Skip to content

Instantly share code, notes, and snippets.

@stdray
Last active February 28, 2017 22:21
Show Gist options
  • Save stdray/4bfc34e72f1a2f36867f811487f12217 to your computer and use it in GitHub Desktop.
Save stdray/4bfc34e72f1a2f36867f811487f12217 to your computer and use it in GitHub Desktop.
/// <summary>
/// Откуда форкаемся
/// </summary>
public enum ForkOrigin
{
/// <summary>
/// C начала потока
/// </summary>
Begin,
/// <summary>
/// С текущего места
/// </summary>
Current
}
public interface IForkableStream
{
Stream Fork(ForkOrigin oring = ForkOrigin.Current);
}
public sealed class ForkableStream : Stream, IForkableStream
{
readonly List<byte> _bytes;
readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
readonly List<ManualResetEventSlim> _resetEvents = new List<ManualResetEventSlim>();
public ForkableStream(int capacity)
{
_bytes = new List<byte>(capacity);
}
public Stream Fork(ForkOrigin origin = ForkOrigin.Current)
{
_lock.EnterWriteLock();
var resetEvent = new ManualResetEventSlim(false);
var position = origin == ForkOrigin.Begin ? 0 : _bytes.Count;
_resetEvents.Add(resetEvent);
_lock.ExitWriteLock();
return new ForkedStream(this, position, resetEvent);
}
public void Write(byte[] bytes)
{
_lock.EnterWriteLock();
_bytes.AddRange(bytes);
foreach (var resetEvent in _resetEvents)
resetEvent.Set();
_lock.ExitWriteLock();
}
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => _bytes.Count;
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public int Read(int position, byte[] buffer, int offset, int count, ManualResetEventSlim resetEvent)
{
_lock.EnterReadLock();
var max = Math.Min(_bytes.Count, position + count);
var n = max - position;
if (n > 0)
for (var i = 0; i < n; i++)
buffer[offset + i] = _bytes[position + i];
else
resetEvent.Reset();
_lock.ExitReadLock();
return n;
}
public override void Flush()
{
//todo: Возможно стоит сделать опцию, при которой форки видят данные только по Flush
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
var max = Math.Min(buffer.Length, offset + count);
for (var i = 0; i < max; i++)
_bytes.Add(buffer[i]);
}
}
public sealed class ForkedStream : Stream
{
readonly ForkableStream _baseStream;
readonly ManualResetEventSlim _resetEvent;
int _position;
internal ForkedStream(ForkableStream baseStream, int position, ManualResetEventSlim resetEvent)
{
_baseStream = baseStream;
_position = position;
_resetEvent = resetEvent;
}
public override bool CanRead => true;
public override bool CanSeek => true;
public override bool CanWrite => false;
public override long Length => _baseStream.Length;
public override long Position
{
get { return _position; }
set { _position = (int)value; }
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
while (true)
{
var cnt = _baseStream.Read(_position, buffer, offset, count, _resetEvent);
if (cnt > 0)
{
_position += cnt;
return cnt;
}
_resetEvent.Wait();
}
}
public override long Seek(long offset, SeekOrigin origin)
{
var len = Length;
long pos;
switch (origin)
{
case SeekOrigin.Begin:
pos = offset;
break;
case SeekOrigin.Current:
unchecked { pos = _position + offset; }
break;
case SeekOrigin.End:
unchecked { pos = len + offset; }
break;
default:
throw new ArgumentOutOfRangeException(nameof(origin));
}
//todo: CheckPos(pos);
_position = (int)pos;
return _position;
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment