Skip to content

Instantly share code, notes, and snippets.

@musukvl
Created September 4, 2018 12:13
Show Gist options
  • Select an option

  • Save musukvl/e9e44760f9d9071f0f2b3362ee176a4a to your computer and use it in GitHub Desktop.

Select an option

Save musukvl/e9e44760f9d9071f0f2b3362ee176a4a to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Common.Async
{
public class LimitedLengthActionPool : IDisposable
{
private readonly SemaphoreSlim _semaphore;
private readonly ConcurrentDictionary<Guid, Task> _executingTasks = new ConcurrentDictionary<Guid, Task>();
public LimitedLengthActionPool(int parallelism = 0)
{
if (parallelism == 0)
{
parallelism = Environment.ProcessorCount - 1;
}
_semaphore = new SemaphoreSlim(parallelism);
}
public int Length
{
get { return _executingTasks.Count(x => !x.Value.IsCompleted); }
}
public async Task AddAction(Func<Task> func)
{
await _semaphore.WaitAsync();
var completed = _executingTasks.Where(x => x.Value.IsCompleted).Select(x => x.Key).ToArray();
if (completed.Any())
{
foreach (var item in completed)
{
_executingTasks.TryRemove(item, out var _);
}
}
_executingTasks.TryAdd(Guid.NewGuid(), CreateTask(_semaphore, func));
}
private async Task CreateTask(SemaphoreSlim semaphore, Func<Task> func)
{
try
{
await func().ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
}
public void Process()
{
Task.WaitAll(_executingTasks.Values.ToArray());
}
public void Dispose()
{
Process();
_semaphore.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment