Last active
January 4, 2017 18:09
-
-
Save hifi/3882f07f9064ce87e1a43d27a83888b3 to your computer and use it in GitHub Desktop.
An idea for stream splitting
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Threading; | |
namespace Renci.SshNet.Common | |
{ | |
internal class Pipe | |
{ | |
class PipeEntry | |
{ | |
private byte[] _data; | |
private int _position; | |
private int _length; | |
public PipeEntry Next { get; set; } | |
public PipeEntry(byte[] data, int offset, int count) | |
{ | |
_data = data; | |
_position = offset; | |
_length = count; | |
} | |
public int Read(byte[] dst, int offset, int count) | |
{ | |
var bytesToCopy = count; | |
var bytesAvailable = _length - _position; | |
if (count > bytesAvailable) | |
bytesToCopy = bytesAvailable; | |
Buffer.BlockCopy(_data, _position, dst, offset, bytesToCopy); | |
_position += bytesToCopy; | |
return bytesToCopy; | |
} | |
public bool IsEmpty | |
{ | |
get { return _position == _length; } | |
} | |
} | |
class ByteQueue | |
{ | |
private readonly object _lock = new object(); | |
private bool _isClosed; | |
private PipeEntry First { get; set; } | |
private PipeEntry Last { get; set; } | |
public void Close() | |
{ | |
lock (_lock) | |
{ | |
_isClosed = true; | |
Monitor.PulseAll(_lock); | |
} | |
} | |
public void Enqueue(byte[] buffer, int offset, int count) | |
{ | |
lock (_lock) | |
{ | |
if (_isClosed) | |
return; | |
var entry = new PipeEntry(buffer, offset, count); | |
if (Last != null) | |
{ | |
Last.Next = entry; | |
} | |
Last = entry; | |
if (First == null) | |
{ | |
First = entry; | |
} | |
Monitor.PulseAll(_lock); | |
} | |
} | |
public int Dequeue(byte[] buffer, int offset, int count) | |
{ | |
lock (_lock) | |
{ | |
var totalBytesRead = 0; | |
while (count > 0) | |
{ | |
while (First == null && !_isClosed) | |
Monitor.Wait(_lock); | |
if (First == null) | |
{ | |
return totalBytesRead; | |
} | |
var bytesRead = First.Read(buffer, offset, count); | |
if (First.IsEmpty) | |
{ | |
First = First.Next; | |
} | |
count -= bytesRead; | |
totalBytesRead += bytesRead; | |
offset += bytesRead; | |
} | |
return totalBytesRead; | |
} | |
} | |
} | |
class PipeStream : Stream | |
{ | |
public enum Type | |
{ | |
Read, | |
Write | |
}; | |
private ByteQueue _queue; | |
private bool _isReadable; | |
private bool _isWritable; | |
private bool _isDisposed; | |
private bool _isClosed; | |
public PipeStream(ByteQueue queue, Type type) | |
{ | |
_queue = queue; | |
if (type == Type.Read) | |
{ | |
_isReadable = true; | |
} | |
else | |
{ | |
_isWritable = true; | |
} | |
} | |
public override void Flush() | |
{ | |
} | |
public override long Seek(long offset, SeekOrigin origin) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override void SetLength(long value) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
if (buffer == null) | |
throw new ArgumentNullException("buffer"); | |
if (offset + count > buffer.Length) | |
throw new ArgumentException("The sum of offset and count is greater than the buffer length."); | |
if (offset < 0 || count < 0) | |
throw new ArgumentOutOfRangeException("offset", "offset or count is negative."); | |
if (_isDisposed) | |
throw CreateObjectDisposedException(); | |
if (!_isReadable) | |
throw new NotSupportedException("This stream is for writing only."); | |
if (_isClosed) | |
return 0; | |
return _queue.Dequeue(buffer, offset, count); | |
} | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
if (buffer == null) | |
throw new ArgumentNullException("buffer"); | |
if (offset + count > buffer.Length) | |
throw new ArgumentException("The sum of offset and count is greater than the buffer length."); | |
if (offset < 0 || count < 0) | |
throw new ArgumentOutOfRangeException("offset", "offset or count is negative."); | |
if (_isDisposed) | |
throw CreateObjectDisposedException(); | |
if (!_isWritable) | |
throw new NotSupportedException("This pipe is for reading only."); | |
if (_isClosed) | |
return; | |
_queue.Enqueue(buffer, offset, count); | |
} | |
public override bool CanRead | |
{ | |
get { return _isReadable; } | |
} | |
public override bool CanSeek | |
{ | |
get { return false; } | |
} | |
public override bool CanWrite | |
{ | |
get { return _isWritable; } | |
} | |
public override long Length | |
{ | |
get | |
{ | |
throw new NotSupportedException(); | |
} | |
} | |
public override long Position | |
{ | |
get | |
{ | |
throw new NotSupportedException(); | |
} | |
set | |
{ | |
throw new NotSupportedException(); | |
} | |
} | |
public override void Close() | |
{ | |
if (!_isClosed && _isWritable) | |
{ | |
_queue.Close(); | |
} | |
} | |
protected override void Dispose(bool disposing) | |
{ | |
base.Dispose(disposing); | |
if (!_isDisposed) | |
{ | |
if (!_isClosed) | |
Close(); | |
_isDisposed = true; | |
} | |
} | |
private ObjectDisposedException CreateObjectDisposedException() | |
{ | |
return new ObjectDisposedException(GetType().FullName); | |
} | |
} | |
public Stream InputStream | |
{ | |
get; private set; | |
} | |
public Stream OutputStream | |
{ | |
get; private set; | |
} | |
public Pipe() | |
{ | |
var queue = new ByteQueue(); | |
InputStream = new PipeStream(queue, PipeStream.Type.Write); | |
OutputStream = new PipeStream(queue, PipeStream.Type.Read); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment