Skip to content

Instantly share code, notes, and snippets.

@jsquire
Created February 11, 2022 20:21
Show Gist options
  • Save jsquire/bfbbe8b0d9f27d3d5b9c63ceb9390ce8 to your computer and use it in GitHub Desktop.
Save jsquire/bfbbe8b0d9f27d3d5b9c63ceb9390ce8 to your computer and use it in GitHub Desktop.
Event Hubs: Checkpoint Store Proposal

Event Hubs: Checkpoint Store Proposal

Despite being a key requirement for extending EventProcessor<T>, no abstraction exists for processor storage operations. Developers wishing to extend the processor must implement the storage operations required by its abstract members, holding responsibility for ensuring a production-ready implementation. They must also infer what storage operations may be needed by the application which the processor does not use - such as writing checkpoints that the processor consumes - and provide an implementation for them. This places a burden on developers and introduces a barrier of entry for extending EventProcessor<T>.

Things to know before reading

  • The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.

  • Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.

  • Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Event Hubs types, it can likely be assumed that it is for illustration only.

Goals

  • Allow developers to extend the EventProcessor<T> without the burden of implementing storage operations.
  • Developers should be able to use our existing storage implementations with custom processors with minimal effort.
  • Developers should be able to implement custom storage provider in a standard way that can be easily consumed by others.

Why this is needed

Developers have registered consistent feedback in the last 24 months that having to implement a custom storage solution makes using EventProcessor<T> undesirable. Because our batch processing story is built around EventProcessor<T>, this friction causes developers to see the Azure.Messaging.EventHubs pacakge as not having parity with the legacy packages and has been expresed as a migration blocker. Developers have expressed a strong desire to be able to make use of our existing Blob Storage implementation from the Azure.Messaging.EventHubs.Processor, which is not possible, as our implementation is internal.

API View

Design thoughts

A new abstract CheckpointStore type is created that serves as a contract for all storage operations assocaited with the processor. The Azure.Messaging.EventHubs.Processor package exposes its Blob Storage implementation of the checkpoiknt store as BlobCheckpointStore, which allows interested parties to make use it while not introducing depencendies on Azure Storage in other packages.

These changes are enough to meet the minimum desired feature set, but still require developers to write boiler code to wire up the processor storage methods to the checkpoint store. If we are willing to made additional adjustments to EventProcessor<T>, a default checkpoint store implementation can be introduced which preserves backwards compatibility and flexibility while also removing the need for this boilerplate.

Usage example

Assume that an application is using a custom processor; the basic flow is consistent across options and looks quite a bit like the pattern for EventProcessorClient, other than not needing to wire up event handlers.

    var blobClient = new BlobContainerClient("<< CONNECTION >>", "<< CONTAINER >>");
    var checkpointStore = new BlobCheckpointStore(blobClient);

    var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
    var eventHubName = "<< NAME OF THE EVENT HUB >>";
    var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";   
    
    var processor = new CustomProcessor(100, consumerGroup, eventHubsConnectionString, eventHubName, checkpointStore);

    var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
    
    await processor.StartProcessingAsync();

    try
    {
        // The processor performs its work in the background; block until cancellation
        // to allow processing to take place.

        await Task.Delay(Timeout.Infinite, cancellationSource.Token);
    }
    catch (TaskCanceledException)
    {
        // This is expected when the delay is canceled.
    }

    await processor.StopProcessingAsync();

Option: Additional constructors and hidden overloads

This option corresponds to revision 2 in the API View for Azure.Messaging.EventHubs and would be the preferred developer experience.

public class CustomProcessor : EventProcessor<EventProcessorPartition>
{
    public CustomProcessor(int maximumBatchSize,
                           string consumerGroup,
                           string connectionString,
                           string eventHubName,
                           CheckpointStore checkpointStore,
                           EventProcessorOptions options = default) : base(maximumBatchSize, consumerGroup, connectionString, eventHubName, checkpointStore, options)
    {        
    }

    // =============================================
    //  Processing Logic (stubbed for illustration)
    // =============================================

    protected async override Task OnProcessingEventBatchAsync(IEnumerable<EventData> events,
                                                              EventProcessorPartition partition,
                                                              CancellationToken cancellationToken)
    {
        try
        {
            await Application.ProcessEventBatchAsync(events, cancellationToken);
        }
        catch (Exception ex)
        {
            await Application.ReportProcessingErrorAsync(partition.PartitionId, events, ex);
        }
    }

    protected async override Task OnProcessingErrorAsync(Exception exception,
                                                         EventProcessorPartition partition,
                                                         string operationDescription,
                                                         CancellationToken cancellationToken)
    {
        try
        {
            await Application.HandleExceptionAsync(partition.PartitionId, exception);
        }
        catch (Exception ex)
        {
            Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
        }
    }

    protected async override Task OnInitializingPartitionAsync(EventProcessorPartition partition,
                                                               CancellationToken cancellationToken)
    {
        try
        {
            await Application.HandlePartitionInitializingAsync(partition.PartitionId, cancellationToken);
        }
        catch (Exception ex)
        {
            Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
        }
    }

    protected async override Task OnPartitionProcessingStoppedAsync(EventProcessorPartition partition,
                                                                    ProcessingStoppedReason reason,
                                                                    CancellationToken cancellationToken)
    {
        try
        {
            await Application.HandlePartitionStoppedAsync(partition.PartitionId, reason, cancellationToken);
        }
        catch (Exception ex)
        {
            Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
        }
    }
}

Option: Minimum featureset

This option corresponds to revision 1 in the API View for Azure.Messaging.EventHubs and would be the preferred developer experience.

ublic class CustomProcessor : EventProcessor<EventProcessorPartition>
{
    private readonly CheckpointStore _checkpointStore;
    
    public CustomProcessor(int maximumBatchSize,
                           string consumerGroup,
                           string connectionString, 
                           string eventHubName, 
                           CheckpointStore checkpointStore, 
                           EventProcessorOptions options = default) : base (maximumBatchSize, consumerGroup, connectionString, eventHubName, options)
    {
        _checkpointStore = checkpointStore ?? throw new ArgumentNullException(nameof(checkpointStore));
    }
 
    // ==============================
    //  Checkpoint Store Forwarding
    //===============================
    
    protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventProcessorPartitionOwnership> desiredOwnership, 
                                                                                               CancellationToken cancellationToken) => 
        _checkpointStore.ClaimOwnershipAsync(desiredOwnership, cancellationToken);

    protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(CancellationToken cancellationToken) => 
        _checkpointStore.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken);

    protected override Task<EventProcessorCheckpoint> GetCheckpointAsync(string partitionId, 
                                                                         CancellationToken cancellationToken) =>
        _checkpointStore.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, cancellationToken);

    protected override Task UpdateCheckpointAsync(string partitionId, 
                                                  long offset, 
                                                  long? sequenceNumber, 
                                                  CancellationToken cancellationToken) => 
        _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, offset, sequenceNumber, cancellationToken);

    protected override Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(CancellationToken cancellationToken) =>
        throw new NotImplementedException($"Unnecessary; { nameof(GetCheckpointAsync) } will be called instead.");

    // =============================================
    //  Processing Logic (stubbed for illustration)
    // =============================================

    protected async override Task OnProcessingEventBatchAsync(IEnumerable<EventData> events, 
                                                              EventProcessorPartition partition, 
                                                              CancellationToken cancellationToken)
    {
        try
        {
            await Application.ProcessEventBatchAsync(events, cancellationToken);
        }
        catch (Exception ex)
        {
            await Application.ReportProcessingErrorAsync(partition.PartitionId, events, ex);
        }
    }

    protected async override Task OnProcessingErrorAsync(Exception exception, 
                                                         EventProcessorPartition partition, 
                                                         string operationDescription, 
                                                         CancellationToken cancellationToken)
    {
        try
        {
            await Application.HandleExceptionAsync(partition.PartitionId, exception);
        }
        catch (Exception ex)
        {
            Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
        }
    }

    protected async override Task OnInitializingPartitionAsync(EventProcessorPartition partition, 
                                                               CancellationToken cancellationToken)
    {
        try
        {
            await Application.HandlePartitionInitializingAsync(partition.PartitionId, cancellationToken);
        }
        catch (Exception ex)
        {
            Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
        }
    }

    protected async override Task OnPartitionProcessingStoppedAsync(EventProcessorPartition partition, 
                                                                    ProcessingStoppedReason reason, 
                                                                    CancellationToken cancellationToken)
    {
        try
        {
            await Application.HandlePartitionStoppedAsync(partition.PartitionId, reason, cancellationToken);
        }
        catch (Exception ex)
        {
            Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
        }
    }
}

References and Related

  • [FEATURE REQ] Better support for batch processing in Event Hubs SDK (#24075)
  • [Event Hubs] Support for Batch Processing (#26775) (proposed minimal update to EventProcessorClient, as an alternative approach)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment