Skip to content

Instantly share code, notes, and snippets.

Created July 27, 2013 09:08
Show Gist options
  • Save 0x53A/6094323 to your computer and use it in GitHub Desktop.
Save 0x53A/6094323 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace xxx
public class BlockingStream : Stream
private object _lockForRead;
private object _lockForAll;
private Queue<byte[]> _chunks;
private byte[] _currentChunk;
private int _currentChunkPosition;
private ManualResetEvent _doneWriting;
private ManualResetEvent _dataAvailable;
private WaitHandle[] _events;
private int _doneWritingHandleIndex;
private volatile bool _illegalToWrite;
public BlockingStream()
_chunks = new Queue<byte[]>();
_doneWriting = new ManualResetEvent(false);
_dataAvailable = new ManualResetEvent(false);
_events = new WaitHandle[] { _dataAvailable, _doneWriting };
_doneWritingHandleIndex = 1;
_lockForRead = new object();
_lockForAll = new object();
public override bool CanRead { get { return true; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return !_illegalToWrite; } }
public override void Flush() { }
public override long Length
get { throw new NotSupportedException(); }
public override long Position
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
public override long Seek(long offset, SeekOrigin origin)
throw new NotSupportedException();
public override void SetLength(long value)
throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count)
if (buffer == null) throw new ArgumentNullException("buffer");
if (offset < 0 || offset >= buffer.Length)
throw new ArgumentOutOfRangeException("offset");
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException("count");
if (_dataAvailable == null)
throw new ObjectDisposedException(GetType().Name);
if (count == 0) return 0;
while (true)
int handleIndex = WaitHandle.WaitAny(_events);
lock (_lockForRead)
lock (_lockForAll)
if (_currentChunk == null)
if (_chunks.Count == 0)
if (handleIndex == _doneWritingHandleIndex)
return 0;
else continue;
_currentChunk = _chunks.Dequeue();
_currentChunkPosition = 0;
int bytesAvailable =
_currentChunk.Length - _currentChunkPosition;
int bytesToCopy;
if (bytesAvailable > count)
bytesToCopy = count;
Buffer.BlockCopy(_currentChunk, _currentChunkPosition,
buffer, offset, count);
_currentChunkPosition += count;
bytesToCopy = bytesAvailable;
Buffer.BlockCopy(_currentChunk, _currentChunkPosition,
buffer, offset, bytesToCopy);
_currentChunk = null;
_currentChunkPosition = 0;
lock (_lockForAll)
if (_chunks.Count == 0) _dataAvailable.Reset();
return bytesToCopy;
public override void Write(byte[] buffer, int offset, int count)
if (buffer == null) throw new ArgumentNullException("buffer");
if (offset < 0 || offset >= buffer.Length)
throw new ArgumentOutOfRangeException("offset");
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException("count");
if (_dataAvailable == null)
throw new ObjectDisposedException(GetType().Name);
if (count == 0) return;
byte[] chunk = new byte[count];
Buffer.BlockCopy(buffer, offset, chunk, 0, count);
lock (_lockForAll)
if (_illegalToWrite)
throw new InvalidOperationException(
"Writing has already been completed.");
public void SetEndOfStream()
if (_dataAvailable == null)
throw new ObjectDisposedException(GetType().Name);
lock (_lockForAll)
_illegalToWrite = true;
public override void Close()
if (_dataAvailable != null)
_dataAvailable = null;
if (_doneWriting != null)
_doneWriting = null;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment