Skip to content

Instantly share code, notes, and snippets.

@danielmarbach
Created October 17, 2016 15:44
Show Gist options
  • Save danielmarbach/2ef214b178366d0fa700400cd414f13f to your computer and use it in GitHub Desktop.
Save danielmarbach/2ef214b178366d0fa700400cd414f13f to your computer and use it in GitHub Desktop.
MultiProducerConcurrentDispatcher
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace PushCollection
{
class Program
{
static void Main(string[] args)
{
var collection = new MultiProducerConcurrentDispatcher<int>(batchSize: 10, pushInterval: TimeSpan.FromSeconds(5), maxConcurrency: 2, numberOfSlots: 10);
for (int i = 0; i < 65; i++)
{
collection.Push(slotNumber: 1, item : i);
}
collection.Start((items, slot, state, token) =>
{
Console.WriteLine($"New Batch on { Thread.CurrentThread.ManagedThreadId } for slot { slot }");
for (int i = 0; i < items.Count; i++)
{
Console.Write($"{ items[i]} ");
}
Console.WriteLine();
return Task.FromResult(0);
}, null);
for (int i = 0; i < 4000; i++)
{
collection.Push(1, i);
}
for (int i = 0; i < 10010; i++)
{
collection.Push(2, i);
}
Thread.Sleep(10000);
var tcs = new CancellationTokenSource();
// Run a task so that we can cancel from another thread.
Task.Run(() =>
{
if (Console.ReadKey().KeyChar == 'c')
tcs.Cancel();
Console.WriteLine("press any key to exit");
});
var randomLocal = new ThreadLocal<Random>(() => new Random());
var t1 = Task.Run(() =>
{
ParallelOptions po1 = new ParallelOptions();
po1.CancellationToken = tcs.Token;
Parallel.For(10001, 100000, po1, (i, state) =>
{
if (po1.CancellationToken.IsCancellationRequested)
{
state.Stop();
}
//Thread.Sleep(randomLocal.Value.Next(1, 10000));
var slot = randomLocal.Value.Next(0, 9);
collection.Push(slot, i);
});
});
var t2 = Task.Run(() =>
{
ParallelOptions po2 = new ParallelOptions();
po2.CancellationToken = tcs.Token;
Parallel.For(100001, 200000, (i, state) =>
{
if (po2.CancellationToken.IsCancellationRequested)
{
state.Stop();
}
Thread.Sleep(randomLocal.Value.Next(1, 1000));
var slot = randomLocal.Value.Next(0, 9);
collection.Push(slot, i);
});
});
try
{
Task.WaitAll(t1, t2);
}
catch (AggregateException)
{
}
Console.WriteLine("Hit enter to completed");
Console.ReadLine();
Console.WriteLine("Completting");
collection.Complete(drain: true).GetAwaiter().GetResult();
Console.WriteLine("Completed");
Console.ReadLine();
}
}
class MultiProducerConcurrentDispatcher<TItem>
{
private readonly int batchSize;
private ConcurrentQueue<TItem>[] queues;
private TimeSpan pushInterval;
private Func<List<TItem>, int, object, CancellationToken, Task> pump;
private Task timer;
private CancellationTokenSource tokenSource = new CancellationTokenSource();
private AsyncAutoResetEvent syncEvent = new AsyncAutoResetEvent(false);
private bool started;
private object state;
private int maxConcurrency;
private long numberOfPushedItems;
private int numberOfSlots;
public MultiProducerConcurrentDispatcher(int batchSize, TimeSpan pushInterval, int maxConcurrency, int numberOfSlots)
{
this.maxConcurrency = maxConcurrency;
this.pushInterval = pushInterval;
this.batchSize = batchSize;
this.numberOfSlots = numberOfSlots;
queues = new ConcurrentQueue<TItem>[numberOfSlots];
for (int i = 0; i < numberOfSlots; i++)
{
queues[i] = new ConcurrentQueue<TItem>();
}
}
public void Push(int slotNumber, TItem item)
{
queues[slotNumber].Enqueue(item);
var incrementedCounter = Interlocked.Increment(ref numberOfPushedItems);
if (incrementedCounter > batchSize)
{
syncEvent.Set();
}
}
public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump)
{
Start(pump, null);
}
public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state)
{
if (started)
{
throw new InvalidOperationException("Already started");
}
timer = Task.Run(TimerLoop);
this.pump = pump;
this.state = state;
}
public async Task Complete(bool drain = true)
{
tokenSource.Cancel();
await timer.ConfigureAwait(false);
if (drain)
{
do
{
await PushInBatches().ConfigureAwait(false);
} while (Interlocked.Read(ref numberOfPushedItems) > 0);
}
}
private async Task TimerLoop()
{
var token = tokenSource.Token;
while (!tokenSource.IsCancellationRequested)
{
try
{
await Task.WhenAny(Task.Delay(pushInterval, token), syncEvent.WaitAsync(token)).ConfigureAwait(false);
await PushInBatches().ConfigureAwait(false);
}
catch (Exception)
{
}
}
}
private Task PushInBatches()
{
if (Interlocked.Read(ref numberOfPushedItems) == 0)
{
return Task.FromResult(0);
}
var tasks = new List<Task>(maxConcurrency * numberOfSlots);
for (int i = 0; i < numberOfSlots; i++)
{
var queue = queues[i];
PushInBatchesUpToConcurrencyPer(queue, i, tasks);
}
return Task.WhenAll(tasks);
}
private void PushInBatchesUpToConcurrencyPer(ConcurrentQueue<TItem> queue, int currentSlotNumber, List<Task> tasks)
{
int numberOfItems;
var concurrency = 1;
do
{
numberOfItems = 0;
List<TItem> items = null;
for (int i = 0; i < batchSize; i++)
{
TItem item;
if (!queue.TryDequeue(out item))
{
break;
}
// late allocate
if (items == null)
{
items = new List<TItem>(batchSize);
}
items.Add(item);
numberOfItems++;
}
if (numberOfItems <= 0)
{
return;
}
Interlocked.Add(ref numberOfPushedItems, -numberOfItems);
concurrency++;
tasks.Add(pump(items, currentSlotNumber, state, tokenSource.Token));
} while (numberOfItems == batchSize && concurrency <= maxConcurrency);
}
}
/// <summary>
/// Inspired by http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx
/// </summary>
class AsyncAutoResetEvent
{
private volatile TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
public AsyncAutoResetEvent(bool initialState)
{
if (initialState)
{
tcs.SetResult(true);
}
}
public Task WaitAsync()
{
return WaitAsync(CancellationToken.None);
}
public async Task WaitAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
try
{
using (cancellationToken.Register(() => tcs.TrySetCanceled()))
{
await tcs.Task.ConfigureAwait(false);
}
}
finally
{
Reset();
}
}
public void Set()
{
tcs.TrySetResult(true);
}
public void Reset()
{
var sw = new SpinWait();
do
{
var tcs1 = tcs;
if (!tcs1.Task.IsCompleted)
return;
var taskCompletionSource = new TaskCompletionSource<bool>();
#pragma warning disable 420
if (Interlocked.CompareExchange(ref tcs, taskCompletionSource, tcs1) == tcs1)
#pragma warning restore 420
return;
sw.SpinOnce();
} while (true);
}
}
}
@SeanFeldman
Copy link

started is always false. What's the purpose if this member field if it's always false?

@SeanFeldman
Copy link

PushInBatchesUpToConcurrencyPer() is iterating over concurrent collection performing for (int i = 0; i < batchSize; i++) for each slot, times the maxConcurrency. Is that better than having to pop a range?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment