Skip to content

Instantly share code, notes, and snippets.

@JKamsker
Created October 19, 2017 21:13
Show Gist options
  • Save JKamsker/f401c44ab749f79d8e7eab9fe1501f9e to your computer and use it in GitHub Desktop.
Save JKamsker/f401c44ab749f79d8e7eab9fe1501f9e to your computer and use it in GitHub Desktop.
public class FifoStream2 : Stream
{
private static readonly ConcurrentQueue<CByte> _ringBuffer = new ConcurrentQueue<CByte>();
static FifoStream2()
{
for (int i = 0; i < 1000; i++)
_ringBuffer.Enqueue(new CByte());
}
private static CByte PickBuf()
{
CByte result;
if (!_ringBuffer.TryDequeue(out result))
result = new CByte();
return result;
}
private readonly ConcurrentQueue<CByte> _localBuffer = new ConcurrentQueue<CByte>();
private int _currentStreamOffset = 0;
private long _length;
public override void Flush()
{
throw new NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
var currentReadBytes = 0;
CByte enqueuedByteArray;
while (currentReadBytes < count && _localBuffer.TryPeek(out enqueuedByteArray))
{
var requiredBytes = enqueuedByteArray.Length - _currentStreamOffset;
var toRead2 = count - currentReadBytes;
requiredBytes = Math.Min(count, requiredBytes);
requiredBytes = Math.Min(toRead2, requiredBytes);
Array.Copy(enqueuedByteArray.Value, _currentStreamOffset, buffer, currentReadBytes, requiredBytes);
currentReadBytes += requiredBytes;
_currentStreamOffset += requiredBytes;
if (_currentStreamOffset >= enqueuedByteArray.Length)
{
CByte cBuf2;
if (_localBuffer.TryDequeue(out cBuf2))
enqueuedByteArray = cBuf2;
_currentStreamOffset = 0;
_ringBuffer.Enqueue(enqueuedByteArray);
}
}
_length -= currentReadBytes;
return currentReadBytes;
}
public override int ReadByte()
{
CByte cBuf;
if (!_localBuffer.TryPeek(out cBuf))
return -1;
var result = cBuf.Value[_currentStreamOffset];
_currentStreamOffset += 1;
_length -= 1;
if (_currentStreamOffset >= cBuf.Length)
{
CByte cBuf2;
if (_localBuffer.TryDequeue(out cBuf2))
cBuf = cBuf2;
_currentStreamOffset = 0;
_ringBuffer.Enqueue(cBuf);
}
return result;
}
public override void Write(byte[] buffer, int offset, int count)
{
while (count > 0)
{
var cwb = PickBuf();
var wcnt = Math.Min(count, cwb.Value.Length);
count -= wcnt;
Array.Copy(buffer, offset, cwb.Value, 0, wcnt);
cwb.Length = wcnt;
_localBuffer.Enqueue(cwb);
offset += wcnt;
_length += wcnt;
}
}
public int ReadInt32()
{
if (_length >= 4)
{
var nBuf = new byte[4];
Read(nBuf, 0, 4);
return (int)(nBuf[0] | nBuf[1] << 8 | nBuf[2] << 16 | nBuf[3] << 24);
}
else
{
return -1;
}
}
public ulong ReadUInt64()
{
if (_length >= 8)
{
var buffer = new byte[8];
Read(buffer, 0, 8);
uint lo = (uint)(buffer[0] | buffer[1] << 8 | buffer[2] << 16 | buffer[3] << 24);
uint hi = (uint)(buffer[4] | buffer[5] << 8 | buffer[6] << 16 | buffer[7] << 24);
return ((ulong)hi) << 32 | lo;
}
else
{
return 0;
}
}
public long ReadInt64()
{
if (_length >= 8)
{
var buffer = new byte[8];
Read(buffer, 0, 8);
uint lo = (uint)(buffer[0] | buffer[1] << 8 | buffer[2] << 16 | buffer[3] << 24);
uint hi = (uint)(buffer[4] | buffer[5] << 8 | buffer[6] << 16 | buffer[7] << 24);
return (long)((ulong)hi) << 32 | lo;
}
else
{
return 0;
}
}
public byte[] ReadBytes(long count) => ReadBytes((int)count);
public byte[] ReadBytes(int count)
{
if (count == 0)
return null;
var retArr = new byte[count];
Read(retArr, 0, count);
return retArr;
}
public override bool CanRead { get; }
public override bool CanSeek { get; }
public override bool CanWrite { get; }
public override long Length => _length;
public override long Position { get; set; }
private class CByte
{
private const int DefaultBufferSize = 8 * 1024 * 1024;
/// <summary>
/// Fields because of performance
/// </summary>
public byte[] Value;
public int Length = 0;
public CByte(int bufferSize = DefaultBufferSize)
{
Value = new byte[bufferSize];
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment