Skip to content

Instantly share code, notes, and snippets.

@jsquire
Last active June 24, 2021 14:06
Show Gist options
  • Save jsquire/03413ccb734d5e5e990542fc9441ff88 to your computer and use it in GitHub Desktop.
Save jsquire/03413ccb734d5e5e990542fc9441ff88 to your computer and use it in GitHub Desktop.
Streaming Producer: Early Concept

Streaming Producer: Early Concept

The current story for publishing events offers a natural idiom when an application has a discrete set of events that it would like to treat as a single unit. When the producer is invoked, it offers a strong contract what allows the application to immediately understand whether publishing was successful or not. This approach is has many advantages, such as ensuring the client is not bound to any particular partition and can support publishing concurrently.

For applications that unable to efficiently batch events into a discrete set, the approach is less compelling. Publishing a single event or a small set can be inefficient and may impact throughput. For applications that view an event as a single item of data, the responsibility of managing batches introduces additional overhead and boilerplate code to collect those single events, track them, and package them as a batch efficiently.

Things to know before reading

  • This is an early concept meant to serve as a point of discussion. It hasn't been fully designed and should be expected to have gaps and undiscovered scenarios. Nothing has been discussed with the SDK feature team or architects at this point.

  • The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.

  • Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.

  • Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Event Hubs types, it can likely be assumed that it is for illustration only. These methods will most often use ellipses for the parameter list, in order to help differentiate them.

Goal

Support a paradigm where callers can request to queue an event for publishing and receive notification when publishing of that event was successful or has failed. The client should handle all details around queuing events, building efficient batches, and scheduling publishing.

Key concepts

  • Each event queued for publishing is considered individual; there is no support for bundling events and forcing them to be batched together.

  • The streaming functionality should be contained in a dedicated client type; the EventHubProducerClient API should not be made more complicated by supporting two significantly different sets of publishing semantics and guarantees.

  • Streaming support requires a stateful client for each partition or partition key used; to ensure that the resource cost is known, the streaming client will be bound to either: automatic routing, a single partition, or a single partition key.

  • Streaming support wraps an EventHubProducerClient instance. This allows a single producer/connection to be shared and resource use to be controlled.

Usage examples

Creating the client for automatic partition routing

// Create the producer client

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var client = new EventHubProducerClient(connectionString, eventHubName)

// Create the streaming producer with default options

var producer = new StreamingProducer(client);

Creating the client for a partition

// Create the producer client

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var client = new EventHubProducerClient(connectionString, eventHubName);

// Create the streaming producer

var producer = new StreamingProducer(client, new StreamingProducerOptions
{
    PartitionId = "0"
});

Creating the client for a partition key

// Create the producer client

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var client = new EventHubProducerClient(connectionString, eventHubName);

// Create the streaming producer

var producer = new StreamingProducer(client, new StreamingProducerOptions
{
    PartitionKey = "some_key"
});

Creating the client with custom options

// Create the producer client
    
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var client = new EventHubProducerClient(connectionString, eventHubName, new EventHubProducerClientOptions
{       
   EnableIdempotentPartitions = true,
   RetryOptions = new EventHubsRetryOptions { TryTimeout = TimeSpan.FromMinutes(5) }
});
    
// Create the streaming producer

var producer = new StreamingProducer(client, new StreamingProducerOptions
{
    Identifier = "Streaming Producer for Partition 0",
    PartitionId = "0",
    MaximumWaitTime = TimeSpan.FromMilliseconds(250),
    MaximumQueuedEventCount = 500
});    

Publish events using the Streaming Producer

// Define handlers for the streaming events
    
void PublishSuccessfulHandler(StreamingProducer sender, PublishSucceededEventArgs args)
{
    foreach (var eventData in args.Events)
    {
        var eventId = eventData.Properties["event-id"];
        Console.WriteLine($"Event: { eventId } was published.");
    }
}

void PublishFailedHandler(StreamingProducer sender, PublishFailedEventArgs args)
{
    Console.WriteLine("Publishing FAILED!");        
    Console.WriteLine("\tEvents:");
    
    foreach (var eventData in args.Events)
    {
        var eventId = eventData.Properties["event-id"];
        Console.WriteLine($"\t\t{ eventId } ");
    }
    
    Console.WriteLine($"\tException: { args.Exception.ToString() }");
}

// Create the producer client

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var client = new EventHubProducerClient(connectionString, eventHubName)

// Create the streaming producer with default options

var producer = new StreamingProducer(client);

producer.PublishSucceeded += PublishSuccessfulHandler;    
producer.PublishFailed += PublishFailedHandler;

try
{
    while (TryGetNextEvent(out var eventData))
    {
        // The queue is bounded and has limited capacity to prevent unchecked 
        // growth.  When the queue is full, this call waits for room.
        // 
        // Publishing will take place in the background, transparent to the caller.
        
        await producer.QueueForSendAsync(eventData);
        Console.WriteLine($"There are { producer.QueuedEventCount } events queued for publishing.");
    }
}
finally
{
    // Closing the producer will flush the queue, publishing any events that are still bending.
    
    await producer.CloseAsync();
    producer.PublishSucceeded -= PublishSuccessfulHandler;
    producer.PublishFailed -= PublishFailedHandler;
    
    await client.DisposeAsync();
}

API Skeleton

Azure.Messaging.EventHubs.Producer

public class PublishSucceededEventArgs : EventArgs
{
    public IEnumerable<EventData> Events { get; init; }
    public PublishSucceededEventArgs(IEnumerable<EventData> events);
}

public class PublishFailedEventArgs : EventArgs
{
    public IEnumerable<EventData> Events { get; init; }
    public Exception Exception { get; init; }
    public PublishFailedEventArgs(IEnumerable<EventData> events, Exception ex);
}

public class StreamingProducerOptions
{
    public string Identifier { get; set; }
    public string PartitionKey { get; set; }
    public string PartitionId { get; set; }
    public TimeSpan? MaximumWaitTime { get; set; }
    public int MaximumQueuedEventCount { get; set; }      
}

public class StreamingProducer
{
    public event Action<StreamingProducer, PublishSucceededEventArgs> PublishSucceeded;
    public event Action<StreamingProducer, PublishFailedEventArgs> PublishFailed;

    public string FullyQualifiedNamespace { get; }
    public string EventHubName { get; }
    public string Identifier { get; }
    public int QueuedEventCount { get; }
    
    public bool IsActive { get; protected set;}
    public bool IsClosed { get; protected set; }
    
    public StreamingProducer(EventHubProducerClient producer, StreamingProducerOptions options = default);
    protected StreamingProducer();
    
    public virtual Task QueueForSendAsync(EventData eventData, CancellationToken cancellationToken);
    public virtual Task FlushAsync(CancellationToken cancellationToken);
    public virtual Task CloseAsync(CancellationToken cancellationToken);
    
    protected virtual void OnPublishSucceeded(EventData eventData);
    protected virtual void OnPublishFailed(EventData eventData, Exception ex);
}

Improvements to Consider

  • Add a sync QueueForSend that throws when the queue is full
  • Add the ability to request that an event/batch be resent by signaling the EventArgs from the handler
  • Add an option to ignore ordering of events and use that to build/publish batches concurrently rather than waiting for each sent to complete
  • Consider having the producer do hashing for partition keys and send directly to a partition rather than using the gateway
  • Consider the ability to send to the gateway (no partition key) or a partition dynamically, rather than binding to one on creation
  • Consider whether this should be constructed directly or by calling a CreateStreamingProducer method from the producer client
  • Event for pre-send that exposes the set of events that is about to be published (not mutable) and blocks until after the handler returns. Can be used for checkpointing.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment