Created
April 11, 2019 20:43
-
-
Save ronnieoverby/67636c5e519e28644bf21294337e048e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BlockingStreams | |
{ | |
public Stream WriteableStream { get; } | |
public Stream ReadableStream { get; } | |
private readonly CancellationToken _ct; | |
private readonly BlockingCollection<MemoryStream> _blocks; | |
public BlockingStreams(int? maxWrites = null, CancellationToken ct = default) | |
{ | |
_ct = ct; | |
_blocks = maxWrites.HasValue | |
? new BlockingCollection<MemoryStream>(maxWrites.Value) | |
: new BlockingCollection<MemoryStream>(); | |
var readDisposed = new TaskCompletionSource<object>(); | |
var writeDisposed = new TaskCompletionSource<object>(); | |
ReadableStream = new ReadStream(_blocks, ct, readDisposed); | |
WriteableStream = new WriteStream(_blocks, ct, writeDisposed); | |
Task.WhenAll(readDisposed.Task, writeDisposed.Task) | |
.ContinueWith(t => _blocks.Dispose()); | |
} | |
class ReadStream : Stream | |
{ | |
readonly TaskCompletionSource<object> _disposed; | |
readonly BlockingCollection<MemoryStream> _blocks; | |
readonly CancellationToken _ct; | |
MemoryStream _current; | |
long _position; | |
public override bool CanRead => true; | |
public override bool CanSeek => false; | |
public override bool CanWrite => false; | |
public override long Length => throw new NotSupportedException(); | |
public ReadStream(BlockingCollection<MemoryStream> blocks, CancellationToken ct, TaskCompletionSource<object> disposed) | |
{ | |
_blocks = blocks; | |
_ct = ct; | |
_disposed = disposed; | |
} | |
public override long Position | |
{ | |
get => _position; | |
set => throw new NotSupportedException(); | |
} | |
public override void Flush() { } | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
var read = 0; | |
while (read < count) | |
{ | |
if (_current == null) | |
{ | |
// only wait if no bytes read yet | |
var timeout = read == 0 ? -1 : 0; | |
if (!_blocks.TryTake(out _current, timeout, _ct)) | |
return read; | |
} | |
var thisRead = _current.Read(buffer, offset + read, count - read); | |
read += thisRead; | |
_position += thisRead; | |
// is current block exhausted? | |
if (_current.Position == _current.Length) | |
{ | |
using (_current) | |
_current = null; | |
} | |
} | |
return read; | |
} | |
public override long Seek(long offset, SeekOrigin origin) => | |
throw new NotSupportedException(); | |
public override void SetLength(long value) => | |
throw new NotSupportedException(); | |
public override void Write(byte[] buffer, int offset, int count) => | |
throw new NotSupportedException(); | |
protected override void Dispose(bool disposing) | |
{ | |
base.Dispose(disposing); | |
_disposed.SetResult(default); | |
} | |
} | |
class WriteStream : Stream | |
{ | |
readonly TaskCompletionSource<object> _disposed; | |
readonly BlockingCollection<MemoryStream> _blocks; | |
readonly CancellationToken _ct; | |
readonly RecyclableMemoryStreamManager _streamManager = new RecyclableMemoryStreamManager(); | |
long _position = 0; | |
public WriteStream(BlockingCollection<MemoryStream> blocks, CancellationToken ct, TaskCompletionSource<object> disposed) | |
{ | |
_blocks = blocks; | |
_ct = ct; | |
_disposed = disposed; | |
} | |
protected override void Dispose(bool disposing) | |
{ | |
base.Dispose(disposing); | |
_blocks.CompleteAdding(); | |
_disposed.SetResult(default); | |
} | |
public override bool CanRead => false; | |
public override bool CanSeek => false; | |
public override bool CanWrite => true; | |
public override long Length => throw new NotImplementedException(); | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
var stream = _streamManager.GetStream(tag: default, buffer, offset, count); | |
_position += stream.Length; | |
_blocks.Add(stream, _ct); | |
} | |
public override long Position | |
{ | |
get => _position; | |
set => throw new NotSupportedException(); | |
} | |
public override void Flush() { } | |
public override int Read(byte[] buffer, int offset, int count) => | |
throw new NotSupportedException(); | |
public override long Seek(long offset, SeekOrigin origin) => | |
throw new NotSupportedException(); | |
public override void SetLength(long value) => | |
throw new NotSupportedException(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment