Last active
May 16, 2022 17:06
-
-
Save forki/e6ab43b4e7ec9321bb19ef1df6f92086 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// DEFINE PROCESSOR | |
type MyEventProcessorClient<'a>(blobContainer:BlobContainerClient,consumerGroup:string,connectionString:string,eventHubName:string,logger : ILogger<'a>,partitionID:string) = | |
inherit EventProcessorClient(blobContainer,consumerGroup,connectionString,eventHubName) | |
override __.ClaimOwnershipAsync(partitions:System.Collections.Generic.IEnumerable<_>, token:CancellationToken) = task { | |
return partitions |> Seq.filter (fun p -> p.PartitionId = partitionID) | |
} | |
// USAGE | |
let processor = | |
new MyEventProcessorClient<Worker>( | |
blobContainer, | |
consumerGroup, | |
eventHubsConnectionKey.Force(), | |
"myevents", | |
_logger, | |
string partition) | |
processor.add_ProcessEventAsync(fun args -> | |
task { | |
let text = Encoding.UTF8.GetString(args.Data.EventBody.ToArray()) | |
let message = Newtonsoft.Json.JsonConvert.DeserializeObject<MyEvent> text | |
_logger.LogInformation($"Partition: {partition} {args.Data.PartitionKey}") | |
try | |
do! EventHandler.run _logger message | |
with | |
| exn -> _logger.LogInformation($"Error: {exn.Message}") | |
do! args.UpdateCheckpointAsync(stoppingToken) | |
} | |
:> Task | |
) | |
processor.add_ProcessErrorAsync(fun args -> | |
task { | |
_logger.LogInformation($"ProcessError: {args.Exception.Message}") | |
} :> Task | |
) |
It appears that you're checkpointing after every event. This is valid, but will cause a high number of Storage requests, driving up costs and slowing down throughput. Generally, most applications tend to use a strategy where they checkpoint every XX number of events or YY time elapsed. (for example, every 100 events or 5 minutes)
The thresholds would be specific to your application and depend on how expensive it is to reprocess events should there be a failure that requires rewinding to the last checkpoint recorded.
type MyEventProcessorClient<'a>(blobContainer:BlobContainerClient,consumerGroup:string,connectionString:string,eventHubName:string,logger : ILogger<'a>,partitionID:string) =
inherit EventProcessorClient(blobContainer,consumerGroup,connectionString,eventHubName)
let assignedPartitions = [| partitionID |]
override __.ClaimOwnershipAsync(desiredOwnership:System.Collections.Generic.IEnumerable<_>,_cancellationToken:CancellationToken) = task {
return
desiredOwnership
|> Seq.map (fun ownership ->
ownership.LastModifiedTime <- DateTimeOffset.UtcNow
ownership
)
}
override __.ListPartitionIdsAsync(_connection:EventHubConnection,_cancellationToken:CancellationToken) =
Task.FromResult assignedPartitions
override this.ListOwnershipAsync(_cancellationToken:CancellationToken) = task {
return
assignedPartitions
|> Seq.map (fun partition ->
let ownership = EventProcessorPartitionOwnership()
ownership.FullyQualifiedNamespace <- this.FullyQualifiedNamespace
ownership.EventHubName <- this.EventHubName
ownership.ConsumerGroup <- this.ConsumerGroup
ownership.PartitionId <- partition
ownership.OwnerIdentifier <- this.Identifier
ownership.LastModifiedTime <- DateTimeOffset.UtcNow
ownership
)
}
does this look better?
That looks about right to me.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Based on the code, I think that I understand what the root of the additional costing is. My F# is only intermediate and a bit rusty, so forgive me if I'm misreading parts.
Because you're returning a static set of data from
ClaimOwnershipAsync
and not overridingListOwnershipAsync
orListPartitionIdsAsync
, each time the load balancing loop ticks it will make a storage request to list all of the ownership blobs in the container. Since you're not writing out any ownership data, when the processor inspects the ownership list and compares it to the set of available partitions, it does not believe that it has its fair share of work. Unless another processor is running without this customization, the processors will each see themselves as the only active processor for the cluster and each will believe that it should own all partitions. Because of this, the processor is going to continue to try and claim a partition during each load balancing cycle and will never see ownership as balanced.In v5.6.2, the processor defaulted to the
Balanced
strategy with more aggressive defaults for load balancing intervals. This means that your load balancing loop was ticking every 10 seconds and making the aforementioned Storage calls. In the v5.7.x line, we made some adjustments based on the data that we've gathered for production issues over the last two years. Part of those tweaks involve setting the strategy toGreedy
by default to offset slower values for loop iteration. For the normal case, this reduces Storage calls by 2/3 - once the processor has reached a state where ownership is balanced. Therein lies the problem.What the
Greedy
strategy does is to ignore the load balancing interval when the processor believes that it has not reached its ownership quota. Because your processor will never see ownership as balanced, load balancing will continue to run without pause, each time making a Storage request.The good news is that basic mitigation is simple - setting the
LoadBalancingStrategy
toBalanced
in theEventProcessorClientOptions
will stop the constant loop iterations. Because we increased the load balancing interval, your loop will run every 30 seconds, rather than every 10.That said, the better approach would be to override
ListPartitionIdsAsync
andListOwnershipAsync
as well, telling the processor that its assigned partition is the only one and that it already owns it. This will allow you to bypass Storage entirely. The details of doing so are demonstrated in this sample.