Created
May 29, 2014 03:32
-
-
Save meisinger/53ad8187770e75d7a813 to your computer and use it in GitHub Desktop.
example of a task controller
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using StructureMap; | |
namespace Task.Services.Service | |
{ | |
public class Controller | |
{ | |
private readonly IActiveTaskContextProvider activeContexts; | |
private readonly IScheduleProvider scheduleProvider; | |
private readonly ITaskProvider taskProvider; | |
private readonly ILogger logger; | |
public Controller(IScheduleProvider scheduleProvider, ITaskProvider taskProvider, IActiveTaskContextProvider activeContexts) | |
{ | |
this.scheduleProvider = scheduleProvider; | |
this.taskProvider = taskProvider; | |
this.activeContexts = activeContexts; | |
logger = LoggerFactory.CreateLogger("Controller"); | |
} | |
public void RunThread(CancellationToken token) | |
{ | |
using (var manual = new ManualResetEventSlim()) | |
{ | |
while (true) | |
{ | |
if (token.IsCancellationRequested) | |
break; | |
var identifiers = scheduleProvider.IdentifyTasksToExecute(DateTime.Now); | |
logger.DebugFormat("Identified {0} tasks that should be ran.", identifiers.Length); | |
foreach (var identifier in identifiers) | |
{ | |
logger.TraceFormat("Identified \"{0}\" should be ran.", identifier); | |
ActiveTaskContext context; | |
if (activeContexts.TryGetValue(identifier, out context) && context.IsRunning) | |
{ | |
logger.WarnFormat("Task \"{0}\" has been scheduled to run but is currently active.", | |
identifier); | |
logger.InfoFormat("Task \"{0}\" has been active since {1:yyyy-MM-dd hh:mm:ss}.", | |
context.TaskName, context.StartDate); | |
continue; | |
} | |
ExecuteTask(identifier, token); | |
} | |
try | |
{ | |
manual.Wait(TimeSpan.FromSeconds(15), token); | |
} | |
catch (OperationCanceledException) | |
{ | |
break; | |
} | |
} | |
} | |
var runningTasks = activeContexts.GetRunningContexts() | |
.Select(x => x.RunningTask).ToArray(); | |
if (activeContexts.Any()) | |
WaitOnTasks(runningTasks); | |
} | |
public void ExecuteTask(string taskId, CancellationToken token) | |
{ | |
scheduleProvider.PopTaskFromConsideration(taskId); | |
var model = taskProvider.LoadTaskConfiguration(taskId); | |
if (model == null) | |
{ | |
logger.WarnFormat("Unable to identify configuration for task \"{0}\".", taskId); | |
return; | |
} | |
var context = ActiveTaskContext.Create(model.TaskId, model.TaskName, false); | |
if (!model.IsContainer) | |
{ | |
ExecuteTask(context, model, token); | |
return; | |
} | |
var collection = IdentifyTasksToExecute(model) | |
.Where(x => x is TaskModel) | |
.Cast<TaskModel>() | |
.ToArray(); | |
ExecuteTaskContainer(context, collection, token); | |
} | |
public void ExecuteTask(ActiveTaskContext context, TaskModel model, CancellationToken token) | |
{ | |
var task = new Task(() => | |
{ | |
try | |
{ | |
var instance = LoadTaskInstance(model); | |
logger.InfoFormat("Executing task \"{0}\".", model.TaskName); | |
instance.Execute(context.Token); | |
} | |
catch (OperationCanceledException) | |
{ | |
if (context.ServiceToken.IsCancellationRequested) | |
context.ServiceToken.ThrowIfCancellationRequested(); | |
} | |
}, token, TaskCreationOptions.LongRunning); | |
task.ContinueWith(x => ContinueOnComplete(context), TaskContinuationOptions.OnlyOnRanToCompletion); | |
task.ContinueWith(x => ContinueOnCancellation(context), TaskContinuationOptions.OnlyOnCanceled); | |
task.ContinueWith(_ => ContinueOnFault(x, context), TaskContinuationOptions.OnlyOnFaulted); | |
activeContexts.TryAdd(context.TaskId, context); | |
context.SetActiveTask(task, token); | |
task.Start(); | |
} | |
public void ExecuteTaskContainer(ActiveTaskContext context, TaskModel[] models, CancellationToken token) | |
{ | |
var task = new Task(() => | |
{ | |
try | |
{ | |
foreach (var model in models) | |
{ | |
if (context.ServiceToken.IsCancellationRequested) | |
context.ServiceToken.ThrowIfCancellationRequested(); | |
if (context.Token.IsCancellationRequested) | |
break; | |
var instance = LoadTaskInstance(model); | |
logger.InfoFormat("Executing task \"{0}\".", model.TaskName); | |
instance.Execute(context.Token); | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
if (context.ServiceToken.IsCancellationRequested) | |
context.ServiceToken.ThrowIfCancellationRequested(); | |
} | |
}, token, TaskCreationOptions.LongRunning); | |
task.ContinueWith(_ => ContinueOnComplete(context), TaskContinuationOptions.OnlyOnRanToCompletion); | |
task.ContinueWith(_ => ContinueOnCancellation(context), TaskContinuationOptions.OnlyOnCanceled); | |
task.ContinueWith(x => ContinueOnFault(x, context), TaskContinuationOptions.OnlyOnFaulted); | |
activeContexts.TryAdd(context.TaskId, context); | |
context.SetActiveTask(task, token); | |
task.Start(); | |
} | |
private void ContinueOnComplete(ActiveTaskContext context) | |
{ | |
var messageFormat = (!context.WasTaskCancelled) | |
? "Task \"{0}\" completed." | |
: "Task \"{0}\" was cancelled. This task may not have completed."; | |
logger.InfoFormat(messageFormat, context.TaskName); | |
ActiveTaskContext existingContext; | |
if (!activeContexts.TryRemove(context.TaskId, out existingContext)) | |
{ | |
logger.WarnFormat( | |
"Unable to remove task \"{0}\". This may occur if the task was already removed or evicted from the dictionary.", | |
context.TaskName); | |
return; | |
} | |
if (!context.WasTaskCancelled) | |
scheduleProvider.PushTaskForConsideration(context.TaskId); | |
context.Dispose(); | |
} | |
private void ContinueOnCancellation(ActiveTaskContext context) | |
{ | |
logger.InfoFormat("Task \"{0}\" was asked to stop. This task shut down gracefully.", context.TaskName); | |
ActiveTaskContext existingContext; | |
if (!activeContexts.TryRemove(context.TaskId, out existingContext)) | |
{ | |
logger.WarnFormat( | |
"Unable to remove task \"{0}\". This may occur if the task was already removed or evicted from the dictionary.", | |
context.TaskName); | |
return; | |
} | |
logger.WarnFormat("Removed task \"{0}\" from the dictionary.", context.TaskName); | |
context.Dispose(); | |
} | |
private void ContinueOnFault(Task task, ActiveTaskContext context) | |
{ | |
logger.ErrorFormat(task.Exception, "Fatal error occured with task \"{0}\".", context.TaskName); | |
ActiveTaskContext existing; | |
if (!activeContexts.TryRemove(context.TaskId, out existing)) | |
{ | |
logger.WarnFormat( | |
"Unable to remove task \"{0}\". This may occur if the task was already removed or evicted from the dictionary.", | |
context.TaskName); | |
return; | |
} | |
logger.WarnFormat("Removed task \"{0}\" from the dictionary.", context.TaskName); | |
context.Dispose(); | |
} | |
private IEnumerable<TaskModel> IdentifyTasksToExecute(TaskModel model) | |
{ | |
var collection = model.Items; | |
var executionPlan = scheduleProvider.IdentifyExecutionPlan(model.TaskId); | |
if (executionPlan.Any()) | |
{ | |
collection = collection | |
.Join(executionPlan, o => o.TaskId, i => i, (o, i) => o) | |
.ToArray(); | |
var invalidEntries = executionPlan | |
.Except(model.Items.Select(x => x.TaskId)) | |
.ToArray(); | |
if (invalidEntries.Any()) | |
{ | |
logger.WarnFormat( | |
"Invalid execution plan found for task \"{0}\". The following tasks are invalid for this configuration \"{1}\".", | |
model.TaskId, string.Join(", ", invalidEntries)); | |
} | |
} | |
return collection; | |
} | |
private ITask LoadTaskInstance(TaskModel model) | |
{ | |
var taskInfo = new TaskInfoModel | |
{ | |
TaskId = model.TaskId, | |
TaskName = model.TaskName | |
}; | |
var factory = ObjectFactory | |
.GetAllInstances<ITaskFactory>() | |
.FirstOrDefault(x => x.IsSatisfiedBy(taskInfo)); | |
if (factory == null) | |
{ | |
var message = | |
string.Format("No corresponding Task Factory could be found for task \"{0}\". Task will not be executed.", | |
model.TaskName); | |
throw new InvalidOperationException(message); | |
} | |
var instance = factory.GetTask(taskInfo); | |
if (instance == null) | |
{ | |
var message = | |
string.Format("Unable to create instance for task \"{0}\". Task will not be executed.", | |
model.TaskName); | |
throw new InvalidOperationException(message); | |
} | |
return instance; | |
} | |
private void WaitOnTasks(Task[] tasks) | |
{ | |
if (!tasks.Any()) | |
return; | |
try | |
{ | |
Task.WaitAll(tasks); | |
} | |
catch (AggregateException ex) | |
{ | |
logger.WarnFormat(ex, "Expected error occurred while waiting for \"{0}\" task(s) to complete.", | |
tasks.Length); | |
} | |
catch (Exception ex) | |
{ | |
logger.ErrorFormat(ex, "Unknown error occurred while waiting for \"{0}\" task(s) to complete.", | |
tasks.Length); | |
} | |
} | |
} | |
public class ActiveTaskContextProvider : IActiveTaskContextProvider | |
{ | |
private readonly ConcurrentDictionary<string, ActiveTaskContext> activeContexts; | |
public ICollection<ActiveTaskContext> Values | |
{ | |
get { return activeContexts.Values; } | |
} | |
public ActiveTaskContextProvider() | |
{ | |
activeContexts = new ConcurrentDictionary<string, ActiveTaskContext>(); | |
} | |
public bool TryAdd(string taskId, ActiveTaskContext context) | |
{ | |
return activeContexts.TryAdd(taskId, context); | |
} | |
public bool TryGetValue(string taskId, out ActiveTaskContext context) | |
{ | |
return activeContexts.TryGetValue(taskId, out context); | |
} | |
public bool TryRemove(string taskId, out ActiveTaskContext context) | |
{ | |
return activeContexts.TryRemove(taskId, out context); | |
} | |
public ActiveTaskContext[] GetRunningContexts() | |
{ | |
return activeContexts.Values | |
.Where(x => x.IsRunning).ToArray(); | |
} | |
public bool Any() | |
{ | |
return activeContexts.Any(); | |
} | |
public bool CancelTask(string taskId) | |
{ | |
ActiveTaskContext context; | |
if (activeContexts.TryGetValue(taskId, out context)) | |
return context.CancelTask(); | |
return false; | |
} | |
} | |
public class ActiveTaskContext | |
{ | |
private Task activeTask; | |
private CancellationTokenSource linkedSource; | |
private CancellationTokenSource internalSource; | |
private DateTime cancelTime = DateTime.MaxValue; | |
public string TaskId { get; private set; } | |
public string TaskName { get; private set; } | |
public DateTime StartDate { get; private set; } | |
public CancellationToken ServiceToken { get; private set; } | |
public bool IsRunning | |
{ | |
get { return (activeTask != null && (activeTask.Status == TaskStatus.Running)); } | |
} | |
public Task RunningTask | |
{ | |
get { return activeTask; } | |
} | |
public CancellationToken Token | |
{ | |
get { return linkedSource.Token; } | |
} | |
public bool WasTaskCancelled | |
{ | |
get { return (linkedSource.IsCancellationRequested); } | |
} | |
public bool WasServiceCancelled | |
{ | |
get { return (ServiceToken.IsCancellationRequested); } | |
} | |
public void Dispose() | |
{ | |
if (activeTask != null) | |
{ | |
activeTask.Dispose(); | |
activeTask = null; | |
} | |
if (linkedSource != null) | |
{ | |
linkedSource.Dispose(); | |
linkedSource = null; | |
} | |
if (internalSource != null) | |
{ | |
internalSource.Dispose(); | |
internalSource = null; | |
} | |
} | |
public bool CancelTask() | |
{ | |
if (internalSource == null) | |
return false; | |
if (internalSource.IsCancellationRequested) | |
return false; | |
internalSource.Cancel(); | |
if (cancelTime == DateTime.MaxValue) | |
cancelTime = DateTime.Now; | |
return true; | |
} | |
public void SetActiveTask(Task task, CancellationToken token) | |
{ | |
activeTask = task; | |
internalSource = new CancellationTokenSource(); | |
linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, internalSource.Token); | |
StartDate = DateTime.Now; | |
ServiceToken = token; | |
} | |
public static ActiveTaskContext Create(string taskId, string taskName) | |
{ | |
return new ActiveTaskContext | |
{ | |
TaskId = taskId, | |
TaskName = taskName | |
}; | |
} | |
} | |
public class TaskProvider : ITaskProvider | |
{ | |
private readonly TaskModel[] collection; | |
public TaskProvider(TaskModel[] collection) | |
{ | |
this.collection = collection; | |
} | |
public TaskModel LoadTaskConfiguration(string taskId) | |
{ | |
return collection.FirstOrDefault(x => x.TaskId.Equals(taskId, StringComparison.OrdinalIgnoreCase)); | |
} | |
} | |
public class ScheduleProvider : IScheduleProvider | |
{ | |
private readonly ILogger logger; | |
private readonly List<TaskScheduleModel> collection; | |
private readonly List<string> considerations; | |
private readonly Dictionary<string, DateTime> schedules; | |
private readonly Dictionary<string, List<string>> executionPlans; | |
public ScheduleProvider(TaskScheduleModel[] collection) | |
{ | |
this.collection = new List<TaskScheduleModel>(collection); | |
logger = LoggerFactory.CreateLogger("Scheduler"); | |
considerations = new List<string>(); | |
schedules = new Dictionary<string, DateTime>(); | |
executionPlans = new Dictionary<string, List<string>>(); | |
PushTasksForInitialConsideration(collection); | |
} | |
public string[] IdentifyTasksToExecute(DateTime date) | |
{ | |
return schedules | |
.Where(x => x.Value <= date) | |
.Select(x => x.Key) | |
.ToArray(); | |
} | |
public string[] IdentifyExecutionPlan(string taskId) | |
{ | |
lock (executionPlans) | |
{ | |
List<string> plans; | |
if (executionPlans.TryGetValue(taskId, out plans) && plans.Count != 0) | |
return plans.ToArray(); | |
return new string[0]; | |
} | |
} | |
public void PopTaskFromConsideration(string taskId) | |
{ | |
if (!schedules.ContainsKey(taskId)) | |
return; | |
logger.DebugFormat("Removing task \"{0}\" from consideration.", taskId); | |
lock (schedules) | |
{ | |
schedules.Remove(taskId); | |
} | |
} | |
public void PushTaskForConsideration(string taskId) | |
{ | |
if (schedules.ContainsKey(taskId)) | |
{ | |
logger.WarnFormat("Task \"{0}\" has already been scheduled. Tash will not be considered.", taskId); | |
return; | |
} | |
var model = collection.FirstOrDefault(x => x.TaskId.Equals(taskId, StringComparison.Ordinal)); | |
if (model == null) | |
{ | |
if (!considerations.Contains(taskId, StringComparer.OrdinalIgnoreCase)) | |
return; | |
logger.ErrorFormat("Task \"{0}\" was not found. Task cannot be considered.", taskId); | |
return; | |
} | |
PushTaskForConsideration(model); | |
} | |
public void PushTaskForConsideration(TaskScheduleModel model) | |
{ | |
var executionDate = model.IdentifyNextEntry(DateTime.Now); | |
logger.DebugFormat("Task \"{0}\" next execution is: \"{1}\".", model.TaskId, executionDate); | |
lock (schedules) | |
{ | |
schedules[model.TaskId] = executionDate; | |
} | |
if (!model.IsContainer) | |
return; | |
IdentifyExecutionPlan(model.TaskId, model.ScheduleItems); | |
} | |
public void PushTasksForInitialConsideration(TaskScheduleModel[] models) | |
{ | |
foreach(var model in models) | |
PushTaskForInitialConsideration(model); | |
} | |
public void PushTaskForInitialConsideration(TaskScheduleModel model) | |
{ | |
var executionDate = model.IdentifyNextEntry(DateTime.Now); | |
if (model.IsContinuous) | |
executionDate = DateTime.Now; | |
logger.DebugFormat("Task \"{0}\" next execution is: \"{1}\".", model.TaskId, executionDate); | |
considerations.Add(model.TaskId); | |
schedules[model.TaskId] = executionDate; | |
if (!model.IsContainer) | |
return; | |
IdentifyExecutionPlan(model.TaskId, model.ScheduleItems); | |
} | |
private void IdentifyExecutionPlan(string taskId, ICollection<TaskScheduleItemModel> items) | |
{ | |
if (string.IsNullOrWhiteSpace(taskId)) | |
throw new ArgumentNullException("taskId"); | |
if (items == null || items.Count == 0) | |
return; | |
var ordering = items | |
.OrderBy(x => x.Order) | |
.Select(x => x.TaskId) | |
.ToArray(); | |
logger.DebugFormat("Task \"{0}\" is a container task. Execution plan: \"{1}\".", | |
taskId, string.Join(", ", ordering)); | |
lock (executionPlans) | |
{ | |
var execution = new List<string>(ordering); | |
executionPlans[taskId] = execution; | |
} | |
if (!items.Any(x => x.HasScheduleItems)) | |
return; | |
foreach (var item in items.Where(x => x.HasScheduleItems)) | |
IdentifyExecutionPlan(item.TaskId, item.ScheduleItems); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment