Lesson Learnt, Streaming Producer / Consumer issue using:
BlockingCollection<T>
and Parallel.ForEach(_blockingCollection.GetConsumingEnumerable
Beware default Partitioner algo, which is chunking and buffering.
The GetConsumingPartitioner
heading covers it, and also the first comment from Stephen Toub totally nails it. We implemented similar to what commenter Hernan pointed out.
Stephen Toub Blog
My absolute saviour, NoBuffering
in:
EnumerablePartitionerOptions
Inlined the issue:
This issue you were hitting... was the Parallel.ForEach call returning, or did the ForEach call block forever? If the latter, it would make sense that it was waiting for enough items to fill the chunk, but if those items were never arriving, it would wait forever for them to. Or, more specifically, it would wait until either enough elements arrived or until the blocking collection's CompletedAdding method was called, which would inform the Parallel.ForEach that no more data would arrive and so it should stop waiting to fill the chunk.
We had occasional "missing" elements which were successfully added to the collection, and successfully iterated, just never ran the delegate. Until we gracefully shutdown the service, which signalled the CancellationToken and it would then happilly process the work, man that ate a few hours! (edited)
//Before
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 5, CancellationToken = cancel.Token };
var results = Parallel.ForEach(_blockingCollection.GetConsumingEnumerable(cancel.Token), options, work =>
//After
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 5, CancellationToken = cancel.Token };
var partitioner = Partitioner.Create(_blockingCollection.GetConsumingEnumerable(cancel.Token), EnumerablePartitionerOptions.NoBuffering);
var results = Parallel.ForEach(partitioner, options, work =>
Just saved me going crazy after 3 hours of trying to figure out why my process was getting stuck randomly.
Seems like this is definitely a bug with .NET framework.