The "idempotent publishing" feature in the Event Hubs client library was introduced as a means to help reduce the potential for duplication when publishing events using the EventHubProducerClient
. It appeared in several beta packages, starting in September of 2020 and was last available in March of 2021.
Primarily driven by a desire for parity with Kafka, the feature was built on service infrastructure created for the Event Hubs Kafka head. Its API and user experience are heavily influenced by Kafka's approach, which is centered around their buffered producer model. As a result, the feature is not well-suited to the EventHubProducerClient
and has significant potential for causing customer confusion and providing a poor development experience.
The major concerns are:
-
The concept of “idempotent” in this context does not meet developer expectations.
-
Event Hubs continues to have an at-least-once delivery guarantee; even in the case that publishing is tightly managed, duplication is still possible for consumers and edge cases.
-
The advanced surface is difficult to use and potentially dangerous.
-
Enabling the feature alters client behavior in unexpected ways.
The feature should not be released for general availability as part of the EventProducerClient
; it does not meet the quality standards for the Azure SDK developer experience.
A simplified version of idempotent publishing, without many of the drawbacks, exists in the EventHubBufferedProducerClient
. The buffered producer more closely matches Kafka's model and should be the focal point for providing parity with the feature. It is expected that this version of idempotent publishing will be part of the EventHubBufferedProducerClient
when it becomes generally available.
Despite being in preview for a year as part of several Event Hubs beta packages and having been marketed by the Event Hubs team, we have not seen customer engagement with the idempotent publishing feature. To date, there have been no GitHub issues which discuss it, nor questions on Stack Overflow.
We do not believe there is customer demand for this feature.
The Azure Stream Analytics team drove some of the requirements for the feature, participated in the service and SDK designs, and is actively using the feature. They are invested and their feedback has been the primary reason that the path forward is still under discussion.
Other than ASA, we are not aware of any internal partners using the feature.
A common definition for Idempotency is: "A request method is considered 'idempotent' if the intended effect on the server of multiple identical requests with that method is the same as the effect for a single such request." (source: IETF RFC 7231) This is not the behavior exhibited by the Event Hubs idempotent publishing feature.
var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true };
await using var producer = new EventHubProducerClient("<< CONNECTION >>", "<< HUB >>", options);
var data = new[]
{
new EventData("Body")
{
MessageId = "12345"
}
};
await producer.SendAsync(data);
await producer.SendAsync(data);
No matter how many times you call the operation using data
, event "12345" will be published once.
Each call to SendAsync
publishes data
, resulting in duplicates. Idempotency is only guaranteed across any implicit retries, not explicit calls.
No. Event Hubs lacks the concept of an identity for an event; deduplication for the idempotent publishing feature is based on the sequence number assigned for the send operation and protects across retries for that operation. Each time SendAsync
is called, a new sequence number must be assigned. Failure to do so could result in events being incorrectly considered duplicates and lost.
While many hard failures are deterministic and events can be resent without introducing duplicates, some failures represent an ambiguous outcome - for example, a timeout. In these cases, resending events may or may not cause duplication. Here again, the definition of "idempotent" leads to the expectation that resending on any failure is safe from duplicates.
var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true };
await using var producer = new EventHubProducerClient("<< CONNECTION >>", "<< HUB >>", options);
var data = new[]
{
new EventData("Body")
{
MessageId = "12345"
}
};
var published = false;
while (!published)
{
try
{
await producer.SendAsync(data);
published = true;
}
catch (TimeoutException)
{
// Allow indefinite retries.
}
}
Once published
flag gets set, data
has been published once, and only once.
The state of publishing is ambiguous in the face of a timeout; it is possible that multiple duplicates of data
exist in the Event Hub when SendAsync
succeeds. Idempotency is only guaranteed across any implicit retries, not explicit SendAsync
calls.
No. Event Hubs lacks the concept of an identity for an event; deduplication for the idempotent publishing feature is based on the sequence number assigned for the send operation and protects across retries for that operation.
Each time SendAsync
is called after an exception, the publisher identifier has changed and client state has been reset. Subsequent send operations will use the new state to guard against potential data loss due to incorrect duplicate identification. (see Addendum: Data loss scenarios)
The producer exposes a set of options intended to restore the previous state of each partition. Successful use requires specifying 3 independent values such that they match the current Event Hubs service state. Failure to match the expected service state when specifying these values to construct a producer results in a corrupted instance. To recover, the producer must be disposed and recreated.
The Event Hubs service does not support querying the state; an application must track state using the client across operations, ensure that no drift occurs, and hold responsibility for persistence.
One of the options in the advanced set is for a “producer owner level”, which allows a producer to assert exclusive ownership of publishing to a partition. This opens a potential attack vector for a denial-of-service attack where partially-trusted publishers, such as IoT devices, can interfere with normal operation.
This example illustrates a long-running publishing pipeline with the following goals:
-
Read data from an external system and transform into events to be published.
-
When data has been successfully published, inform the external system. Doing so will ensure that the next read receives fresh data.
-
Be resilient in the face of exceptions; fail the current operation and restart in a safe manner.
-
Avoid duplication when publishing events to the extent that doing so is possible.
public Task PublishEventsAsync(CancellationToken cancellationToken)
{
Dictionary<string, PartitionPublishingProperties> partitionState = null;
while (!cancellationToken.IsCancellationRequested)
{
try
{
if (partitionState == null)
{
// If there is no partition state held in local memory, attempt to read the
// last persisted set from external storage.
partitionState = await ReadPartitionStateExternalAsync(cancellationToken);
}
var options = CreateProducerOptions(partitionState);
await using var producer = new EventHubProducerClient("<< CONNECTION >>", "<< HUB >>", options);
var partitionId, currentEvents = await ReadExternalDataToEventsAsync(cancellationToken);
while (currentEvents != null)
{
try
{
await producer.SendAsync(events, new SendEventOptions { PartitionId = partitionId }, cancellationToken);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
// If we hit this code path, then all retries have been exhausted
// and the events could not be published. Report them as bad data to
// the external system and ignore them for now. This will cause
// force them to be ignored. This will cause ReadExternalDataToEventsAsync
// to move to the next set of data, which we'll publish using new client state.
await ReportBadDataExternalAsync(partitionId, events, cancellationToken);
// Remove the current state for the failed partition; it will
// have been reset by the client to avoid potential data loss.
partitionState.Remove(partitionId);
}
try
{
partitionState[partitionId] = await producer.GetPartitionPublishingPropertiesAsync(partitionId, cancellationToken);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
// Remove the current state for the failed partition; it is safter
// to reset state and potentially have duplicates than to put the
// producer into a bad state because we failed to update local state.
partitionState.Remove(partitionId);
}
// For the sake of simplicity, assume this call is responsible for its own retries
// and will throw if those are exhausted without success.
//
// We should ignore failures; there's really no good recovery mechanism
// and if we throw to the outer loop and force a producer reset, then we'll
// be knowingly introducing potential duplicates.
//
// This approach is risky, but works because we've got cached local state.
try
{
await PersistPartitionStateAsync(partitionState, cancellationToken);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
ReportException(ex);
}
/// For the sake of simplicity, assume this call is responsible for its own retries
// and will throw if those are exhausted without success. That will cause bubbling to
// the outer loop which will force a producer reset.
//
// In that scenario ReadExternalDataToEventsAsync will return the same set as it did
// the last time that it called and publishing will use the same state; this allows
// the service to detect duplicates and remove them.
//
// If we do not force a producer reset, the call to ReadExternalDataToEventsAsync will
// still return the same set as it previously did, but new producer state would be used
// and the service would not detect them as duplicates.
//
// Note that this path may still introduce duplication if persisting the partition state
// persistence also previously failed.
await MarkExternalDataAsPublishedAsync(partitionId, events, cancellationToken);
partitionId, currentEvents = GetNextEvents(cancellationToken);
}
// If no events were available, delay before checking again.
await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
// Report the exception and fall-through; that will force a
//producer reset based on the current partition partition state
// held in memory.
ReportException(ex);
}
}
// If we reach this point, cancellation was signaled.
throw new TaskCanceledException();
}
EventHubProducerClientOptions CreateProducerOptions(IDictionary<string, PartitionPublishingProperties> partitionState = default)
{
var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true };
if (partitionState != default)
{
foreach (var stateItem in partitionState)
{
options.PartitionOptions.Add(stateItem.Key, new PartitionPublishingOptions
{
OwnerLevel = stateItem.Value.OwnerLevel,
ProducerGroupId = stateItem.Value.ProducerGroupId,
StartingSequenceNumber = stateItem.Value.LastPublishedSequenceNumber + 1
});
}
}
}
// ==============================================================
// These methods are stubbed; the implementation is unimportant
// for this example illustration.
// ==============================================================
public Task<(string PartitionId, EventData[] Events)> ReadExternalDataToEventsAsync(CancellationToken cancellationToken)) => = ("0", new[] { new EventData("Body") });
public Task MarkExternalDataAsPublishedAsync(string partitionId, eventData[] Event, CancellationToken cancellationToken)) => Task.CompletedTask;
public Task<Dictionary<string, PartitionPublishingProperties>> ReadPartitionStateExternalAsync(CancellationToken cancellationToken)) => Task.FromResult(new());
public Task PersistPartitionStateAsync(Dictionary<string, PartitionPublishingProperties> partitionState, CancellationToken cancellationToken)) => Task.CompletedTask.
public Task ReportBadDataExternalAsync(string partitionId, EventData[] events, CancellationToken cancellationToken)) => Task.CompletedTask;
public void ReportException(Exception ex) {};
The producer exposes the ability to explicitly manage idempotent state for advanced scenarios like server fail-over and coordinating long-running processes where there is a need to rollback client state for an extended retry.
The producer meets the expectation functionally but is forced to expose a large amount of complexity due to the service infrastructure available. The poor experience around this complexity is exacerbated by being brittle; if state in the client is overridden, it can become out of sync with the service and require recreating the client to recover.
No. The Event Hubs service does not offer any tracking or synchronization mechanism for state; callers are expected to manage state and implicitly keep in sync. When allowing external state management, the client does not have the ability to validate state nor influence the service values.
We believe there are four options to consider, moving forward:
-
Do not expose the feature on the
EventHubProducerClient
; continue to support the API via the Experimental package. This allows Azure Stream Analytics to continue to use idempotent publishing as a non-public API. -
Expose the feature on the
EventHubProducerClient
asprotected
members. This would allow callers to use the feature by explicitly extending some types and surfacing public members. (see Addendum: Protected API surface for idempotent publishing) -
Do not expose the feature on the
EventHubProducerClient
; remove the API and the extensions in the Experimental package that enable Azure Stream Analytics to use idempotent publishing without it being part of the public API surface. -
Plan for release of the feature as part of the public API surface of the
EventHubProducerClient
, potentially revisiting areas of concern and investing in improvements. (this may involve both the client library and Event Hubs service)
NOTE: Support for idempotent publishing in the EventHubBufferedProducerClient
is not impacted by these options; they apply only to the EventHubProducerClient
Because we not observed customer engagement with the idempotent publishing feature other than the Azure Stream Analytics team, we believe that any usage impact from next steps will be limited to their scenarios. As a result, we believe that costs and impact of next steps should be considered with a focus on the Event Hubs service team, Azure SDK team, and Azure Stream Analytics team.
-
This option requires no additional development investment upfront, as it makes use of the current implementation. Going forward, minimal costs would be required for maintaining the feature and, potentially, handling bug fixes.
-
This would require a small amount of development upfront for the Azure SDK team to adjust the API surface and for the Azure Stream Analytics team to adapt their implementation to take advantage of it. Going forward, minimal costs would be required for maintaining the feature and, potentially, handling bug fixes.
-
This would cause a larger impact to the Azure Stream Analytics team, requiring development efforts to remove dependencies on the feature, and potentially may incur operations costs.
-
The costing for this approach could vary greatly, depending on decisions around potential improvements. In the scenario where the feature is released as-is, there would be minor efforts required by the Azure SDK team to make the feature GA-ready and by the ASA team to change dependencies from the Experimental package over to directly using the
EventHubBufferedProducerClient
.
The primary business value for Event Hubs is ensuring parity with Kafka's offerings. Because the EventHubBufferedProducerclient
will continue to offer idempotent publishing in a paradigm similar to Kafka's, we believe the business goal is met without inclusion of idempotent publishing in the EventHubProducerClient
and do not see justification for including it due to the lack of customer engagment with the betas.
Unfortunately, we lack sufficient context to understand the business value that the idempotent publishing feature brings to Azure Stream Analytics and what impact its loss may have. Knowing the following would help to understand that value:
-
How many duplicates are observed by ASA for a given volume of publishing attempts without the idempotent publishing feature?
-
For that same volume, how many fewer duplicates are observed with the idempotent publishing feature enabled?
-
How can we quantify the cost savings to Microsoft for the ASA service with the idempotent publishing feature enabled?
-
How can we quantify the cost savings to customers of the ASA service with the idempotent publishing feature enabled?
Service Bus offers a form of idempotency through its “duplicate detection” feature:
-
This is true idempotency within a specific and configurable window of time.
-
Each message has metadata used by Service Bus to uniquely identify a message and other messages with that identifier are ignored when published in the configured window of time.
-
Applications control the identifier, allowing them to decide what constitutes “the same message” in their context.
-
The service guarantee of “at-least-once” delivery still applies.
Kafka offers a form of idempotency through its “Idempotent Producer”:
-
Can achieve true idempotency for publishing if used with compacted topics.
-
Non-compacted topics can have duplicates if a publish operation fails implicit retries and the application resends.
-
Streaming publish model allows the client to control network requests, apply advanced logic, and hide complexity.
-
Concurrent publishing possible due to client control of network requests and retries for out-of-sequence issues.
-
State is managed exclusively by the client, reducing complexity and danger of mistakes.
-
Duplicates are still possible when consuming when topics are not compacted.
If the producer did not guard against data-loss by forcing the publishing state to reset after a failure, it is possible that data could be lost due to a mismatch of service and client state. This is especially possible with ambiguous outcomes, such as a timeout.
There is a new partition to which no events were previously published. Assume that the client publishes the following:
Payload | Publisher Sequence Id |
---|---|
A | 1 |
B | 2 |
C | 3 |
The call completes with a timeout; the operation was successful from the service perspective, but the client must assume a failure. At this point, the service state looks like:
Payload | Publisher Sequence Id | Offset | Sequence Number |
---|---|---|---|
A | 1 | [ SOME VALUE ] | [ SOME VALUE ] |
B | 2 | [ SOME VALUE ] | [ SOME VALUE ] |
C | 3 | [ SOME VALUE ] | [ SOME VALUE ] |
The application decides to consider these events failed, marks them as such and moves on. A new set of events is published using the client, which recycle the sequence numbers because the previous attempt was a failure (Timeout):
Payload | Publisher Sequence Id |
---|---|
D | 1 |
E | 2 |
F | 3 |
What happens?
The service sees the publisher sequence number is within the last 5 published and implicitly no-ops, returning success. The client in this case considers the operation a success and reports it to the application. The application believes that the partition contains “D”, “E”, and “F”. In reality, the partition contains “A”, “B”, and “C”.
There is a new partition to which no events were previously published. Assume that the client publishes the following:
Payload | Publisher Sequence Id |
---|---|
A | 1 |
B | 2 |
C | 3 |
D | 4 |
E | 5 |
The call completes with a timeout; the operation was successful from the service perspective, but the client must assume a failure. At this point, the service state looks like:
Payload | Publisher Sequence Id | Offset | Sequence Number |
---|---|---|---|
A | 1 | [ SOME VALUE ] | [ SOME VALUE ] |
B | 2 | [ SOME VALUE ] | [ SOME VALUE ] |
C | 3 | [ SOME VALUE ] | [ SOME VALUE ] |
D | 4 | [ SOME VALUE ] | [ SOME VALUE ] |
E | 5 | [ SOME VALUE ] | [ SOME VALUE ] |
The application decides to consider these events failed, marks them as such and moves on. A new set of events is published using the client, which recycle the sequence numbers because the previous attempt was a failure (Timeout):
Payload | Publisher Sequence Id |
---|---|
F | 1 |
G | 2 |
H | 3 |
I | 4 |
J | 5 |
K | 6 |
What happens?
The second batch now has an additional sequence number that is not within the last 5 published. The service state now looks like the following:
Payload | Publisher Sequence Id | Offset | Sequence Number |
---|---|---|---|
A | 1 | [ SOME VALUE ] | [ SOME VALUE ] |
B | 2 | [ SOME VALUE ] | [ SOME VALUE ] |
C | 3 | [ SOME VALUE ] | [ SOME VALUE ] |
D | 4 | [ SOME VALUE ] | [ SOME VALUE ] |
E | 5 | [ SOME VALUE ] | [ SOME VALUE ] |
K | 6 | [ SOME VALUE ] | [ SOME VALUE ] |
The behavior of the EventHubProducerClient
changes implicitly when the idempotent publishing feature is enabled, in ways that are not obvious. Patterns commonly used to publish events result in an exception, requiring use of a specific overload.
var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = false };
await using var producer = new EventHubProducerClient("<< CONNECTION >>", "<< HUB >>", options);
var data = new[]
{
new EventData("Body")
{
MessageId = "12345"
}
};
// Request automatic partition assignment; the recommended (and most common)
// usage pattern.
await producer.SendAsync(data);
// Automatically assigns a partition, guaranteeing data with identical keys
// is kept together and ordered; the recommended pattern for grouping data.
await producer.SendAsync(data, new SendEventOptions { PartitionKey = "abc123" });
// Assign a specific partition; this is not a frequently used pattern.
await producer.SendAsync(data, new SendEventOptions { PartitionId = "0" });
var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true };
await using var producer = new EventHubProducerClient("<< CONNECTION >>", "<< HUB >>", options);
var data = new[]
{
new EventData("Body")
{
MessageId = "12345"
}
};
// This overload is the only one able to publish events when idempotent publishing
// is enabled.
await producer.SendAsync(data, new SendEventOptions { PartitionId = "0" });
// Both of these overloads result an exception; you must manually assign a partition
// identifier when idempotent publishing is enabled.
await producer.SendAsync(data);
await producer.SendAsync(data, new SendEventOptions { PartitionKey = "abc123" });
Events can be published in the same manner, regardless of whether idempotent publishing is enabled.
Behavior of the client differs based on the state of the feature:
-
When idempotent publishing is not enabled, events may be published without assigning a specific partition, allowing the Event Hub service to assign one automatically. This is the recommended pattern for most customers and enables a higher availability.
-
When idempotent publishing is enabled, the Event Hubs service requires that a specific partition be assigned to events when publishing. This is not a common pattern and requires specific knowledge about the structure of the Event Hub, as well as taking responsibility for evenly distributing events across partitions.
Somewhat; This is a service requirement and the client must conform. We could consider performing client-side partition assignment when a specific partition is not requested. However, this comes with trade-offs.
-
The client could implement partition assignment in the exact same manner as the service, and continue to stay in-sync with any service changes over time. This places a limit on service evolution as well as a burden on ensuring that the client libraries across languages are kept consistent.
Any subtle differences against the current/service behavior, would be considered breaking. It would also reduce availability of publishing, as the client has no way of knowing when a partition becomes temporarily unavailable.
-
The client could implement its own version of partition assignment. This would potentially change the existing behavior, causing breaking changes. It would also risk drift with service fixes/updates and different versions of the client library would behave differently.
For this approach, the necessary API surface is exposed in the Azure.Messaging.EventHubs
package on the EventHubProducerClient
and its associated types as a protected
surface with support types excluded from code completion listings and documentation.
The intent is that the feature can be used by extending a small number of members on two types to shadow the relevant protected
members with a public
shadow. The feature would intentionally be difficult to discover, but could be used by any consumers of the package.
//
// using Azure.Messaging.EventHubs.Producer;
//
private async Task Main()
{
var conn = "<< CONNECTION STRING >>";
var hub = "<< EVENT HUB NAME >>";
var options = new IdempotentProducerOptions { EnableIdempotentRetries = true };
options.PartitionOptions.Add("fake", new PartitionPublishingOptions
{
OwnerLevel = 14,
ProducerGroupId = 27,
StartingSequenceNumber = 99
});
await using var producer = new ItempotentProducer(conn, hub, options);
var publishingProperties = await producer.GetPartitionPublishingPropertiesAsync("0");
Logger.LogPublishingProperties(publishingProperties);
var partitions = new[] { "0", "1", "2", "3" };
for (var index = 0; index < 100; ++index)
{
using var batch = await producer.CreateBatchAsync(new CreateBatchOptions
{
PartitionId = partitions[index % partitions.Length]
});
var eventData = new EventData($"Test-{index}-{ batch.Count }");
while (batch.TryAdd(eventData))
{
eventData = new EventData($"Test-{index}-{ batch.Count }");
}
try
{
await producer.SendAsync(batch);
Logger.Information($"Batch { index } sent.");
}
catch (Exception ex)
{
Logger.Error($"ERROR: { ex.Message}{ Environment.NewLine }{ ex.StackTrace }");
}
}
}
//
// using Azure.Messaging.EventHubs.Producer;
//
public class IdempotentProducerOptions : EventHubProducerClientOptions
{
public new bool EnableIdempotentPartitions
{
get => base.EnableIdempotentPartitions;
set => base.EnableIdempotentPartitions = value;
}
public new Dictionary<string, PartitionPublishingOptions> PartitionOptions =>
base.PartitionOptions;
}
// A subset of available constructors was used for illustration; any
// constructors required for the application would need to be shadowed
// in this client type.
public class IdempotentProducer : EventHubProducerClient
{
public IdempotentProducer(string connectionString,
string eventHubName,
IdempotentProducerOptions clientOptions = default)
: base(connectionString, eventHubName, clientOptions)
{
}
public IdempotentProducer(string fullyQualifiedNamespace,
string eventHubName,
TokenCredential credential,
IdempotentProducerOptions clientOptions = default)
: base(fullyQualifiedNamespace, eventHubName, credential, clientOptions)
{
}
protected IdempotentProducer() : base()
{
}
public virtual new async Task<PartitionPublishingProperties> GetPartitionPublishingPropertiesAsync(
string partitionId,
CancellationToken cancellationToken = default) =>
await base.GetPartitionPublishingPropertiesAsync(partitionId, cancellationToken).ConfigureAwait(false);
}
This skeleton shows only the changes that would be applied to the current implementation. The remainder of each type would be unchanged.
public class EventHubProducerClient : IAsyncDisposable
{
// Previously "internal"
[EditorBrowsable(EditorBrowsableState.Never)]
protected internal virtual async Task<PartitionPublishingPropertiesInternal> GetPartitionPublishingPropertiesAsync(
string partitionId,
CancellationToken cancellationToken = default)
{
}
}
public class EventHubProducerClientOptions
{
// Previously "internal"
[EditorBrowsable(EditorBrowsableState.Never)]
protected internal bool EnableIdempotentPartitions { get; set; }
}
// Previously "internal"
[EditorBrowsable(EditorBrowsableState.Never)]
public class PartitionPublishingOptions
{
}
// Previously "internal"
[EditorBrowsable(EditorBrowsableState.Never)]
public class PartitionPublishingProperties
{
}