Skip to content

Instantly share code, notes, and snippets.

@jsquire
Last active August 17, 2021 12:44
Show Gist options
  • Save jsquire/89ac6db20959b747b5092b32fe6fddc6 to your computer and use it in GitHub Desktop.
Save jsquire/89ac6db20959b747b5092b32fe6fddc6 to your computer and use it in GitHub Desktop.
Manual Batching with the `EventHubProducerClient`

Scenario

This scenario is based on this question on Stack Overflow, slightly modified to improve the failure cases.

A WinForms application is collecting telemetry from some number of IoT devices deployed on company delivery trucks. Collection is accompanied by some light data transformations, and then publishing them to Event Hubs.

Because there is a consistent flow of telemetry from the devices, the application is performing the collection and the processing in parallel. To maximize throughput, the application is prioritizing batch density and is not concerned with accounting for any drops in telemetry collection that would benefit from flushing partial batches, other than when the collected telemetry queue is fully empty.

Event Publishing Code

The code that is responsible for running as a background task and processing one category of telemetry, looks similar to the following. Note that most of this code is concentrated around batch management.

One important call-out is that this scenario sends telemetry only to the gateway for automatic partition assignment. Were it to make use of partition keys to group data together, each partition key would require a dedicated batch. The logic would grow in complexity as it would need to potentially track multiple batches that were in-progress concurrently.

public static Task ProcessTruckTelemetry(EventHubProducerClient producer,
                                         VirtualThingsController controller,
                                         CancellationToken cancellationToken,
                                         Action<int> progressCallback) =>
    Task.Run(async () =>
    {
        var eventBatch = default(EventDataBatch);
        var totalEventCount = 0;        
        var interval = controller.GetTruckTelemetryInterval();        
        
        // To support concurrent collection and processing, a ConcurrentQueue<T> is
        // used for the telemetry interactions via the controller.
        
        var telemetryQueue = controller.GetTruckTelemetry();
                
        while (!cancellationToken.IsCancellationRequested)
        {
            // In this example, we'll pump the telemetry queue as long
            // as cancellation has not taken place. If we wanted to limit  
            // the queue size, we would need to manage that manually or change
            // to a channel-based approach.
            
            while ((!cancellationToken.IsCancellationRequested) 
                && (telemetryQueue.TryPeek(out var telemetry))
            {
                // Create a batch if we don't currently have one.

                eventBatch ??= (await producer.CreateBatchAsync().ConfigureAwait(false));
                
                // Translate the telemetry data.
                
                telemetry.Timestamp = DateTimeOffset.UtcNow;                
                var serializedTelemetry = JsonSerializer.Serialize(telemetry);
                var eventData = new EventData(serializedTelemetry);
                
                // Attempt to add the event to the batch.  If the batch is full, 
                // send it and clear state so that we know to create a new one.
                
                if (!eventBatch.TryAdd(eventData))
                {
                    // If there are no events in the batch, this event is
                    // too large to ever publish.  We can't recover automatically.
                    //
                    // We'll move the event to a dead-letter area using a method
                    // that is guaranteed not to throw.
                    
                    if (eventBatch.Count == 0)
                    {
                        await DeadLetterEventSafe(eventData).ConfigureAwait(false);
                        telemetryQueue.TryDequeue(out _);
                    }
                    else
                    {
                        // See implementation below.
                
                        totalEventCount += 
                            await SendWithRetriesAndDeadLetterAsync(
                                batch, 
                                cancellationToken)
                            .ConfigureAwait(false));
                    }
                }
                else
                {
                    telemetryQueue.TryDequeue(out _);
                }
            }
            
            // Once we hit this point, there were no telemetry items left
            // in the queue.  Send any that are held in the event batch.
            
            if ((eventBatch != default) && (eventBatch.Count > 0))
            {
                // See implementation below.
                
                totalEventCount += 
                    await SendWithRetriesAndDeadLetterAsync(
                        batch, 
                        cancellationToken)
                    .ConfigureAwait(false));
            }            
            
            // Invoke the progress callback with the total count.
            
            progressCallback(totalEventCount);

            // Pause for the requested delay before attempting to 
            // pump the telemetry queue again.  

            try
            {
                await Task.Delay(interval, cancellationToken).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
               // Thrown if cancellation is requested while we're in the
               // delay.  This example is assuming that it isn't interesting 
               // to the application and swallows it.
            }
        }
    }, cancellationToken);
    
private async Task<int> SendWithRetriesAndDeadLetterAsync(EventBatch batch,
                                                          CancellationToken cancellationToken)
{
    var shouldSend = true;
    var sent = false;
                     
    while ((!cancellationToken.IsCancellationRequested)
        && (shouldSend))
    {
        try
        {
            await producer.SendAsync(eventBatch).ConfigureAwait(false);
            sent = true;
 
            eventBatch.Dispose();
            eventBatch = default;
        }
        catch (Exception ex) when (!ShouldRetry(ex))
        {
            // We'll move the event to a dead-letter area using a method
            // that is guaranteed not to throw.
            
            await DeadLetterBatchSafe(eventData).ConfigureAwait(false);
            shouldSend = false;
        }
    }
    
    return sent ? eventBatch.Count : 0;
}

Application Code

The core of the application is built around management of background tasks that looks similar to:

// This is the application shutdown signal.  When cancellation is 
// requested for this token, all of the background tasks will terminate.

using var cancellationSource = new CancellationTokenSource();
    
var producer = GetProducerClient();
var virtualThingsController = new VirtualThingsController();

try
{
    // Define a simple process callback.
    
    Action<int> progressCallback = totalCount => 
        Debug.WriteLine($"There have been a total of { totalCount } items published.");
        
    // Start the background processing and capture the tasks.  Once the
    // call is made, telemetry is being processed in the background until
    // the cancellation token is signaled.
    
    var backgroundTasks = new List<Task>();
    
    backgroundTasks.Add
    (
        ProcessTruckTelemetry(
            producer, 
            virtualThingsController, 
            cancellationSource.Token, 
            progressCallback)
    );

    backgroundTasks.Add
    (
        ProcessOtherTelemetry(
            producer,
            virtualThingsController,
            cancellationSource.Token,
            progressCallback)
    );
    
    // ============================================================
    //  The application UI and related elements are present, but
    //  not relevant to the focus of the scenario.
    //
    //  << UI and Interactivity code... >>
    // ============================================================
    
    // When the application is ready to stop, signal the cancellation
    // token and wait for the tasks.  We're not calling ConfigureAwait(false)
    // here because the application is WinForms, which has some 
    // sensitivity to the synchronization context.
    
    cancellationSource.Cancel();
    await Task.WhenAll(backgroundTasks);
}
finally
{        
    await producer.CloseAsync();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment