Skip to content

Instantly share code, notes, and snippets.

@mganss
Created January 2, 2013 12:55
Show Gist options
  • Save mganss/4434399 to your computer and use it in GitHub Desktop.
Save mganss/4434399 to your computer and use it in GitHub Desktop.
A buffer pool and an in-memory stream that uses it.
using System;
using System.Collections.Concurrent;
namespace XY
{
public interface IPool<T>
{
T Take();
void Return(T t);
}
public interface IBufferPool : IPool<byte[]>
{
int BufferSize { get; }
}
/// <summary>
/// A pool of reusable objects.
/// </summary>
/// <typeparam name="T">The type of objects in the pool.</typeparam>
public class Pool<T> : IPool<T>
{
protected ConcurrentStack<T> _Pool = new ConcurrentStack<T>();
protected Func<T> Allocator;
public Pool(Func<T> allocator)
{
Allocator = allocator;
}
public T Take()
{
T t;
if (!_Pool.TryPop(out t)) t = Allocator();
return t;
}
public void Return(T t)
{
_Pool.Push(t);
}
}
/// <summary>
/// A pool of reusable byte array buffers.
/// </summary>
public class BufferPool : Pool<byte[]>, IBufferPool
{
private const int _BufferSize = 8192;
public int BufferSize
{
get { return _BufferSize; }
}
public BufferPool()
: base(() => new byte[_BufferSize])
{
}
public static BufferPool Instance = new BufferPool();
}
}
using System;
using System.IO;
using System.Collections.Generic;
namespace XY
{
/// <summary>
/// An alternative to <see cref="System.IO.MemoryStream"/> that uses a number of buffers
/// taken from a pool as its backing store instead of a single buffer.
/// </summary>
public class PoolMemoryStream : Stream
{
private List<byte[]> Buffers = new List<byte[]>();
private IBufferPool _pool;
public IBufferPool Pool
{
get { return _pool; }
private set { _pool = value; }
}
public PoolMemoryStream(IBufferPool pool = null)
{
if (pool == null) pool = BufferPool.Instance;
Pool = pool;
}
public PoolMemoryStream(int capacity, IBufferPool pool = null)
{
if (pool == null) pool = BufferPool.Instance;
Pool = pool;
SetCapacity(capacity);
}
public PoolMemoryStream(byte[] buffer, IBufferPool pool = null)
{
Constructor(buffer, 0, buffer.Length, true, false, pool);
}
public PoolMemoryStream(byte[] buffer, bool writable, IBufferPool pool = null)
{
Constructor(buffer, 0, buffer.Length, writable, false, pool);
}
public PoolMemoryStream(byte[] buffer, int index, int count, IBufferPool pool = null)
{
Constructor(buffer, index, count, true, false, pool);
}
public PoolMemoryStream(byte[] buffer, int index, int count, bool writable, IBufferPool pool = null)
{
Constructor(buffer, index, count, writable, false, pool);
}
public PoolMemoryStream(byte[] buffer, int index, int count, bool writable, bool publiclyVisible, IBufferPool pool = null)
{
Constructor(buffer, index, count, writable, publiclyVisible, pool);
}
private void Constructor(byte[] buffer, int index, int count, bool writable, bool publiclyVisible, IBufferPool pool)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (index < 0)
throw new ArgumentOutOfRangeException("index", "index is negative");
if (count < 0)
throw new ArgumentOutOfRangeException("count", "count is negative");
if (buffer.Length - index < count)
throw new ArgumentException("index+count", "size of buffer is less than index + count");
if (pool == null) pool = BufferPool.Instance;
Pool = pool;
Write(buffer, index, count);
Position = 0;
canWrite = writable;
visible = publiclyVisible;
expandable = false;
}
private bool expandable = true;
private bool canWrite = true;
private bool visible = true;
public virtual byte[] ToArray()
{
var buffer = new byte[Length];
var bufNum = 0;
var posInBuf = 0;
var bytesToRead = Length;
var bytesRead = 0;
var bytesLeft = Length;
var offset = 0;
while (bytesLeft > 0)
{
var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? (int)bytesLeft : Pool.BufferSize - posInBuf;
var buf = Buffers[bufNum];
Buffer.BlockCopy(buf, posInBuf, buffer, offset, bytesToCopy);
Position += bytesToCopy;
offset += bytesToCopy;
bytesLeft -= bytesToCopy;
bytesRead += bytesToCopy;
bufNum++;
posInBuf = 0;
}
return buffer;
}
public virtual byte[] GetBuffer()
{
if (!visible)
throw new UnauthorizedAccessException();
var buffer = new byte[Capacity];
var offset = 0;
foreach (var buf in Buffers)
{
Buffer.BlockCopy(buf, 0, buffer, offset, buf.Length);
offset += buf.Length;
}
return buffer;
}
#region implemented abstract members of Stream
public override void Flush()
{
}
private void CheckIfDisposed()
{
if (_disposed)
throw new ObjectDisposedException("PoolMemoryStream");
}
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if ((offset + count) > buffer.Length)
throw new ArgumentException("buffer too small", "buffer");
if (offset < 0)
throw new ArgumentException("offset must be >= 0", "offset");
if (count < 0)
throw new ArgumentException("count must be >= 0", "count");
CheckIfDisposed();
if (Position >= Length || count == 0)
return 0;
var bufNum = (int)Position / Pool.BufferSize;
var posInBuf = (int)Position - bufNum * Pool.BufferSize;
var bytesToRead = Math.Min(count, (int)(Length - Position));
var bytesRead = 0;
var bytesLeft = bytesToRead - bytesRead;
while (bytesLeft > 0)
{
var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? bytesLeft : Pool.BufferSize - posInBuf;
var buf = Buffers[bufNum];
Buffer.BlockCopy(buf, posInBuf, buffer, offset, bytesToCopy);
Position += bytesToCopy;
offset += bytesToCopy;
bytesLeft -= bytesToCopy;
bytesRead += bytesToCopy;
bufNum++;
posInBuf = 0;
}
return bytesToRead;
}
public override long Seek(long offset, SeekOrigin origin)
{
CheckIfDisposed();
if (offset > (long)int.MaxValue)
throw new ArgumentOutOfRangeException("offset out of range. " + offset);
switch (origin)
{
case SeekOrigin.Current:
offset += Position;
break;
case SeekOrigin.End:
offset += Length;
break;
case SeekOrigin.Begin:
break;
default:
throw new ArgumentException("origin", "invalid SeekOrigin");
}
if (offset < 0)
throw new IOException("Attempted to seek before start of PoolMemoryStream.");
Position = offset;
return Position;
}
public int Capacity
{
get
{
CheckIfDisposed();
return Buffers.Count * Pool.BufferSize;
}
set
{
CheckIfDisposed();
if (value < 0 || value < Length)
throw new ArgumentOutOfRangeException("value", "capacity cannot be negative or smaller than length of stream.");
SetCapacity(value);
}
}
private bool dirty = true;
public override void SetLength(long value)
{
if (!canWrite)
throw new NotSupportedException("cannot write to stream");
SetCapacity(value);
if (value < _length)
dirty = true;
_length = value;
if (Position > _length)
Position = _length;
}
private void SetCapacity(long value)
{
CheckIfDisposed();
if (!expandable && value > Capacity)
throw new NotSupportedException("cannot expand stream");
if (value < 0 || value > int.MaxValue)
throw new ArgumentOutOfRangeException();
if (value == 0)
{
foreach (var buf in Buffers)
{
Pool.Return(buf);
}
Buffers.Clear();
return;
}
if (value == Capacity) return;
int buffers = (int)(value / Pool.BufferSize);
if ((value - buffers * Pool.BufferSize) > 0)
buffers++;
if (buffers < Buffers.Count)
{
for (int i = buffers + 1; i < Buffers.Count; i++)
{
Pool.Return(Buffers[i]);
}
Buffers.RemoveRange(buffers + 1, Buffers.Count - buffers);
}
else
{
if (dirty && Buffers.Count > 0)
{
var dirtyBytes = Capacity - Length;
var lastBuf = Buffers[Buffers.Count - 1];
for (var i = Pool.BufferSize - dirtyBytes; i < Pool.BufferSize; i++)
{
lastBuf[i] = 0;
}
dirty = false;
}
if (buffers > Buffers.Count)
{
for (int i = Buffers.Count; i < buffers; i++)
{
var buf = Pool.Take();
Buffers.Add(buf);
}
}
}
}
public override void Write(byte[] buffer, int offset, int count)
{
if (!canWrite)
throw new NotSupportedException("cannot write to stream");
if (buffer == null)
throw new ArgumentNullException("buffer");
if ((offset + count) > buffer.Length)
throw new ArgumentException("buffer too small", "buffer");
if (offset < 0)
throw new ArgumentException("offset must be >= 0", "offset");
if (count < 0)
throw new ArgumentException("count must be >= 0", "count");
CheckIfDisposed();
if (Position > Length - count)
SetLength(Position + count);
var bufNum = (int)Position / Pool.BufferSize;
var posInBuf = (int)Position - bufNum * Pool.BufferSize;
var bytesWritten = 0;
var bytesLeft = count - bytesWritten;
while (bytesLeft > 0)
{
var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? bytesLeft : Pool.BufferSize - posInBuf;
var buf = Buffers[bufNum];
Buffer.BlockCopy(buffer, offset, buf, posInBuf, bytesToCopy);
Position += bytesToCopy;
offset += bytesToCopy;
bytesLeft -= bytesToCopy;
bytesWritten += bytesToCopy;
bufNum++;
posInBuf = 0;
}
}
public override void WriteByte(byte value)
{
if (!canWrite)
throw new NotSupportedException("cannot write to stream");
CheckIfDisposed();
if (Position >= Length)
SetLength(Position + 1);
var bufNum = (int)Position / Pool.BufferSize;
var posInBuf = (int)Position - bufNum * Pool.BufferSize;
var buf = Buffers[bufNum];
buf[posInBuf] = value;
Position++;
}
public override int ReadByte()
{
CheckIfDisposed();
if (Position >= Length)
return -1;
var bufNum = (int)Position / Pool.BufferSize;
var posInBuf = (int)Position - bufNum * Pool.BufferSize;
var buf = Buffers[bufNum];
Position++;
return buf[posInBuf];
}
public override bool CanRead
{
get
{
return !_disposed;
}
}
public override bool CanSeek
{
get
{
return !_disposed;
}
}
public override bool CanWrite
{
get
{
return !_disposed && canWrite;
}
}
private long _length;
public override long Length
{
get
{
CheckIfDisposed();
return _length;
}
}
private long _position;
public override long Position
{
get
{
CheckIfDisposed();
return _position;
}
set
{
CheckIfDisposed();
_position = value;
}
}
#endregion
private bool _disposed;
protected override void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
foreach (var buffer in Buffers)
{
Pool.Return(buffer);
}
Buffers.Clear();
}
_disposed = true;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment