Created
November 16, 2017 02:18
-
-
Save JKamsker/8a088c148eab27707203acc241433923 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Runtime.CompilerServices; | |
using System.Runtime.InteropServices; | |
using System.Text; | |
using System.Threading; | |
using JSocket.Utilities.Extensions; | |
namespace JSocket.Utilities.IO | |
{ | |
internal class ConcurrentRingbuffer : IDisposable | |
{ | |
private static readonly ConcurrentQueue<ConcurrentRingbuffer> _ringBuffer = new ConcurrentQueue<ConcurrentRingbuffer>(); | |
private static bool _processExit = false; | |
private static int __DefaultBufferSize = 2 * 1024; | |
private static Timer _bufferThresholdTimer; | |
internal static int _entityCounter = 0; | |
public static int DefaultBufferSize => __DefaultBufferSize; | |
public static int BufferThreshold = 100; | |
public static double BufferThresholdPercentage = 1.5; | |
static ConcurrentRingbuffer() | |
{ | |
AppDomain.CurrentDomain.ProcessExit += (_, __) => _processExit = true; | |
_bufferThresholdTimer = new Timer(BufferThresholdWatcher, null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)); | |
for (int i = 0; i < 50; i++) | |
{ | |
new ConcurrentRingbuffer().ReIntegrate(); | |
} | |
} | |
private static int Ticks = 1; | |
private static void BufferThresholdWatcher(object state) | |
{ | |
var cTickCount = Interlocked.Increment(ref Ticks); | |
var minTickCount = cTickCount - 5; | |
var coldBuffers = _ringBuffer.Count(m => m.LastUseTime < minTickCount); | |
if (coldBuffers > BufferThreshold * BufferThresholdPercentage) | |
{ | |
var overflow = coldBuffers - BufferThreshold; | |
for (int i = 0; i < overflow; i++) | |
{ | |
ConcurrentRingbuffer result; | |
if (_ringBuffer.TryDequeue(out result)) | |
{ | |
result.DoDispose = true; | |
result.Value = null; | |
result = null; | |
} | |
} | |
GC.Collect(); | |
Log.Write($"Disposed {overflow} instances of ConcurrentRingbuffer (Current: {_ringBuffer.Count})"); | |
} | |
} | |
/// <summary> | |
/// Dequeues a buffer entity from the pool or creates a new one. | |
/// Notice that it's possible to get buffers of different sizes (DEFAULT: 8k) | |
/// </summary> | |
/// <returns></returns> | |
public static ConcurrentRingbuffer Create() | |
{ | |
ConcurrentRingbuffer result; | |
if (!_ringBuffer.TryDequeue(out result)) | |
{ | |
result = new ConcurrentRingbuffer(); | |
} | |
result.LastUseTime = Ticks; | |
result.IsIntegrated = false; | |
return result; | |
} | |
public int LastUseTime = 0; | |
/// <summary> | |
/// Enqueues a buffer entity to the pool | |
/// </summary> | |
/// <param name="buf"></param> | |
public void ReIntegrate() | |
{ | |
Reset(true); | |
_ringBuffer.Enqueue(this); | |
} | |
private ConcurrentRingbuffer() | |
{ | |
Value = new byte[__DefaultBufferSize]; | |
} | |
internal ConcurrentRingbuffer(int bufferSize = -1) | |
{ | |
bufferSize = bufferSize == -1 ? __DefaultBufferSize : bufferSize; | |
Value = new byte[bufferSize]; | |
} | |
private ConcurrentRingbuffer(byte[] input) | |
{ | |
if (input != null) | |
Value = input; | |
else | |
Value = new byte[__DefaultBufferSize]; | |
} | |
~ConcurrentRingbuffer() | |
{ | |
ReleaseUnmanagedResources(); | |
} | |
public int Length => High - Low; | |
public int Capacity => Value.Length; | |
public int FreeBytes => Value.Length - High; | |
public bool IsFull => High == Value.Length; | |
internal static ConcurrentQueue<ConcurrentRingbuffer> RingBuffer | |
{ | |
get | |
{ | |
return _ringBuffer; | |
} | |
} | |
public readonly int EntityId = ++_entityCounter; | |
public byte[] Value; | |
public bool DoDispose = false; | |
public bool IsIntegrated = false; | |
public int High = 0; | |
public int Low = 0; | |
/// <summary> | |
/// Writes bytes to an array and returns the amount of written bytes | |
/// </summary> | |
/// <param name="sourceBuffer"></param> | |
/// <param name="sourceOffset"></param> | |
/// <param name="count"></param> | |
/// <returns></returns> | |
public int Write(byte[] sourceBuffer, int sourceOffset, int count = -1) | |
{ | |
if (count == -1) | |
{ | |
count = sourceBuffer.Length; | |
} | |
var lHigh = High; | |
var z1 = sourceBuffer.Length - sourceOffset; | |
var z2 = Value.Length - lHigh; | |
if (z1 < count) | |
{ | |
count = z1; | |
} | |
if (z2 < count) | |
{ | |
count = z2; | |
} | |
//count = Math.Min(count, sourceBuffer.Length - sourceOffset); | |
//count = Math.Min(count, Value.Length - High); | |
//count = count == -1 | |
// ? Math.Min(sourceBuffer.Length - sourceOffset, Value.Length - High) | |
// : Math.Min(Math.Min(count, sourceBuffer.Length - sourceOffset), Value.Length - High); | |
if (count > 0) | |
Array.Copy(sourceBuffer, sourceOffset, Value, lHigh, count); | |
High += count; | |
return count; | |
} | |
public int ReadByte() | |
{ | |
if (Length < 0) | |
{ | |
return Value[Low++]; | |
} | |
return -1; | |
} | |
public int Read(byte[] destBuffer, int destOffset, int count = -1) | |
{ | |
if (count == -1) | |
count = destBuffer.Length - destOffset; | |
var cLen = Length; | |
if (cLen < count) | |
{ | |
count = cLen; | |
} | |
if (count > 0) | |
{ | |
Array.Copy(Value, Low, destBuffer, destOffset, count); | |
Low += count; | |
} | |
return count; | |
} | |
/// <summary> | |
/// Re-Enables R/W from the beginning | |
/// </summary> | |
/// <param name="isIntegration"></param> | |
[MethodImpl(MethodImplOptions.AggressiveInlining)] | |
public void Reset(bool isIntegration = false) | |
{ | |
High = 0; | |
Low = 0; | |
if (isIntegration) | |
{ | |
LastUseTime = Ticks; | |
IsIntegrated = true; | |
} | |
} | |
private void ReleaseUnmanagedResources() | |
{ | |
// TODO release unmanaged resources here | |
} | |
public void Dispose() | |
{ | |
Reset(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment