Skip to content

Instantly share code, notes, and snippets.

@JKamsker
Created November 16, 2017 02:18
Show Gist options
  • Save JKamsker/8a088c148eab27707203acc241433923 to your computer and use it in GitHub Desktop.
Save JKamsker/8a088c148eab27707203acc241433923 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using JSocket.Utilities.Extensions;
namespace JSocket.Utilities.IO
{
internal class ConcurrentRingbuffer : IDisposable
{
private static readonly ConcurrentQueue<ConcurrentRingbuffer> _ringBuffer = new ConcurrentQueue<ConcurrentRingbuffer>();
private static bool _processExit = false;
private static int __DefaultBufferSize = 2 * 1024;
private static Timer _bufferThresholdTimer;
internal static int _entityCounter = 0;
public static int DefaultBufferSize => __DefaultBufferSize;
public static int BufferThreshold = 100;
public static double BufferThresholdPercentage = 1.5;
static ConcurrentRingbuffer()
{
AppDomain.CurrentDomain.ProcessExit += (_, __) => _processExit = true;
_bufferThresholdTimer = new Timer(BufferThresholdWatcher, null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
for (int i = 0; i < 50; i++)
{
new ConcurrentRingbuffer().ReIntegrate();
}
}
private static int Ticks = 1;
private static void BufferThresholdWatcher(object state)
{
var cTickCount = Interlocked.Increment(ref Ticks);
var minTickCount = cTickCount - 5;
var coldBuffers = _ringBuffer.Count(m => m.LastUseTime < minTickCount);
if (coldBuffers > BufferThreshold * BufferThresholdPercentage)
{
var overflow = coldBuffers - BufferThreshold;
for (int i = 0; i < overflow; i++)
{
ConcurrentRingbuffer result;
if (_ringBuffer.TryDequeue(out result))
{
result.DoDispose = true;
result.Value = null;
result = null;
}
}
GC.Collect();
Log.Write($"Disposed {overflow} instances of ConcurrentRingbuffer (Current: {_ringBuffer.Count})");
}
}
/// <summary>
/// Dequeues a buffer entity from the pool or creates a new one.
/// Notice that it's possible to get buffers of different sizes (DEFAULT: 8k)
/// </summary>
/// <returns></returns>
public static ConcurrentRingbuffer Create()
{
ConcurrentRingbuffer result;
if (!_ringBuffer.TryDequeue(out result))
{
result = new ConcurrentRingbuffer();
}
result.LastUseTime = Ticks;
result.IsIntegrated = false;
return result;
}
public int LastUseTime = 0;
/// <summary>
/// Enqueues a buffer entity to the pool
/// </summary>
/// <param name="buf"></param>
public void ReIntegrate()
{
Reset(true);
_ringBuffer.Enqueue(this);
}
private ConcurrentRingbuffer()
{
Value = new byte[__DefaultBufferSize];
}
internal ConcurrentRingbuffer(int bufferSize = -1)
{
bufferSize = bufferSize == -1 ? __DefaultBufferSize : bufferSize;
Value = new byte[bufferSize];
}
private ConcurrentRingbuffer(byte[] input)
{
if (input != null)
Value = input;
else
Value = new byte[__DefaultBufferSize];
}
~ConcurrentRingbuffer()
{
ReleaseUnmanagedResources();
}
public int Length => High - Low;
public int Capacity => Value.Length;
public int FreeBytes => Value.Length - High;
public bool IsFull => High == Value.Length;
internal static ConcurrentQueue<ConcurrentRingbuffer> RingBuffer
{
get
{
return _ringBuffer;
}
}
public readonly int EntityId = ++_entityCounter;
public byte[] Value;
public bool DoDispose = false;
public bool IsIntegrated = false;
public int High = 0;
public int Low = 0;
/// <summary>
/// Writes bytes to an array and returns the amount of written bytes
/// </summary>
/// <param name="sourceBuffer"></param>
/// <param name="sourceOffset"></param>
/// <param name="count"></param>
/// <returns></returns>
public int Write(byte[] sourceBuffer, int sourceOffset, int count = -1)
{
if (count == -1)
{
count = sourceBuffer.Length;
}
var lHigh = High;
var z1 = sourceBuffer.Length - sourceOffset;
var z2 = Value.Length - lHigh;
if (z1 < count)
{
count = z1;
}
if (z2 < count)
{
count = z2;
}
//count = Math.Min(count, sourceBuffer.Length - sourceOffset);
//count = Math.Min(count, Value.Length - High);
//count = count == -1
// ? Math.Min(sourceBuffer.Length - sourceOffset, Value.Length - High)
// : Math.Min(Math.Min(count, sourceBuffer.Length - sourceOffset), Value.Length - High);
if (count > 0)
Array.Copy(sourceBuffer, sourceOffset, Value, lHigh, count);
High += count;
return count;
}
public int ReadByte()
{
if (Length < 0)
{
return Value[Low++];
}
return -1;
}
public int Read(byte[] destBuffer, int destOffset, int count = -1)
{
if (count == -1)
count = destBuffer.Length - destOffset;
var cLen = Length;
if (cLen < count)
{
count = cLen;
}
if (count > 0)
{
Array.Copy(Value, Low, destBuffer, destOffset, count);
Low += count;
}
return count;
}
/// <summary>
/// Re-Enables R/W from the beginning
/// </summary>
/// <param name="isIntegration"></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Reset(bool isIntegration = false)
{
High = 0;
Low = 0;
if (isIntegration)
{
LastUseTime = Ticks;
IsIntegrated = true;
}
}
private void ReleaseUnmanagedResources()
{
// TODO release unmanaged resources here
}
public void Dispose()
{
Reset();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment