Skip to content

Instantly share code, notes, and snippets.

Created January 2, 2013 12:55
Show Gist options
  • Save mganss/4434399 to your computer and use it in GitHub Desktop.
Save mganss/4434399 to your computer and use it in GitHub Desktop.
A buffer pool and an in-memory stream that uses it.
using System;
using System.Collections.Concurrent;
namespace XY
public interface IPool<T>
T Take();
void Return(T t);
public interface IBufferPool : IPool<byte[]>
int BufferSize { get; }
/// <summary>
/// A pool of reusable objects.
/// </summary>
/// <typeparam name="T">The type of objects in the pool.</typeparam>
public class Pool<T> : IPool<T>
protected ConcurrentStack<T> _Pool = new ConcurrentStack<T>();
protected Func<T> Allocator;
public Pool(Func<T> allocator)
Allocator = allocator;
public T Take()
T t;
if (!_Pool.TryPop(out t)) t = Allocator();
return t;
public void Return(T t)
/// <summary>
/// A pool of reusable byte array buffers.
/// </summary>
public class BufferPool : Pool<byte[]>, IBufferPool
private const int _BufferSize = 8192;
public int BufferSize
get { return _BufferSize; }
public BufferPool()
: base(() => new byte[_BufferSize])
public static BufferPool Instance = new BufferPool();
using System;
using System.IO;
using System.Collections.Generic;
namespace XY
/// <summary>
/// An alternative to <see cref="System.IO.MemoryStream"/> that uses a number of buffers
/// taken from a pool as its backing store instead of a single buffer.
/// </summary>
public class PoolMemoryStream : Stream
private List<byte[]> Buffers = new List<byte[]>();
private IBufferPool _pool;
public IBufferPool Pool
get { return _pool; }
private set { _pool = value; }
public PoolMemoryStream(IBufferPool pool = null)
if (pool == null) pool = BufferPool.Instance;
Pool = pool;
public PoolMemoryStream(int capacity, IBufferPool pool = null)
if (pool == null) pool = BufferPool.Instance;
Pool = pool;
public PoolMemoryStream(byte[] buffer, IBufferPool pool = null)
Constructor(buffer, 0, buffer.Length, true, false, pool);
public PoolMemoryStream(byte[] buffer, bool writable, IBufferPool pool = null)
Constructor(buffer, 0, buffer.Length, writable, false, pool);
public PoolMemoryStream(byte[] buffer, int index, int count, IBufferPool pool = null)
Constructor(buffer, index, count, true, false, pool);
public PoolMemoryStream(byte[] buffer, int index, int count, bool writable, IBufferPool pool = null)
Constructor(buffer, index, count, writable, false, pool);
public PoolMemoryStream(byte[] buffer, int index, int count, bool writable, bool publiclyVisible, IBufferPool pool = null)
Constructor(buffer, index, count, writable, publiclyVisible, pool);
private void Constructor(byte[] buffer, int index, int count, bool writable, bool publiclyVisible, IBufferPool pool)
if (buffer == null)
throw new ArgumentNullException("buffer");
if (index < 0)
throw new ArgumentOutOfRangeException("index", "index is negative");
if (count < 0)
throw new ArgumentOutOfRangeException("count", "count is negative");
if (buffer.Length - index < count)
throw new ArgumentException("index+count", "size of buffer is less than index + count");
if (pool == null) pool = BufferPool.Instance;
Pool = pool;
Write(buffer, index, count);
Position = 0;
canWrite = writable;
visible = publiclyVisible;
expandable = false;
private bool expandable = true;
private bool canWrite = true;
private bool visible = true;
public virtual byte[] ToArray()
var buffer = new byte[Length];
var bufNum = 0;
var posInBuf = 0;
var bytesToRead = Length;
var bytesRead = 0;
var bytesLeft = Length;
var offset = 0;
while (bytesLeft > 0)
var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? (int)bytesLeft : Pool.BufferSize - posInBuf;
var buf = Buffers[bufNum];
Buffer.BlockCopy(buf, posInBuf, buffer, offset, bytesToCopy);
Position += bytesToCopy;
offset += bytesToCopy;
bytesLeft -= bytesToCopy;
bytesRead += bytesToCopy;
posInBuf = 0;
return buffer;
public virtual byte[] GetBuffer()
if (!visible)
throw new UnauthorizedAccessException();
var buffer = new byte[Capacity];
var offset = 0;
foreach (var buf in Buffers)
Buffer.BlockCopy(buf, 0, buffer, offset, buf.Length);
offset += buf.Length;
return buffer;
#region implemented abstract members of Stream
public override void Flush()
private void CheckIfDisposed()
if (_disposed)
throw new ObjectDisposedException("PoolMemoryStream");
public override int Read(byte[] buffer, int offset, int count)
if (buffer == null)
throw new ArgumentNullException("buffer");
if ((offset + count) > buffer.Length)
throw new ArgumentException("buffer too small", "buffer");
if (offset < 0)
throw new ArgumentException("offset must be >= 0", "offset");
if (count < 0)
throw new ArgumentException("count must be >= 0", "count");
if (Position >= Length || count == 0)
return 0;
var bufNum = (int)Position / Pool.BufferSize;
var posInBuf = (int)Position - bufNum * Pool.BufferSize;
var bytesToRead = Math.Min(count, (int)(Length - Position));
var bytesRead = 0;
var bytesLeft = bytesToRead - bytesRead;
while (bytesLeft > 0)
var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? bytesLeft : Pool.BufferSize - posInBuf;
var buf = Buffers[bufNum];
Buffer.BlockCopy(buf, posInBuf, buffer, offset, bytesToCopy);
Position += bytesToCopy;
offset += bytesToCopy;
bytesLeft -= bytesToCopy;
bytesRead += bytesToCopy;
posInBuf = 0;
return bytesToRead;
public override long Seek(long offset, SeekOrigin origin)
if (offset > (long)int.MaxValue)
throw new ArgumentOutOfRangeException("offset out of range. " + offset);
switch (origin)
case SeekOrigin.Current:
offset += Position;
case SeekOrigin.End:
offset += Length;
case SeekOrigin.Begin:
throw new ArgumentException("origin", "invalid SeekOrigin");
if (offset < 0)
throw new IOException("Attempted to seek before start of PoolMemoryStream.");
Position = offset;
return Position;
public int Capacity
return Buffers.Count * Pool.BufferSize;
if (value < 0 || value < Length)
throw new ArgumentOutOfRangeException("value", "capacity cannot be negative or smaller than length of stream.");
private bool dirty = true;
public override void SetLength(long value)
if (!canWrite)
throw new NotSupportedException("cannot write to stream");
if (value < _length)
dirty = true;
_length = value;
if (Position > _length)
Position = _length;
private void SetCapacity(long value)
if (!expandable && value > Capacity)
throw new NotSupportedException("cannot expand stream");
if (value < 0 || value > int.MaxValue)
throw new ArgumentOutOfRangeException();
if (value == 0)
foreach (var buf in Buffers)
if (value == Capacity) return;
int buffers = (int)(value / Pool.BufferSize);
if ((value - buffers * Pool.BufferSize) > 0)
if (buffers < Buffers.Count)
for (int i = buffers + 1; i < Buffers.Count; i++)
Buffers.RemoveRange(buffers + 1, Buffers.Count - buffers);
if (dirty && Buffers.Count > 0)
var dirtyBytes = Capacity - Length;
var lastBuf = Buffers[Buffers.Count - 1];
for (var i = Pool.BufferSize - dirtyBytes; i < Pool.BufferSize; i++)
lastBuf[i] = 0;
dirty = false;
if (buffers > Buffers.Count)
for (int i = Buffers.Count; i < buffers; i++)
var buf = Pool.Take();
public override void Write(byte[] buffer, int offset, int count)
if (!canWrite)
throw new NotSupportedException("cannot write to stream");
if (buffer == null)
throw new ArgumentNullException("buffer");
if ((offset + count) > buffer.Length)
throw new ArgumentException("buffer too small", "buffer");
if (offset < 0)
throw new ArgumentException("offset must be >= 0", "offset");
if (count < 0)
throw new ArgumentException("count must be >= 0", "count");
if (Position > Length - count)
SetLength(Position + count);
var bufNum = (int)Position / Pool.BufferSize;
var posInBuf = (int)Position - bufNum * Pool.BufferSize;
var bytesWritten = 0;
var bytesLeft = count - bytesWritten;
while (bytesLeft > 0)
var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? bytesLeft : Pool.BufferSize - posInBuf;
var buf = Buffers[bufNum];
Buffer.BlockCopy(buffer, offset, buf, posInBuf, bytesToCopy);
Position += bytesToCopy;
offset += bytesToCopy;
bytesLeft -= bytesToCopy;
bytesWritten += bytesToCopy;
posInBuf = 0;
public override void WriteByte(byte value)
if (!canWrite)
throw new NotSupportedException("cannot write to stream");
if (Position >= Length)
SetLength(Position + 1);
var bufNum = (int)Position / Pool.BufferSize;
var posInBuf = (int)Position - bufNum * Pool.BufferSize;
var buf = Buffers[bufNum];
buf[posInBuf] = value;
public override int ReadByte()
if (Position >= Length)
return -1;
var bufNum = (int)Position / Pool.BufferSize;
var posInBuf = (int)Position - bufNum * Pool.BufferSize;
var buf = Buffers[bufNum];
return buf[posInBuf];
public override bool CanRead
return !_disposed;
public override bool CanSeek
return !_disposed;
public override bool CanWrite
return !_disposed && canWrite;
private long _length;
public override long Length
return _length;
private long _position;
public override long Position
return _position;
_position = value;
private bool _disposed;
protected override void Dispose(bool disposing)
if (!_disposed)
if (disposing)
foreach (var buffer in Buffers)
_disposed = true;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment