Created
March 23, 2020 19:19
-
-
Save bryanknox/8a0db9305c11e0fe9a1195c08011ddd6 to your computer and use it in GitHub Desktop.
Use paging to process status of many Azure Durable Functions.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.Azure.WebJobs.Extensions.DurableTask; | |
namespace K0x.DurableFunctionsHelpers | |
{ | |
public static class DurableOrchestrationStatusProcessor | |
{ | |
/// <summary> | |
/// Processes DurableOrchestrationStatus in PageSize batches by iterating through | |
/// the following steps: | |
/// 1) Find DurableOrchestrationStatus in the given client context based on the | |
/// given condition. | |
/// 2) Filter the statuses found sing the given filter delegate. | |
/// 3) Process the filtered statuses using the given processor delegate. | |
/// </summary> | |
/// <param name="client"> | |
/// The IDurableOrchestrationClient context to use to call the GetStatusAsync(..) method | |
/// to perform the find. | |
/// </param> | |
/// <param name="condition"> | |
/// The query condition for the find. It is passed into the the GetStatusAsync(..) method | |
/// of the given client. Paging is used. Specify the PageSize in the condidtion | |
/// to control how many statuses are returned in each iteration of the status query. | |
/// Larger numbers require more memory and more data sent across the network. Smaller | |
/// numbers require more calls to the GetStatusAsync(..), filter and process methods | |
/// (more iterations). | |
/// </param> | |
/// <param name="filter"> | |
/// A delagate for filtering the DurableOrchestrationStatus returned from each iteration | |
/// of the find. It accepts an IEnumerable<DurableOrchestrationStatus> and returns | |
/// the filtered results as an IEnumerable<DurableOrchestrationStatus>. | |
/// </param> | |
/// <param name="process"> | |
/// A delagate that processes the filtered DurableOrchestrationStatus. | |
/// It accepts an IEnumerable<DurableOrchestrationStatus>. | |
/// </param> | |
/// <param name="cancellationToken"> | |
/// The cancellation token that can be used to cancel the status find operation | |
/// and iterations. | |
/// </param> | |
public static async Task ProcessPagedStatusAsync( | |
IDurableOrchestrationClient client, | |
OrchestrationStatusQueryCondition condition, | |
Func<IEnumerable<DurableOrchestrationStatus>, IEnumerable<DurableOrchestrationStatus>> filter, | |
Action<IEnumerable<DurableOrchestrationStatus>> process, | |
CancellationToken cancellationToken) | |
{ | |
string continuationToken = null; | |
do | |
{ | |
condition.ContinuationToken = continuationToken; | |
OrchestrationStatusQueryResult queryResult = await client.GetStatusAsync( | |
condition, | |
cancellationToken); | |
IEnumerable<DurableOrchestrationStatus> foundInPage = filter( | |
queryResult.DurableOrchestrationState); | |
process(foundInPage); | |
continuationToken = queryResult.ContinuationToken; | |
} while ( | |
continuationToken != "bnVsbA==" | |
&& !cancellationToken.IsCancellationRequested); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment