
When you are dealing with millions of events per day (Json format). You need a debugging tool to deal with events that do no behave as expected.
Recently we had an issue where Azure Streaming analytics was in a degraded state. A colleague eventually found the issue to be the output of the Azure Streaming Analytics Job.
The error message was very misleading.
[11:36:35] Source 'EventHub' had 76 occurrences of kind 'InputDeserializerError.TypeConversionError' between processing times '2020-03-24T00:31:36.1109029Z' and '2020-03-24T00:36:35.9676583Z'. Could not deserialize the input event(s) from resource 'Partition: [11], Offset: [86672449297304], SequenceNumber: [137530194]' as Json. Some possible reasons: 1) Malformed events 2) Input source configured with incorrect serialization format\r\n"
The source of the issue was CosmosDB, we need to increase the RU’s. However the error seemed to indicate a serialization issue.
We developed a tool that could subscribe to events at exactly the same time of the error, using the sequence number and partition.
We also wanted to be able to use the tool for a large number of events +- 1 Million per hour.
Please click link to the EventHub .Net client. This tool is optimised to use as little memory as possible and leverage asynchronous file writes for the an optimal event subscription experience (Console app of course).
Have purposely avoided the newton soft library for the final file write to improve the performance.
The output will be a json array of events.
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Consumer;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
namespace CheckMalformedEvents
{
class GetMalformedEvents
{
private static string partitionId;
private static IConfigurationRoot configuration;
private static string connectionString;
private static string consumergroup;
private static EventHubConsumerClient eventHubClient;
private static EventPosition startingPosition;
private static DateTimeOffset processingEnqueueEndTimeUTC;
static void Main(string[] args)
{
Init();
ShowIntro();
try
{
GetEvents(eventHubClient, startingPosition, processingEnqueueEndTimeUTC).Wait();
}
catch (AggregateException e)
{
Console.WriteLine($"{e.Message}");
}
catch (Exception e)
{
Console.WriteLine($"{e.Message}");
}
}
private static void Init()
{
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
configuration = builder.Build();
connectionString = configuration.GetConnectionString("eventhub");
consumergroup = configuration.GetConnectionString("consumergroup");
eventHubClient = new EventHubConsumerClient(consumergroup, connectionString);
partitionId = configuration["partitionId"];
if (long.TryParse(configuration["SequenceNumber"], out long sequenceNumber) == false)
throw new ArgumentException("Invalid SequenceNumber");
processingEnqueueEndTimeUTC = DateTimeOffset.Parse(configuration["ProcessingEnqueueEndTimeUTC"]);
startingPosition = EventPosition.FromSequenceNumber(sequenceNumber);
}
private static void ShowIntro()
{
Console.WriteLine("This tool is used to troubleshoot malformed messages in an Azure EventHub");
Console.WriteLine("Sample Error Message to troubleshoot - First get the errors from the Streaming Analytics Jobs Input blade.\r\n");
}
private static async Task<CancellationTokenSource> GetEvents(EventHubConsumerClient eventHubClient, EventPosition startingPosition, DateTimeOffset endEnqueueTime)
{
var cancellationSource = new CancellationTokenSource();
if (int.TryParse(configuration["TerminateAfterSeconds"], out int TerminateAfterSeconds) == false)
throw new ArgumentException("Invalid TerminateAfterSeconds");
cancellationSource.CancelAfter(TimeSpan.FromSeconds(TerminateAfterSeconds));
string path = Path.Combine(Directory.GetCurrentDirectory(), $"{Path.GetRandomFileName()}.json");
int count = 0;
byte[] encodedText;
using FileStream sourceStream = new FileStream(path, FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: true);
{
encodedText = Encoding.Unicode.GetBytes("{\r\n\"events\": [" + Environment.NewLine);
await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
encodedText = Encoding.Unicode.GetBytes("");
await foreach (PartitionEvent receivedEvent in eventHubClient.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
{
if (encodedText.Length > 0)
await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
count++;
using var sr = new StreamReader(receivedEvent.Data.BodyAsStream);
var data = sr.ReadToEnd();
var partition = receivedEvent.Data.PartitionKey;
var offset = receivedEvent.Data.Offset;
var sequence = receivedEvent.Data.SequenceNumber;
try
{
IsEventValidJson(count, receivedEvent, data, partition, offset, sequence);
}
catch (Exception ex)
{
Console.WriteLine($"Serialization issue Partition: { partition}, Offset: {offset}, Sequence Number: { sequence }");
Console.WriteLine(ex.Message);
}
if (receivedEvent.Data.EnqueuedTime > endEnqueueTime)
{
Console.WriteLine($"Last Message EnqueueTime: {receivedEvent.Data.EnqueuedTime:o}, Offset: {receivedEvent.Data.Offset}, Sequence: {receivedEvent.Data.SequenceNumber}");
Console.WriteLine($"Total Events Streamed: {count}");
Console.WriteLine($"-----------");
break;
}
encodedText = Encoding.Unicode.GetBytes(data + "," + Environment.NewLine);
await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
}
encodedText = await FinaliseFile(encodedText, sourceStream);
}
Console.WriteLine($"\r\n Output located at: {path}");
return cancellationSource;
}
private static async Task<byte[]> FinaliseFile(byte[] encodedText, FileStream sourceStream)
{
await sourceStream.WriteAsync(encodedText, 0, encodedText.Length - 6); //Remove ,\r\n on last line
encodedText = Encoding.Unicode.GetBytes("]\r\n}" + Environment.NewLine);
await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
return encodedText;
}
private static void IsEventValidJson(int count, PartitionEvent receivedEvent, string data, string partition, long offset, long sequence)
{
dynamic message = JsonConvert.DeserializeObject(data);
message.AzureEventHubsPartition = partition;
message.AzureEventHubsOffset = offset;
message.AzureEventHubsSequence = sequence;
message.AzureEnqueuedTime = receivedEvent.Data.EnqueuedTime.ToString("o");
if (count == 0)
Console.WriteLine($"First Message EnqueueTime: {message.AzureEnqueuedTime}, Offset: {message.AzureEventHubsOffset}, Sequence: {message.AzureEventHubsSequence}");
}
}
}
The next time you need to be able to subscribe to event hubs to diagnose an issue with a particular event, I would recommend using this tool to get the events you are interested in analysing.
Thank you.
You must be logged in to post a comment.