Skip to content

Instantly share code, notes, and snippets.

@jsquire
Last active June 1, 2020 18:29
Show Gist options
  • Save jsquire/331672aeefe6364197cfc38ec2b259be to your computer and use it in GitHub Desktop.
Save jsquire/331672aeefe6364197cfc38ec2b259be to your computer and use it in GitHub Desktop.
Event Hubs: Direct Partition Connections

Event Hubs: Direct Partition Connections

Connections to the Azure Event Hubs service are typically made to a gateway endpoint, which holds responsibility for routing traffic to the appropriate service node for a given partition. This allows for a simplified client view of the service, without the need for awareness of individual nodes nor use of a discovery service as an intermediate step for opening connections.

The use of the gateway as an intermediary does incur additional latency when compared to the efficiency of using a direct node connection. In cases in which require maximum throughput, bypassing the gateway is desirable. The Event Hubs service offers the ability to opt-into a mechanism to discover the node currently hosting a given partition and the ability to directly connect to it, eliminating the gateway as an intermediary. This feature of the Event Hubs service is colloquially known as "receiver redirect."

As the familiar name implies, discovery of the Event Hubs node for a partition uses a redirection error when establishing a link to the partition as its mechanism. This incurs an additional cost when first connecting to read events, as a new connection and link must be opened to the partition node directly. Additionally, the Event Hubs service retains ownership of the node and may migrate its location without warning. In the event of a migration, the existing connection would be terminated and a new connection and link will need to be established before resuming reads from the partition.

Things to know before reading

  • The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.

  • Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.

  • Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Service Bus or Event Hubs types, it can likely be assumed that it is for illustration only. These methods will most often use ellipses for the parameter list, in order to help differentiate them.

Terminology

  • Receiver Redirect is the colloquial name for the feature in the Event Hubs service which enables a direct connection for a partition node by allowing discovery of the node's location upon the attempt to establish a receiving link for a partition.

  • Enable Link Redirect is the name by which the feature was known in the Microsoft.ServiceBus.Messaging library for .NET (track zero) where it was first introduced, named after the associated transport option.

Why this is needed

The Azure Messaging team has positioned the Event Hubs service as their performance-oriented offering. Developers with high throughput needs are encouraged to consider Event Hubs as their Azure message broker of choice. While the majority of scenarios would favor the trade-offs associated with using the gateway endpoint over those present when directly connecting to a partition. For those with needs for maximum throughput, a direct connection is likely preferable.

Because Event Hubs has a per-connection costing model, one of the key goals for the client library was to maintain transparency to developers with respect to how connections are created and managed. Each Event Hubs Client type in the library requires a connection and will create one bound to its lifespan unless connection sharing is explicitly used. This allows developers a simple mental model for understanding the number of Event Hubs connections used by their application.

That model breaks down when a direct connection is made to partitions, as additional connection must be established and managed for each individual partition that is. Should the Event Hubs service decide to migrate a partition to a new node, the direct connection will be faulted and the partition's new location must be discovered by once again using the gateway connection to open a link. This holds when the direct partition connection faults for other reasons as well. Because of this, the client library is likely to the number of active connections for a client may vary and is difficult to predict.

Additionally, a direct connection requires an additional range of ports to be opened for bi-directional communication; although documented, these ports are outside the standard range of ports defined as part of the AMQP specification (see page 79, section 2.8.19) and may not be available in all environments.

High level scenarios

Processing of IoT device observations

A local tomato farm monitors the need to irrigate crops based on observations made about the ambient temperature and moisture in the environment by IoT devices deployed to strategic parts of their crop fields. Observations are communicated to a edge gateway in the farm's local office using a messaging pattern over the MQTT protocol. Once received, the observations are aggregated and time-sequenced across the different devices that collected the data and then published to Event Hubs for further processing.

Because observations consist of small and focused bits of data collected continually, the volume of events flowing through the associated Event Hub is quite high. In order to ensure that telemetry can be processed quickly enough that a backlog of events does not build up, a PartitionReceiver instance is assigned to an individual partition of the Event Hub and is responsible for handling its events. To maximize that throughput, a direct connection to the partition node would be desirable.

Central monitoring of alarm systems

A security firm sells its customers alarm system backed by centralized monitoring. They have customers of varying size, with some having multiple hundreds of alarm sensors. Each alarm sensor captures observations about its state on a fixed interval and sends them to the Event Hub that the firm has provisioned for that customer.

Because the firm would like to monitor efficiently, the security firm has automated processing of events, grouping them into a time series and using AI for anomaly detection. When something suspicious is suspected, that series of data is passed to a human operator along with the corresponding video feed for a more thorough analysis.

Identifying a security issue is a time-sensitive activity; to maintain their competitive advantage, the security firm needs to achieve maximum throughput in their event system. They have done this by creating a specialized customization of the EventProcessor<TPartition> which conforms to their specific needs while offering built-in resilience and cooperative load balancing. The use of direct partition connections would also help minimize latency.

Proposed approach

  • Allow the primitive types, which are performance-focused, to opt-in for use of direct partition connections.
  • Do not offer the option to the higher-level client types in order to control the complexity of options.

Usage examples

Create a Partition Receiver that uses direct partition connections

var options = new PartitionReceiverOptions
{
    AllowDirectPartitionConnections = true;
}

await using var receiver = new PartitionReceiver("$DEFAULT", "0", EventPosition.Earliest, "<< CONNECTION STRING >>", options);

Create an EventProcessor<T> derivative that uses direct partition connections

var options = new EventProcessorOptions
{
    AllowDirectPartitionConnections = true;
}

var processor = new CustomProcessor(
    100, 
    EventHubConsumerClient.DefaultConsumerGroupName,
    "<< CONNECTION STRING >>",
    "<< EVENT HUB NAME >>",
    options);

// This is a minimal custom processor; code is intentionally stubbed for illustration.
public class CustomProcessor : EventProcessor<EventProcessorPartition>
{
    // Constructors
    public CustomProcessor(int batchCount, string consumerGroup, string connectionString, string eventHubName, EventProcessorOptions options = default ) 
        : base(batchCount, consumerGroup, connectionString, eventHubName, options)
    {
    }
    
    // Stubbed implementation;
    protected override async Task OnProcessingEventBatchAsync(IEnumerable<EventData> events, EventProcessorPartition partition, CancellationToken cancellationToken = default)
    {
        await DoSomethingWithEvents(events).ConfigureAwait(false);
        Log.Information($"Received event batch for partition: { partition.PartitionId }");
    }
    
    protected override Task OnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken = default)
    {
        Log.Error(exception, $"Error occured for partition: { partition.PartitionId } during { operationDescription }.");
        return Task.CompletedTask;
    }
    
    protected override Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(CancellationToken cancellationToken) =>
        CustomStorage.GetCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup);
    
    protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(CancellationToken cancellationToken) =>
        CustomStorage.GetOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier);
    
    protected override Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventProcessorPartitionOwnership> desiredOwnership, CancellationToken cancellationToken) =>
        CustomStorage.TryUpdateOwnershipAsync(desiredOwnership);
}

API Skeleton

Azure.Messaging.EventHubs.Primitives

public class PartitionReceiverOptions
{
    // New members
    public bool AllowDirectPartitionConnections { get; set; } = false;   
    
    // Existing members
    public EventHubConnectionOptions ConnectionOptions { get; set; }
    public EventHubsRetryOptions RetryOptions { get; set; }
    public TimeSpan? DefaultMaximumReceiveWaitTime { get; set; }
    public long? OwnerLevel { get; set; }
    public int PrefetchCount { get; set; }
    public bool TrackLastEnqueuedEventProperties { get; set; } = true;
}

public class EventProcessorOptions
{
    // New members
    public bool AllowDirectPartitionConnections { get; set; } = false;   
    
    // Existing members
    public EventHubConnectionOptions ConnectionOptions { get; set; }
    public EventHubsRetryOptions RetryOptions { get; set; }
    public TimeSpan? MaximumWaitTime { get; set; }
    public int PrefetchCount { get; set; }
    public TimeSpan LoadBalancingUpdateInterval { get; set; }
    public TimeSpan PartitionOwnershipExpirationInterval { get; set; }
    public string Identifier { get; set; }
    public bool TrackLastEnqueuedEventProperties { get; set; } = true;
    public EventPosition DefaultStartingPosition { get; set; } = EventPosition.Earliest;
}
@srnagar
Copy link

srnagar commented May 26, 2020

Java API view:

https://apiview.dev/Assemblies/Review/8af570231c9a423e8aa2fe54a26fffad?diffRevisionId=d8c5a8d5745f43239a1fa1a4a3c630b7

User code sample:

        EventHubConsumerAsyncClient directConnectionClient = new EventHubClientBuilder()
            .connectionString("<connection-string>")
            .consumerGroup("<consumer-group>")
            .allowDirectPartitionConnection(true)
            .buildAsyncConsumerClient();

        directConnectionClient.receiveFromPartition("0", EventPosition.earliest())
            .subscribe(partitionEvent -> System.out.printf("Received event from partition %s with data %s",
                partitionEvent.getPartitionContext().getPartitionId(), partitionEvent.getData().getBodyAsString()));

@chradek
Copy link

chradek commented May 28, 2020

TypeScript API view:
https://github.com/Azure/azure-sdk-for-js/pull/9141/files?file-filters%5B%5D=.md

User code sample

const client = new EventHubConsumerClient("consumerGroup", "connectionString", { allowDirectPartitionConnections: true});

client.subscribe({
  processEvents(events, context) {
    // Your code to process list of events
  },
  processError(error, context) {
    // Your code to handle the error
  }
});

@YijunXieMS
Copy link

Python API view:
https://apiview.dev/Assemblies/Review/fa294a29022e42c9b9b1b4d998e67821/d23f1fd66b5148fd8db1c2749bc65066

User code sample:

def on_event(partition_context, event):
    # Put your code here.

def on_error(partition_context, error):
    # put your code here

consumer_client = EventHubConsumerClient.from_connection_string("CONNECTION_STR", "CONSUMER_GROUP", 
    eventhub_name=EVENTHUB_NAME,
    allow_direct_partition_connections=True
)

with consumer_client:
    consumer_client.receive(
        on_event=on_event,
        on_error=on_error,
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment