Skip to content

Instantly share code, notes, and snippets.

@darrelmiller
Last active October 26, 2015 16:36
Show Gist options
  • Save darrelmiller/8a3ac2dc46ba0e399344 to your computer and use it in GitHub Desktop.
Save darrelmiller/8a3ac2dc46ba0e399344 to your computer and use it in GitHub Desktop.
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string message = Encoding.UTF8.GetString(eventData.GetBytes());
// Why does this line cause this event to block indefinitely?
// This is not the actual line of code I want to run, but it was the simplest example I found that reproduced the blocking.
// What I want to run is HttpContentExtensions.ReadAsHttpMessageContent() from System.Net.Http.Formatting but it blocks in the same way.
var scheduler = TaskScheduler.FromCurrentSynchronizationContext();
_Queue.Enqueue(message); // This line is never executed. Removing the above line fixes the problem.
// Using a queue and processing the queue with another task is my current workaround.
_Logger.LogInfo(string.Format("Event received from partition: '{0}'", context.Lease.PartitionId));
}
//Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
_Logger.LogInfo("Checkpointing");
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment