Created
March 12, 2014 20:45
-
-
Save anaisbetts/9515910 to your computer and use it in GitHub Desktop.
Async reader/writer lock via abusing ConcurrentExclusiveSchedulerPair
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace GitHub.Helpers | |
{ | |
public sealed class AsyncReaderWriterLock | |
{ | |
bool isShutdown; | |
readonly TaskFactory readScheduler; | |
readonly TaskFactory writeScheduler; | |
public AsyncReaderWriterLock() | |
{ | |
var pair = new ConcurrentExclusiveSchedulerPair(); | |
readScheduler = new TaskFactory( | |
CancellationToken.None, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, pair.ConcurrentScheduler); | |
writeScheduler = new TaskFactory( | |
CancellationToken.None, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, pair.ExclusiveScheduler); | |
} | |
public IObservable<IDisposable> AcquireRead() | |
{ | |
return AcquireOnScheduler(readScheduler); | |
} | |
public IObservable<IDisposable> AcquireWrite() | |
{ | |
return AcquireOnScheduler(writeScheduler); | |
} | |
public IObservable<Unit> Shutdown() | |
{ | |
// NB: Just grab the write lock to shut down | |
var writeFuture = AcquireWrite(); | |
isShutdown = true; | |
return writeFuture.Select(x => { x.Dispose(); return Unit.Default; }); | |
} | |
IObservable<IDisposable> AcquireOnScheduler(TaskFactory sched) | |
{ | |
if (isShutdown) return Observable.Throw<IDisposable>(new ObjectDisposedException("AsyncReaderWriterLock")); | |
var ret = new AsyncSubject<IDisposable>(); | |
var gate = new AsyncSubject<Unit>(); | |
sched.StartNew(async () => | |
{ | |
// NB: At this point we know that we are currently executing on the | |
// scheduler (i.e. if this was the exclusive scheduler, we know that | |
// all the readers have been thrown out) | |
var disp = Disposable.Create(() => { gate.OnNext(Unit.Default); gate.OnCompleted(); }); | |
ret.OnNext(disp); ret.OnCompleted(); | |
// Trashing the returned Disposable will unlock this gate | |
await gate; | |
}); | |
return ret; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment