Skip to content

Instantly share code, notes, and snippets.

@alefcarlos
Last active July 6, 2020 21:26
Show Gist options
  • Save alefcarlos/6ad22e3917c5431d1344048cb4b97c24 to your computer and use it in GitHub Desktop.
Save alefcarlos/6ad22e3917c5431d1344048cb4b97c24 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace Super.GlobalPlatform.HSM.Application
{
public class SocketPool : IDisposable
{
internal readonly Func<Task<Socket>> itemFactoryFunc;
internal readonly ConcurrentQueue<Socket> availableBuffer;
internal readonly int maxConnections;
internal int currentConnections = 0;
internal bool disposedValue;
private static SemaphoreSlim semaphore;
// Default time of 10s for cleanup seems reasonable.
// Quick math:
// 10 distinct named clients * expiry time >= 1s = approximate cleanup queue of 100 items
//
// This seems frequent enough. We also rely on GC occurring to actually trigger disposal.
private readonly TimeSpan DefaultCleanupInterval = TimeSpan.FromSeconds(10);
// We use a new timer for each regular cleanup cycle, protected with a lock. Note that this scheme
// doesn't give us anything to dispose, as the timer is started/stopped as needed.
//
// There's no need for the factory itself to be disposable. If you stop using it, eventually everything will
// get reclaimed.
private Timer _cleanupTimer;
private readonly object _cleanupTimerLock;
private readonly object _cleanupActiveLock;
public SocketPool(int max, Func<Task<Socket>> factory) : this(max, factory, TimeSpan.FromMilliseconds(50))
{
}
public SocketPool(int max, Func<Task<Socket>> factory, TimeSpan waitTime)
{
maxConnections = max;
itemFactoryFunc = factory;
availableBuffer = new ConcurrentQueue<Socket>();
semaphore = new SemaphoreSlim(max, max);
}
public TimeSpan WaitTime { get; }
public int Available => this.availableBuffer.Count;
public async Task<IAcquisitonController<Socket>> AcquireAsync()
{
Socket tmpBufferElement;
System.Diagnostics.Debug.WriteLine($"SocketPool | Acquiring connection. Disponibilidade:{availableBuffer.Count}, CurrentConnections: {currentConnections}");
while (this.availableBuffer.TryDequeue(out tmpBufferElement) == false)
{
System.Diagnostics.Debug.WriteLine($"SocketPool | No available connection, need do create on or waiting for. Disponibilidade:{availableBuffer.Count}, CurrentConnections: {currentConnections}");
if (await semaphore.WaitAsync(WaitTime))
{
if (currentConnections < maxConnections) //Se tiver espaço para crescer, bora!
{
System.Diagnostics.Debug.WriteLine($"SocketPool | Creating new Socket.. Disponibilidade:{availableBuffer.Count}, CurrentConnections: {currentConnections}");
Interlocked.Increment(ref currentConnections);
System.Diagnostics.Debug.WriteLine($"SocketPool | Created.. Disponibilidade:{availableBuffer.Count}, CurrentConnections: {currentConnections}");
try
{
return new SocketAcquisitonController(availableBuffer, await itemFactoryFunc());
}
catch
{
semaphore.Release();
throw;
}
}
}
}
System.Diagnostics.Debug.WriteLine($"SocketPool | Ok! Disponibilidade: {availableBuffer.Count}, CurrentConnections: {currentConnections}");
if (!tmpBufferElement.IsConnected())
{
try
{
tmpBufferElement = await itemFactoryFunc();
}
catch
{
semaphore.Release();
throw;
}
}
return new SocketAcquisitonController(availableBuffer, tmpBufferElement);
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
var buffer = this.availableBuffer.ToArray();
this.availableBuffer.Clear();
foreach (Socket item in buffer)
item.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
internal class SocketAcquisitonController : IAcquisitonController<Socket>
{
private readonly ConcurrentQueue<Socket> availableBuffer;
internal SocketAcquisitonController(ConcurrentQueue<Socket> availableBuffer, Socket item)
{
this.availableBuffer = availableBuffer;
this.Current = item;
}
public Socket Current { get; }
public void Dispose()
{
System.Diagnostics.Debug.WriteLine("SocketPool | Returned pooled connection.");
this.availableBuffer.Enqueue(this.Current);
GC.SuppressFinalize(this);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment