Skip to content

Instantly share code, notes, and snippets.

@jsquire
Last active April 24, 2021 08:23
Show Gist options
  • Select an option

  • Save jsquire/1cc4db6b3ca4ef13d26dc5315483555b to your computer and use it in GitHub Desktop.

Select an option

Save jsquire/1cc4db6b3ca4ef13d26dc5315483555b to your computer and use it in GitHub Desktop.
Design Proposal: Idempotent Event Publishing

Event Hubs: Idempotent Event Publishing

When publishing events to Event Hubs, timeouts or other transient failures may introduce ambiguity into the understanding of whether a batch of events was received by the service. Because Event Hubs has an at-least-once guarantee for delivery and consumers are strongly encouraged to be idempotent in their processing, the common approach is to resend any batch where the status of receipt was unknown.

In some specialized scenarios, producers have a need to make efforts to avoid publishing duplicate events. To support these scenarios, the Event Hubs service is adding support for annotating messages with metadata to indicate the sequence in which events were intended to be published and to use that as an indicator of which events were already received by a the target partition. This functionality comes at a performance cost, however, and requires producers to follow a strict set of semantics to allow the service to perform server-side deduplication based on the intent of the producer.

It is important to note that idempotent publishing endeavors to reduce the number of duplicate events that are published, but cannot fully eliminate them. The guarantee of the Event Hubs service is not altered by this feature and remains an at-least-once guarantee.

Things to know before reading

  • The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.

  • Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.

  • Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Service Bus or Event Hubs types, it can likely be assumed that it is for illustration only. These methods will most often use ellipses for the parameter list, in order to help differentiate them.

Target segment: Developers with specialized needs

These are developers working on products which have special needs that are often specialized and do not fit into the majority case for many Event Hubs client library users. While this segment has a much smaller addressable market, those that fall into this segment often drive a large amount of Azure Consumed Revenue (ACR).

In the context of the idempotent publishing feature, these developers are ones for whom expensive to process duplicates when consuming events. In order to reduce duplicates when consuming, they are willing to take on additional complexity, conform to a restrictive set of semantics, and experience some performance degradation when publishing.

Why this is needed

The Event Hubs service is offering published event deduplication as a new feature. Because usage requires metadata to be sent when establishing the connection to the Event Hubs service and reacting to information received at the transport-level, client library support will be needed to take take advantage of the feature.

Terminology

  • Idempotent Producer is the name used by the Event Hubs service team for the feature performing server-side deduplication.

  • Idempotent denotes that an operation may be safely performed multiple times without the end state changing after the first successful execution. In the context of this feature, the use of idempotent is constrained to the publishing of events to a specific partition, by a specific producer client and is limited to the period of time in which the Event Hubs service retains the associated state.

High level scenarios

Real-time data aggregation and analysis

A financial company offers stock brokering service to its customers, offering a fully managed experience where brokers monitor market activity and make trades in response. Because of the volatility of the markets, access to real-time information with the highest accuracy and lowest possible latency is paramount. An extra second of delay or too large an inaccuracy of a calculation can make a significant difference in the valuation of a trade, sometimes swinging the amount by millions of dollars.

For the platform that compiles market data, the cost of guaranteeing idempotency when processing data is too high. While a very small amount of duplicate processing is acceptable, there exists a threshold where too many duplicates would negatively impact the margin of accuracy, introducing unreliable aggregate data. Because data is compiled from multiple sources, each flowing into a set of message brokers, there is slightly less sensitivity over timing and availability for a single source. This provides an opportunity to apply some form of duplicate prevention without unreasonable risk of impact to the downstream system.

Though there is reduced sensitivity to latency, it is still highly important to maintain a consistently high throughput when publishing events to the broker. Ideally, this is a function of the broker, allowing the approach for deduplication to be applied consistently across data sources and allowing producers to concentrate on publishing.

Proposed approach

  • Extend the EventHubProducerClient to allow opting into idempotent publishing. The majority use case will be to treat the Event Hubs service as the system of record and accept its values for partition state.

  • To support advanced scenarios, the client will accept a set of options to be specified to each partition so that metadata can be specified, including the producer group identifier, owner level, and starting sequence number. These overrides will be communicated to the service when a link is established.

  • When idempotent publishing is enabled, the client will only support idempotent semantics; publishing to the Event Hubs gateway for automated routing or use of a partition key will not be allowed.

  • The client will allow publishing to any valid partition of the Event Hub, applying idempotent semantics individually. No state will be shared between partitions.

  • The EventHubProducerClient owns and manages the state associated with idempotent publishing. Once initialized, callers are not permitted to mutate state to avoid corruption.

  • When the EventHubProducerClient performs a publishing operation, it will apply the sequencing to each event. Those sequence numbers will be stable and consistent within the scope of that call, across any retries. At the conclusion of the call, if publishing was successful, each of the events will retain the assigned sequence number. This sequencing mutates a specific instance of the event, such that the data that was sent to the service and the reference to the event instance held by the caller will both have the same sequence number.

  • If publishing for a set of events ends in failure, after all retries have been exhausted, the events will not retain their sequence numbers. Should a developer wish to send the same set again, the EventHubProducerClient will recognize it as a new request.

  • To ensure consistency of state, the EventHubProducerClient will ensure that there is only one publishing operation active for a given partition. The client will not allow concurrent publishing requests to a partition, but since partitions are treated independently, concurrent requests for different partitions are allowed.

  • The EventHubProducerClient will hold responsibility for recovering connections and links to the service as part of its current design for resilience. In the case that the producer becomes unstable and must be recreated, its state will be reset.

Usage examples (mainstream)

Create a producer that opts into event publishing idempotency

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

Publish an event batch

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Create and fill the batch
var batchOptions = new CreateBatchOptions  
{
    PartitionId = "0"
};

var shouldAddEvent = true;
using var batch = await producer.CreateBatchAsync(batchOptions);

while (shouldAddEvent)
{
    shouldAddEvent = batch.TryAdd(GenerateEvent(...));
}

// Sequence numbering is applied by the producer when SendAsync is called.
await producer.SendAsync(batch);

Publish a set of events

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Create the events
var eventsToSend = Enumerable
    .Range(0, 24)
    .Select(index => GenerateEvent(...));

// Sequence numbering is applied by the producer when SendAsync is called.
await producer.SendAsync(events, new SendEventOptions 
{
    PartitionId = "1"
});

Publish events to multiple partitions

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Create the events
var firstPartitionEvents = Enumerable
    .Range(0, 24)
    .Select(index => GenerateEvent(...));

var secondPartitionEvents = Enumerable
    .Range(0, 24)
    .Select(index => GenerateEvent(...));

// Send to the first partition
await producer.SendAsync(firstPartitionEvents, new SendEventOptions 
{
    PartitionId = "0"
});

// Send to the second partition. 
await producer.SendAsync(secondPartitionEvents, new SendEventOptions 
{
    PartitionId = "1"
});

Publish to multiple partitions concurrently

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Create the events
var firstPartitionEvents = Enumerable
    .Range(0, 24)
    .Select(index => GenerateEvent(...));

var secondPartitionEvents = Enumerable
    .Range(0, 24)
    .Select(index => GenerateEvent(...));

// Send to the first partition
var partitionZeroTask = producer.SendAsync(firstPartitionEvents, new SendEventOptions 
{
    PartitionId = "0"
});

// Send to the second partition. 
var partitionOneTask = producer.SendAsync(secondPartitionEvents, new SendEventOptions 
{
    PartitionId = "1"
});

await Task.WhenAll(partitionZeroTask, partitionOneTask);

Inspect sequence numbers for a published batch

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Create and fill the batch
using var batch = await producer.CreateBatchAsync(new CreateBatchOptions
{
    PartitionId = "0"
});

batch.TryAdd(GenerateEvent(...));
batch.TryAdd(GenerateEvent(...));
batch.TryAdd(GenerateEvent(...));

// Publish the batch
try
{
    await producer.SendAsync(batch);

    var startingSequenceNumber = batch.StartingPublishedSequenceNumber;
    var endingSequenceNumber = (startingSequenceNumber + batch.Count);
}
catch
{
    // Publishing has failed; the batch will not have a starting
    // sequence number assigned.
}

Inspect the sequence number of a published event

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Create and publish the event
try
{
    await producer.SendAsync(new[] { GenerateEvent(...) }, new SendEventOptions 
    {
        PartitionId = "1"
    });

    var sequenceNumber = eventData.PublishedSequenceNumber;
}
catch
{
    // Publishing has failed; the event will not have a
    // sequence number assigned.
}

Retry publishing after a failure

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Create and fill the batch
using var batch = await producer.CreateBatchAsync(new CreateBatchOptions
{
    PartitionId = "0"
});

batch.TryAdd(GenerateEvent(...));
batch.TryAdd(GenerateEvent(...));
batch.TryAdd(GenerateEvent(...));

// Publish the batch
var maxRetries = 5;
var tryCount = 0;

while (++tryCount <= maxRetries)
{
    try
    {
        await producer.SendAsync(batch);
        shouldRetry = false;
    }
    catch (TimeoutException)
    {
        // The state of publishing is unknown; retrying in this case
        // may result in duplicates.
    }
    catch (EventHubsException ex) 
        when (ex.IsTransient || ex.Reason == EventHubsException.FailureReason.ServiceTimeout)
    {
        // For most cases, the state is deterministic and publishing 
        // has failed; retrying would not produce duplicates.
        //
        // However, in cases such as when the reason ServiceTimeout,
        // the state may be unclear; retrying may result in duplicates.
    }
}

Detect that another producer claimed a partition

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

// If not specified, the OwnerLevel is assumed to be 0.
options.PartitionOptions["0"] = new PartitionOptions
{
    OwnerLevel = 2
};

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Create and send the events
var eventSet = Enumerable
    .Range(0, 24)
    .Select(index => GenerateEvent(...));
   
try
{
    await producer.SendAsync(eventSet, new SendEventOptions 
    {
        PartitionId = "0"
    });
}
catch (EventHubsException ex) when
   (ex.Reason == EventHubsException.FailureReason.ProducerDisconnected)
{
    // The partition has been claimed by another producer.  This application
    // should no longer attempt to publish to the partition.
}

Detect invalid client state

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Create and send the events
try
{
    await producer.SendAsync(new[] { GenerateEvent(...) }, new SendEventOptions 
    {
        PartitionId = "1"
    });
}
catch (EventHubsException ex) 
    when (ex.Reason == EventHubsException.FailureReason.InvalidClientState)
{
    // The client state is invalid and the producer should be recreated.
    // Under normal circumstances, this should not happen.
}

Usage examples (advanced)

Create a producer that opts into event publishing idempotency and sets optional metadata

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

// The other values are populated by the values returned
// by the service when the link is opened.
options.PartitionOptions["0"] = new PartitionOptions
{
    OwnerLevel = 9
};

// These values are sent to the service when opening the link and will
// attempt to override any service state.
options.PartitionOptions["1"] = new PartitionOptions
{
    ProducerGroupId = 8675309,
    OwnerLevel = 3,
    StartingSequenceNumber = 121
};

// Partitions without explicit options will apply the defaults.
await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

Inspect the producer state for a partition

// Create the client
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

// Query the state for the partition.
var state = await producer.GetPartitionPublishingPropertiesAsync("0");

Log.Information($"Partition { state.PartitionId }:");
Log.Information($"    Idempotent Publishing Enabled is { state.IsIdempotentPublishingEnabled }";)
Log.Information($"    Producer Group Id is { state.ProducerGroupId ?? "not set" }");
Log.Information($"    Owner Level is { state.OwnerLevel ?? "not set" }");
Log.Information($"    Last Sequence Number is { state.LastPublishedSequenceNumber ?? "not set" }");

Topics for Discussion

Event Hubs Service

  • The type for the epoch (OwnerIdentifier) is short for the producer but long for the consumer. Can we unify on long? (see: PartitionReceiver.Epoch from the T1 client)

Producer

  • Should any published EventBatch or EventData be marked up with a sequence number (as currently illustrated) or should we instead use a boolean property to denote "WasPublished"?

  • GetPartitionPublishingPropertiesAsync sounds very close to GetPartitionPropertiesAsync and, likewise, PartitionPublishingProperties is close to PartitionProperties. This could be potentially confusing, as one is reading client state and the other is querying the service for runtime information.

    This member is meant to be a reflection of the ReadLastEnqueuedEventProperties approach that we use on the consumer side of things. This is an area that we should definitely discuss naming.

Exception Cases

  • It's unfortunate that the existing error reason is called ConsumerDisconnected rather than being more generic. Should we use the existing reason when the producer is disconnected due to the OwnerLevel (epoch) competition, or should we create one with more clear semantics, such as ProducerDisconnected?

API Skeleton

Azure.Messaging.EventHubs.Producer

// =================
//  New Types
// =================

public class PartitionPublishingOptions
{
    public long? ProducerGroupId  { get; set; }
    public short? OwnerLevel { get; set; }
    public int? StartingSequenceNumber { get; set; }
}

public class PartitionPublishingProperties
{
    public bool IsIdempotentPublishingEnabled { get; }
    public string PartitionId { get; }
    public long? ProducerGroupId  { get; }
    public short? OwnerLevel { get; }
    public int? LastPublishedSequenceNumber  { get; }
}

// =================
//  Existing Types
// =================

public class EventHubProducerOptions
{
    // New members
    public bool EnableIdempotentPartitions { get; set; }
    public Dictionary<string, PartitionPublishingOptions> PartitionOptions { get; }
    
    // Existing members
    public EventHubConnectionOptions ConnectionOptions { get; set; }
    public EventHubsRetryOptions RetryOptions { get; set; }
}

public class EventHubProducerClient
{
    // New members
    public Task<PartitionPublishingProperties> GetPartitionPublishingPropertiesAsync(string partitionId);
    
    // Existing members
    public string EventHubName { get; }
    public string FullyQualifiedNamespace { get; }
    public bool IsClosed { get; protected set; }
    public virtual Task CloseAsync(CancellationToken cancellationToken = default);
    public virtual ValueTask<EventDataBatch> CreateBatchAsync(CancellationToken cancellationToken = default);
    public virtual ValueTask<EventDataBatch> CreateBatchAsync(CreateBatchOptions options, CancellationToken cancellationToken = default);
    public virtual ValueTask DisposeAsync();
    public virtual Task<EventHubProperties> GetEventHubPropertiesAsync(CancellationToken cancellationToken = default);
    public virtual Task<string[]> GetPartitionIdsAsync(CancellationToken cancellationToken = default);
    public virtual Task<PartitionProperties> GetPartitionPropertiesAsync(string partitionId, CancellationToken cancellationToken = default);
    public virtual Task SendAsync(IEnumerable<EventData> eventBatch, CancellationToken cancellationToken = default);
    public virtual Task SendAsync(IEnumerable<EventData> eventBatch, SendEventOptions options, CancellationToken cancellationToken = default);
    public virtual Task SendAsync(EventDataBatch eventBatch, CancellationToken cancellationToken = default);
}

public sealed class EventDataBatch : IDisposable 
{
    // New members
    public int? StartingPublishedSequenceNumber { get; }
    
    // Existing members
    public int Count { get; }
    public long MaximumSizeInBytes { get; }
    public long SizeInBytes { get; }
    public void Dispose();
    public bool TryAdd(EventData eventData);
}

Azure.Messaging.EventHubs

public class EventData 
{
    // New members
    public int? PublishedSequenceNumber { get; }
    
    // Existing members
    public EventData(ReadOnlyMemory<byte> eventBody);
    public ReadOnlyMemory<byte> Body { get; }
    public Stream BodyAsStream { get; }
    public DateTimeOffset EnqueuedTime { get; }
    public long Offset { get; }
    public string PartitionKey { get; }
    public IDictionary<string, object> Properties { get; }
    public long SequenceNumber { get; }
    public IReadOnlyDictionary<string, object> SystemProperties { get; }
}

public class EventHubsException : Exception
{
    public enum FailureReason 
    {
        // New members (will be appended to the existing set to ensure stable values)
        ProducerDisconnected,
        InvalidClientState,
        
        // Existing members
        GeneralError,
        ClientClosed,
        ConsumerDisconnected,
        ResourceNotFound,
        MessageSizeExceeded,
        QuotaExceeded,
        ServiceBusy,
        ServiceTimeout,
        ServiceCommunicationProblem
    }
}

Appendix: Requirements use cases

Target segment: Developers with specialized needs

  • Developers should be able to create a producer for idempotent publishing.

  • Once a producer is created, developers should not be able to disable idempotent publishing if it was enabled; the state of the feature is determined at the time of creation.

  • Once a producer is created, developers should not be able to enable idempotent publishing if it was disabled; the state of the feature is determined at the time of creation.

  • Producers with idempotent publishing enabled should only support idempotent publishing. Developers should be presented with a single, consistent set of semantics.

  • Developers should not be able to publish to the Event Hubs gateway for automatic routing or with a partition key using a producer with idempotent publishing enabled, as idempotency is only supported when publishing directly to a partition.

  • Developers should be able to publish to any partition of the Event Hub using the producer; each partition should function independently and not interfere or influence the state of other partitions.

  • Developers should be able to create events and batches without consideration of idempotency. Sequence numbering should be applied by the client when the user attempts to publish an event batch or set of events.

  • Developers should be able to publish an event batch to a partition without the need to manually coordinate sequencing.

  • Developers should be able to publish a set of events to a partition without the need to manually coordinate sequencing.

  • If an event batch was successfully published, the batch should be updated to reflect the starting sequence number applied for publishing; using the starting sequence number and count of events in the batch, developers should be able to understand the full set of sequence numbers that were published by the associated SendAsync call.

  • If a set of events was successfully published, each event should be updated to reflect the sequence number that was published; by inspecting the sequence number of each event in the set, developers should be able to understand the full set of sequence numbers that were published by the associated SendAsync call.

  • If an event batch or set of events is successfully published, developers should be notified of success.

  • If a publishing operation fails with a transient error, the producer should perform retries as governed by the configured retry policy. During retries, the sequence numbers assigned to events should not change; each attempt should pass the same set of events using the same sequence numbers to the Event Hubs service.

  • If a publishing operation fails with a transient error after all retry attempts have been exhausted, developers should be presented with a meaningful error.

  • If a publishing operation fails with a fatal error, the producer will not attempt to retry and will consider the operation a failure immediately. Developers should be presented with a meaningful error.

  • If a publishing operation fails with a fatal error related to sequencing, the producer will not attempt to retry and will consider the operation a failure immediately. Developers should be presented with a meaningful error that indicates that it is not safe to continue using the producer to publish events to the associated partition; developers should be encouraged to close and recreate the producer.

  • If publishing of an event batch completes with failure, the batch should not be updated to reflect sequencing. Developers should be able to publish the provide the same batch to SendAsync and trigger a new publishing operation that will sequence the batch and treat it as if no previous attempt to publish was made.

  • If publishing of a set of events completes with failure, none of the events should not be updated to reflect sequencing. Developers should be able to provide the same events, in this set or in different sets, to SendAsync and trigger a new publishing operation that will sequence the events and treat them as if no previous attempt to publish was made.

  • If a user cancels a publishing operation, the producer should respect the request while ensuring that it takes place at a safe and deterministic point. If cancellation is triggered and events have not been confirmed as accepted by the service, behavior will mirror the failure scenario. If events have already been accepted by the service, behavior will mirror the successful send scenario.

Target segment: Advanced developers with specialized needs

  • All use cases from the “Developers with specialized needs” segment apply here as well.

  • Developers should be able to query the state of a partition using the producer. The state should reflect the critical attributes for idempotent publishing, including the Producer Group Id, Owner Level, and Latest Sequence Number.

  • Developers should be able to choose the sequence number at which sequencing starts for a partition and express that preference to both the producer and the Event Hubs service. For partitions that the user does not provide explicit configuration, the sequence number specified by the Event Hubs service should be used.

Appendix: Client behavior

A new client is created (majority case)

An EventHubProducerClient created by calling one of the constructors and passing a set of options with EventHubProducerClient.EnableIdempotentPartitions set.

1) When a link is opened for a specific partition:

  - The desired capability for an idempotent producer is set.
  - When the link is created, the service returns: Producer Group Id, Owner Level, Starting Sequence Number.
  - The returned data is set as part of the state of the associated partition for the producer.
2) The link and client state have been initialized; the producer is ready to use.

A new batch is created

- Creation of a batch is performed by calling EventHubProducerClient.CreateBatchAsync.
- For the batch to be used with idempotent publishing, the CreateBatchOptions.PartitionId must be passed.
- On creation, the batch is unlocked and events may be added to the batch normally with no behavioral changes.

A new event is created

- Creation of an event is performed by calling the constructor for EventHubData.
- On creation, the event is initialized to denote that it has not been published.
- The event may be populated and manipulated normally with no behavioral changes.

An event is added to a batch

  - Events are not assigned an actual sequence number when added to a batch.  This ensures that 
    batches do not have the constraint of having to be published in the order they were created.  
  
  - The events in a batch will be assigned a sequence number when SendAsync is called; this allows 
    the batch to follow the same semantics as when an IEnumerable<EventData> is sent.
  
  - To ensure that an event can fit into the batch, he event is temporarily assigned an artificial 
    sequence number of Int32.MaxValue for measurement purposes.

A batch of events is published

Publishing begins by calling EventHubProducerClient.SendAsync(EventDataBatch)

1) Determine if there is an active operation for the requested partition.

  IF there is:
    - Wait for the current operation to complete.
    - Set this as the active operation.
  
  ELSE:
    - Set this as the active operation.
2) Validate and annotate the batch.

  IF the batch has already been published:  
    - Fail as an invalid operation.
 
 ELSE:  
    - Each event in the batch is assigned a sequence number.
    - The batch is marked as having been published.
    - The batch is locked to prevent edits.
    - The batch is annotated to denote that it was published.
3) Send the events to the Event Hubs service.

  IF the send was successful:
    - The call completes and the events have been published or were identified as known duplicates.
    - The batch has a starting sequence number assigned to it.
    - The batch remains locked for edits.
    - The batch cannot be published as a new request, but may be resent using idempotent semantics.
    - Producer state is updated to reflect the batch sequencing.
    
  IF the send was cancelled by the caller: 
    - The batch has no sequence number assigned.
    - The batch is not locked for edits.
    - The batch may be published again and will be treated as a new request.
    - Producer state is not updated.
  
  ELSE:
    - Retry policies are followed, branching to the success case or returning here.
    - If all eligible retries were exhausted, an exception is surfaced.
    - Depending on the exception, events may or may not have been published.
    - The batch has no sequence number assigned.
    - The batch is not locked for edits.
    - The batch may be published again and will be treated as a new request.
    - Producer state is not updated.

A set of events is published

Publishing begins by calling EventHubProducerClient.SendAsync(IEnumerable<EventData>)

1) Determine if there is an active operation for the requested partition.

  IF there is:
    - Wait for the current operation to complete.
    - Set this as the active operation.
  
  ELSE:
    - Set this as the active operation.
2) Validate the set of events.

  IF any event in the set has been assigned a sequence number:  
    - Fail as an invalid operation.
 
 ELSE:  
    - Each event in the set is assigned a sequence number.
3) Send the events to the Event Hubs service.

  IF the send was successful:
    - The call completes and the events have been published or were identified as known duplicates.
    - Each event is mutated with a sequence number annotation; callers may view the assigned sequence number.
    - The events cannot be published as a new request, but may be resent using idempotent semantics.
    - Producer state is updated to reflect the event sequencing.
    
  IF the send was cancelled by the caller: 
    - The sequence number is unassigned from all events.
    - Each event is left without annotation; the client will view these as unpublished.
    - The events may be published again and will be treated as a new request.
    - Producer state is not updated.
  
  ELSE:
    - Retry policies are followed, branching to the success case or returning here.
    - If all eligible retries were exhausted, an exception is surfaced.
    - Depending on the exception, events may or may not have been published.
    - Each event is left without annotation; the client will view these as unpublished.
    - The events may be published again and will be treated as a new request.
    - Producer state is not updated.

A set of events is published, but the sequence is not recognized as valid by the service

- This scenario should not be possible without state corruption.
- The service will return an exception which the client should consider fatal.
- The client should surface an exception that indicates invalid client state.
- Callers should not attempt to use the client and should recreate it.

Recreating a link for a partition that was previously established

This is an implicit activity; no user interaction is needed. This flow is only used to recreate a link that had previously been established and which had been closed/disconnected.

1) When a link is re-opened for a specific partition:

  - The desired capability for an idempotent producer is set.
  
  - When the link is requested, the client sends any of the following that were found in the 
    existing client state: 
      - Producer Group Id
      - Owner Level
      - Starting Sequence Number 
  
  - When the link is created, the service returns: Producer Group Id, Owner Level, Starting Sequence Number.
  
  IF the service accepts the client state:
    - The returned data overrides the state of the associated partition for the producer.
    - The link and client state have been initialized; the producer is ready to use.
    
  ELSE:
    - The client is in an invalid state.
    - The client should surface an exception that indicates invalid client state.
    - Callers should not attempt to use the client and should recreate it.

Recovering after a crash (advanced case)

Recovering after a crash or corrupted client can take two forms, either resetting state or restoring the previous state of the producer.

In the majority case, the overhead needed to track state in a durable manner such that it can survive a process crash is enough that doing so is undesirable. In this case, the application will accept the possibility of seeing a number of duplicates equal to the last set of events that was published.

For applications that have an extremely low tolerance for consuming duplicate events, they will need to persist producer state for each partition intermittently. This process can be visualized as "checkpoints for producers."

1) An `EventHubProducerClient` created by calling one of the constructors and passing a set of options with
  EventHubProducerClient.EnableIdempotentPartitions set.

  IF the producer state is being reset to match the service state:
    - No additional options are used when creating the producer client.
    
  ELSE:
    - The state of each partition is specified as part of the IdempotentPublishingOptions.PartitionOptions
      dictionary when creating the producer client.
      
    - Each IdempotentPublishingOptions.PartitionOptions member is used to initialize the client state for the 
      associated partition.
2) When a link is re-opened for a specific partition:

  - The desired capability for an idempotent producer is set.
  
  - When the link is requested, the client sends any of the following that were found in the 
    existing client state: 
      - Producer Group Id
      - Owner Level
      - Starting Sequence Number 
  
  IF the service accepts the client state:
    - The returned data overrides the state of the associated partition for the producer.
    - The link and client state have been initialized; the producer is ready to use.
    
  ELSE:
    - The client is in an invalid state.
    - The client should surface an exception that indicates invalid client state.
    - Callers should not attempt to use the client and should recreate it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment