Despite being a key requirement for extending EventProcessor<T>
, no abstraction exists for processor storage operations. Developers wishing to extend the processor must implement the storage operations required by its abstract
members, holding responsibility for ensuring a production-ready implementation. They must also infer what storage operations may be needed by the application which the processor does not use - such as writing checkpoints that the processor consumes - and provide an implementation for them. This places a burden on developers and introduces a barrier of entry for extending EventProcessor<T>
.
-
The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.
-
Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.
-
Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Event Hubs types, it can likely be assumed that it is for illustration only.
- Allow developers to extend the
EventProcessor<T>
without the burden of implementing storage operations. - Developers should be able to use our existing storage implementations with custom processors with minimal effort.
- Developers should be able to implement custom storage provider in a standard way that can be easily consumed by others.
Developers have registered consistent feedback in the last 24 months that having to implement a custom storage solution makes using EventProcessor<T>
undesirable. Because our batch processing story is built around EventProcessor<T>
, this friction causes developers to see the Azure.Messaging.EventHubs
pacakge as not having parity with the legacy packages and has been expresed as a migration blocker. Developers have expressed a strong desire to be able to make use of our existing Blob Storage implementation from the Azure.Messaging.EventHubs.Processor
, which is not possible, as our implementation is internal.
A new abstract CheckpointStore
type is created that serves as a contract for all storage operations assocaited with the processor. The Azure.Messaging.EventHubs.Processor
package exposes its Blob Storage implementation of the checkpoiknt store as BlobCheckpointStore
, which allows interested parties to make use it while not introducing depencendies on Azure Storage in other packages.
These changes are enough to meet the minimum desired feature set, but still require developers to write boiler code to wire up the processor storage methods to the checkpoint store. If we are willing to made additional adjustments to EventProcessor<T>
, a default checkpoint store implementation can be introduced which preserves backwards compatibility and flexibility while also removing the need for this boilerplate.
Assume that an application is using a custom processor; the basic flow is consistent across options and looks quite a bit like the pattern for EventProcessorClient
, other than not needing to wire up event handlers.
var blobClient = new BlobContainerClient("<< CONNECTION >>", "<< CONTAINER >>");
var checkpointStore = new BlobCheckpointStore(blobClient);
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
var processor = new CustomProcessor(100, consumerGroup, eventHubsConnectionString, eventHubName, checkpointStore);
var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await processor.StartProcessingAsync();
try
{
// The processor performs its work in the background; block until cancellation
// to allow processing to take place.
await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
// This is expected when the delay is canceled.
}
await processor.StopProcessingAsync();
Option: Additional constructors and hidden overloads
This option corresponds to revision 2 in the API View for Azure.Messaging.EventHubs
and would be the preferred developer experience.
public class CustomProcessor : EventProcessor<EventProcessorPartition>
{
public CustomProcessor(int maximumBatchSize,
string consumerGroup,
string connectionString,
string eventHubName,
CheckpointStore checkpointStore,
EventProcessorOptions options = default) : base(maximumBatchSize, consumerGroup, connectionString, eventHubName, checkpointStore, options)
{
}
// =============================================
// Processing Logic (stubbed for illustration)
// =============================================
protected async override Task OnProcessingEventBatchAsync(IEnumerable<EventData> events,
EventProcessorPartition partition,
CancellationToken cancellationToken)
{
try
{
await Application.ProcessEventBatchAsync(events, cancellationToken);
}
catch (Exception ex)
{
await Application.ReportProcessingErrorAsync(partition.PartitionId, events, ex);
}
}
protected async override Task OnProcessingErrorAsync(Exception exception,
EventProcessorPartition partition,
string operationDescription,
CancellationToken cancellationToken)
{
try
{
await Application.HandleExceptionAsync(partition.PartitionId, exception);
}
catch (Exception ex)
{
Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
}
}
protected async override Task OnInitializingPartitionAsync(EventProcessorPartition partition,
CancellationToken cancellationToken)
{
try
{
await Application.HandlePartitionInitializingAsync(partition.PartitionId, cancellationToken);
}
catch (Exception ex)
{
Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
}
}
protected async override Task OnPartitionProcessingStoppedAsync(EventProcessorPartition partition,
ProcessingStoppedReason reason,
CancellationToken cancellationToken)
{
try
{
await Application.HandlePartitionStoppedAsync(partition.PartitionId, reason, cancellationToken);
}
catch (Exception ex)
{
Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
}
}
}
This option corresponds to revision 1 in the API View for Azure.Messaging.EventHubs
and would be the preferred developer experience.
ublic class CustomProcessor : EventProcessor<EventProcessorPartition>
{
private readonly CheckpointStore _checkpointStore;
public CustomProcessor(int maximumBatchSize,
string consumerGroup,
string connectionString,
string eventHubName,
CheckpointStore checkpointStore,
EventProcessorOptions options = default) : base (maximumBatchSize, consumerGroup, connectionString, eventHubName, options)
{
_checkpointStore = checkpointStore ?? throw new ArgumentNullException(nameof(checkpointStore));
}
// ==============================
// Checkpoint Store Forwarding
//===============================
protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventProcessorPartitionOwnership> desiredOwnership,
CancellationToken cancellationToken) =>
_checkpointStore.ClaimOwnershipAsync(desiredOwnership, cancellationToken);
protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(CancellationToken cancellationToken) =>
_checkpointStore.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken);
protected override Task<EventProcessorCheckpoint> GetCheckpointAsync(string partitionId,
CancellationToken cancellationToken) =>
_checkpointStore.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, cancellationToken);
protected override Task UpdateCheckpointAsync(string partitionId,
long offset,
long? sequenceNumber,
CancellationToken cancellationToken) =>
_checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, offset, sequenceNumber, cancellationToken);
protected override Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(CancellationToken cancellationToken) =>
throw new NotImplementedException($"Unnecessary; { nameof(GetCheckpointAsync) } will be called instead.");
// =============================================
// Processing Logic (stubbed for illustration)
// =============================================
protected async override Task OnProcessingEventBatchAsync(IEnumerable<EventData> events,
EventProcessorPartition partition,
CancellationToken cancellationToken)
{
try
{
await Application.ProcessEventBatchAsync(events, cancellationToken);
}
catch (Exception ex)
{
await Application.ReportProcessingErrorAsync(partition.PartitionId, events, ex);
}
}
protected async override Task OnProcessingErrorAsync(Exception exception,
EventProcessorPartition partition,
string operationDescription,
CancellationToken cancellationToken)
{
try
{
await Application.HandleExceptionAsync(partition.PartitionId, exception);
}
catch (Exception ex)
{
Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
}
}
protected async override Task OnInitializingPartitionAsync(EventProcessorPartition partition,
CancellationToken cancellationToken)
{
try
{
await Application.HandlePartitionInitializingAsync(partition.PartitionId, cancellationToken);
}
catch (Exception ex)
{
Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
}
}
protected async override Task OnPartitionProcessingStoppedAsync(EventProcessorPartition partition,
ProcessingStoppedReason reason,
CancellationToken cancellationToken)
{
try
{
await Application.HandlePartitionStoppedAsync(partition.PartitionId, reason, cancellationToken);
}
catch (Exception ex)
{
Application.ReportHandlerError(nameof(OnInitializingPartitionAsync), ex);
}
}
}