Created
October 19, 2017 21:13
-
-
Save JKamsker/f401c44ab749f79d8e7eab9fe1501f9e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class FifoStream2 : Stream | |
{ | |
private static readonly ConcurrentQueue<CByte> _ringBuffer = new ConcurrentQueue<CByte>(); | |
static FifoStream2() | |
{ | |
for (int i = 0; i < 1000; i++) | |
_ringBuffer.Enqueue(new CByte()); | |
} | |
private static CByte PickBuf() | |
{ | |
CByte result; | |
if (!_ringBuffer.TryDequeue(out result)) | |
result = new CByte(); | |
return result; | |
} | |
private readonly ConcurrentQueue<CByte> _localBuffer = new ConcurrentQueue<CByte>(); | |
private int _currentStreamOffset = 0; | |
private long _length; | |
public override void Flush() | |
{ | |
throw new NotImplementedException(); | |
} | |
public override long Seek(long offset, SeekOrigin origin) | |
{ | |
throw new NotImplementedException(); | |
} | |
public override void SetLength(long value) | |
{ | |
throw new NotImplementedException(); | |
} | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
var currentReadBytes = 0; | |
CByte enqueuedByteArray; | |
while (currentReadBytes < count && _localBuffer.TryPeek(out enqueuedByteArray)) | |
{ | |
var requiredBytes = enqueuedByteArray.Length - _currentStreamOffset; | |
var toRead2 = count - currentReadBytes; | |
requiredBytes = Math.Min(count, requiredBytes); | |
requiredBytes = Math.Min(toRead2, requiredBytes); | |
Array.Copy(enqueuedByteArray.Value, _currentStreamOffset, buffer, currentReadBytes, requiredBytes); | |
currentReadBytes += requiredBytes; | |
_currentStreamOffset += requiredBytes; | |
if (_currentStreamOffset >= enqueuedByteArray.Length) | |
{ | |
CByte cBuf2; | |
if (_localBuffer.TryDequeue(out cBuf2)) | |
enqueuedByteArray = cBuf2; | |
_currentStreamOffset = 0; | |
_ringBuffer.Enqueue(enqueuedByteArray); | |
} | |
} | |
_length -= currentReadBytes; | |
return currentReadBytes; | |
} | |
public override int ReadByte() | |
{ | |
CByte cBuf; | |
if (!_localBuffer.TryPeek(out cBuf)) | |
return -1; | |
var result = cBuf.Value[_currentStreamOffset]; | |
_currentStreamOffset += 1; | |
_length -= 1; | |
if (_currentStreamOffset >= cBuf.Length) | |
{ | |
CByte cBuf2; | |
if (_localBuffer.TryDequeue(out cBuf2)) | |
cBuf = cBuf2; | |
_currentStreamOffset = 0; | |
_ringBuffer.Enqueue(cBuf); | |
} | |
return result; | |
} | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
while (count > 0) | |
{ | |
var cwb = PickBuf(); | |
var wcnt = Math.Min(count, cwb.Value.Length); | |
count -= wcnt; | |
Array.Copy(buffer, offset, cwb.Value, 0, wcnt); | |
cwb.Length = wcnt; | |
_localBuffer.Enqueue(cwb); | |
offset += wcnt; | |
_length += wcnt; | |
} | |
} | |
public int ReadInt32() | |
{ | |
if (_length >= 4) | |
{ | |
var nBuf = new byte[4]; | |
Read(nBuf, 0, 4); | |
return (int)(nBuf[0] | nBuf[1] << 8 | nBuf[2] << 16 | nBuf[3] << 24); | |
} | |
else | |
{ | |
return -1; | |
} | |
} | |
public ulong ReadUInt64() | |
{ | |
if (_length >= 8) | |
{ | |
var buffer = new byte[8]; | |
Read(buffer, 0, 8); | |
uint lo = (uint)(buffer[0] | buffer[1] << 8 | buffer[2] << 16 | buffer[3] << 24); | |
uint hi = (uint)(buffer[4] | buffer[5] << 8 | buffer[6] << 16 | buffer[7] << 24); | |
return ((ulong)hi) << 32 | lo; | |
} | |
else | |
{ | |
return 0; | |
} | |
} | |
public long ReadInt64() | |
{ | |
if (_length >= 8) | |
{ | |
var buffer = new byte[8]; | |
Read(buffer, 0, 8); | |
uint lo = (uint)(buffer[0] | buffer[1] << 8 | buffer[2] << 16 | buffer[3] << 24); | |
uint hi = (uint)(buffer[4] | buffer[5] << 8 | buffer[6] << 16 | buffer[7] << 24); | |
return (long)((ulong)hi) << 32 | lo; | |
} | |
else | |
{ | |
return 0; | |
} | |
} | |
public byte[] ReadBytes(long count) => ReadBytes((int)count); | |
public byte[] ReadBytes(int count) | |
{ | |
if (count == 0) | |
return null; | |
var retArr = new byte[count]; | |
Read(retArr, 0, count); | |
return retArr; | |
} | |
public override bool CanRead { get; } | |
public override bool CanSeek { get; } | |
public override bool CanWrite { get; } | |
public override long Length => _length; | |
public override long Position { get; set; } | |
private class CByte | |
{ | |
private const int DefaultBufferSize = 8 * 1024 * 1024; | |
/// <summary> | |
/// Fields because of performance | |
/// </summary> | |
public byte[] Value; | |
public int Length = 0; | |
public CByte(int bufferSize = DefaultBufferSize) | |
{ | |
Value = new byte[bufferSize]; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment