Created
June 22, 2015 02:20
-
-
Save programmation/3fd37bbc583f63341400 to your computer and use it in GitHub Desktop.
Task dependency manager from Stephen Toub MSDN article. Similar to NSOperationQueue / NSOperation on MacOS / iOS.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://msdn.microsoft.com/en-us/magazine/dd569760.aspx | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace OperationQueueTest | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
Action oneSecond = () => { ComputeForOneSecond(); }; | |
DependencyManager dm = new DependencyManager(); | |
dm.OperationCompleted += dm_OperationCompleted; | |
dm.AddOperation(1, oneSecond); | |
dm.AddOperation(2, oneSecond); | |
dm.AddOperation(3, oneSecond); | |
dm.AddOperation(4, oneSecond, 1); | |
dm.AddOperation(5, oneSecond, 1, 2, 3); | |
dm.AddOperation(6, oneSecond, 3, 4); | |
dm.AddOperation(7, oneSecond, 5, 6); | |
dm.AddOperation(8, oneSecond, 5); | |
dm.Execute(); | |
Console.ReadLine(); | |
} | |
static void dm_OperationCompleted(object sender, DependencyManager.OperationCompletedEventArgs e) | |
{ | |
Console.WriteLine("Operation completed: {0} {1} -> {2}", e.Id, e.Start, e.End); | |
} | |
public static void ComputeForOneSecond() | |
{ | |
Task.Delay(1000).Wait(); | |
} | |
} | |
public class DependencyManager | |
{ | |
private class OperationData | |
{ | |
internal int Id; | |
internal Action Operation; | |
internal int[] Dependencies; | |
internal ExecutionContext Context; | |
internal int NumRemainingDependencies; | |
internal DateTimeOffset Start, End; | |
} | |
private Dictionary<int, OperationData> _operations = new Dictionary<int, OperationData>(); | |
private Dictionary<int, List<int>> _dependenciesFromTo; | |
private object _stateLock = new object(); | |
private ManualResetEvent _doneEvent; | |
private int _remainingCount; | |
public void AddOperation(int id, Action operation, params int[] dependencies) | |
{ | |
if (operation == null) | |
throw new ArgumentNullException("operation"); | |
if (dependencies == null) | |
throw new ArgumentNullException("dependencies"); | |
var data = new OperationData | |
{ | |
Context = ExecutionContext.Capture(), | |
Id = id, | |
Operation = operation, | |
Dependencies = dependencies | |
}; | |
Console.WriteLine("Add operation {0}", id); | |
_operations.Add(id, data); | |
} | |
public event EventHandler<OperationCompletedEventArgs> OperationCompleted; | |
public void Execute() | |
{ | |
VerifyThatAllOperationsHaveBeenRegistered(); | |
VerifyThereAreNoCycles(); | |
// Fill dependency data structures | |
_dependenciesFromTo = new Dictionary<int, List<int>>(); | |
foreach (var op in _operations.Values) | |
{ | |
op.NumRemainingDependencies = op.Dependencies.Length; | |
foreach (var from in op.Dependencies) | |
{ | |
List<int> toList; | |
if (!_dependenciesFromTo.TryGetValue(from, out toList)) | |
{ | |
toList = new List<int>(); | |
_dependenciesFromTo.Add(from, toList); | |
} | |
toList.Add(op.Id); | |
} | |
} | |
// Launch and wait | |
_remainingCount = _operations.Count; | |
using (_doneEvent = new ManualResetEvent(false)) | |
{ | |
lock (_stateLock) | |
{ | |
foreach (var op in _operations.Values) | |
{ | |
if (op.NumRemainingDependencies == 0) | |
QueueOperation(op); | |
} | |
} | |
_doneEvent.WaitOne(); | |
} | |
} | |
private void QueueOperation(OperationData data) | |
{ | |
Console.WriteLine("Queue operation {0}", data.Id); | |
ThreadPool.UnsafeQueueUserWorkItem(state => | |
ProcessOperation((OperationData)state), data); | |
} | |
private void ProcessOperation(OperationData data) | |
{ | |
// Time and run the operation's delegate | |
data.Start = DateTimeOffset.Now; | |
if (data.Context != null) | |
{ | |
ExecutionContext.Run(data.Context.CreateCopy(), | |
op => ((OperationData)op).Operation(), data); | |
} | |
else data.Operation(); | |
data.End = DateTimeOffset.Now; | |
// Raise the operation completed event | |
OnOperationCompleted(data); | |
// Signal to all that depend on this operation of its | |
// completion, and potentially launch newly available | |
lock (_stateLock) | |
{ | |
List<int> toList; | |
if (_dependenciesFromTo.TryGetValue(data.Id, out toList)) | |
{ | |
foreach (var targetId in toList) | |
{ | |
OperationData targetData = _operations[targetId]; | |
if (--targetData.NumRemainingDependencies == 0) | |
QueueOperation(targetData); | |
} | |
} | |
_dependenciesFromTo.Remove(data.Id); | |
if (--_remainingCount == 0) _doneEvent.Set(); | |
} | |
} | |
private void OnOperationCompleted(OperationData data) | |
{ | |
var handler = OperationCompleted; | |
if (handler != null) | |
handler(this, new OperationCompletedEventArgs( | |
data.Id, data.Start, data.End)); | |
} | |
public class OperationCompletedEventArgs : EventArgs | |
{ | |
internal OperationCompletedEventArgs( | |
int id, DateTimeOffset start, DateTimeOffset end) | |
{ | |
Id = id; Start = start; End = end; | |
} | |
public int Id { get; private set; } | |
public DateTimeOffset Start { get; private set; } | |
public DateTimeOffset End { get; private set; } | |
} | |
private void VerifyThatAllOperationsHaveBeenRegistered() | |
{ | |
foreach (var op in _operations.Values) | |
{ | |
foreach (var dependency in op.Dependencies) | |
{ | |
if (!_operations.ContainsKey(dependency)) | |
{ | |
throw new InvalidOperationException( | |
"Missing operation: " + dependency); | |
} | |
} | |
} | |
} | |
private void VerifyThereAreNoCycles() | |
{ | |
if (CreateTopologicalSort() == null) | |
throw new InvalidOperationException("Cycle detected"); | |
} | |
private List<int> CreateTopologicalSort() | |
{ | |
// Build up the dependencies graph | |
var dependenciesToFrom = new Dictionary<int, List<int>>(); | |
var dependenciesFromTo = new Dictionary<int, List<int>>(); | |
foreach (var op in _operations.Values) | |
{ | |
// Note that op.Id depends on each of op.Dependencies | |
dependenciesToFrom.Add(op.Id, new List<int>(op.Dependencies)); | |
// Note that each of op.Dependencies is relied on by op.Id | |
foreach (var depId in op.Dependencies) | |
{ | |
List<int> ids; | |
if (!dependenciesFromTo.TryGetValue(depId, out ids)) | |
{ | |
ids = new List<int>(); | |
dependenciesFromTo.Add(depId, ids); | |
} | |
ids.Add(op.Id); | |
} | |
} | |
// Create the sorted list | |
var overallPartialOrderingIds = new List<int>(dependenciesToFrom.Count); | |
var thisIterationIds = new List<int>(dependenciesToFrom.Count); | |
while (dependenciesToFrom.Count > 0) | |
{ | |
thisIterationIds.Clear(); | |
foreach (var item in dependenciesToFrom) | |
{ | |
// If an item has zero input operations, remove it. | |
if (item.Value.Count == 0) | |
{ | |
thisIterationIds.Add(item.Key); | |
// Remove all outbound edges | |
List<int> depIds; | |
if (dependenciesFromTo.TryGetValue(item.Key, out depIds)) | |
{ | |
foreach (var depId in depIds) | |
{ | |
dependenciesToFrom[depId].Remove(item.Key); | |
} | |
} | |
} | |
} | |
// If nothing was found to remove, there's no valid sort. | |
if (thisIterationIds.Count == 0) return null; | |
// Remove the found items from the dictionary and | |
// add them to the overall ordering | |
foreach (var id in thisIterationIds) dependenciesToFrom.Remove(id); | |
overallPartialOrderingIds.AddRange(thisIterationIds); | |
} | |
return overallPartialOrderingIds; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment