Created
September 18, 2015 06:40
-
-
Save andreasohlund/020eb2a81249689174d5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Transactions; | |
using MassTransit.Util; | |
namespace MsmqAsyncSpike | |
{ | |
using System; | |
using System.Messaging; | |
using System.Threading; | |
using System.Threading.Tasks; | |
class Program | |
{ | |
static int numMessages = 1000; | |
private static int bodySizeBytes = 1024 * 1000 | |
; | |
static void Main(string[] args) | |
{ | |
var queue = new MessageQueue(@".\private$\asyncspike"); | |
queue.Purge(); | |
FillQueue(queue,bodySizeBytes); | |
Console.Out.WriteLine("queue filled"); | |
var cancellationToken = new CancellationTokenSource(); | |
var task = Task.Factory.StartNew(() => new Program().StartMessagPump(queue, cancellationToken.Token), cancellationToken.Token) | |
.Unwrap() | |
.ContinueWith(t => | |
{ | |
Console.Out.WriteLine("Stopped " + t.Status); | |
}); | |
Console.ReadKey(); | |
cancellationToken.Cancel(); | |
task.Wait(); | |
queue.Dispose(); | |
Console.Out.WriteLine("stopped"); | |
Console.ReadKey(); | |
} | |
static void FillQueue(MessageQueue queue, int numBytes) | |
{ | |
Parallel.For(0, numMessages, (i, s) => | |
{ | |
//queue.Send(new Message(), MessageQueueTransactionType.Single); | |
using (var memStream = new MemoryStream(new byte[numBytes])) | |
{ | |
var m = new Message { BodyStream = memStream }; | |
queue.Send(m, MessageQueueTransactionType.Single); | |
} | |
}); | |
} | |
async Task StartMessagPump(MessageQueue queue, CancellationToken token) | |
{ | |
sw.Start(); | |
//var enumerator = ; | |
using (var enumerator = queue.GetMessageEnumerator2()) | |
{ | |
var maxConcurrency = 1000; | |
var semaphore = new SemaphoreSlim(maxConcurrency); | |
var tasks = new ConcurrentDictionary<Task, Task>(); | |
while (!token.IsCancellationRequested) | |
{ | |
if (!enumerator.MoveNext(TimeSpan.FromMilliseconds(10))) | |
{ | |
//Console.Out.WriteLine("Empty"); | |
continue; | |
} | |
await semaphore.WaitAsync(token); | |
var task = Task.Run(async () => | |
{ | |
//Console.Out.WriteLine("Started"); | |
await ProcessMessage(queue).ConfigureAwait(false); | |
//Console.Out.WriteLine("Processed"); | |
}, token); | |
task.ContinueWith(t => | |
{ | |
semaphore.Release(); | |
Task whoCares; | |
tasks.TryRemove(t, out whoCares); | |
}, TaskContinuationOptions.ExecuteSynchronously); | |
tasks.AddOrUpdate(task, task, (k, v) => task); | |
//Console.Out.WriteLine("Requested"); | |
} | |
Console.Out.WriteLine("Messages: " + numProcessed); | |
Console.Out.WriteLine("Tasks: " + tasks.Count); | |
await Task.WhenAll(tasks.Values); | |
} | |
} | |
async Task ProcessMessage(MessageQueue queue) | |
{ | |
Func<Message, Task> pipe = m => | |
{ | |
if (Interlocked.Increment(ref numProcessed) == numMessages) | |
{ | |
var msgPerSec = numMessages / sw.Elapsed.TotalSeconds; | |
Console.Out.WriteLine("Done: {0:0.000}", msgPerSec); | |
} | |
//return Task.Delay(TimeSpan.FromSeconds(1)); | |
return Task.FromResult(0); | |
}; | |
//// ------------- Native --------------- | |
using (var tx = new MessageQueueTransaction()) | |
{ | |
tx.Begin(); | |
var message = queue.Receive(TimeSpan.FromMilliseconds(10), tx); | |
using (var sr = new StreamReader(message.BodyStream)) | |
{ | |
var buffer = sr.ReadToEnd(); | |
await pipe(message).ConfigureAwait(false); | |
} | |
tx.Commit(); | |
} | |
// ------------- TX Scope --------------- | |
//using (var tx = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) | |
//{ | |
// var message = queue.Receive(TimeSpan.FromMilliseconds(10), MessageQueueTransactionType.Automatic); | |
// await pipe(message); | |
// tx.Complete(); | |
//} | |
// -------------No tx at all-------------- | |
//var message = queue.Receive(TimeSpan.FromSeconds(5)); | |
//await pipe(message); | |
} | |
static int numProcessed; | |
static Stopwatch sw = new Stopwatch(); | |
} | |
class LimitedConcurrencyLevelTaskScheduler : TaskScheduler | |
{ | |
// Indicates whether the current thread is processing work items. | |
[ThreadStatic] | |
private static bool _currentThreadIsProcessingItems; | |
// The list of tasks to be executed | |
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) | |
// The maximum concurrency level allowed by this scheduler. | |
private readonly int _maxDegreeOfParallelism; | |
// Indicates whether the scheduler is currently processing work items. | |
private int _delegatesQueuedOrRunning = 0; | |
// Creates a new instance with the specified degree of parallelism. | |
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) | |
{ | |
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); | |
_maxDegreeOfParallelism = maxDegreeOfParallelism; | |
} | |
// Queues a task to the scheduler. | |
protected sealed override void QueueTask(Task task) | |
{ | |
// Add the task to the list of tasks to be processed. If there aren't enough | |
// delegates currently queued or running to process tasks, schedule another. | |
lock (_tasks) | |
{ | |
_tasks.AddLast(task); | |
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) | |
{ | |
++_delegatesQueuedOrRunning; | |
NotifyThreadPoolOfPendingWork(); | |
} | |
} | |
} | |
// Inform the ThreadPool that there's work to be executed for this scheduler. | |
private void NotifyThreadPoolOfPendingWork() | |
{ | |
ThreadPool.UnsafeQueueUserWorkItem(_ => | |
{ | |
// Note that the current thread is now processing work items. | |
// This is necessary to enable inlining of tasks into this thread. | |
_currentThreadIsProcessingItems = true; | |
try | |
{ | |
// Process all available items in the queue. | |
while (true) | |
{ | |
Task item; | |
lock (_tasks) | |
{ | |
// When there are no more items to be processed, | |
// note that we're done processing, and get out. | |
if (_tasks.Count == 0) | |
{ | |
--_delegatesQueuedOrRunning; | |
break; | |
} | |
// Get the next item from the queue | |
item = _tasks.First.Value; | |
_tasks.RemoveFirst(); | |
} | |
// Execute the task we pulled out of the queue | |
TryExecuteTask(item); | |
} | |
} | |
// We're done processing items on the current thread | |
finally { _currentThreadIsProcessingItems = false; } | |
}, null); | |
} | |
// Attempts to execute the specified task on the current thread. | |
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) | |
{ | |
// If this thread isn't already processing a task, we don't support inlining | |
if (!_currentThreadIsProcessingItems) return false; | |
// If the task was previously queued, remove it from the queue | |
if (taskWasPreviouslyQueued) | |
// Try to run the task. | |
if (TryDequeue(task)) | |
return TryExecuteTask(task); | |
else | |
return false; | |
else | |
return TryExecuteTask(task); | |
} | |
// Attempt to remove a previously scheduled task from the scheduler. | |
protected sealed override bool TryDequeue(Task task) | |
{ | |
lock (_tasks) return _tasks.Remove(task); | |
} | |
// Gets the maximum concurrency level supported by this scheduler. | |
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } | |
// Gets an enumerable of the tasks currently scheduled on this scheduler. | |
protected sealed override IEnumerable<Task> GetScheduledTasks() | |
{ | |
var lockTaken = false; | |
try | |
{ | |
Monitor.TryEnter(_tasks, ref lockTaken); | |
if (lockTaken) return _tasks; | |
else throw new NotSupportedException(); | |
} | |
finally | |
{ | |
if (lockTaken) Monitor.Exit(_tasks); | |
} | |
} | |
} | |
} | |
// Copyright 2007-2015 Chris Patterson, Dru Sellers, Travis Smith, et. al. | |
// | |
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use | |
// this file except in compliance with the License. You may obtain a copy of the | |
// License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, software distributed | |
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR | |
// CONDITIONS OF ANY KIND, either express or implied. See the License for the | |
// specific language governing permissions and limitations under the License. | |
namespace MassTransit.Util | |
{ | |
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
/// <summary> | |
/// Provides a TaskScheduler that provides control over priorities, fairness, and the underlying threads utilized. | |
/// </summary> | |
[DebuggerTypeProxy(typeof(QueuedTaskSchedulerDebugView)), DebuggerDisplay("Id={Id}, Queues={DebugQueueCount}, ScheduledTasks = {DebugTaskCount}")] | |
public sealed class QueuedTaskScheduler : TaskScheduler, | |
IDisposable | |
{ | |
/// <summary>Whether we're processing tasks on the current thread.</summary> | |
static readonly ThreadLocal<bool> _taskProcessingThread = new ThreadLocal<bool>(); | |
/// <summary>The collection of tasks to be executed on our custom threads.</summary> | |
readonly BlockingCollection<Task> _blockingTaskQueue; | |
/// <summary> | |
/// The maximum allowed concurrency level of this scheduler. If custom threads are | |
/// used, this represents the number of created threads. | |
/// </summary> | |
readonly int _concurrencyLevel; | |
/// <summary>Cancellation token used for disposal.</summary> | |
readonly CancellationTokenSource _disposeCancellation = new CancellationTokenSource(); | |
// *** | |
// *** For when using a target scheduler | |
// *** | |
/// <summary>The queue of tasks to process when using an underlying target scheduler.</summary> | |
readonly Queue<Task> _nonthreadsafeTaskQueue; | |
/// <summary> | |
/// A sorted list of round-robin queue lists. Tasks with the smallest priority value | |
/// are preferred. Priority groups are round-robin'd through in order of priority. | |
/// </summary> | |
readonly SortedList<int, QueueGroup> _queueGroups = new SortedList<int, QueueGroup>(); | |
/// <summary>The scheduler onto which actual work is scheduled.</summary> | |
readonly TaskScheduler _targetScheduler; | |
// *** | |
// *** For when using our own threads | |
// *** | |
/// <summary>The threads used by the scheduler to process work.</summary> | |
readonly Thread[] _threads; | |
/// <summary>The number of Tasks that have been queued or that are running whiel using an underlying scheduler.</summary> | |
int _delegatesQueuedOrRunning; | |
// *** | |
/// <summary>Initializes the scheduler.</summary> | |
public QueuedTaskScheduler() | |
: this(Default, 0) | |
{ | |
} | |
/// <summary>Initializes the scheduler.</summary> | |
/// <param name="targetScheduler">The target underlying scheduler onto which this sceduler's work is queued.</param> | |
public QueuedTaskScheduler(TaskScheduler targetScheduler) | |
: this(targetScheduler, 0) | |
{ | |
} | |
/// <summary>Initializes the scheduler.</summary> | |
/// <param name="targetScheduler">The target underlying scheduler onto which this sceduler's work is queued.</param> | |
/// <param name="maxConcurrencyLevel">The maximum degree of concurrency allowed for this scheduler's work.</param> | |
public QueuedTaskScheduler(TaskScheduler targetScheduler, int maxConcurrencyLevel) | |
{ | |
if (targetScheduler == null) | |
throw new ArgumentNullException("targetScheduler"); | |
if (maxConcurrencyLevel < 0) | |
throw new ArgumentOutOfRangeException("maxConcurrencyLevel"); | |
// Initialize only those fields relevant to use an underlying scheduler. We don't | |
// initialize the fields relevant to using our own custom threads. | |
_targetScheduler = targetScheduler; | |
_nonthreadsafeTaskQueue = new Queue<Task>(); | |
// If 0, use the number of logical processors. But make sure whatever value we pick | |
// is not greater than the degree of parallelism allowed by the underlying scheduler. | |
_concurrencyLevel = maxConcurrencyLevel != 0 ? maxConcurrencyLevel : Environment.ProcessorCount; | |
if (targetScheduler.MaximumConcurrencyLevel > 0 && targetScheduler.MaximumConcurrencyLevel < _concurrencyLevel) | |
_concurrencyLevel = targetScheduler.MaximumConcurrencyLevel; | |
} | |
/// <summary>Initializes the scheduler.</summary> | |
/// <param name="threadCount">The number of threads to create and use for processing work items.</param> | |
public QueuedTaskScheduler(int threadCount) | |
: this(threadCount, string.Empty, false, ThreadPriority.Normal, ApartmentState.MTA, 0, null, null) | |
{ | |
} | |
/// <summary>Initializes the scheduler.</summary> | |
/// <param name="threadCount">The number of threads to create and use for processing work items.</param> | |
/// <param name="threadName">The name to use for each of the created threads.</param> | |
/// <param name="useForegroundThreads">A Boolean value that indicates whether to use foreground threads instead of background.</param> | |
/// <param name="threadPriority">The priority to assign to each thread.</param> | |
/// <param name="threadApartmentState">The apartment state to use for each thread.</param> | |
/// <param name="threadMaxStackSize">The stack size to use for each thread.</param> | |
/// <param name="threadInit">An initialization routine to run on each thread.</param> | |
/// <param name="threadFinally">A finalization routine to run on each thread.</param> | |
public QueuedTaskScheduler( | |
int threadCount, | |
string threadName = "", | |
bool useForegroundThreads = false, | |
ThreadPriority threadPriority = ThreadPriority.Normal, | |
ApartmentState threadApartmentState = ApartmentState.MTA, | |
int threadMaxStackSize = 0, | |
Action threadInit = null, | |
Action threadFinally = null) | |
{ | |
// Validates arguments (some validation is left up to the Thread type itself). | |
// If the thread count is 0, default to the number of logical processors. | |
if (threadCount < 0) | |
throw new ArgumentOutOfRangeException("threadCount"); | |
_concurrencyLevel = threadCount == 0 ? Environment.ProcessorCount : threadCount; | |
// Initialize the queue used for storing tasks | |
_blockingTaskQueue = new BlockingCollection<Task>(); | |
// Create all of the threads | |
_threads = new Thread[threadCount]; | |
for (int i = 0; i < threadCount; i++) | |
{ | |
_threads[i] = new Thread(() => ThreadBasedDispatchLoop(threadInit, threadFinally), threadMaxStackSize) | |
{ | |
Priority = threadPriority, | |
IsBackground = !useForegroundThreads, | |
}; | |
if (threadName != null) | |
_threads[i].Name = threadName + " (" + i + ")"; | |
_threads[i].SetApartmentState(threadApartmentState); | |
} | |
// Start all of the threads | |
foreach (Thread thread in _threads) | |
thread.Start(); | |
} | |
/// <summary>Gets the number of queues currently activated.</summary> | |
int DebugQueueCount | |
{ | |
get | |
{ | |
int count = 0; | |
foreach (var group in _queueGroups) | |
count += group.Value.Count; | |
return count; | |
} | |
} | |
/// <summary>Gets the number of tasks currently scheduled.</summary> | |
int DebugTaskCount | |
{ | |
get | |
{ | |
return (_targetScheduler != null | |
? _nonthreadsafeTaskQueue | |
: (IEnumerable<Task>)_blockingTaskQueue) | |
.Where(t => t != null).Count(); | |
} | |
} | |
/// <summary>Gets the maximum concurrency level to use when processing tasks.</summary> | |
public override int MaximumConcurrencyLevel | |
{ | |
get { return _concurrencyLevel; } | |
} | |
/// <summary>Initiates shutdown of the scheduler.</summary> | |
public void Dispose() | |
{ | |
_disposeCancellation.Cancel(); | |
} | |
/// <summary>The dispatch loop run by all threads in this scheduler.</summary> | |
/// <param name="threadInit">An initialization routine to run when the thread begins.</param> | |
/// <param name="threadFinally">A finalization routine to run before the thread ends.</param> | |
void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally) | |
{ | |
_taskProcessingThread.Value = true; | |
if (threadInit != null) | |
threadInit(); | |
try | |
{ | |
// If the scheduler is disposed, the cancellation token will be set and | |
// we'll receive an OperationCanceledException. That OCE should not crash the process. | |
try | |
{ | |
// If a thread abort occurs, we'll try to reset it and continue running. | |
while (true) | |
{ | |
try | |
{ | |
// For each task queued to the scheduler, try to execute it. | |
foreach (Task task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)) | |
{ | |
// If the task is not null, that means it was queued to this scheduler directly. | |
// Run it. | |
if (task != null) | |
TryExecuteTask(task); | |
// If the task is null, that means it's just a placeholder for a task | |
// queued to one of the subschedulers. Find the next task based on | |
// priority and fairness and run it. | |
else | |
{ | |
// Find the next task based on our ordering rules... | |
Task targetTask; | |
QueuedTaskSchedulerQueue queueForTargetTask; | |
lock (_queueGroups) | |
FindNextTaskNeedsLock(out targetTask, out queueForTargetTask); | |
// ... and if we found one, run it | |
if (targetTask != null) | |
queueForTargetTask.ExecuteTask(targetTask); | |
} | |
} | |
} | |
catch (ThreadAbortException) | |
{ | |
// If we received a thread abort, and that thread abort was due to shutting down | |
// or unloading, let it pass through. Otherwise, reset the abort so we can | |
// continue processing work items. | |
if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload()) | |
Thread.ResetAbort(); | |
} | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
} | |
} | |
finally | |
{ | |
// Run a cleanup routine if there was one | |
if (threadFinally != null) | |
threadFinally(); | |
_taskProcessingThread.Value = false; | |
} | |
} | |
/// <summary>Find the next task that should be executed, based on priorities and fairness and the like.</summary> | |
/// <param name="targetTask">The found task, or null if none was found.</param> | |
/// <param name="queueForTargetTask"> | |
/// The scheduler associated with the found task. Due to security checks inside of TPL, | |
/// this scheduler needs to be used to execute that task. | |
/// </param> | |
void FindNextTaskNeedsLock(out Task targetTask, out QueuedTaskSchedulerQueue queueForTargetTask) | |
{ | |
targetTask = null; | |
queueForTargetTask = null; | |
// Look through each of our queue groups in sorted order. | |
// This ordering is based on the priority of the queues. | |
foreach (var queueGroup in _queueGroups) | |
{ | |
QueueGroup queues = queueGroup.Value; | |
// Within each group, iterate through the queues in a round-robin | |
// fashion. Every time we iterate again and successfully find a task, | |
// we'll start in the next location in the group. | |
foreach (int i in queues.CreateSearchOrder()) | |
{ | |
queueForTargetTask = queues[i]; | |
Queue<Task> items = queueForTargetTask._workItems; | |
if (items.Count > 0) | |
{ | |
targetTask = items.Dequeue(); | |
if (queueForTargetTask._disposed && items.Count == 0) | |
RemoveQueueNeedsLock(queueForTargetTask); | |
queues.NextQueueIndex = (queues.NextQueueIndex + 1) % queueGroup.Value.Count; | |
return; | |
} | |
} | |
} | |
} | |
/// <summary>Queues a task to the scheduler.</summary> | |
/// <param name="task">The task to be queued.</param> | |
protected override void QueueTask(Task task) | |
{ | |
// If we've been disposed, no one should be queueing | |
if (_disposeCancellation.IsCancellationRequested) | |
throw new ObjectDisposedException(GetType().Name); | |
// If the target scheduler is null (meaning we're using our own threads), | |
// add the task to the blocking queue | |
if (_targetScheduler == null) | |
_blockingTaskQueue.Add(task); | |
// Otherwise, add the task to the non-blocking queue, | |
// and if there isn't already an executing processing task, | |
// start one up | |
else | |
{ | |
// Queue the task and check whether we should launch a processing | |
// task (noting it if we do, so that other threads don't result | |
// in queueing up too many). | |
bool launchTask = false; | |
lock (_nonthreadsafeTaskQueue) | |
{ | |
_nonthreadsafeTaskQueue.Enqueue(task); | |
if (_delegatesQueuedOrRunning < _concurrencyLevel) | |
{ | |
++_delegatesQueuedOrRunning; | |
launchTask = true; | |
} | |
} | |
// If necessary, start processing asynchronously | |
if (launchTask) | |
{ | |
Task.Factory.StartNew(ProcessPrioritizedAndBatchedTasks, | |
CancellationToken.None, TaskCreationOptions.None, _targetScheduler); | |
} | |
} | |
} | |
/// <summary> | |
/// Process tasks one at a time in the best order. | |
/// This should be run in a Task generated by QueueTask. | |
/// It's been separated out into its own method to show up better in Parallel Tasks. | |
/// </summary> | |
void ProcessPrioritizedAndBatchedTasks() | |
{ | |
bool continueProcessing = true; | |
while (!_disposeCancellation.IsCancellationRequested && continueProcessing) | |
{ | |
try | |
{ | |
// Note that we're processing tasks on this thread | |
_taskProcessingThread.Value = true; | |
// Until there are no more tasks to process | |
while (!_disposeCancellation.IsCancellationRequested) | |
{ | |
// Try to get the next task. If there aren't any more, we're done. | |
Task targetTask; | |
lock (_nonthreadsafeTaskQueue) | |
{ | |
if (_nonthreadsafeTaskQueue.Count == 0) | |
break; | |
targetTask = _nonthreadsafeTaskQueue.Dequeue(); | |
} | |
// If the task is null, it's a placeholder for a task in the round-robin queues. | |
// Find the next one that should be processed. | |
QueuedTaskSchedulerQueue queueForTargetTask = null; | |
if (targetTask == null) | |
{ | |
lock (_queueGroups) | |
FindNextTaskNeedsLock(out targetTask, out queueForTargetTask); | |
} | |
// Now if we finally have a task, run it. If the task | |
// was associated with one of the round-robin schedulers, we need to use it | |
// as a thunk to execute its task. | |
if (targetTask != null) | |
{ | |
if (queueForTargetTask != null) | |
queueForTargetTask.ExecuteTask(targetTask); | |
else | |
TryExecuteTask(targetTask); | |
} | |
} | |
} | |
finally | |
{ | |
// Now that we think we're done, verify that there really is | |
// no more work to do. If there's not, highlight | |
// that we're now less parallel than we were a moment ago. | |
lock (_nonthreadsafeTaskQueue) | |
{ | |
if (_nonthreadsafeTaskQueue.Count == 0) | |
{ | |
_delegatesQueuedOrRunning--; | |
continueProcessing = false; | |
_taskProcessingThread.Value = false; | |
} | |
} | |
} | |
} | |
} | |
/// <summary>Notifies the pool that there's a new item to be executed in one of the round-robin queues.</summary> | |
void NotifyNewWorkItem() | |
{ | |
QueueTask(null); | |
} | |
/// <summary>Tries to execute a task synchronously on the current thread.</summary> | |
/// <param name="task">The task to execute.</param> | |
/// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param> | |
/// <returns>true if the task was executed; otherwise, false.</returns> | |
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) | |
{ | |
// If we're already running tasks on this threads, enable inlining | |
return _taskProcessingThread.Value && TryExecuteTask(task); | |
} | |
/// <summary>Gets the tasks scheduled to this scheduler.</summary> | |
/// <returns>An enumerable of all tasks queued to this scheduler.</returns> | |
/// <remarks>This does not include the tasks on sub-schedulers. Those will be retrieved by the debugger separately.</remarks> | |
protected override IEnumerable<Task> GetScheduledTasks() | |
{ | |
// If we're running on our own threads, get the tasks from the blocking queue... | |
if (_targetScheduler == null) | |
{ | |
// Get all of the tasks, filtering out nulls, which are just placeholders | |
// for tasks in other sub-schedulers | |
return _blockingTaskQueue.Where(t => t != null).ToList(); | |
} | |
// otherwise get them from the non-blocking queue... | |
return _nonthreadsafeTaskQueue.Where(t => t != null).ToList(); | |
} | |
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary> | |
/// <returns>The newly created and activated queue at priority 0.</returns> | |
public TaskScheduler ActivateNewQueue() | |
{ | |
return ActivateNewQueue(0); | |
} | |
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary> | |
/// <param name="priority">The priority level for the new queue.</param> | |
/// <returns>The newly created and activated queue at the specified priority.</returns> | |
public TaskScheduler ActivateNewQueue(int priority) | |
{ | |
// Create the queue | |
var createdQueue = new QueuedTaskSchedulerQueue(priority, this); | |
// Add the queue to the appropriate queue group based on priority | |
lock (_queueGroups) | |
{ | |
QueueGroup list; | |
if (!_queueGroups.TryGetValue(priority, out list)) | |
{ | |
list = new QueueGroup(); | |
_queueGroups.Add(priority, list); | |
} | |
list.Add(createdQueue); | |
} | |
// Hand the new queue back | |
return createdQueue; | |
} | |
/// <summary>Removes a scheduler from the group.</summary> | |
/// <param name="queue">The scheduler to be removed.</param> | |
void RemoveQueueNeedsLock(QueuedTaskSchedulerQueue queue) | |
{ | |
// Find the group that contains the queue and the queue's index within the group | |
QueueGroup queueGroup = _queueGroups[queue._priority]; | |
int index = queueGroup.IndexOf(queue); | |
// We're about to remove the queue, so adjust the index of the next | |
// round-robin starting location if it'll be affected by the removal | |
if (queueGroup.NextQueueIndex >= index) | |
queueGroup.NextQueueIndex--; | |
// Remove it | |
queueGroup.RemoveAt(index); | |
} | |
/// <summary>A group of queues a the same priority level.</summary> | |
class QueueGroup : List<QueuedTaskSchedulerQueue> | |
{ | |
/// <summary>The starting index for the next round-robin traversal.</summary> | |
public int NextQueueIndex; | |
/// <summary>Creates a search order through this group.</summary> | |
/// <returns>An enumerable of indices for this group.</returns> | |
public IEnumerable<int> CreateSearchOrder() | |
{ | |
for (int i = NextQueueIndex; i < Count; i++) | |
yield return i; | |
for (int i = 0; i < NextQueueIndex; i++) | |
yield return i; | |
} | |
} | |
/// <summary>Debug view for the QueuedTaskScheduler.</summary> | |
class QueuedTaskSchedulerDebugView | |
{ | |
/// <summary>The scheduler.</summary> | |
readonly QueuedTaskScheduler _scheduler; | |
/// <summary>Initializes the debug view.</summary> | |
/// <param name="scheduler">The scheduler.</param> | |
public QueuedTaskSchedulerDebugView(QueuedTaskScheduler scheduler) | |
{ | |
if (scheduler == null) | |
throw new ArgumentNullException("scheduler"); | |
_scheduler = scheduler; | |
} | |
/// <summary>Gets all of the Tasks queued to the scheduler directly.</summary> | |
public IEnumerable<Task> ScheduledTasks | |
{ | |
get | |
{ | |
IEnumerable<Task> tasks = (_scheduler._targetScheduler != null) | |
? _scheduler._nonthreadsafeTaskQueue | |
: (IEnumerable<Task>)_scheduler._blockingTaskQueue; | |
return tasks.Where(t => t != null).ToList(); | |
} | |
} | |
/// <summary>Gets the prioritized and fair queues.</summary> | |
public IEnumerable<TaskScheduler> Queues | |
{ | |
get | |
{ | |
var queues = new List<TaskScheduler>(); | |
foreach (var group in _scheduler._queueGroups) | |
queues.AddRange(group.Value); | |
return queues; | |
} | |
} | |
} | |
/// <summary>Provides a scheduling queue associatd with a QueuedTaskScheduler.</summary> | |
[DebuggerDisplay("QueuePriority = {_priority}, WaitingTasks = {WaitingTasks}"), DebuggerTypeProxy(typeof(QueuedTaskSchedulerQueueDebugView))] | |
sealed class QueuedTaskSchedulerQueue : TaskScheduler, | |
IDisposable | |
{ | |
/// <summary>The scheduler with which this pool is associated.</summary> | |
readonly QueuedTaskScheduler _pool; | |
/// <summary>Gets the priority for this queue.</summary> | |
internal readonly int _priority; | |
/// <summary>The work items stored in this queue.</summary> | |
internal readonly Queue<Task> _workItems; | |
/// <summary>Whether this queue has been disposed.</summary> | |
internal bool _disposed; | |
/// <summary>Initializes the queue.</summary> | |
/// <param name="priority">The priority associated with this queue.</param> | |
/// <param name="pool">The scheduler with which this queue is associated.</param> | |
internal QueuedTaskSchedulerQueue(int priority, QueuedTaskScheduler pool) | |
{ | |
_priority = priority; | |
_pool = pool; | |
_workItems = new Queue<Task>(); | |
} | |
/// <summary>Gets the number of tasks waiting in this scheduler.</summary> | |
internal int WaitingTasks | |
{ | |
get { return _workItems.Count; } | |
} | |
/// <summary>Gets the maximum concurrency level to use when processing tasks.</summary> | |
public override int MaximumConcurrencyLevel | |
{ | |
get { return _pool.MaximumConcurrencyLevel; } | |
} | |
/// <summary>Signals that the queue should be removed from the scheduler as soon as the queue is empty.</summary> | |
public void Dispose() | |
{ | |
if (!_disposed) | |
{ | |
lock (_pool._queueGroups) | |
{ | |
// We only remove the queue if it's empty. If it's not empty, | |
// we still mark it as disposed, and the associated QueuedTaskScheduler | |
// will remove the queue when its count hits 0 and its _disposed is true. | |
if (_workItems.Count == 0) | |
_pool.RemoveQueueNeedsLock(this); | |
} | |
_disposed = true; | |
} | |
} | |
/// <summary>Gets the tasks scheduled to this scheduler.</summary> | |
/// <returns>An enumerable of all tasks queued to this scheduler.</returns> | |
protected override IEnumerable<Task> GetScheduledTasks() | |
{ | |
return _workItems.ToList(); | |
} | |
/// <summary>Queues a task to the scheduler.</summary> | |
/// <param name="task">The task to be queued.</param> | |
protected override void QueueTask(Task task) | |
{ | |
if (_disposed) | |
throw new ObjectDisposedException(GetType().Name); | |
// Queue up the task locally to this queue, and then notify | |
// the parent scheduler that there's work available | |
lock (_pool._queueGroups) | |
_workItems.Enqueue(task); | |
_pool.NotifyNewWorkItem(); | |
} | |
/// <summary>Tries to execute a task synchronously on the current thread.</summary> | |
/// <param name="task">The task to execute.</param> | |
/// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param> | |
/// <returns>true if the task was executed; otherwise, false.</returns> | |
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) | |
{ | |
// If we're using our own threads and if this is being called from one of them, | |
// or if we're currently processing another task on this thread, try running it inline. | |
return _taskProcessingThread.Value && TryExecuteTask(task); | |
} | |
/// <summary>Runs the specified ask.</summary> | |
/// <param name="task">The task to execute.</param> | |
internal void ExecuteTask(Task task) | |
{ | |
TryExecuteTask(task); | |
} | |
/// <summary>A debug view for the queue.</summary> | |
sealed class QueuedTaskSchedulerQueueDebugView | |
{ | |
/// <summary>The queue.</summary> | |
readonly QueuedTaskSchedulerQueue _queue; | |
/// <summary>Initializes the debug view.</summary> | |
/// <param name="queue">The queue to be debugged.</param> | |
public QueuedTaskSchedulerQueueDebugView(QueuedTaskSchedulerQueue queue) | |
{ | |
if (queue == null) | |
throw new ArgumentNullException("queue"); | |
_queue = queue; | |
} | |
/// <summary>Gets the priority of this queue in its associated scheduler.</summary> | |
public int Priority | |
{ | |
get { return _queue._priority; } | |
} | |
/// <summary>Gets the ID of this scheduler.</summary> | |
public int Id | |
{ | |
get { return _queue.Id; } | |
} | |
/// <summary>Gets all of the tasks scheduled to this queue.</summary> | |
public IEnumerable<Task> ScheduledTasks | |
{ | |
get { return _queue.GetScheduledTasks(); } | |
} | |
/// <summary>Gets the QueuedTaskScheduler with which this queue is associated.</summary> | |
public QueuedTaskScheduler AssociatedScheduler | |
{ | |
get { return _queue._pool; } | |
} | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment