Adobe Experience Manager – Setup Azure Devops CICD

Overview

CI/CD Git Flow

Above we can see that we would like our developers to push to their own git repo (Customer Git = Azure Devops).

From here we can then Sync Azure git with AEM Cloud Manager Git.

Below is a sample build pipeline you can use in Azure Devops.

azure-pipline.yml

trigger:
  batch: true
  branches:
    include:
    - master

variables:
- name: remote_git
  value: rangerrom/africa-p46502-uk11112

stages:
- stage: AEM_Cloud_Manager
  jobs:
  - job: Push_To_Cloudmanager
    timeoutInMinutes: 10
    condition: succeeded()
    workspace:
      clean: all
    steps:
    #steps: [ script | bash | pwsh | powershell | checkout | task | templateReference ]
    
    - task: AzureKeyVault@1
      displayName: pull secrets
      inputs:
        azureSubscription: PROD
        KeyVaultName: mykeyvault
        SecretsFilter: aem_dm_cm_credentials
    - checkout: self
      clean: true
    - bash: echo "##vso[task.setvariable variable=git_ref]https://$(aem_dm_cm_credentials)@git.cloudmanager.adobe.com/$(remote_git)/"
      displayName: Set remote adobe git URL 
    - bash: git remote add adobe $(git_ref)
      displayName: Add git remote to Adobe CloudManager
    - bash: cat .git/config
      displayName: Show git config
    - bash: git checkout $(Build.SourceBranchName)
      displayName: Checkout $(Build.SourceBranchName) branch
    - bash: git push -f -v adobe $(Build.SourceBranchName)
      displayName: Push changes from $(Build.SourceBranchName) branch to Adobe CloudManager

That is pretty much the minimum required to sync the two git repos. Happy AEMing and building your CMS solution.

Kali Linux – Steam and Rocksmith 2014

I enjoy playing my bass guitar with Rocksmith. However I do not use Windows at all and wanted it to work with Kali Linux 2021.2, a Debian 10 Distro.

This is how I got it to work with my audio and Nvidia GeForce RTX 2070

Install crossover bin file

wget https://media.codeweavers.com/pub/crossover/cxlinux/demo/install-crossover-21.0.0.bin

chmod 755 install-crossover-21.0.0.bin

./install-crossover-21.0.0.bin

Activate critical 32 bit libraries so that 32 bit games run on 64 bit linux

sudo dpkg –add-architecture i386

sudo apt install nvidia-driver-libs:i386

In Crossover install steam

In Steam install rocksmith and Rocksmith 2014

Run cxdiag – works

Run cxdiag64 – works

Manual Fixes for 32 bit libraries so audio can work perfectly and other things like odbc etc

./cxfix missinglibgsm

./cxfix  missinglibasound

./cxfix missinglibpcap

./cxfix missinglibosmesa8

./cxfix missinglibopencl 

./cxfix missinglibodbc

./cxfix missinglibcapi20

./cxfix missinggstreamer1libav

Manual fix

apt-get install gstreamer1.0-plugins-bad:i386

Change audio to alsa

Winetricks – wget https://raw.githubusercontent.com/Winetricks/winetricks/master/src/winetricks

chmod +x winetricks

./winetricks

set audio to alsa

Run crossover regedit

Export CU/software/wine/drivers

Run bottle command regedit

Import registry file to bottle

Change driver to alsa

Rocksmith Config

Now configure your steam bottle to use alsa audio drivers for Rocksmith to work.

~/cxoffice/bin/wine –bottle “Steam” winecfg

Configure Rocksmith INI

Win32UltraLowLatencyMode=0

Enjoy playing your guitar on linux with rocksmith!

Adobe Experience Manager – Remote SPA vs Headless

It is important to consider the implications regarding the workflow content authors will use when publishing content to a website via

  • Headless
  • Remote SPA

Headless

API driven solution.

  • Author does not require a WYSIWYG experience when editing and updating a particular component
  • Re-usable, presentation-agnostic content, composed of structured data elements (text, dates, references, etc.
  • Implemented as a DAM asset
  • Used via the GraphQL Assets APIs for 3rd party consumption e.g. Trader

Content fragments drive the data model. Usually managed via AEM assets.

Remote SPA

A totally seperate website based on Angular/React/NextJS. Your site does not have to be a “true” remote spa, as long as it is leveraging the modern javascript based frameworks.

IFRAME in AEM author instance enables the wysiwyg expereince.

Usually managed via AEM Sites

Developer activates areas of a website for content authors to use AEM components

This is the high level architecture to allow AEM to integrate with an existing angular site.

A great tutorial to work through regarding remote spa is https://experienceleague.adobe.com/docs/experience-manager-learn/getting-started-with-aem-headless/spa-editor/remote-spa/quick-setup.html?lang=en

Remote SPA Iframe Model

1. SPA Editor loads.
2. SPA is loaded in a separated frame.
3. SPA requests JSON content and renders components client-side.
4. SPA Editor detects rendered components and generates overlays.
5. Author clicks overlay, displaying the component’s edit toolbar.
6. SPA Editor persists edits with a POST request to the server.
7. SPA Editor requests updated JSON to the SPA Editor, which is sent to the SPA with a DOM Event.
8. SPA re-renders the concerned component, updating its DOM.

Advantages

The content author will not experience any difference when editing or creating content.

Enables in context editing of content & configuring components.

Enables in context layout management of components.

Provides the content authors with same user experience both in author and publish mode.

It supports React and Angular framework which are widely used to create SPAs.

Much improved seamless user experience.

Improved application performance, as all content is retrieved once in a single page load with additional resources loaded asynchronously as needed based on user interaction with in the page.

Clear separation of front end and back end development which reduces the integration dependency on each other.

Gives the front-end developers the flexibility to use their choice of JavaScript frameworks and build tools to create highly interactive applications.

By being faster, fluid, and more like a native application, a SPA becomes a very attractive experience not only for the visitor of the webpage, but also for marketers and developers due to the nature of how SPAs work.

Kubernetes – Prometheus – use an existing persistent volume claim

We use the Prometheus Operator Chart to deploy the Prometheus, Alert Manager and Grafana stack,

Please note as of October 2020, the official Prometheus Operator Chart is.

prometheus-communityhttps://prometheus-community.github.io/helm-charts

To add this chart to your Helm repo.

helm repo add prometheus-community https://prometheus-community.github.io/helm-charts

What usually happens is that you will initially install the chart and by default your kubernetes PV will have a default policy of DELETE. This means if you uninstall the chart, the Persistent Volume in the cloud (Azure, AWS, GCP etc) will also be deleted. Not a great outcome if you want historic metrics.

What you want is a PV that has a reclaim policy of retain, so that when the chart is every uninstalled, your managed disks in the cloud are retained.

So how do you go about doing this?

  • Install the chart initially with a persistent volume configured in the values files for Prometheus. (The default way)
  • Configure Grafana correctly on the first install.

Stage 1

Prometheus

We using managed GKE/GCP, so standard storage class is fine, your cloud provider may be different.

  • Configure your Prometheus Operator chart with the following in the values file.
prometheus:    
    prometheusSpec:
      storageSpec:
        volumeClaimTemplate:
          spec:
            storageClassName: standard
            resources:
              requests:
                storage: 64Gi

Grafana

With Grafana, you can get away with setting it up correctly first time round.

Create the PVC

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: grafana
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi

Add the following to the Grafana values files.

grafana:
  persistence:
    enabled: true
    type: pvc
    existingClaim: pv-claim-grafana
  • Deploy your chart

Stage 2

Once the chart is deployed, go to your cloud provider and note the disk id’s. I am using GCP. So I note them down here:

In the above, the Name column is the disk id for GCP. Azure/AWS will be different e.g. Disk URI etc.

Go back to your helm chart repository and lets alter the chart so that Prometheus and Grafana are always bound to this disks, even if you uninstall the chart.

Prometheus

If you would like to keep the data of the current persistent volumes, it should be possible to attach existing volumes to new PVCs and PVs that are created using the conventions in the new chart. For example, in order to use an existing Azure disk for a helm release called `prometheus-operator` the following resources can be created:

  • Note down the RELEASE NAME of your prometheus operator chart. Mine is called prometheus operator.

Configure the following yaml template. This is a HACK. By making the name of the PV and PVC EXACTLY the same as the chart. Prometheus will reuse the PV/PVC.

apiVersion: v1
kind: PersistentVolume
metadata:
  name: pvc-prometheus-operator-prometheus-0
spec:
  storageClassName: "standard"
  capacity:
    storage: 64Gi
  accessModes:
    - ReadWriteOnce
  gcePersistentDisk:
    pdName: gke-dev-xyz-aae-pvc-d8971937-85f8-4566-b90e-110dfbc17cbb
    fsType: ext4
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  labels:
    app: prometheus
    prometheus: prometheus-operator-prometheus
  name:  prometheus-prometheus-operator-prometheus-db-prometheus-prometheus-operator-prometheus-0
spec:
  storageClassName: "standard"
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 64Gi
  • Configure the above to always run as a PreInstall hook e.g. with Helmfile
  - events: ["presync"]
    showlogs: true
    command: "kubectl"
    args:
    - apply
    - -n
    - monitoring
    - -f
    - ./pv/pv-prometheus.yaml

Grafana

Grafana is not so fussy. So we can do the following:

Configure the following yaml template.

apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv-grafana
spec:
  storageClassName: "standard"
  capacity:
    storage: 10Gi
  accessModes:
    - ReadWriteOnce
  claimRef:
    namespace: service-compliance
    name: pv-claim-grafana
  gcePersistentDisk:
    pdName: gke-dev-xyz-aae-pvc-4b450590-8ec0-471d-bf1a-4f6aaa9c4e81
    fsType: ext4
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: pv-claim-grafana
spec:
  storageClassName: "standard"
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi

Then finally setup a preinstall helm sync if using helmfile

  hooks:
  - events: ["presync"]
    showlogs: true
    command: "kubectl"
    args:
    - apply
    - -n
    - monitoring
    - -f
    - ./pv/pv-grafana.yaml

With the above in place, you will be able to rerun chart installs for updates and uninstall the chart. Your final check is to ensure the PVs are RETAIN and not on the DELETE policy.

Using Kubernetes to measure homepage speed index

Hi,

We decided to try out using a Kubernetes cronjob to measure the SPEED INDEX score of a system. We thought this would be a great idea. We can have a job that polls a server and measures the performance of the HomePage loading.

We use Cypress and a lighthouse plugin for synthetics.
https://github.com/mfrachet/cypress-audit

Why Speed Index and not something like BrowserTimings?

Speed index measures the time it takes for the visible (above-the-fold) parts of a webpage to appear to the users. It became part of the WebPageTest in April 2012. The Lighthouse-powered Audits panel in the Chrome DevTools also measures it.

Speed index can be used to compare performance against competitors or previous versions of the same website. It best utilised alongside other metrics like load time and start-render to get a better understanding of a site’s quality.

Thats pretty awesome. You get a good metric that shows you the user experience. BrowserTimings usually do not factor in when the user can interact with the page. Even if a web page takes 10 seconds to load, if it is usable within 4 seconds, we good as gold.

The Problem

SpeedIndex uses your browser and it requires CPU/Memory, both of which can be throttled by kubernetes.

We noticed that when we used a normal machine the speed index was good – around 5 seconds. We noticed in kubernetes it would be around 12 seconds!

We had set some throttling limits, but never though it would get throttled.

resources:
              limits:
                cpu: 1500m
                memory: 1500M
              requests:
                cpu: "1"
                memory: 1G

However, it was getting throttled!

We decided to remove the resource limits

resources:
 {}

Now, there is no throttling and the speed index measured similar to our home device.

The above graph shows that throttling disabled improved the metric.

Conclusion

Even those Speed Index is a great way to measure performance, we cannot use it on Kubernetes as a measuring device, because even with throttling off, you just never know if another container/pod is stealing resources. The best way forward when measuring metrics (NOT GATHERING), is to measure as close as possible to the source e.g. Client Side Javascript etc.

So for us, it is back to the drawing board and pick a measurement as close to the client as possible.

Possible solutions:

https://github.com/WPO-Foundation/RUM-SpeedIndex/blob/master/src/rum-speedindex.js

Avoid using kubernetes to be a measuring device that measures metrics related to CPU/Memory

Questions

There seems to be a bug in Kubernetes with some linux kernels regarding CFS/Throttling

https://github.com/kubernetes/kubernetes/issues/67577

What still puzzles me, is I have a physical node that is not using power. I have a process using 0.4 cpu with a limit of 2.5 cpus and it gets throttled!

This is on Azure Kubernetes 1.16. However I can reproduce the issue on Google GKE as well. So it is a kubernetes algorithm of some sort that limits more than you would expect!

Site Reliability Engineering with Gitops & Kubernetes – Part 1

Intro

One of the key pillars regarding SRE is being able to make quantitative decisions based on key metrics.

The major challenge is what are key metrics and this is testament to the plethora of monitoring software out in the wild today.

At a foundational level you want to ensure your services are always running, however 100% availability in not practical.

Class SRE implements Devops
{
  MeasureEverything()
}

Availability/Error Budget

You then figure out what availability is practical for your application and services.

Availability

Your error budget will then be the downtime figure e.g. 99% is 7.2 hours of downtime a month that you can afford to have.

SLAs, SLOs and SLIs

This will be the starting point on your journey to implementing quantitative analysis to

  • Service Level Agreements
  • Service Level Objectives
  • Service Level Indicators

This blog post is all about how you can measure Service Level Objectives without breaking the bank. You do not need to spend millions of dollars on bloated monitoring solutions to observe key metrics that really impact your customers.

Just like baking a cake, these are the ingredients we will use to implement an agile, scaleable monitoring platform that is solely dedicated to doing one thing well.

Outcome

This is what we want our cake to deliver:

  • Measuring your SLA Compliance Level
  • Measuring your Error Budget Burn Rate
  • Measuring if you have exhausted your error budget
Service Level Compliance – SLAs -> SLOs -> SLIs

If you look at the cake above, you can see all your meaningful information in one dashboard.

  1. Around 11am the error budget burn rate went up. (A kin to your kids spending all their pocket money in one day!)
  2. Compliance was breached (99% availability) – The purple line (burn rate) went above the maximum budget (yellow line)

These are metrics you will want to ALERT on at any time of the day. These sort of metrics matter. They are violating a Service Level Agreement.

What about my other metrics?

Aaaah, like %Disk Queue Length, Processor Time, Kubernetes Nodes/Pods/Pools etc? Well…

I treat these metrics as second class citizens. Like a layered onion. Your initial metrics should be – Am I violating my SLA? If not, then you can use the other metrics that we have enjoyed over the decades to compliment your observeability into the systems and as a visual aid for diagnostics and troubleshooting.

Alerting

Another important consideration is the evolution of infrastructure. In 1999 you will have wanted to receive an alert if a server ran out of disk space. In 2020, you are running container orchestration clusters and other high availability systems. A container running out of disk space is not so critical as it used to be in 1999.

  • Evaluate every single alert you have and ask yourself. Do I really need to wake someone up at 3am for this problem?
  • Always alert on Service Level Compliance levels ABOUT to breach
  • Always alert on Error Budget Burn Rates going up sharply
  • Do not alert someone out of hours because the CPU is 100% for 5 minutes unless Service Level Compliance is being affected to

You will have happier engineers and a more productive team. You will be cooled headed during an incident because you know the different between a cluster node going down versus Service Level Compliance violations. Always solve the Service Level Compliance and then fix the other problems.

Ingredients

Where are the ingredients you promised? You said it will not break the bank, I am intrigued.

Summary

In this post we have touched on the FOUNDATION of what we want out of monitoring.

We know exactly what we want to measure – Service Level Compliance, Error Budget Burn Rate and Max Budget. All this revolves around deciding on the level of availability we give a service.

We touched on the basic ingredients that we can use to build this solution.

In my next blog post we will discuss how we mix all these ingredients together to provide a platform that is is good at doing one thing well.

Measuring Service Level Compliance & Error Budget Burn Rates

When you give your child $30 to spend a month and they need to save $10 a month. You need to be alerted if they spending too fast (Burn Rate).

Microsoft Azure Devops – Dynamic Docker Agent (Ansible)

Often you may require a unique custom build/release agent with a specific set of tools.

A good example is a dynamic Ansible Agent that can manage post deployment configuration. This ensures configuration drift is minimised.

Secondly this part of a release is not too critical, so we can afford to spend a bit of time downloading a docker image if it is not already cached.

This article demonstates how you can dynamically spawn a docker container during your release pipeline to apply configuration leveraging Ansible. It will also demonstrate how to use Ansible Dynamic Inventory to detect Azure Virtual machine scale set instances – in the past you would run hacks on facter.

Prerequsites

You will require:

  • A docker image with ansible – You can use mine as a starting point – https://github.com/Romiko/DockerUbuntuDev
    The above is hosted at: dockerhub – romiko/ansible:latest (See reference at bottom of this page)
  • A Self-host Azure Devops Agent – Linux
  • Docker installed on the self-hosted agent
  • Docker configured to expose Docker Socket
    docker run -v /var/run/docker.sock:/var/run/docker.sock -d –name some_container some_image

Release Pipeline

Configure a CLI Task in your release pipeline.

variables:
  env: 'dev'

steps:
- task: AzureCLI@2
  displayName: 'Azure CLI Ansible'
  inputs:
    azureSubscription: 'RangerRom'
    scriptType: bash
    scriptLocation: inlineScript
    inlineScript: |
     set -x
     
     docker run --rm -v $(System.DefaultWorkingDirectory)/myproject/config:/playbooks/ romiko/ansible:latest \
      "cd  /playbooks/ansible; ansible-playbook --version; az login --service-principal --username $servicePrincipalId --password $servicePrincipalKey --tenant $tenantId; az account set --subscription $subscription;ansible-playbook my-playbook.yaml -i inventory_$(env)_azure_rm.yml --extra-vars \"ansible_ssh_pass=$(clientpassword)\""
    addSpnToEnvironment: true
    workingDirectory: '$(System.DefaultWorkingDirectory)/myproject/config/ansible'

In the above the code that is causing a SIBLING container to spawn on the self-hosted devops agent is:

docker run –rm -v $(System.DefaultWorkingDirectory)/myproject/config:/playbooks/ romiko/ansible:latest \ <command to execute inside the container>

Here we have a mount point occuring where the config folder in the repo will be mounted into the docker container.

-v <SourceFolder>:<MountPointInDockerContainer>

The rest of the code after the \ will execute on the docker container. So in the above,

  • The container will become a sibling,
  • Entry into a bash shell
  • Container will mount a /playbooks folder containing the source code from the build artifacts
  • Connect to azure
  • Run an anisble playbook.
  • The playbook will find all virtual machine scale sets in a resoruce group with a name pattern
  • Apply a configuration by configuring logstash to auto reload config files when they change
  • Apply a configuration by copying files

Ansible

The above is used to deploy configurations to an Azure Virtual Machine Scale Set. Ansible has a feature called dynamica inventory. We will leverage this feature to detect all active nodes/instances in a VMSS.

The structure of ansible is as follows:

Ansible Dynamic Inventory

So lets see how ansible can be used to detect all running instances in an Azure Virtual machine Scale Set

inventory_dev_azure_rm.yml

Below it will detect any VMSS cluster in resourcegroup rom-dev-elk-stack that has logstash in the name

plugin: azure_rm

include_vmss_resource_groups:
- rom-dev-elk-stack

conditional_groups:
  logstash_hosts: "'logstash' in name"

auth_source: auto

logstash_hosts.yml (Ensure this lives in a group_vars folder)

Now, I can configure ssh using a username or ssh keys.

---
ansible_connection: ssh
ansible_ssh_user: logstash

logstash-playbook.yaml

Below I now have ansible doing some configuration checks for me on a logstash pipeline (upstream/downstream architecture).


    - name: Logstash auto reloads check interval
      lineinfile:
        path: /etc/logstash/logstash.yml
        regexp: '^config\.reload\.interval'
        line: "config.reload.interval: 30s"
      become: true
      notify:
        - restart_service

    - name: Copy pipeline configs
      copy:
        src: ../pipelines/conf.d/
        dest: /etc/logstash/conf.d/
        owner: logstash
        group: logstash
      become: true
    
    - name: Copy pipeline settings
      copy:
        src: ../pipelines/register/
        dest: /etc/logstash/
        owner: logstash
        group: logstash
      become: true

To improve security – replace user/password ansible login with an SSH key pair.

References

To read up more about Docker Socket mount points. Check out

https://www.develves.net/blogs/asd/2016-05-27-alternative-to-docker-in-docker/

https://docs.ansible.com/ansible/latest/user_guide/intro_dynamic_inventory.html

Thanks to Shawn Wang and Ducas Francis for the inspirations on Docker Socket.

https://azure.microsoft.com/en-au/services/devops/

Azure Streaming Analytics – Beware – JSON Engine

After investigating an issue with Azure Streamiung Analytics, we discovered it cannot deserialise JSON that have the same property names but differ in case e.g.

{
  "Proxy": "abc",
  "proxy": "def"
}

If you send the above payload to a Streaming Analytics Job, it will fail.

Source ‘<unknown_location>’ had 1 occurrences of kind ‘InputDeserializerError.InvalidData’ between processing times ‘2020-03-30T00:19:27.8689879Z’ and ‘2020-03-30T00:19:27.8689879Z’. Could not deserialize the input event(s) from resource ‘Partition: [8], Offset: [1], SequenceNumber: [1]’ as Json. Some possible reasons: 1) Malformed events 2) Input source configured with incorrect serialization format

We opened a ticket with Microsoft. This was the response.

“Hi Romiko,

Thank you for being patience with us. I had further discussion with our ASA PG and here’s our findings.

Findings

ASA unfortunately does not support case sensitive column. We understand it is possible for json documents to add to have two columns that differ only in case and that some libraries support it. However there hasn’t been a compelling use case to support it. We will update the documentation as well. 

We are sorry for the inconvenience. If you have any questions or concerns, please feel free to reach out to me. I will be happy to assist you.”

Indeed other libraries do support this, such as powershell, c#, python etc.

C#

using Newtonsoft.Json;

data = “{‘name’: ‘Ashley’, ‘Name’: ‘Romiko’}”

dynamic message = JsonConvert.DeserializeObject(data);

var text = JsonConvert.SerializeObject(message);

I have built the following tool on github as a workaround for streaming data to elastic. https://github.com/Romiko/EventHub-CheckMalformedEvents

A significant reason why Microsoft should support it – is the Elastic Common Schema. (ECS), a new specification that provides a consistent and customizable way to structure your data in Elasticsearch, facilitating the analysis of data from diverse sources. With ECS, analytics content such as dashboards and machine learning jobs can be applied more broadly, searches can be crafted more narrowly, and field names are easier to remember.

When introducing a new schema, there is always dealing with existing/custom data. Elastic have an ingenious way to solve this. All fields in ECS are lower case. So your existing data can be guarnteed to not conflict if you use an UpperCase.

Let us reference Elastic’s advice

https://www.elastic.co/guide/en/ecs/current/ecs-custom-fields-in-ecs.html

image.png

Elastic, who deal with Big Data all the time recommend using Proxy vs proxy to ensure migrations to ECS is a vaiable/conflic free solution.

Conclusion

If you are migrating huge amounts of data to Elastic Common Schema (ECS), Consider if Azure Streaming Analytics is a good fit due to the JSON limits.

You can also vote to fix this issue here and improve Microsoft’s product offering 🙂

https://feedback.azure.com/forums/270577-stream-analytics/suggestions/40122079-azure-stream-analytics-to-be-able-to-handle-case-s

Debugging Azure Event Hubs and Stream Analytics Jobs

When you are dealing with millions of events per day (Json format). You need a debugging tool to deal with events that do no behave as expected.

Recently we had an issue where an Azure Streaming analytics job was in a degraded state. A colleague eventually found the issue to be the output of the Azure Streaming Analytics Job.

The error message was very misleading.

[11:36:35] Source 'EventHub' had 76 occurrences of kind 'InputDeserializerError.TypeConversionError' between processing times '2020-03-24T00:31:36.1109029Z' and '2020-03-24T00:36:35.9676583Z'. Could not deserialize the input event(s) from resource 'Partition: [11], Offset: [86672449297304], SequenceNumber: [137530194]' as Json. Some possible reasons: 1) Malformed events 2) Input source configured with incorrect serialization format\r\n"

The source of the issue was CosmosDB, we need to increase the RU’s. However the error seemed to indicate a serialization issue.

We developed a tool that could subscribe to events at exactly the same time of the error, using the sequence number and partition.

We also wanted to be able to use the tool for a large number of events +- 1 Million per hour.

Please click link to the EventHub .Net client. This tool is optimised to use as little memory as possible and leverage asynchronous file writes for the an optimal event subscription experience (Console app of course).

Have purposely avoided the newton soft library for the final file write to improve the performance.

The output will be a json array of events.

The next time you need to be able to subscribe to event hubs to diagnose an issue with a particular event, I would recommend using this tool to get the events you are interested in analysing.

Thank you.

Troubleshooting Azure Event Hubs and Azure Streaming Analytics

When you are dealing with millions of events per day (Json format). You need a debugging tool to deal with events that do no behave as expected.

Recently we had an issue where Azure Streaming analytics was in a degraded state. A colleague eventually found the issue to be the output of the Azure Streaming Analytics Job.

The error message was very misleading.

[11:36:35] Source 'EventHub' had 76 occurrences of kind 'InputDeserializerError.TypeConversionError' between processing times '2020-03-24T00:31:36.1109029Z' and '2020-03-24T00:36:35.9676583Z'. Could not deserialize the input event(s) from resource 'Partition: [11], Offset: [86672449297304], SequenceNumber: [137530194]' as Json. Some possible reasons: 1) Malformed events 2) Input source configured with incorrect serialization format\r\n"

The source of the issue was CosmosDB, we need to increase the RU’s. However the error seemed to indicate a serialization issue.

We developed a tool that could subscribe to events at exactly the same time of the error, using the sequence number and partition.

We also wanted to be able to use the tool for a large number of events +- 1 Million per hour.

Please click link to the EventHub .Net client. This tool is optimised to use as little memory as possible and leverage asynchronous file writes for the an optimal event subscription experience (Console app of course).

Have purposely avoided the newton soft library for the final file write to improve the performance.

The output will be a json array of events.

using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Consumer;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;

namespace CheckMalformedEvents
{
    class GetMalformedEvents
    {
        private static string partitionId;
        private static IConfigurationRoot configuration;
        private static string connectionString;
        private static string consumergroup;
        private static EventHubConsumerClient eventHubClient;
        private static EventPosition startingPosition;
        private static DateTimeOffset processingEnqueueEndTimeUTC;

        static void Main(string[] args)

        {
            Init();
            ShowIntro();

            try
            {
                GetEvents(eventHubClient, startingPosition, processingEnqueueEndTimeUTC).Wait();
            }
            catch (AggregateException e)
            {
                Console.WriteLine($"{e.Message}");

            }
            catch (Exception e)
            {
                Console.WriteLine($"{e.Message}");
            }
        }

        private static void Init()
        {
            var builder = new ConfigurationBuilder()
                            .SetBasePath(Directory.GetCurrentDirectory())
                            .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);

            configuration = builder.Build();

            connectionString = configuration.GetConnectionString("eventhub");
            consumergroup = configuration.GetConnectionString("consumergroup");
            eventHubClient = new EventHubConsumerClient(consumergroup, connectionString);

            partitionId = configuration["partitionId"];
            if (long.TryParse(configuration["SequenceNumber"], out long sequenceNumber) == false)
                throw new ArgumentException("Invalid SequenceNumber");

            processingEnqueueEndTimeUTC = DateTimeOffset.Parse(configuration["ProcessingEnqueueEndTimeUTC"]);
            startingPosition = EventPosition.FromSequenceNumber(sequenceNumber);
        }

        private static void ShowIntro()
        {
            Console.WriteLine("This tool is used to troubleshoot malformed messages in an Azure EventHub");
            Console.WriteLine("Sample Error Message to troubleshoot - First get the errors from the Streaming Analytics Jobs Input blade.\r\n");
        }

        private static async Task<CancellationTokenSource> GetEvents(EventHubConsumerClient eventHubClient, EventPosition startingPosition, DateTimeOffset endEnqueueTime)
        {
            var cancellationSource = new CancellationTokenSource();
            if (int.TryParse(configuration["TerminateAfterSeconds"], out int TerminateAfterSeconds) == false)
                throw new ArgumentException("Invalid TerminateAfterSeconds");

            cancellationSource.CancelAfter(TimeSpan.FromSeconds(TerminateAfterSeconds));
            string path = Path.Combine(Directory.GetCurrentDirectory(), $"{Path.GetRandomFileName()}.json");

            int count = 0;
            byte[] encodedText;
            using FileStream sourceStream = new FileStream(path, FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: true);
            {
                encodedText = Encoding.Unicode.GetBytes("{\r\n\"events\": [" + Environment.NewLine);
                await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
                encodedText = Encoding.Unicode.GetBytes("");
                await foreach (PartitionEvent receivedEvent in eventHubClient.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
                {
                    if (encodedText.Length > 0)
                        await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);

                    count++;
                    using var sr = new StreamReader(receivedEvent.Data.BodyAsStream);
                    var data = sr.ReadToEnd();
                    var partition = receivedEvent.Data.PartitionKey;
                    var offset = receivedEvent.Data.Offset;
                    var sequence = receivedEvent.Data.SequenceNumber;

                    try
                    {
                        IsEventValidJson(count, receivedEvent, data, partition, offset, sequence);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"Serialization issue Partition: { partition}, Offset: {offset}, Sequence Number: { sequence }");
                        Console.WriteLine(ex.Message);
                    }

                    if (receivedEvent.Data.EnqueuedTime > endEnqueueTime)
                    {
                        Console.WriteLine($"Last Message EnqueueTime: {receivedEvent.Data.EnqueuedTime:o}, Offset: {receivedEvent.Data.Offset}, Sequence: {receivedEvent.Data.SequenceNumber}");
                        Console.WriteLine($"Total Events Streamed: {count}");
                        Console.WriteLine($"-----------");
                        break;
                    }

                    encodedText = Encoding.Unicode.GetBytes(data + "," + Environment.NewLine);
                    await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
                }
                encodedText = await FinaliseFile(encodedText, sourceStream);
            }

            Console.WriteLine($"\r\n Output located at: {path}");
            return cancellationSource;
        }

        private static async Task<byte[]> FinaliseFile(byte[] encodedText, FileStream sourceStream)
        {
            await sourceStream.WriteAsync(encodedText, 0, encodedText.Length - 6); //Remove ,\r\n on last line
            encodedText = Encoding.Unicode.GetBytes("]\r\n}" + Environment.NewLine);
            await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
            return encodedText;
        }

        private static void IsEventValidJson(int count, PartitionEvent receivedEvent, string data, string partition, long offset, long sequence)
        {
            dynamic message = JsonConvert.DeserializeObject(data);
            message.AzureEventHubsPartition = partition;
            message.AzureEventHubsOffset = offset;
            message.AzureEventHubsSequence = sequence;
            message.AzureEnqueuedTime = receivedEvent.Data.EnqueuedTime.ToString("o");

            if (count == 0)
                Console.WriteLine($"First Message EnqueueTime: {message.AzureEnqueuedTime}, Offset: {message.AzureEventHubsOffset}, Sequence: {message.AzureEventHubsSequence}");
        }
    }
}

The next time you need to be able to subscribe to event hubs to diagnose an issue with a particular event, I would recommend using this tool to get the events you are interested in analysing.

Thank you.