Created
June 22, 2024 17:07
-
-
Save justinobney/13faab60f196e8a3088c57782238dcc2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async Task Main() | |
{ | |
// Initialize settings | |
var transformFileSettings = new TransformFileSettings { BlobPath = "some/path", BusinessUnitId = 1 }; | |
var uploadToBlobSettings = new UploadToBlobSettings { ContainerName = "some-container" }; | |
var firstTask = new TransformFile(transformFileSettings, Guid.Empty); | |
var secondTask = new UploadToBlob(uploadToBlobSettings, firstTask.TaskId); | |
var tasks = new List<IJobExecutionTask> { firstTask, secondTask }; | |
var context = new Dictionary<string, string>(); | |
var jobExecution = new JobExecution(tasks, context); | |
jobExecution.OnTaskStarted += (sender, args) => | |
Console.WriteLine($"[Event] Task {args.TaskType} has started."); | |
jobExecution.OnTaskCompleted += (sender, args) => | |
Console.WriteLine($"[Event] Task {args.TaskType} has completed."); | |
if (!jobExecution.ValidateTaskTree()) | |
{ | |
Console.WriteLine("Invalid task tree. Exiting..."); | |
return; | |
} | |
await jobExecution.Run(); | |
} | |
public class JobExecutionStateDto | |
{ | |
public int Id { get; set; } | |
public string ExecutionStatus { get; set; } | |
public Guid Identifier { get; set; } | |
public string ContextSnapshotJson { get; set; } | |
} | |
public interface IJobExecutionTask | |
{ | |
Guid TaskId { get; } | |
Guid ExecuteAfter { get; } | |
Task<Guid> ExecuteAsync(IJobExecution execution); | |
} | |
public class TransformFile : IJobExecutionTask | |
{ | |
public Guid TaskId { get; } | |
public Guid ExecuteAfter { get; } | |
public TransformFileSettings Settings { get; } | |
public TransformFile(TransformFileSettings settings, Guid executeAfter) | |
{ | |
TaskId = Guid.NewGuid(); | |
ExecuteAfter = executeAfter; | |
Settings = settings; | |
} | |
public async Task<Guid> ExecuteAsync(IJobExecution execution) | |
{ | |
// Simulate real work | |
Console.WriteLine("TransformFile. ExecuteAsync..."); | |
await Task.Delay(1000); | |
return TaskId; | |
} | |
} | |
public class UploadToBlob : IJobExecutionTask | |
{ | |
public Guid TaskId { get; } | |
public Guid ExecuteAfter { get; } | |
public UploadToBlobSettings Settings { get; } | |
public UploadToBlob(UploadToBlobSettings settings, Guid executeAfter) | |
{ | |
TaskId = Guid.NewGuid(); | |
ExecuteAfter = executeAfter; | |
Settings = settings; | |
} | |
public async Task<Guid> ExecuteAsync(IJobExecution execution) | |
{ | |
// Simulate real work | |
Console.WriteLine("UploadToBlob. ExecuteAsync..."); | |
await Task.Delay(1000); | |
return TaskId; | |
} | |
} | |
public class TransformFileSettings | |
{ | |
public string BlobPath { get; set; } | |
public int BusinessUnitId { get; set; } | |
} | |
public class UploadToBlobSettings | |
{ | |
public string ContainerName { get; set; } | |
} | |
public interface IJobExecution | |
{ | |
Dictionary<string, string> Context { get; set; } | |
string Status { get; } | |
Task Run(); | |
} | |
public class JobExecution : IJobExecution | |
{ | |
public event EventHandler<TaskEventArgs> OnTaskStarted; | |
public event EventHandler<TaskEventArgs> OnTaskCompleted; | |
// Simulated in-memory database | |
private static readonly Dictionary<Guid, JobExecutionStateDto> _db = new Dictionary<Guid, JobExecutionStateDto>(); | |
public Dictionary<string, string> Context { get; set; } | |
public string Status => ExecutionState.ExecutionStatus; | |
private readonly List<IJobExecutionTask> _tasks; | |
public JobExecutionStateDto ExecutionState { get; private set; } | |
public JobExecution(List<IJobExecutionTask> tasks, Dictionary<string, string> context) | |
{ | |
ExecutionState = new JobExecutionStateDto | |
{ | |
Identifier = Guid.NewGuid(), | |
ExecutionStatus = "Initialized" | |
}; | |
_db[ExecutionState.Identifier] = ExecutionState; | |
Context = context; | |
_tasks = tasks; | |
} | |
public bool ValidateTaskTree() | |
{ | |
// Storing all task IDs in a set for quick lookup | |
var taskIds = new HashSet<Guid>(_tasks.Select(t => t.TaskId)); | |
// Checking for orphan tasks | |
foreach (var task in _tasks) | |
{ | |
if (task.ExecuteAfter != Guid.Empty && !taskIds.Contains(task.ExecuteAfter)) | |
{ | |
Console.WriteLine($"Validation Failed: Task with ID {task.TaskId} has invalid ExecuteAfter value {task.ExecuteAfter}"); | |
return false; | |
} | |
} | |
return true; | |
} | |
public async Task Run() | |
{ | |
if (!ValidateTaskTree()) | |
{ | |
Console.WriteLine("Job aborted due to invalid task tree."); | |
return; | |
} | |
Console.WriteLine($"Job started. Status: {Status}"); | |
// Using a Dictionary to map task outcomes to the next task to be executed | |
var taskOutcomeMap = new Dictionary<Guid, Guid>(); | |
foreach (var task in _tasks) | |
{ | |
taskOutcomeMap[task.ExecuteAfter] = task.TaskId; | |
} | |
Guid currentTaskOutcome = Guid.Empty; | |
while (taskOutcomeMap.ContainsKey(currentTaskOutcome)) | |
{ | |
var nextTaskId = taskOutcomeMap[currentTaskOutcome]; | |
var nextTask = _tasks.First(t => t.TaskId == nextTaskId); | |
// Trigger the OnTaskStarted event | |
OnTaskStarted?.Invoke(this, new TaskEventArgs(nextTask.GetType().Name)); | |
currentTaskOutcome = await nextTask.ExecuteAsync(this); | |
// Trigger the OnTaskCompleted event | |
OnTaskCompleted?.Invoke(this, new TaskEventArgs(nextTask.GetType().Name)); | |
// Update in-memory database | |
ExecutionState.ContextSnapshotJson = JsonConvert.SerializeObject(Context); | |
ExecutionState.ExecutionStatus = "In Progress"; | |
_db[ExecutionState.Identifier] = ExecutionState; | |
} | |
ExecutionState.ExecutionStatus = "Completed"; | |
_db[ExecutionState.Identifier] = ExecutionState; | |
Console.WriteLine($"Job completed. Status: {Status}"); | |
} | |
} | |
public class TaskEventArgs : EventArgs | |
{ | |
public string TaskType { get; set; } | |
public TaskEventArgs(string taskType) | |
{ | |
TaskType = taskType; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment