Last active
February 19, 2021 08:35
-
-
Save jdaigle/cc1449a99d4e6448672d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public QueueEntry Enqueue(object item) | |
{ | |
var entry = new QueueEntry(item); | |
while (true) | |
{ | |
var prevTail = this.tail; | |
var prevTailNext = prevTail.Next; | |
if (prevTail == this.tail) | |
{ | |
if (prevTailNext == null) | |
{ | |
entry.SeqNum = prevTail.SeqNum + 1; | |
if (prevTail.TrySetNext(entry, null)) // compare and swap, returns if successful | |
{ | |
// it shouldn't be possible for two threads to get here at the same time | |
Interlocked.CompareExchange(ref this.tail, entry, this.tail); // compare and swap | |
return entry; | |
} | |
} | |
else | |
{ | |
// the tail has been updated before we had a chance. so compare and swap | |
// if it fails... that's okay | |
Interlocked.CompareExchange(ref this.tail, prevTailNext, prevTail); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private readonly AutoResetEvent messageDeliveryPumpSignal = new AutoResetEvent(false); | |
private readonly RegisteredWaitHandle messageDeliveryPumpSignalWaitHandle; | |
private volatile int messageDeliveryPumpLoopIsRunning = 0; // 0 = false, 1 = true | |
public ConcurrentQueue() | |
{ | |
... | |
messageDeliveryPumpSignalWaitHandle | |
= ThreadPool.RegisterWaitForSingleObject(messageDeliveryPumpSignal | |
, (state, timedOut) => MessageDeliveryPumpLoop() | |
, null, -1, false); | |
messageDeliveryPumpSignal.Set(); // signal to start pump right away | |
} | |
public void Enqueue() | |
{ | |
... | |
messageDeliveryPumpSignal.Set(); | |
} | |
private void MessageDeliveryPumpLoop() | |
{ | |
if (Interlocked.CompareExchange(ref messageDeliveryPumpLoopIsRunning, 1, 0) != 0) | |
{ | |
return; // if the pump is already running, so we don't want to execute again | |
} | |
trace.Debug("MessageDeliveryPumpLoop Started."); | |
try | |
{ | |
while (true) | |
{ | |
var head = queueList.Head; | |
var next = head.GetNextValidEntry(); | |
if (next == null) | |
{ | |
return; // no more work to do | |
} | |
lock (consumers) // lock to prevent adding new consumers | |
{ | |
if (consumers.IsEmpty) | |
{ | |
return; | |
} | |
} | |
// Loop over consumers and attempt delivery | |
var consumersToIterate = new List<Consumer>(consumers); | |
foreach (var consumer in consumersToIterate) | |
{ | |
consumer.TryDelivery(head); | |
} | |
} | |
} | |
finally | |
{ | |
Interlocked.Exchange(ref messageDeliveryPumpLoopIsRunning, 0); // indicate that the pump has stopped | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment