Created
July 4, 2023 17:20
-
-
Save TheBuzzSaw/19f16c67bffa5557c650db68d27541a9 to your computer and use it in GitHub Desktop.
Pitch for "unmanaged queue"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Runtime.CompilerServices; | |
using System.Runtime.InteropServices; | |
namespace Piranha.Jawbone; | |
file static class UnmanagedQueueExtensions | |
{ | |
public static Span<byte> Write<T>( | |
this Span<byte> destination, | |
in T source | |
) where T : unmanaged | |
{ | |
MemoryMarshal.AsBytes( | |
new ReadOnlySpan<T>(source)).CopyTo(destination); | |
return destination[Unsafe.SizeOf<T>()..]; | |
} | |
public static ReadOnlySpan<byte> Read<T>( | |
this ReadOnlySpan<byte> source, | |
out T destination | |
) where T : unmanaged | |
{ | |
Unsafe.SkipInit(out destination); | |
source[..Unsafe.SizeOf<T>()].CopyTo( | |
MemoryMarshal.AsBytes( | |
new Span<T>(ref destination))); | |
return source[Unsafe.SizeOf<T>()..]; | |
} | |
} | |
public sealed class UnmanagedQueue | |
{ | |
private readonly object _lock = new(); | |
private readonly Dictionary<Type, int> _blobHandlerIndicesByType = new(); | |
private readonly List<BlobHandler> _blobHandlers = new(); | |
private byte[] _bytes = Array.Empty<byte>(); | |
private int _begin = 0; | |
private int _length = 0; | |
public void Register<T>(Action<T> action) where T : unmanaged | |
{ | |
var blobHandler = new BlobHandler<T>(action); | |
lock (_lock) | |
{ | |
if (_blobHandlerIndicesByType.TryGetValue(typeof(T), out var index)) | |
{ | |
_blobHandlers[index] = blobHandler; | |
} | |
else | |
{ | |
index = _blobHandlers.Count; | |
_blobHandlers.Add(blobHandler); | |
_blobHandlerIndicesByType.Add(typeof(T), index); | |
} | |
} | |
} | |
public bool TryEnqueue<T>(T item) where T : unmanaged | |
{ | |
lock (_lock) | |
{ | |
if (!_blobHandlerIndicesByType.TryGetValue(typeof(T), out var index)) | |
return false; | |
var bytes = Allocate(Unsafe.SizeOf<int>() + Unsafe.SizeOf<T>()); | |
bytes.Write(index).Write(item); | |
return true; | |
} | |
} | |
public bool TryDequeue() | |
{ | |
lock (_lock) | |
{ | |
if (_length == 0) | |
return false; | |
ReadOnlySpan<byte> bytes = _bytes.AsSpan(_begin); | |
var blob = bytes.Read(out int index); | |
var handler = _blobHandlers[index]; | |
handler.Handle(blob[..handler.Size]); | |
var sizeOfBlobWithHeader = Unsafe.SizeOf<int>() + handler.Size; | |
_length -= sizeOfBlobWithHeader; | |
_begin = _length == 0 ? 0 : (_begin + sizeOfBlobWithHeader) % _bytes.Length; | |
return true; | |
} | |
} | |
public void DequeueAll() | |
{ | |
while (TryDequeue()) | |
; | |
} | |
private Span<byte> Allocate(int size) | |
{ | |
var available = _bytes.Length - _length; | |
var end = _bytes.Length == 0 ? 0 : (_begin + _length) % _bytes.Length; | |
if (available < size) | |
{ | |
var bytes = new byte[Math.Max((_length + size) * 4, size * 16)]; | |
if (0 < _length) | |
{ | |
if (_begin < end) | |
{ | |
_bytes.AsSpan(_begin, _length).CopyTo(bytes); | |
} | |
else | |
{ | |
_bytes.AsSpan(_begin).CopyTo(bytes); | |
_bytes.AsSpan(0, end).CopyTo(bytes.AsSpan(_bytes.Length - _begin)); | |
} | |
} | |
_bytes = bytes; | |
_begin = 0; | |
end = _length; | |
} | |
else if (_begin < end) | |
{ | |
var availableBytesAtEnd = _bytes.Length - end; | |
if (availableBytesAtEnd < size) | |
{ | |
// Move to the far end of the array to minimize | |
// the amount of memory overlap. | |
if (_begin < availableBytesAtEnd) | |
{ | |
var begin = _bytes.Length - _length; | |
_bytes.AsSpan(_begin, _length).CopyTo(_bytes.AsSpan(begin)); | |
_begin = begin; | |
end = 0; | |
} | |
else | |
{ | |
_bytes.AsSpan(_begin, _length).CopyTo(_bytes); | |
_begin = 0; | |
end = _length; | |
} | |
} | |
} | |
var result = _bytes.AsSpan(end, size); | |
_length += size; | |
return result; | |
} | |
private abstract class BlobHandler | |
{ | |
public int Size { get; protected init; } | |
public abstract void Handle(ReadOnlySpan<byte> blob); | |
} | |
private sealed class BlobHandler<T> : BlobHandler where T : unmanaged | |
{ | |
private readonly Action<T> _action; | |
public BlobHandler(Action<T> action) | |
{ | |
Size = Unsafe.SizeOf<T>(); | |
_action = action; | |
} | |
public override void Handle(ReadOnlySpan<byte> blob) | |
{ | |
var item = default(T); | |
var span = new Span<T>(ref item); | |
var asBytes = MemoryMarshal.AsBytes(span); | |
blob.CopyTo(asBytes); | |
_action.Invoke(item); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Numerics; | |
using Xunit; | |
namespace Piranha.Jawbone.Test; | |
public class UnmanagedQueueTest | |
{ | |
[Fact] | |
public void QueueCallsHandlers() | |
{ | |
var intMessage = 1337; | |
var matrixMessage = Matrix4x4.Identity; | |
var dateTimeMessage = new DateTime(2000, 1, 1, 11, 30, 29); | |
var intMessageWasHandled = false; | |
var matrixMessageWasHandled = false; | |
var dateTimeMessageWasHandled = false; | |
var queue = new UnmanagedQueue(); | |
queue.Register<int>(item => | |
{ | |
Assert.Equal(intMessage, item); | |
intMessageWasHandled = true; | |
}); | |
queue.Register<Matrix4x4>(item => | |
{ | |
Assert.Equal(matrixMessage, item); | |
matrixMessageWasHandled = true; | |
}); | |
queue.Register<DateTime>(item => | |
{ | |
Assert.Equal(dateTimeMessage, item); | |
dateTimeMessageWasHandled = true; | |
}); | |
Assert.True(queue.TryEnqueue(intMessage)); | |
Assert.True(queue.TryEnqueue(matrixMessage)); | |
Assert.True(queue.TryEnqueue(dateTimeMessage)); | |
queue.DequeueAll(); | |
Assert.True(intMessageWasHandled); | |
Assert.True(matrixMessageWasHandled); | |
Assert.True(dateTimeMessageWasHandled); | |
} | |
[Fact] | |
public void QueueWorks() | |
{ | |
var queue = new UnmanagedQueue(); | |
queue.Register<TimeSpan>(static _ => {}); | |
queue.Register<byte>(static _ => {}); | |
for (int i = 0; i < 12; ++i) | |
Assert.True(queue.TryEnqueue(TimeSpan.MaxValue)); | |
// Toss in a weird shape to prevent even fit. | |
Assert.True(queue.TryEnqueue(byte.MaxValue)); | |
while (queue.TryDequeue() && queue.TryDequeue()) | |
Assert.True(queue.TryEnqueue(TimeSpan.MinValue)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment