Skip to content

Instantly share code, notes, and snippets.

@Arithmomaniac
Last active October 20, 2022 08:43
Show Gist options
  • Save Arithmomaniac/e7754b41c156ef33a3b86151b48155bf to your computer and use it in GitHub Desktop.
Save Arithmomaniac/e7754b41c156ef33a3b86151b48155bf to your computer and use it in GitHub Desktop.
Stream from async byte enumerator
<Query Kind="Statements" />
using System;
using System.Buffers;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
// based on https://github.com/dotnet/runtime/blob/main/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs
internal sealed class AsyncEnumeratorStream : Stream
{
private readonly IAsyncEnumerator<ReadOnlyMemory<byte>> _enumerator;
Task<bool> _readNextTask;
ReadOnlyMemory<byte> _buffer = Array.Empty<byte>();
int _pointer = 0;
public AsyncEnumeratorStream(IAsyncEnumerator<ReadOnlyMemory<byte>> enumerator)
{
_enumerator = enumerator;
_readNextTask = enumerator.MoveNextAsync().AsTask();
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
internal bool LeaveOpen { get; set; }
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
return ReadInternal(new Span<byte>(buffer, offset, count));
}
public override int ReadByte()
{
Span<byte> oneByte = stackalloc byte[1];
return ReadInternal(oneByte) == 0 ? -1 : oneByte[0];
}
private int ReadInternal(Span<byte> buffer)
{
if (!CheckBuffer(async: false).GetAwaiter().GetResult())
return 0;
return HandleReadResult(buffer);
}
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
public override int Read(Span<byte> buffer)
{
return ReadInternal(buffer);
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(buffer, cancellationToken);
}
private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
if (await CheckBuffer(async: true).ConfigureAwait(false) == false)
return 0;
cancellationToken.ThrowIfCancellationRequested();
return HandleReadResult(buffer.Span);
}
private async ValueTask<bool> CheckBuffer(bool async)
{
if (_buffer.Length == _pointer)
{
var canStillRead = async
? await _readNextTask.ConfigureAwait(false)
: _readNextTask.Result;
if (!canStillRead)
return false;
_buffer = _enumerator.Current;
_pointer = 0;
_readNextTask = _enumerator.MoveNextAsync().AsTask();
}
return true;
}
private int HandleReadResult(Span<byte> buffer)
{
var currentSlice = _buffer.Slice(_pointer);
int actual = Math.Min(buffer.Length, currentSlice.Length);
var slice = actual == buffer.Length ? currentSlice : currentSlice.Slice(0, actual);
currentSlice.Span.CopyTo(buffer);
_pointer += actual;
return actual;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment