Skip to content

Instantly share code, notes, and snippets.

@freezy
Created October 22, 2015 21:03
Show Gist options
  • Save freezy/ed0dfd34e970fc235d4f to your computer and use it in GitHub Desktop.
Save freezy/ed0dfd34e970fc235d4f to your computer and use it in GitHub Desktop.
Non-blocking JobQueue and ParallelJobQueue

I need a decent job queue for a c# project, preferably using Reactive Extensions. I came across this excellent post by Andreas Köpf, but this was quite some time ago and obviously for an older version of Rx.

Below you can see my updated code. The (major) changes made are the following:

  • MutableDisposable becomes MultipleAssignmentDisposable

  • Then this block:

    whenJobCompletes.Where(n => n.Kind == NotificationKind.OnError)
       .Select(n => ((Notification<Unit>.OnError)n).Exception)
       .Subscribe(whenJobFails);

    becomes

    whenJobCompletes.Where(n => n.Kind == NotificationKind.OnError)
       .Select(n => n.Exception)
       .Subscribe(_whenJobFails);
    
  • whenJobCompletes.Hide() becomes whenJobCompletes.AsObservable():

    public IObservable<Notification<Unit>> WhenJobCompletes => _whenJobCompletes.AsObservable();
    
  • GroupDisposable becomes CompositeDisposable, so the cancelable in Add() becomes:

    var cancelable = Observable.Create<Unit>(o =>
    new CompositeDisposable(
        job.CompletionHandler.Subscribe(o),     // main job subscription
        job.JobSubscription,
        job.Cancel)
    );
    
  • And finally, the Replace() method doesn't exist anymore, so the subscription at StartJob is replaced by:

    job.JobSubscription.Disposable = jobSubscription;
    

You can see the whole class below. However, for some reason it doesn't seem to work. Running this sample code:

var jobQueue = new ParallelJobQueue(5);

// subscribe to failures
jobQueue.InnerQueue.WhenJobFails.Subscribe(e => Console.WriteLine("Job failed: {0}", e.Message));

// subscribe to empty queue notification
jobQueue.InnerQueue.WhenQueueEmpty.Subscribe(n => Console.WriteLine("Empty!"));

int completed1 = 0, completed2 = 0, errors = 0;     // test counters
foreach (var i in Enumerable.Range(0, 100)) {
	var x = i;
	jobQueue.Add(() => {

		Console.WriteLine("Thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, x);

		if ((Interlocked.Increment(ref completed1) % 10) == 0) {
			// generate test exceptions
			throw new Exception("Text exception " + completed1);
		}

	}).Subscribe(n => Interlocked.Increment(ref completed2), e => Interlocked.Increment(ref errors));
}

jobQueue.InnerQueue.WhenQueueEmpty.Subscribe(_ =>
{
	Console.WriteLine("DONE! Received 1: {0}, 2: {1}, errors: {2}", completed1, completed2, errors);
});

Stops at around 65 and the DONE line is never printed. I'm still new to Rx, so maybe @andreaskoepf you have an idea? I'm sorry I registered at Microsoft specifically to reply to your thread, but somehow the handycapped software over there won't let me post.

/// <summary>
/// A class which does not automatically start queued jobs but
/// requires a user of the class to explicitely start new jobs by calling
/// StartNext() or StartUpTo(maxConcurrentCount).
/// </summary>
/// <a href="https://social.msdn.microsoft.com/Forums/en-US/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7/nonblocking-jobqueue-and-paralleljobqueue-sample?forum=rx">Source</a>
public class JobQueue
{
public struct Job
{
public Func<IObservable<Unit>> AsyncStart;
public AsyncSubject<Unit> CompletionHandler;
public BooleanDisposable Cancel;
public MultipleAssignmentDisposable JobSubscription;
}
int _runningCount;
readonly ConcurrentQueue<Job> _queue = new ConcurrentQueue<Job>();
readonly Subject<Unit> _whenQueueEmpty = new Subject<Unit>();
readonly Subject<Notification<Unit>> _whenJobCompletes = new Subject<Notification<Unit>>();
readonly Subject<Exception> _whenJobFails = new Subject<Exception>();
public IObservable<Notification<Unit>> WhenJobCompletes => _whenJobCompletes.AsObservable();
public IObservable<Unit> WhenQueueEmpty => _whenQueueEmpty.AsObservable();
public IObservable<Exception> WhenJobFails => _whenJobFails;
public int RunningCount => _runningCount;
public int QueuedCount => _queue.Count;
public JobQueue()
{
// whenJobFails subscription
_whenJobCompletes.Where(n => n.Kind == NotificationKind.OnError)
.Select(n => n.Exception)
.Subscribe(_whenJobFails);
// whenQueueEmpty subscription
_whenJobCompletes.Synchronize(this)
.Where(n => _queue.Count == 0 && _runningCount == 0)
.Select(n => new Unit()).Subscribe(_whenQueueEmpty);
}
public IObservable<Unit> Add(Action action)
{
return Add(action.ToAsync());
}
public IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart)
{
var job = new Job() {
AsyncStart = asyncStart,
CompletionHandler = new AsyncSubject<Unit>(),
Cancel = new BooleanDisposable(),
JobSubscription = new MultipleAssignmentDisposable()
};
var cancelable = Observable.Create<Unit>(o =>
new CompositeDisposable(
job.CompletionHandler.Subscribe(o), // main job subscription
job.JobSubscription,
job.Cancel)
);
job.CompletionHandler
.Materialize()
.Where(n => n.Kind == NotificationKind.OnCompleted || n.Kind == NotificationKind.OnError)
.Subscribe(_whenJobCompletes.OnNext); // pass on errors and completions
_queue.Enqueue(job);
return cancelable;
}
public int StartUpTo(int maxConcurrentlyRunning)
{
var started = 0;
for (;;) {
for (;;) {
int running;
do // test and increment with compare and swap
{
running = _runningCount;
if (running >= maxConcurrentlyRunning)
return started;
} while (Interlocked.CompareExchange(ref _runningCount, running + 1, running) != running);
Job job;
if (TryDequeNextJob(out job)) {
StartJob(job);
++started;
} else {
// dequeing job failed but we already incremented running count
Interlocked.Decrement(ref _runningCount);
// ensure that no other thread queued an item and did not start it
// because the running count was too high
if (_queue.Count == 0) {
// if there is nothing in the queue after the decrement
// we can safely return
return started;
}
}
}
}
}
public bool StartNext()
{
Job job;
if (TryDequeNextJob(out job)) {
Interlocked.Increment(ref _runningCount);
StartJob(job);
return true;
}
return false;
}
bool TryDequeNextJob(out Job job)
{
do {
if (!_queue.TryDequeue(out job))
return false;
} while (job.Cancel.IsDisposed);
return true;
}
void StartJob(Job job)
{
try {
var jobSubscription =
job.AsyncStart().Subscribe(
u => OnJobCompleted(job, null),
e => OnJobCompleted(job, e)
);
job.JobSubscription.Disposable = jobSubscription;
if (job.Cancel.IsDisposed)
job.JobSubscription.Dispose();
} catch (Exception ex) {
OnJobCompleted(job, ex);
throw;
}
}
public void CancelOutstandingJobs()
{
Job job;
while (TryDequeNextJob(out job))
job.CompletionHandler.OnError(new OperationCanceledException());
}
void OnJobCompleted(Job job, Exception error)
{
Interlocked.Decrement(ref _runningCount);
if (error == null)
job.CompletionHandler.OnNext(new Unit());
else
job.CompletionHandler.OnError(error);
}
}
/// <summary>
/// A class which uses the JobQueue internally and starts new jobs as soon
/// as the number of currently running jobs drops below a given threshold.
/// </summary>
public class ParallelJobQueue
{
readonly int _maxConcurrent;
public ParallelJobQueue(int maxConcurrent)
{
if (maxConcurrent < 1)
throw new ArgumentOutOfRangeException(nameof(maxConcurrent));
_maxConcurrent = maxConcurrent;
InnerQueue = new JobQueue();
InnerQueue.WhenJobCompletes.Subscribe(OnJobCompleted);
}
public JobQueue InnerQueue { get; private set; }
public IObservable<Unit> Add(Action action)
{
return Add(action.ToAsync());
}
public IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart)
{
var whenCompletes = InnerQueue.Add(asyncStart);
InnerQueue.StartUpTo(_maxConcurrent);
return whenCompletes;
}
/// <summary>
/// Stops starting new jobs of the old queue by replacing
/// the inner queue with an empty new one.
/// </summary>
public void Stop()
{
var oldQueue = InnerQueue;
InnerQueue = new JobQueue();
oldQueue.CancelOutstandingJobs();
}
void OnJobCompleted(Notification<Unit> notification)
{
InnerQueue.StartUpTo(_maxConcurrent);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment