Last active
March 9, 2016 14:05
-
-
Save AndyPook/cc91f90a20c807e39573 to your computer and use it in GitHub Desktop.
Allows for separation of a producer (eg a network receiver) from a potentially more expensive consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class QueueHandler<T> | |
{ | |
public static QueueHandler<T> Start(Action<T> handler, CancellationToken? token = null, ParallelOptions options = null) | |
{ | |
var q = new QueueHandler<T>(handler, token, options); | |
q.Start(); | |
return q; | |
} | |
public QueueHandler(Action<T> handler, CancellationToken? token = null, ParallelOptions options = null) | |
{ | |
if (handler == null) | |
throw new ArgumentNullException(nameof(handler)); | |
this.handler = handler; | |
this.token = token ?? CancellationToken.None; | |
this.options = options ?? new ParallelOptions(); | |
} | |
private readonly Action<T> handler; | |
private readonly CancellationToken token; | |
private readonly ParallelOptions options; | |
private readonly BlockingCollection<T> queue = new BlockingCollection<T>(); | |
public void Start() | |
{ | |
Task.Factory | |
.StartNew(Loop, token, TaskCreationOptions.LongRunning, TaskScheduler.Default) | |
.ContinueWith(t => | |
{ | |
if (t.Exception != null) | |
Trace.TraceError(t.Exception.Message); | |
Trace.TraceInformation("HandlerLoop finished"); | |
}, token); | |
} | |
private void Loop() | |
{ | |
var partitioner = Partitioner.Create(queue.GetConsumingEnumerable(token), EnumerablePartitionerOptions.None); | |
Parallel.ForEach(partitioner, options, handler); | |
} | |
public void Add(T item) | |
{ | |
queue.Add(item, token); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment