Last active
June 8, 2019 03:29
-
-
Save mmurrell/9225ed7c4d107c2195057f77e07f0f68 to your computer and use it in GitHub Desktop.
Concurrency issues in Batch extension method.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class BatchLinq | |
{ | |
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size) | |
{ | |
if (size <= 0) | |
throw new ArgumentOutOfRangeException("size", "Must be greater than zero."); | |
using (IEnumerator<T> enumerator = source.GetEnumerator()) | |
while (enumerator.MoveNext()) | |
yield return TakeIEnumerator(enumerator, size); | |
} | |
private static IEnumerable<T> TakeIEnumerator<T>(IEnumerator<T> source, int size) | |
{ | |
int i = 0; | |
do | |
yield return source.Current; | |
while (++i < size && source.MoveNext()); | |
} | |
} | |
static ConcurrentBag<int> processed_numbers = new ConcurrentBag<int>(); | |
void Main() | |
{ | |
var sample = Enumerable.Range(1,22); | |
var batches = sample.Batch(5); | |
Console.WriteLine("Beginning threaded processing"); | |
var options = new ParallelOptions() { MaxDegreeOfParallelism = 5 }; | |
var results = Parallel.ForEach(batches, options, ProcessBatch); | |
Console.WriteLine("End threaded processing"); | |
Assert(results.IsCompleted, "Parallel processing failed."); | |
Assert(processed_numbers.Count() == sample.Count(), "Input and output counts are different."); | |
var counter = 0; | |
foreach(var (b, a) in sample.Zip(processed_numbers.OrderBy(x=>x), (b,a)=>(b,a))) | |
{ | |
Console.WriteLine($"Index: {++counter:00} Before: {b:00} -- After: {a:00}"); | |
} | |
} | |
private void ProcessBatch(IEnumerable<int> batch) | |
{ | |
int counter = 0; | |
var currentBatchList = new List<int>(); | |
foreach (var val in batch) | |
{ | |
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} - Adding value {val} to bag, index {counter} in this batch."); | |
counter++; | |
currentBatchList.Add(val); | |
processed_numbers.Add(val); | |
} | |
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} - Batch completed, processed {currentBatchList.Count()} numbers."); | |
} | |
private void Assert(bool condition, string message){ | |
if(!condition) | |
Console.WriteLine("*** FAILURE: " + message); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This code produces the following results on my machine. The code is a bit of boilerplate, butI tried to demonstrate that in a multi-threaded environment, several items are not processed at all, and several items are processed multiple times. Theoretically, I would expect all numbers to appear in one and only one batch, and the counts to be distributed evenly. I would also expect that after removing the random ordering, the input list and the output list would match exactly.