Skip to content

Instantly share code, notes, and snippets.

@jsquire
Last active September 23, 2020 19:13
Show Gist options
  • Save jsquire/0f3bc5701388fe03ccd027e86f1994f1 to your computer and use it in GitHub Desktop.
Save jsquire/0f3bc5701388fe03ccd027e86f1994f1 to your computer and use it in GitHub Desktop.
Event Hubs: Idempotent Publishing Examples by Use Case

Event Hubs: Idempotent Event Publishing

Things to know before reading

  • The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.

  • Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.

  • Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Event Hubs types, it can likely be assumed that it is for illustration only. These methods will most often use ellipses for the parameter list, in order to help differentiate them.

Use Case Skeleton

Target segment: Developers with specialized needs

  • Users should be able to create a producer for idempotent publishing.

  • Once a producer is created, users should not be able to disable idempotent publishing if it was enabled; the state of the feature is determined at the time of creation.

  • Once a producer is created, users should not be able to enable idempotent publishing if it was disabled; the state of the feature is determined at the time of creation.

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);
  • Producers with idempotent publishing enabled should only support idempotent publishing. Users should be presented with a single, consistent set of semantics.

  • Users should not be able to publish to the Event Hubs gateway for automatic routing or with a partition key using a producer with idempotent publishing enabled, as idempotency is only supported when publishing directly to a partition.

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

try
{
   var eventData = CreateEvent(...); 
   await producer.SendAsync(new[] { eventData });
}
catch (InvalidOperationException)
{
    // Must send to a partition;  automatic routing and use of a partition
    // key are not allowed.
}
  • Users should be able to publish to any partition of the Event Hub using the producer; each partition should function independently and not interfere or influence the state of other partitions.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

var partitionZeroTask = producer.SendAsync(new[] { CreateEvent(...) }, new SendEventOptions
{
    PartitionId = "0"
});

var partitionOneTask = producer.SendAsync(new[] { CreateEvent(...) }, new SendEventOptions
{
    PartitionId = "1"
});

await Task.WhenAll(partitionZeroTask, partitionOneTask);
  • Users should be able to create events and batches without consideration of idempotency. Sequence numbering should be applied by the client when the user attempts to publish an event batch or set of events.
var eventData = new EventData(Encoding.UTF8.GetBytes("Hello"));
eventData.Properties["app_unique_id"] = Guid.NewGuid("n").ToString();
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

using var batch = await producer.CreateBatchAsync(new CreateBatchOptions
{
    PartitionId = "0"
});

batch.TryAdd(CreateEvent(...));
  • Users should be able to publish an event batch to a partition without the need to manually coordinate sequencing.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

using var batch = await producer.CreateBatchAsync(new CreateBatchOptions
{
    PartitionId = "0"
});

var batchHasRoom = true;

while (batchHasRoom)
{
    batchHasRoom = batch.TryAdd(CreateEvent(...));
}

await producer.SendAsync(batch);
  • Users should be able to publish a set of events to a partition without the need to manually coordinate sequencing.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, clientOptions);

var eventsToSend = Enumerable
    .Range(0, 24)
    .Select(index => CreateEvent(...));

await producer.SendAsync(events, new SendEventOptions 
{
    PartitionId = "1"
});
  • If an event batch was successfully published, the batch should be updated to reflect the starting sequence number applied for publishing; using the starting sequence number and count of events in the batch, users should be able to understand the full set of sequence numbers that were published by the associated SendAsync call.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

using var batch = await producer.CreateBatchAsync(new CreateBatchOptions
{
    PartitionId = "0"
});

batch.TryAdd(CreateEvent(...));
batch.TryAdd(CreateEvent(...));
batch.TryAdd(CreateEvent(...));

await producer.SendAsync(batch);

var startingSequenceNumber = batch.StartingPublishedSequenceNumber;
var endingSequenceNumber = (startingSequenceNumber + batch.Count);
  • If a set of events was successfully published, each event should be updated to reflect the sequence number that was published; by inspecting the sequence number of each event in the set, users should be able to understand the full set of sequence numbers that were published by the associated SendAsync call.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, clientOptions);

var eventData = CreateEvent(...);

await producer.SendAsync(new[] { eventData }, new SendEventOptions 
{
    PartitionId = "1"
});

var sequenceNumber = eventData.PublishedSequenceNumber;
  • If an event batch or set of events is successfully published, users should be notified of success.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, clientOptions);

await producer.SendAsync(new[] { CreateEvent(...) }, new SendEventOptions 
{
    PartitionId = "1"
});

// If no exception is thrown when awaiting the send, then it was successful.
  • If a publishing operation fails with a transient error, the producer should perform retries as governed by the configured retry policy. During retries, the sequence numbers assigned to events should not change; each attempt should pass the same set of events using the same sequence numbers to the Event Hubs service.

  • If a publishing operation fails with a transient error after all retry attempts have been exhausted, users should be presented with a meaningful error.

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, clientOptions);

try
{
    await producer.SendAsync(new[] { CreateEvent(...) }, new SendEventOptions 
    {
        PartitionId = "1"
    });
}
catch (TimeoutException)
{
    // There is ambiguity here; unknown if the events were received by
    // the serivce.
}
catch (EventHubsException ex) when (ex.IsTrasient)
{
    // A transient exception happened.  The failure reason and message
    // provide more context.  In most cases, these represent deterministic
    // failure states.
}

// If no exception is thrown when awaiting the send, then it was successful.
  • If a publishing operation fails with a fatal error, the producer will not attempt to retry and will consider the operation a failure immediately. Users should be presented with a meaningful error.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, clientOptions);

try
{
    await producer.SendAsync(new[] { CreateEvent(...) }, new SendEventOptions 
    {
        PartitionId = "1"
    });
}
catch (EventHubsException ex) 
    when (ex.Reason == EventHubsException.FailureReason.ProducerDisconnected)
{
    // Another producer has claimed the partition.  This producer should not
    // try to publish to it again.
}

// If no exception is thrown when awaiting the send, then it was successful.
  • If a publishing operation fails with a fatal error related to sequencing, the producer will not attempt to retry and will consider the operation a failure immediately. Users should be presented with a meaningful error that indicates that it is not safe to continue using the producer to publish events to the associated partition; users should be encouraged to close and recreate the producer.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, clientOptions);

try
{
    await producer.SendAsync(new[] { CreateEvent(...) }, new SendEventOptions 
    {
        PartitionId = "1"
    });
}
catch (EventHubsException ex) 
    when (ex.Reason == EventHubsException.FailureReason.InvalidClientState)
{
    // The client state is invalid and the producer should be recreated.
    // The failure message provides context. 
}

// If no exception is thrown when awaiting the send, then it was successful.
  • If publishing of an event batch completes with failure, the batch should not be updated to reflect sequencing. Users should be able to publish the provide the same batch to SendAsync and trigger a new publishing operation that will sequence the batch and treat it as if no previous attempt to publish was made.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

using var batch = await producer.CreateBatchAsync(new CreateBatchOptions
{
    PartitionId = "0"
});

batch.TryAdd(CreateEvent(...));
batch.TryAdd(CreateEvent(...));
batch.TryAdd(CreateEvent(...));

try
{
    await producer.SendAsync(batch);
}
catch
{
    // ...
}

// The following is false, because the batch has no sequence number.
var hasSequence = batch.StartingPublishedSequenceNumber.HasValue;
  • If publishing of a set of events completes with failure, none of the events should not be updated to reflect sequencing. Users should be able to provide the same events, in this set or in different sets, to SendAsync and trigger a new publishing operation that will sequence the events and treat them as if no previous attempt to publish was made.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, clientOptions);

var eventData = CreateEvent(...);

try
{
    await producer.SendAsync(new[] { eventData }, new SendEventOptions 
    {
        PartitionId = "1"
    });
}
catch
{
    // ...
}

// The following is false, because the event has no sequence number.
var hasSequence = eventData.PublishedSequenceNumber.HasValue;
  • If a user cancels a publishing operation, the producer should respect the request while ensuring that it takes place at a safe and deterministic point. If cancellation is triggered and events have not been confirmed as accepted by the service, behavior will mirror the failure scenario. If events have already been accepted by the service, behavior will mirror the successful send scenario.

Target segment: Advanced developers with specialized needs

  • All use cases from the “Developers with specialized needs” segment apply here as well.

  • Users should be able to query the state of a partition using the producer. The state should reflect the critical attributes for idempotent publishing, including the Producer Group Id, Owner Level, and Latest Sequence Number.

var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

await using var producer = new EventHubProducerClient(connectionString, eventHubName, clientOptions);

var state = await producer.GetPartitionPublishingPropertiesAsync("0");

Log.Information($"Partition { state.PartitionId }:");
Log.Information($"    Idempotent Publishing Enabled is { state.IsIdempotentPublishingEnabled }";)
Log.Information($"    Producer Group Id is { state.ProducerGroupId ?? "not set" }");
Log.Information($"    Owner Level is { state.OwnerLevel ?? "not set" }");
Log.Information($"    Last Sequence Number is { state.LastPublishedSequenceNumber ?? "not set" }");
  • Users should be able to choose the sequence number at which sequencing starts for a partition and express that preference to both the producer and the Event Hubs service. For partitions that the user does not provide explicit configuration, the sequence number specified by the Event Hubs service should be used.
var connectionString = "<< CONNECTION STRING >>";
var eventHubName = "<< EVENT HUB NAME >>";

var options = new EventHubProducerClientOptions();
options.EnableIdempotentPartitions = true;

// The other values are populated by the values returned
// by the service when the link is opened.
options.PartitionOptions["0"] = new PartitionOptions
{
    OwnerLevel = 9
};

// These values are sent to the service when opening the link and will
// override any service state.
options.PartitionOptions["1"] = new PartitionOptions
{
    ProducerGroupId = 8675309,
    OwnerLevel = 3,
    StartingSequenceNumber = 121
};

// Partitions without explicit options will apply the defaults.
await using var producer = new EventHubProducerClient(connectionString, eventHubName, options);

Out of scope

These items are not necessary to satisfy the use cases required for the initial release of the idempotent producer feature and/or warrant being treated as an independent feature. This categorization does not imply that they are without merit or should not be included in the client library, only that consideration and discussion should be reserved for a different context.

  • When publishing an event batch or set of events that has already been sequenced, users should be able to request that the existing sequence numbers be ignored, and a new set of sequencing be applied.

  • A streaming model for publishing events has been discussed in some detail in the context of idempotent publishing. While the streaming model is something that should be given consideration, it is large enough in scope to warrant being treated as a feature unto itself and is orthogonal to idempotent publishing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment