Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save valerysntx/8973d007357a0161b41bfaad5d9b2049 to your computer and use it in GitHub Desktop.
Save valerysntx/8973d007357a0161b41bfaad5d9b2049 to your computer and use it in GitHub Desktop.
Limited Concurrency Level Task Scheduler
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
private readonly LinkedList<Task> _tasks = new LinkedList<Task>();
private readonly int _maxDegreeOfParallelism;
private int _delegatesQueuedOrRunning = 0;
public sealed override int MaximumConcurrencyLevel
{
get
{
return this._maxDegreeOfParallelism;
}
}
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
bool flag = maxDegreeOfParallelism < 1;
if (flag)
{
throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
}
this._maxDegreeOfParallelism = maxDegreeOfParallelism;
}
protected sealed override void QueueTask(Task task)
{
LinkedList<Task> tasks = this._tasks;
lock (tasks)
{
this._tasks.AddLast(task);
bool flag2 = this._delegatesQueuedOrRunning < this._maxDegreeOfParallelism;
if (flag2)
{
this._delegatesQueuedOrRunning++;
this.NotifyThreadPoolOfPendingWork();
}
}
}
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(delegate(object _)
{
LimitedConcurrencyLevelTaskScheduler._currentThreadIsProcessingItems = true;
try
{
while (true)
{
LinkedList<Task> tasks = this._tasks;
Task item;
lock (tasks)
{
bool flag2 = this._tasks.Count == 0;
if (flag2)
{
this._delegatesQueuedOrRunning--;
break;
}
item = this._tasks.First.Value;
this._tasks.RemoveFirst();
}
base.TryExecuteTask(item);
}
}
finally
{
LimitedConcurrencyLevelTaskScheduler._currentThreadIsProcessingItems = false;
}
}, null);
}
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
bool flag = !LimitedConcurrencyLevelTaskScheduler._currentThreadIsProcessingItems;
bool result;
if (flag)
{
result = false;
}
else
{
if (taskWasPreviouslyQueued)
{
this.TryDequeue(task);
}
result = base.TryExecuteTask(task);
}
return result;
}
protected sealed override bool TryDequeue(Task task)
{
LinkedList<Task> tasks = this._tasks;
bool result;
lock (tasks)
{
result = this._tasks.Remove(task);
}
return result;
}
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
IEnumerable<Task> result;
try
{
Monitor.TryEnter(this._tasks, ref lockTaken);
bool flag = lockTaken;
if (!flag)
{
throw new NotSupportedException();
}
result = this._tasks.ToArray<Task>();
}
finally
{
bool flag2 = lockTaken;
if (flag2)
{
Monitor.Exit(this._tasks);
}
}
return result;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment