Last active
June 28, 2023 14:31
-
-
Save fiddyschmitt/c03e3004822a350f11db9da5ea2e823a to your computer and use it in GitHub Desktop.
Multiple producers, multiple consumers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //DON'T USE THESE. It spins the CPU when calling TryTake() | |
| //Original from: | |
| //https://www.meziantou.net/2017/05/09/mixed-producer-consumer-scenario-in-net | |
| //Feed it a ConcurrentQueue | |
| Task Process<T>(IProducerConsumerCollection<T> collection, Func<T, IEnumerable<T>> processItem, int maxDegreeOfParallelism, CancellationToken ct) | |
| { | |
| var tasks = new Task[maxDegreeOfParallelism]; | |
| int activeThreadsNumber = 0; | |
| for (int i = 0; i < tasks.Length; i++) | |
| { | |
| tasks[i] = Task.Factory.StartNew(() => | |
| { | |
| while (true) | |
| { | |
| Interlocked.Increment(ref activeThreadsNumber); | |
| while (collection.TryTake(out T item)) | |
| { | |
| var nextItems = processItem(item); | |
| foreach (var nextItem in nextItems) | |
| { | |
| collection.TryAdd(nextItem); | |
| } | |
| } | |
| Interlocked.Decrement(ref activeThreadsNumber); | |
| if (activeThreadsNumber == 0) //all tasks finished | |
| return; | |
| } | |
| }, ct); | |
| } | |
| return Task.WhenAll(tasks); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //DON'T USE THESE. It spins the CPU when calling TryTake() | |
| using System; | |
| using System.Collections.Concurrent; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| using System.Text; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| namespace RecursiveUnzip | |
| { | |
| public class MultiProducerMultiConsumer | |
| { | |
| /* | |
| public static void Process<T>(IProducerConsumerCollection<T> collection, Func<T, IEnumerable<T>> processItem, int maxDegreeOfParallelism, CancellationToken ct) | |
| { | |
| var tasks = new Thread[maxDegreeOfParallelism]; | |
| bool stop = false; | |
| ManualResetEvent marshall = new ManualResetEvent(true); | |
| int activeThreadsNumber = 0; | |
| for (int i = 0; i < tasks.Length; i++) | |
| { | |
| tasks[i] = new Thread(() => | |
| { | |
| while (true) | |
| { | |
| marshall.WaitOne(); | |
| if (stop) break; | |
| Interlocked.Increment(ref activeThreadsNumber); | |
| while (collection.TryTake(out T item)) | |
| { | |
| var nextItems = processItem(item); | |
| foreach (var nextItem in nextItems) | |
| { | |
| collection.TryAdd(nextItem); | |
| } | |
| } | |
| Interlocked.Decrement(ref activeThreadsNumber); | |
| } | |
| }); | |
| tasks[i].IsBackground = true; | |
| tasks[i].Start(); | |
| } | |
| //once in a while, take a survey to determine if we've finished | |
| var check = new Thread(new ThreadStart(() => | |
| { | |
| while (!ct.IsCancellationRequested) | |
| { | |
| if (activeThreadsNumber == 0 && collection.Count() == 0) | |
| { | |
| //hold all the workers up to do a final survey | |
| marshall.Reset(); | |
| Thread.Sleep(1000); | |
| //final survey | |
| if (activeThreadsNumber == 0 && collection.Count() == 0) | |
| { | |
| //finished | |
| stop = true; | |
| marshall.Set(); | |
| break; | |
| } | |
| marshall.Set(); | |
| } | |
| else | |
| { | |
| Thread.Sleep(1000); | |
| } | |
| } | |
| })) | |
| { | |
| IsBackground = true | |
| }; | |
| check.Start(); | |
| tasks.ToList().ForEach(t => t.Join()); | |
| } | |
| */ | |
| //https://www.meziantou.net/2017/05/09/mixed-producer-consumer-scenario-in-net | |
| public static Task ProcessQueue<T>(IProducerConsumerCollection<T> collection, Func<T, IEnumerable<T>> processItem, int maxDegreeOfParallelism, CancellationToken ct) | |
| { | |
| var tasks = new Task[maxDegreeOfParallelism]; | |
| bool stop = false; | |
| ManualResetEvent marshall = new ManualResetEvent(true); | |
| int activeThreadsNumber = 0; | |
| for (int i = 0; i < tasks.Length; i++) | |
| { | |
| tasks[i] = Task.Factory.StartNew(() => | |
| { | |
| while (true) | |
| { | |
| marshall.WaitOne(); | |
| if (stop) break; | |
| Interlocked.Increment(ref activeThreadsNumber); | |
| while (collection.TryTake(out T item)) | |
| { | |
| var nextItems = processItem(item); | |
| foreach (var nextItem in nextItems) | |
| { | |
| collection.TryAdd(nextItem); | |
| } | |
| } | |
| Interlocked.Decrement(ref activeThreadsNumber); | |
| } | |
| }, ct); | |
| } | |
| //once in a while, take a survey to determine if we've finished | |
| var check = new Thread(new ThreadStart(() => | |
| { | |
| while (!ct.IsCancellationRequested) | |
| { | |
| if (collection.Count() == 0) | |
| { | |
| //hold all the workers up to do a final survey | |
| marshall.Reset(); | |
| Thread.Sleep(1000); | |
| Debug.WriteLine($"activeThreadsNumber: {activeThreadsNumber}, collection.Count(): {collection.Count()}"); | |
| //final survey | |
| if (activeThreadsNumber == 0 && collection.Count() == 0) | |
| { | |
| //finished | |
| stop = true; | |
| marshall.Set(); | |
| break; | |
| } | |
| marshall.Set(); | |
| } | |
| else | |
| { | |
| Thread.Sleep(1000); | |
| } | |
| } | |
| })) | |
| { | |
| IsBackground = true | |
| }; | |
| check.Start(); | |
| return Task.WhenAll(tasks); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public static void Recurse2<T>(this IEnumerable<T> source, Func<T, IEnumerable<T>> childSelector, int maxThreads, CancellationToken ct) | |
| { | |
| var collection = new BlockingCollection<T>(); | |
| foreach (var item in source) | |
| { | |
| collection.Add(item); | |
| } | |
| var marshall = new ManualResetEvent(true); | |
| var activeThreadsNumber = 0; | |
| var stop = false; | |
| var tasks = new List<Thread>(); | |
| for (int i = 0; i < maxThreads; i++) | |
| { | |
| var newThread = new Thread(() => | |
| { | |
| while (!ct.IsCancellationRequested) | |
| { | |
| marshall.WaitOne(); | |
| if (stop) break; | |
| Interlocked.Increment(ref activeThreadsNumber); | |
| while (collection.TryTake(out T item, 100)) | |
| { | |
| var subItems = childSelector(item); | |
| foreach (var subItem in subItems) | |
| { | |
| collection.Add(subItem); | |
| } | |
| } | |
| Interlocked.Decrement(ref activeThreadsNumber); | |
| } | |
| }) | |
| { | |
| IsBackground = true | |
| }; | |
| newThread.Start(); | |
| tasks.Add(newThread); | |
| } | |
| //once in a while, take a survey to determine if we've finished | |
| var check = new Thread(new ThreadStart(() => | |
| { | |
| while (!ct.IsCancellationRequested) | |
| { | |
| if (collection.Count == 0) | |
| { | |
| //hold all the workers up to do a survey | |
| marshall.Reset(); | |
| Thread.Sleep(1000); | |
| //Debug.WriteLine($"activeThreadsNumber: {activeThreadsNumber}, collection.Count(): {collection.Count}"); | |
| //final survey | |
| if (activeThreadsNumber == 0 && collection.Count == 0) | |
| { | |
| //finished | |
| stop = true; | |
| marshall.Set(); | |
| break; | |
| } | |
| marshall.Set(); | |
| } | |
| else | |
| { | |
| Thread.Sleep(1000); | |
| } | |
| } | |
| })) | |
| { | |
| IsBackground = true | |
| }; | |
| check.Start(); | |
| tasks.ToList().ForEach(t => t.Join()); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment