Last active
June 6, 2019 12:46
-
-
Save zplume/f1a11d584a9d526dee5f6887528ccbeb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.Azure.WebJobs; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading.Tasks; | |
namespace LS.DurableFunctions.Examples | |
{ | |
public static class DurableOrchestrationContextExtensions | |
{ | |
/// <summary> | |
/// Call activity functions in parallel with a maximum degree of parallelism (e.g. to prevent overloading other services). | |
/// Adapted from f2bo's code here: https://github.com/Azure/azure-functions-durable-extension/issues/596#issuecomment-459906400 | |
/// </summary> | |
/// <typeparam name="TResult">The type returned from each instance of the activity function.</typeparam> | |
/// <typeparam name="TInput">The type of input provided to each instance of the activity function.</typeparam> | |
/// <param name="context">The DurableOrchestrationContext instance this method is attached to.</param> | |
/// <param name="functionName">The activity function name.</param> | |
/// <param name="inputs">The sequence of inputs to map to activity function instances.</param> | |
/// <param name="degreeOfParallelism">The maximum number of activities to execute in parallel.</param> | |
public async static Task<IEnumerable<TResult>> CallActivitiesAsync<TResult, TInput>( | |
this DurableOrchestrationContext context, | |
string functionName, | |
IEnumerable<TInput> inputs, | |
int degreeOfParallelism) | |
{ | |
var runningActivities = new List<Task<TResult>>(inputs.Count()); | |
foreach (var input in inputs) | |
{ | |
var pendingOperations = runningActivities.Where(p => !p.IsCompleted); | |
if (pendingOperations.Count() >= degreeOfParallelism) | |
{ | |
await Task.WhenAny(pendingOperations); | |
} | |
Task<TResult> result = context.CallActivityAsync<TResult>(functionName, input); | |
runningActivities.Add(result); | |
} | |
TResult[] results = await Task.WhenAll(runningActivities); | |
return results; | |
} | |
/// <summary> | |
/// Call activity functions in parallel with a maximum degree of parallelism (e.g. to prevent overloading other services). | |
/// Adapted from f2bo's code here: https://github.com/Azure/azure-functions-durable-extension/issues/596#issuecomment-459906400 | |
/// </summary> | |
/// <typeparam name="TInput">The type of input provided to each instance of the activity function.</typeparam> | |
/// <param name="context">The DurableOrchestrationContext instance this method is attached to.</param> | |
/// <param name="functionName">The activity function name.</param> | |
/// <param name="inputs">The sequence of inputs to map to activity function instances.</param> | |
/// <param name="degreeOfParallelism">The maximum number of activities to execute in parallel.</param> | |
public async static Task CallActivitiesAsync<TInput>( | |
this DurableOrchestrationContext context, | |
string functionName, | |
IEnumerable<TInput> inputs, | |
int degreeOfParallelism) | |
{ | |
var runningActivities = new List<Task>(inputs.Count()); | |
foreach (var input in inputs) | |
{ | |
var pendingOperations = runningActivities.Where(p => !p.IsCompleted); | |
if (pendingOperations.Count() >= degreeOfParallelism) | |
{ | |
await Task.WhenAny(pendingOperations); | |
} | |
Task task = context.CallActivityAsync(functionName, input); | |
runningActivities.Add(task); | |
} | |
await Task.WhenAll(runningActivities); | |
} | |
/// <summary> | |
/// Call sub-orchestration functions in parallel with a maximum degree of parallelism (e.g. to prevent overloading other services). | |
/// Adapted from f2bo's code here: https://github.com/Azure/azure-functions-durable-extension/issues/596#issuecomment-459906400 | |
/// </summary> | |
/// <typeparam name="TResult">The type returned from each instance of the sub-orchestrator function.</typeparam> | |
/// <typeparam name="TInput">The type of input provided to each instance of the sub-orchestrator function.</typeparam> | |
/// <param name="context">The DurableOrchestrationContext instance this method is attached to.</param> | |
/// <param name="functionName">The sub-orchestration function name.</param> | |
/// <param name="inputs">The sequence of inputs to map to sub-orchestration instances.</param> | |
/// <param name="degreeOfParallelism">The maximum number of sub-orchestrations to execute in parallel.</param> | |
public async static Task<IEnumerable<TResult>> CallSubOrchestratorsAsync<TResult, TInput>( | |
this DurableOrchestrationContext context, | |
string functionName, | |
IEnumerable<TInput> inputs, | |
int degreeOfParallelism) | |
{ | |
var runningSubOrchestrations = new List<Task<TResult>>(inputs.Count()); | |
foreach (var input in inputs) | |
{ | |
var pendingOperations = runningSubOrchestrations.Where(p => !p.IsCompleted); | |
if (pendingOperations.Count() >= degreeOfParallelism) | |
{ | |
await Task.WhenAny(pendingOperations); | |
} | |
Task<TResult> result = context.CallSubOrchestratorAsync<TResult>(functionName, input); | |
runningSubOrchestrations.Add(result); | |
} | |
TResult[] results = await Task.WhenAll(runningSubOrchestrations); | |
return results; | |
} | |
/// <summary> | |
/// Call sub-orchestration functions in parallel with a maximum degree of parallelism (e.g. to prevent overloading other services). | |
/// Adapted from f2bo's code here: https://github.com/Azure/azure-functions-durable-extension/issues/596#issuecomment-459906400 | |
/// </summary> | |
/// <typeparam name="TInput">The type of input provided to each instance of the sub-orchestrator function.</typeparam> | |
/// <param name="context">The DurableOrchestrationContext instance this method is attached to.</param> | |
/// <param name="functionName">The sub-orchestration function name.</param> | |
/// <param name="inputs">The sequence of inputs to map to sub-orchestration instances.</param> | |
/// <param name="degreeOfParallelism">The maximum number of sub-orchestrations to execute in parallel.</param> | |
public async static Task CallSubOrchestratorsAsync<TInput>( | |
this DurableOrchestrationContext context, | |
string functionName, | |
IEnumerable<TInput> inputs, | |
int degreeOfParallelism) | |
{ | |
var runningSubOrchestrations = new List<Task>(inputs.Count()); | |
foreach (var input in inputs) | |
{ | |
var pendingOperations = runningSubOrchestrations.Where(p => !p.IsCompleted); | |
if (pendingOperations.Count() >= degreeOfParallelism) | |
{ | |
await Task.WhenAny(pendingOperations); | |
} | |
Task result = context.CallSubOrchestratorAsync(functionName, input); | |
runningSubOrchestrations.Add(result); | |
} | |
await Task.WhenAll(runningSubOrchestrations); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment