Created
March 5, 2012 21:04
-
-
Save vbedegi/1981082 to your computer and use it in GitHub Desktop.
A transactional, concurrent queue for .Net
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var queue = new TransactionalConcurrentQueue<int>(); | |
using (var scope = new TransactionScope()) | |
{ | |
queue.Enqueue(1); | |
// won't happen | |
queue.Dequeue(x => Console.WriteLine(x)); | |
scope.Complete(); | |
} | |
// will happen | |
queue.Dequeue(x => Console.WriteLine(x)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var queue = new TransactionalConcurrentQueue<int>(); | |
queue.Enqueue(1); | |
using (var scope = new TransactionScope()) | |
{ | |
// will happen | |
queue.Dequeue(x => Console.WriteLine(x)); | |
// no scope.Complete() --> rollback! | |
} | |
// will happen, again | |
queue.Dequeue(x => Console.WriteLine(x)); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TransactionalConcurrentQueue<T> | |
{ | |
abstract class QueueOperation : IEnlistmentNotification | |
{ | |
private readonly ConcurrentQueue<T> queue; | |
private readonly T item; | |
protected QueueOperation(ConcurrentQueue<T> queue, T item) | |
{ | |
this.queue = queue; | |
this.item = item; | |
} | |
public void Prepare(PreparingEnlistment preparingEnlistment) | |
{ | |
preparingEnlistment.Prepared(); | |
} | |
public virtual void Commit(Enlistment enlistment) | |
{ | |
} | |
public virtual void Rollback(Enlistment enlistment) | |
{ | |
} | |
public void InDoubt(Enlistment enlistment) | |
{ | |
} | |
protected ConcurrentQueue<T> Queue { get { return queue; } } | |
protected T Item { get { return item; } } | |
} | |
class EnqueueOperation : QueueOperation | |
{ | |
public EnqueueOperation(ConcurrentQueue<T> queue, T item) | |
: base(queue, item) | |
{ | |
} | |
public override void Commit(Enlistment enlistment) | |
{ | |
Queue.Enqueue(Item); | |
} | |
} | |
class DequeueOperation : QueueOperation | |
{ | |
public DequeueOperation(ConcurrentQueue<T> queue, T item) | |
: base(queue, item) | |
{ | |
} | |
public override void Rollback(Enlistment enlistment) | |
{ | |
Queue.Enqueue(Item); | |
} | |
} | |
private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); | |
public void Enqueue(T item) | |
{ | |
var tx = Transaction.Current; | |
if (tx == null) | |
queue.Enqueue(item); | |
else | |
tx.EnlistVolatile(new EnqueueOperation(queue, item), EnlistmentOptions.None); | |
} | |
public bool Dequeue(Action<T> action) | |
{ | |
T item; | |
if (!queue.TryDequeue(out item)) | |
return false; | |
var tx = Transaction.Current; | |
if (tx != null) | |
tx.EnlistVolatile(new DequeueOperation(queue, item), EnlistmentOptions.None); | |
action(item); | |
return true; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment