Last active
February 28, 2017 22:21
-
-
Save stdray/4bfc34e72f1a2f36867f811487f12217 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Откуда форкаемся | |
/// </summary> | |
public enum ForkOrigin | |
{ | |
/// <summary> | |
/// C начала потока | |
/// </summary> | |
Begin, | |
/// <summary> | |
/// С текущего места | |
/// </summary> | |
Current | |
} | |
public interface IForkableStream | |
{ | |
Stream Fork(ForkOrigin oring = ForkOrigin.Current); | |
} | |
public sealed class ForkableStream : Stream, IForkableStream | |
{ | |
readonly List<byte> _bytes; | |
readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); | |
readonly List<ManualResetEventSlim> _resetEvents = new List<ManualResetEventSlim>(); | |
public ForkableStream(int capacity) | |
{ | |
_bytes = new List<byte>(capacity); | |
} | |
public Stream Fork(ForkOrigin origin = ForkOrigin.Current) | |
{ | |
_lock.EnterWriteLock(); | |
var resetEvent = new ManualResetEventSlim(false); | |
var position = origin == ForkOrigin.Begin ? 0 : _bytes.Count; | |
_resetEvents.Add(resetEvent); | |
_lock.ExitWriteLock(); | |
return new ForkedStream(this, position, resetEvent); | |
} | |
public void Write(byte[] bytes) | |
{ | |
_lock.EnterWriteLock(); | |
_bytes.AddRange(bytes); | |
foreach (var resetEvent in _resetEvents) | |
resetEvent.Set(); | |
_lock.ExitWriteLock(); | |
} | |
public override bool CanRead => false; | |
public override bool CanSeek => false; | |
public override bool CanWrite => true; | |
public override long Length => _bytes.Count; | |
public override long Position | |
{ | |
get { throw new NotSupportedException(); } | |
set { throw new NotSupportedException(); } | |
} | |
public int Read(int position, byte[] buffer, int offset, int count, ManualResetEventSlim resetEvent) | |
{ | |
_lock.EnterReadLock(); | |
var max = Math.Min(_bytes.Count, position + count); | |
var n = max - position; | |
if (n > 0) | |
for (var i = 0; i < n; i++) | |
buffer[offset + i] = _bytes[position + i]; | |
else | |
resetEvent.Reset(); | |
_lock.ExitReadLock(); | |
return n; | |
} | |
public override void Flush() | |
{ | |
//todo: Возможно стоит сделать опцию, при которой форки видят данные только по Flush | |
} | |
public override long Seek(long offset, SeekOrigin origin) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override void SetLength(long value) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
var max = Math.Min(buffer.Length, offset + count); | |
for (var i = 0; i < max; i++) | |
_bytes.Add(buffer[i]); | |
} | |
} | |
public sealed class ForkedStream : Stream | |
{ | |
readonly ForkableStream _baseStream; | |
readonly ManualResetEventSlim _resetEvent; | |
int _position; | |
internal ForkedStream(ForkableStream baseStream, int position, ManualResetEventSlim resetEvent) | |
{ | |
_baseStream = baseStream; | |
_position = position; | |
_resetEvent = resetEvent; | |
} | |
public override bool CanRead => true; | |
public override bool CanSeek => true; | |
public override bool CanWrite => false; | |
public override long Length => _baseStream.Length; | |
public override long Position | |
{ | |
get { return _position; } | |
set { _position = (int)value; } | |
} | |
public override void Flush() | |
{ | |
} | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
while (true) | |
{ | |
var cnt = _baseStream.Read(_position, buffer, offset, count, _resetEvent); | |
if (cnt > 0) | |
{ | |
_position += cnt; | |
return cnt; | |
} | |
_resetEvent.Wait(); | |
} | |
} | |
public override long Seek(long offset, SeekOrigin origin) | |
{ | |
var len = Length; | |
long pos; | |
switch (origin) | |
{ | |
case SeekOrigin.Begin: | |
pos = offset; | |
break; | |
case SeekOrigin.Current: | |
unchecked { pos = _position + offset; } | |
break; | |
case SeekOrigin.End: | |
unchecked { pos = len + offset; } | |
break; | |
default: | |
throw new ArgumentOutOfRangeException(nameof(origin)); | |
} | |
//todo: CheckPos(pos); | |
_position = (int)pos; | |
return _position; | |
} | |
public override void SetLength(long value) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
throw new NotSupportedException(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment