Created
October 17, 2016 15:44
-
-
Save danielmarbach/2ef214b178366d0fa700400cd414f13f to your computer and use it in GitHub Desktop.
MultiProducerConcurrentDispatcher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace PushCollection | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var collection = new MultiProducerConcurrentDispatcher<int>(batchSize: 10, pushInterval: TimeSpan.FromSeconds(5), maxConcurrency: 2, numberOfSlots: 10); | |
for (int i = 0; i < 65; i++) | |
{ | |
collection.Push(slotNumber: 1, item : i); | |
} | |
collection.Start((items, slot, state, token) => | |
{ | |
Console.WriteLine($"New Batch on { Thread.CurrentThread.ManagedThreadId } for slot { slot }"); | |
for (int i = 0; i < items.Count; i++) | |
{ | |
Console.Write($"{ items[i]} "); | |
} | |
Console.WriteLine(); | |
return Task.FromResult(0); | |
}, null); | |
for (int i = 0; i < 4000; i++) | |
{ | |
collection.Push(1, i); | |
} | |
for (int i = 0; i < 10010; i++) | |
{ | |
collection.Push(2, i); | |
} | |
Thread.Sleep(10000); | |
var tcs = new CancellationTokenSource(); | |
// Run a task so that we can cancel from another thread. | |
Task.Run(() => | |
{ | |
if (Console.ReadKey().KeyChar == 'c') | |
tcs.Cancel(); | |
Console.WriteLine("press any key to exit"); | |
}); | |
var randomLocal = new ThreadLocal<Random>(() => new Random()); | |
var t1 = Task.Run(() => | |
{ | |
ParallelOptions po1 = new ParallelOptions(); | |
po1.CancellationToken = tcs.Token; | |
Parallel.For(10001, 100000, po1, (i, state) => | |
{ | |
if (po1.CancellationToken.IsCancellationRequested) | |
{ | |
state.Stop(); | |
} | |
//Thread.Sleep(randomLocal.Value.Next(1, 10000)); | |
var slot = randomLocal.Value.Next(0, 9); | |
collection.Push(slot, i); | |
}); | |
}); | |
var t2 = Task.Run(() => | |
{ | |
ParallelOptions po2 = new ParallelOptions(); | |
po2.CancellationToken = tcs.Token; | |
Parallel.For(100001, 200000, (i, state) => | |
{ | |
if (po2.CancellationToken.IsCancellationRequested) | |
{ | |
state.Stop(); | |
} | |
Thread.Sleep(randomLocal.Value.Next(1, 1000)); | |
var slot = randomLocal.Value.Next(0, 9); | |
collection.Push(slot, i); | |
}); | |
}); | |
try | |
{ | |
Task.WaitAll(t1, t2); | |
} | |
catch (AggregateException) | |
{ | |
} | |
Console.WriteLine("Hit enter to completed"); | |
Console.ReadLine(); | |
Console.WriteLine("Completting"); | |
collection.Complete(drain: true).GetAwaiter().GetResult(); | |
Console.WriteLine("Completed"); | |
Console.ReadLine(); | |
} | |
} | |
class MultiProducerConcurrentDispatcher<TItem> | |
{ | |
private readonly int batchSize; | |
private ConcurrentQueue<TItem>[] queues; | |
private TimeSpan pushInterval; | |
private Func<List<TItem>, int, object, CancellationToken, Task> pump; | |
private Task timer; | |
private CancellationTokenSource tokenSource = new CancellationTokenSource(); | |
private AsyncAutoResetEvent syncEvent = new AsyncAutoResetEvent(false); | |
private bool started; | |
private object state; | |
private int maxConcurrency; | |
private long numberOfPushedItems; | |
private int numberOfSlots; | |
public MultiProducerConcurrentDispatcher(int batchSize, TimeSpan pushInterval, int maxConcurrency, int numberOfSlots) | |
{ | |
this.maxConcurrency = maxConcurrency; | |
this.pushInterval = pushInterval; | |
this.batchSize = batchSize; | |
this.numberOfSlots = numberOfSlots; | |
queues = new ConcurrentQueue<TItem>[numberOfSlots]; | |
for (int i = 0; i < numberOfSlots; i++) | |
{ | |
queues[i] = new ConcurrentQueue<TItem>(); | |
} | |
} | |
public void Push(int slotNumber, TItem item) | |
{ | |
queues[slotNumber].Enqueue(item); | |
var incrementedCounter = Interlocked.Increment(ref numberOfPushedItems); | |
if (incrementedCounter > batchSize) | |
{ | |
syncEvent.Set(); | |
} | |
} | |
public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump) | |
{ | |
Start(pump, null); | |
} | |
public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state) | |
{ | |
if (started) | |
{ | |
throw new InvalidOperationException("Already started"); | |
} | |
timer = Task.Run(TimerLoop); | |
this.pump = pump; | |
this.state = state; | |
} | |
public async Task Complete(bool drain = true) | |
{ | |
tokenSource.Cancel(); | |
await timer.ConfigureAwait(false); | |
if (drain) | |
{ | |
do | |
{ | |
await PushInBatches().ConfigureAwait(false); | |
} while (Interlocked.Read(ref numberOfPushedItems) > 0); | |
} | |
} | |
private async Task TimerLoop() | |
{ | |
var token = tokenSource.Token; | |
while (!tokenSource.IsCancellationRequested) | |
{ | |
try | |
{ | |
await Task.WhenAny(Task.Delay(pushInterval, token), syncEvent.WaitAsync(token)).ConfigureAwait(false); | |
await PushInBatches().ConfigureAwait(false); | |
} | |
catch (Exception) | |
{ | |
} | |
} | |
} | |
private Task PushInBatches() | |
{ | |
if (Interlocked.Read(ref numberOfPushedItems) == 0) | |
{ | |
return Task.FromResult(0); | |
} | |
var tasks = new List<Task>(maxConcurrency * numberOfSlots); | |
for (int i = 0; i < numberOfSlots; i++) | |
{ | |
var queue = queues[i]; | |
PushInBatchesUpToConcurrencyPer(queue, i, tasks); | |
} | |
return Task.WhenAll(tasks); | |
} | |
private void PushInBatchesUpToConcurrencyPer(ConcurrentQueue<TItem> queue, int currentSlotNumber, List<Task> tasks) | |
{ | |
int numberOfItems; | |
var concurrency = 1; | |
do | |
{ | |
numberOfItems = 0; | |
List<TItem> items = null; | |
for (int i = 0; i < batchSize; i++) | |
{ | |
TItem item; | |
if (!queue.TryDequeue(out item)) | |
{ | |
break; | |
} | |
// late allocate | |
if (items == null) | |
{ | |
items = new List<TItem>(batchSize); | |
} | |
items.Add(item); | |
numberOfItems++; | |
} | |
if (numberOfItems <= 0) | |
{ | |
return; | |
} | |
Interlocked.Add(ref numberOfPushedItems, -numberOfItems); | |
concurrency++; | |
tasks.Add(pump(items, currentSlotNumber, state, tokenSource.Token)); | |
} while (numberOfItems == batchSize && concurrency <= maxConcurrency); | |
} | |
} | |
/// <summary> | |
/// Inspired by http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx | |
/// </summary> | |
class AsyncAutoResetEvent | |
{ | |
private volatile TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>(); | |
public AsyncAutoResetEvent(bool initialState) | |
{ | |
if (initialState) | |
{ | |
tcs.SetResult(true); | |
} | |
} | |
public Task WaitAsync() | |
{ | |
return WaitAsync(CancellationToken.None); | |
} | |
public async Task WaitAsync(CancellationToken cancellationToken) | |
{ | |
if (cancellationToken.IsCancellationRequested) | |
{ | |
return; | |
} | |
try | |
{ | |
using (cancellationToken.Register(() => tcs.TrySetCanceled())) | |
{ | |
await tcs.Task.ConfigureAwait(false); | |
} | |
} | |
finally | |
{ | |
Reset(); | |
} | |
} | |
public void Set() | |
{ | |
tcs.TrySetResult(true); | |
} | |
public void Reset() | |
{ | |
var sw = new SpinWait(); | |
do | |
{ | |
var tcs1 = tcs; | |
if (!tcs1.Task.IsCompleted) | |
return; | |
var taskCompletionSource = new TaskCompletionSource<bool>(); | |
#pragma warning disable 420 | |
if (Interlocked.CompareExchange(ref tcs, taskCompletionSource, tcs1) == tcs1) | |
#pragma warning restore 420 | |
return; | |
sw.SpinOnce(); | |
} while (true); | |
} | |
} | |
} |
PushInBatchesUpToConcurrencyPer()
is iterating over concurrent collection performing for (int i = 0; i < batchSize; i++)
for each slot, times the maxConcurrency
. Is that better than having to pop a range?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
started
is alwaysfalse
. What's the purpose if this member field if it's alwaysfalse
?