Skip to content

Instantly share code, notes, and snippets.

@fiddyschmitt
Last active June 28, 2023 14:31
Show Gist options
  • Select an option

  • Save fiddyschmitt/c03e3004822a350f11db9da5ea2e823a to your computer and use it in GitHub Desktop.

Select an option

Save fiddyschmitt/c03e3004822a350f11db9da5ea2e823a to your computer and use it in GitHub Desktop.
Multiple producers, multiple consumers
//DON'T USE THESE. It spins the CPU when calling TryTake()
//Original from:
//https://www.meziantou.net/2017/05/09/mixed-producer-consumer-scenario-in-net
//Feed it a ConcurrentQueue
Task Process<T>(IProducerConsumerCollection<T> collection, Func<T, IEnumerable<T>> processItem, int maxDegreeOfParallelism, CancellationToken ct)
{
var tasks = new Task[maxDegreeOfParallelism];
int activeThreadsNumber = 0;
for (int i = 0; i < tasks.Length; i++)
{
tasks[i] = Task.Factory.StartNew(() =>
{
while (true)
{
Interlocked.Increment(ref activeThreadsNumber);
while (collection.TryTake(out T item))
{
var nextItems = processItem(item);
foreach (var nextItem in nextItems)
{
collection.TryAdd(nextItem);
}
}
Interlocked.Decrement(ref activeThreadsNumber);
if (activeThreadsNumber == 0) //all tasks finished
return;
}
}, ct);
}
return Task.WhenAll(tasks);
}
//DON'T USE THESE. It spins the CPU when calling TryTake()
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RecursiveUnzip
{
public class MultiProducerMultiConsumer
{
/*
public static void Process<T>(IProducerConsumerCollection<T> collection, Func<T, IEnumerable<T>> processItem, int maxDegreeOfParallelism, CancellationToken ct)
{
var tasks = new Thread[maxDegreeOfParallelism];
bool stop = false;
ManualResetEvent marshall = new ManualResetEvent(true);
int activeThreadsNumber = 0;
for (int i = 0; i < tasks.Length; i++)
{
tasks[i] = new Thread(() =>
{
while (true)
{
marshall.WaitOne();
if (stop) break;
Interlocked.Increment(ref activeThreadsNumber);
while (collection.TryTake(out T item))
{
var nextItems = processItem(item);
foreach (var nextItem in nextItems)
{
collection.TryAdd(nextItem);
}
}
Interlocked.Decrement(ref activeThreadsNumber);
}
});
tasks[i].IsBackground = true;
tasks[i].Start();
}
//once in a while, take a survey to determine if we've finished
var check = new Thread(new ThreadStart(() =>
{
while (!ct.IsCancellationRequested)
{
if (activeThreadsNumber == 0 && collection.Count() == 0)
{
//hold all the workers up to do a final survey
marshall.Reset();
Thread.Sleep(1000);
//final survey
if (activeThreadsNumber == 0 && collection.Count() == 0)
{
//finished
stop = true;
marshall.Set();
break;
}
marshall.Set();
}
else
{
Thread.Sleep(1000);
}
}
}))
{
IsBackground = true
};
check.Start();
tasks.ToList().ForEach(t => t.Join());
}
*/
//https://www.meziantou.net/2017/05/09/mixed-producer-consumer-scenario-in-net
public static Task ProcessQueue<T>(IProducerConsumerCollection<T> collection, Func<T, IEnumerable<T>> processItem, int maxDegreeOfParallelism, CancellationToken ct)
{
var tasks = new Task[maxDegreeOfParallelism];
bool stop = false;
ManualResetEvent marshall = new ManualResetEvent(true);
int activeThreadsNumber = 0;
for (int i = 0; i < tasks.Length; i++)
{
tasks[i] = Task.Factory.StartNew(() =>
{
while (true)
{
marshall.WaitOne();
if (stop) break;
Interlocked.Increment(ref activeThreadsNumber);
while (collection.TryTake(out T item))
{
var nextItems = processItem(item);
foreach (var nextItem in nextItems)
{
collection.TryAdd(nextItem);
}
}
Interlocked.Decrement(ref activeThreadsNumber);
}
}, ct);
}
//once in a while, take a survey to determine if we've finished
var check = new Thread(new ThreadStart(() =>
{
while (!ct.IsCancellationRequested)
{
if (collection.Count() == 0)
{
//hold all the workers up to do a final survey
marshall.Reset();
Thread.Sleep(1000);
Debug.WriteLine($"activeThreadsNumber: {activeThreadsNumber}, collection.Count(): {collection.Count()}");
//final survey
if (activeThreadsNumber == 0 && collection.Count() == 0)
{
//finished
stop = true;
marshall.Set();
break;
}
marshall.Set();
}
else
{
Thread.Sleep(1000);
}
}
}))
{
IsBackground = true
};
check.Start();
return Task.WhenAll(tasks);
}
}
}
public static void Recurse2<T>(this IEnumerable<T> source, Func<T, IEnumerable<T>> childSelector, int maxThreads, CancellationToken ct)
{
var collection = new BlockingCollection<T>();
foreach (var item in source)
{
collection.Add(item);
}
var marshall = new ManualResetEvent(true);
var activeThreadsNumber = 0;
var stop = false;
var tasks = new List<Thread>();
for (int i = 0; i < maxThreads; i++)
{
var newThread = new Thread(() =>
{
while (!ct.IsCancellationRequested)
{
marshall.WaitOne();
if (stop) break;
Interlocked.Increment(ref activeThreadsNumber);
while (collection.TryTake(out T item, 100))
{
var subItems = childSelector(item);
foreach (var subItem in subItems)
{
collection.Add(subItem);
}
}
Interlocked.Decrement(ref activeThreadsNumber);
}
})
{
IsBackground = true
};
newThread.Start();
tasks.Add(newThread);
}
//once in a while, take a survey to determine if we've finished
var check = new Thread(new ThreadStart(() =>
{
while (!ct.IsCancellationRequested)
{
if (collection.Count == 0)
{
//hold all the workers up to do a survey
marshall.Reset();
Thread.Sleep(1000);
//Debug.WriteLine($"activeThreadsNumber: {activeThreadsNumber}, collection.Count(): {collection.Count}");
//final survey
if (activeThreadsNumber == 0 && collection.Count == 0)
{
//finished
stop = true;
marshall.Set();
break;
}
marshall.Set();
}
else
{
Thread.Sleep(1000);
}
}
}))
{
IsBackground = true
};
check.Start();
tasks.ToList().ForEach(t => t.Join());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment