Created
December 20, 2020 00:53
-
-
Save pksorensen/410b3b4b3bdc44d1b1979c7688922c2e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class DurableStateMachineState<TStats> | |
where TStats : Enum | |
{ | |
public TStats OrchestratorState { get; set; } | |
public TimeSpan? DelayExecution { get; set; } | |
public DateTime StartTime { get; set; } | |
public int Depth { get; set; } = 0; | |
public string InstanceId { get; set; } | |
public Guid InvocationId { get; set; } | |
} | |
public class SubOrchestratrationResult<T> | |
{ | |
public T Value { get; set; } | |
public Exception Fault { get; set; } | |
} | |
public class DurableStateMachineAttribute : Attribute | |
{ | |
public string statename { get; } | |
public bool ContinueWhenStateNotChanged { get; set; } | |
public string TaskHub { get; set; } | |
public DurableStateMachineAttribute(string statename) | |
{ | |
this.statename = statename; | |
} | |
public DurableStateMachineAttribute() | |
{ | |
} | |
} | |
public abstract class DurableStateMachine<TState, TStates> | |
where TStates : Enum | |
where TState : DurableStateMachineState<TStates> | |
{ | |
private readonly string getInstanceInfoActivityName; | |
public DurableStateMachine(string GetInstanceInfoActivityName) | |
{ | |
getInstanceInfoActivityName = GetInstanceInfoActivityName; | |
} | |
public virtual string GetActivityName(TStates state) | |
{ | |
var method = this.GetType().GetMethods() | |
.FirstOrDefault(m => string.Equals(m.GetCustomAttribute<DurableStateMachineAttribute>()?.statename, state.ToString(), StringComparison.OrdinalIgnoreCase)); | |
var name = method?.GetCustomAttribute<FunctionNameAttribute>()?.Name; | |
return name; | |
} | |
public virtual bool ContinueWhenStateNotChanged(TStates state) | |
{ | |
var method = this.GetType().GetMethods() | |
.FirstOrDefault(m => string.Equals(m.GetCustomAttribute<DurableStateMachineAttribute>()?.statename, state.ToString(), StringComparison.OrdinalIgnoreCase)); | |
var cancontinue = method?.GetCustomAttribute<DurableStateMachineAttribute>()?.ContinueWhenStateNotChanged ??false; | |
return cancontinue; | |
} | |
public virtual bool IsInCompletedState(TStates state) | |
{ | |
var method = this.GetType().GetMethods() | |
.FirstOrDefault(m => string.Equals(m.GetCustomAttribute<DurableStateMachineAttribute>()?.statename, state.ToString(), StringComparison.OrdinalIgnoreCase)); | |
var name = method?.GetCustomAttribute<FunctionNameAttribute>()?.Name; | |
return string.IsNullOrEmpty(name); | |
} | |
public virtual async Task<TState> RunOrchestrationAsync(Microsoft.Azure.WebJobs.ExecutionContext executionContext, IDurableOrchestrationContext context, ILogger log) | |
{ | |
var state = context.GetInput<TState>(); | |
state.InstanceId = context.InstanceId; | |
state.InvocationId = executionContext.InvocationId; | |
if (state.Depth == 0) | |
state.StartTime = context.CurrentUtcDateTime; | |
if (!context.IsReplaying) | |
log.LogInformation("Running DurableStateMachine Orchestrator for {InstanceId} with {State}", context.InstanceId, state.OrchestratorState.ToString()); | |
if (state.DelayExecution.HasValue) | |
{ | |
await context.CreateTimer(context.CurrentUtcDateTime.Add(state.DelayExecution.Value),CancellationToken.None); | |
} | |
var oldState = state.OrchestratorState; | |
await OnBeforeActivityCallAsync(context, state, log); | |
var status = await context.CallActivityWithRetryAsync<DurableOrchestrationStatus[]>( | |
getInstanceInfoActivityName, | |
new RetryOptions(TimeSpan.FromSeconds(30), Debugger.IsAttached ? 1 : 3), | |
new OrchestrationStatusQueryCondition | |
{ | |
InstanceIdPrefix = context.InstanceId | |
}); | |
// var childs = status.Select(c => new { c.InstanceId, RuntimeStatus = c.RuntimeStatus.ToString(), c.LastUpdatedTime }).ToArray(); | |
context.SetCustomStatus(new | |
{ | |
depth = state.Depth, | |
duration = context.CurrentUtcDateTime - state.StartTime, | |
childs = status.GroupBy(k => k.RuntimeStatus).ToDictionary(k => k.Key, v => v.Count()), | |
state = oldState.ToString(), | |
processing = true, | |
}); | |
while (true) | |
{ | |
var name = GetActivityName(state.OrchestratorState); | |
log.LogInformation("DurableStateMachine Orchestrator calling {ActivityName} for {InstanceId} with {@State}", name, context.InstanceId, state); | |
state = await context.CallActivityWithRetryAsync<TState>( | |
name, | |
new RetryOptions(TimeSpan.FromSeconds(30), Debugger.IsAttached ? 1 : 3) | |
{ | |
Handle = (ex) => | |
{ | |
log.LogWarning("Retrying Activity for {activityName}: {message}", name, ex.Message); | |
return true; | |
} | |
}, | |
state); | |
var newName = GetActivityName(state.OrchestratorState); | |
if(name != newName || !ContinueWhenStateNotChanged(state.OrchestratorState)) | |
{ | |
break; | |
} | |
} | |
var isCompleted = IsInCompletedState(state.OrchestratorState); | |
context.SetCustomStatus(new | |
{ | |
depth = state.Depth, | |
duration = context.CurrentUtcDateTime - state.StartTime, | |
childs = status.GroupBy(k => k.RuntimeStatus).ToDictionary(k => k.Key, v => v.Count()), | |
state = oldState.ToString(), | |
newState = state.OrchestratorState.ToString(), | |
isCompleted, | |
}); | |
await context.CallActivityWithRetryAsync<DurableOrchestrationStatus[]>( | |
"PersistHistoryInformation", | |
new RetryOptions(TimeSpan.FromSeconds(30), Debugger.IsAttached ? 1 : 3), | |
new InstanceInfoRequest | |
{ | |
InstanceId = context.InstanceId, | |
TaskHub = this.GetType().GetCustomAttribute<DurableStateMachineAttribute>().TaskHub, | |
InvocationId = state.InvocationId, | |
State = state.OrchestratorState.ToString() | |
}); | |
if (!isCompleted) | |
{ | |
state.Depth = Convert.ToInt32(oldState) == Convert.ToInt32(state.OrchestratorState) ? state.Depth + 1 : 0; | |
context.ContinueAsNew(state); | |
log.LogInformation("DurableStateMachine Orchestrator countinuing as new for {InstanceId} with {@State}", context.InstanceId, state); | |
} | |
return state; | |
} | |
protected virtual Task OnBeforeActivityCallAsync(IDurableOrchestrationContext context, TState state, ILogger log) | |
{ | |
return Task.CompletedTask; | |
} | |
protected async virtual Task<SubOrchestratrationResult<T>[]> RunSubOrchestratorsAsync<T>(IDurableOrchestrationContext context, TState state, Task<T>[] tasks) where T : class | |
{ | |
// var tasks = GetSubOrchestrators(context, state); | |
var queue = new List<Task<T>>(tasks); | |
while (queue.Any()) | |
{ | |
context.SetCustomStatus(new | |
{ | |
depth = state.Depth, | |
duration = context.CurrentUtcDateTime - state.StartTime, | |
state = state.OrchestratorState, | |
suborchestrating = true, | |
total = tasks.Length, | |
done = tasks.Length - queue.Count | |
}); | |
var any = await Task.WhenAny(queue); | |
queue.Remove(any); | |
} | |
if (tasks.Any(k => !k.IsCompletedSuccessfully)) | |
{ | |
throw new AggregateException("Failed to Run SubOrchestratorsAsync", tasks.Where(t => !t.IsCompletedSuccessfully).Select(k => k.Exception)); | |
} | |
return tasks.Select(t => new SubOrchestratrationResult<T> | |
{ | |
Value = t.IsCompletedSuccessfully ? t.Result : null as T, | |
Fault = t.Exception | |
}).ToArray(); | |
// return await Task.WhenAll(tasks); | |
} | |
} | |
public class InstanceInfoRequest | |
{ | |
public string InstanceId { get; set; } | |
public string TaskHub { get; set; } | |
public string State { get; set; } | |
public Guid InvocationId { get; set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment