Skip to content

Instantly share code, notes, and snippets.

@default-writer
Created March 27, 2019 10:08
Show Gist options
  • Save default-writer/20021e2ede4ea29bb8f5c77d906e3463 to your computer and use it in GitHub Desktop.
Save default-writer/20021e2ede4ea29bb8f5c77d906e3463 to your computer and use it in GitHub Desktop.
Queued ArrayPool<T> memory strings
public class BufferSegment : IDisposable
{
public BufferSegment(int size) => Buffer = ArrayPool<byte>.Shared.Rent(size);
public byte[] Buffer { get; }
public int ReadBytes { get; set; }
public int WrittenBytes { get; set; }
public int AvailableBytes => WrittenBytes - ReadBytes;
public void Dispose() => ArrayPool<byte>.Shared.Return(Buffer);
}
public class LockFreeQueue<T>
{
private readonly LockFreeStack<T> _tail = new LockFreeStack<T>();
private readonly LockFreeStack<T> _head = new LockFreeStack<T>();
#region Consumer
/// <summary>
/// Used only for reading by consumer
/// </summary>
/// <param name="result"></param>
/// <returns></returns>
public bool Pop(out T result)
{
result = default;
// pop and return the element from head
if (_head.Pop(out result))
{
return true;
}
// if head is empty, pop all elements from tail to head till tail is empty
_tail.Move(_head);
return false;
}
#endregion
#region Producer
/// <summary>
/// Used only for writing by producer
/// </summary>
/// <param name="item"></param>
public void Push(T item)
{
_tail.Push(item);
}
#endregion
public bool IsEmpty() => _head.IsEmpty() && _tail.IsEmpty();
}
public class LockFreeStack<T>
{
private volatile Node _node;
class Node
{
private static volatile int _id;
public int Id { get; } = _id++;
public Node Next;
public T Value;
}
public void Push(T item)
{
var spin = new SpinWait();
var node = new Node {Value = item};
while (true)
{
var head = this._node;
node.Next = head;
if (Interlocked.CompareExchange(ref this._node, node, head) == head)
{
if (head != null && head.Id + 1 != node.Id)
throw new LockFreeStackException(node.Id);
break;
}
spin.SpinOnce();
}
}
public bool Pop(out T result)
{
result = default;
var spin = new SpinWait();
while (true)
{
var head = _node;
if (head == null)
{
return false;
}
if (Interlocked.CompareExchange(ref _node, head.Next, head) == head)
{
result = head.Value;
return true;
}
spin.SpinOnce();
}
}
public void Move(LockFreeStack<T> stack)
{
while (Pop(out Node node))
{
stack.Push(node);
}
}
private bool Pop(out Node node)
{
node = default;
var spin = new SpinWait();
while (true)
{
var head = this._node;
if (head == null)
{
return false;
}
if (Interlocked.CompareExchange(ref this._node, head.Next, head) == head)
{
node = head;
return true;
}
spin.SpinOnce();
}
}
private void Push(Node node)
{
var spin = new SpinWait();
while (true)
{
var head = this._node;
node.Next = head;
if (Interlocked.CompareExchange(ref this._node, node, head) == head)
{
if (head != null && head.Id - 1 != node.Id)
throw new LockFreeStackException(node.Id);
break;
}
spin.SpinOnce();
}
}
public bool IsEmpty() =>_node == null;
}
@default-writer
Copy link
Author

        public unsafe static void Main()
        {
            const int Iteration = 10000;
            int counter = 0;

            var done = new SemaphoreSlim(0);

            for (int i = 0; i < Iteration; i++)
            {
                Task.Run(() =>
                {
                    int workerCoreId = Thread.GetCurrentProcessorId();
                    var overlapped = new Overlapped();
                    var nativeOverlapped = overlapped.Pack((code, bytes, overlap) =>
                    {
                        if (Thread.GetCurrentProcessorId() != workerCoreId)
                            Interlocked.Increment(ref counter);

                        Overlapped.Free(overlap);
                        done.Release();
                    }, null);

                    ThreadPool.UnsafeQueueNativeOverlapped(nativeOverlapped);
                });
            }

            for (int i = 0; i < Iteration; i++)
                done.Wait();

            Console.WriteLine($"IOCP callback on different core: {counter}/{Iteration}");
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment