Skip to content

Instantly share code, notes, and snippets.

@programmation
Created May 4, 2015 04:17
Show Gist options
  • Save programmation/9282cce5a22f2e4af8af to your computer and use it in GitHub Desktop.
Save programmation/9282cce5a22f2e4af8af to your computer and use it in GitHub Desktop.
TPL-based job processor
// http://stackoverflow.com/a/14933245
class JobProcessor<TInput, TOutput> : IDisposable
{
private readonly Func<TInput, TOutput> m_transform;
// or a custom type instead of Tuple
private readonly
BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>
m_queue =
new BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>();
public JobProcessor(Func<TInput, TOutput> transform)
{
m_transform = transform;
Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
}
private void ProcessQueue()
{
Tuple<TInput, TaskCompletionSource<TOutput>> tuple;
while (m_queue.TryTake(out tuple, Timeout.Infinite))
{
var input = tuple.Item1;
var tcs = tuple.Item2;
try
{
tcs.SetResult(m_transform(input));
}
catch (Exception ex)
{
tcs.SetException(ex);
}
}
}
public Task<TOutput> QueueJob(TInput input)
{
var tcs = new TaskCompletionSource<TOutput>();
m_queue.Add(Tuple.Create(input, tcs));
return tcs.Task;
}
public void Dispose()
{
m_queue.CompleteAdding();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment