Skip to content

Instantly share code, notes, and snippets.

@jdaigle
Last active February 19, 2021 08:35
Show Gist options
  • Save jdaigle/cc1449a99d4e6448672d to your computer and use it in GitHub Desktop.
Save jdaigle/cc1449a99d4e6448672d to your computer and use it in GitHub Desktop.
public QueueEntry Enqueue(object item)
{
var entry = new QueueEntry(item);
while (true)
{
var prevTail = this.tail;
var prevTailNext = prevTail.Next;
if (prevTail == this.tail)
{
if (prevTailNext == null)
{
entry.SeqNum = prevTail.SeqNum + 1;
if (prevTail.TrySetNext(entry, null)) // compare and swap, returns if successful
{
// it shouldn't be possible for two threads to get here at the same time
Interlocked.CompareExchange(ref this.tail, entry, this.tail); // compare and swap
return entry;
}
}
else
{
// the tail has been updated before we had a chance. so compare and swap
// if it fails... that's okay
Interlocked.CompareExchange(ref this.tail, prevTailNext, prevTail);
}
}
}
}
private readonly AutoResetEvent messageDeliveryPumpSignal = new AutoResetEvent(false);
private readonly RegisteredWaitHandle messageDeliveryPumpSignalWaitHandle;
private volatile int messageDeliveryPumpLoopIsRunning = 0; // 0 = false, 1 = true
public ConcurrentQueue()
{
...
messageDeliveryPumpSignalWaitHandle
= ThreadPool.RegisterWaitForSingleObject(messageDeliveryPumpSignal
, (state, timedOut) => MessageDeliveryPumpLoop()
, null, -1, false);
messageDeliveryPumpSignal.Set(); // signal to start pump right away
}
public void Enqueue()
{
...
messageDeliveryPumpSignal.Set();
}
private void MessageDeliveryPumpLoop()
{
if (Interlocked.CompareExchange(ref messageDeliveryPumpLoopIsRunning, 1, 0) != 0)
{
return; // if the pump is already running, so we don't want to execute again
}
trace.Debug("MessageDeliveryPumpLoop Started.");
try
{
while (true)
{
var head = queueList.Head;
var next = head.GetNextValidEntry();
if (next == null)
{
return; // no more work to do
}
lock (consumers) // lock to prevent adding new consumers
{
if (consumers.IsEmpty)
{
return;
}
}
// Loop over consumers and attempt delivery
var consumersToIterate = new List<Consumer>(consumers);
foreach (var consumer in consumersToIterate)
{
consumer.TryDelivery(head);
}
}
}
finally
{
Interlocked.Exchange(ref messageDeliveryPumpLoopIsRunning, 0); // indicate that the pump has stopped
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment