Last active
October 18, 2024 11:32
-
-
Save bjorkstromm/0c362f2a3a1c231a960970feada06055 to your computer and use it in GitHub Desktop.
Distributed Durable Semaphore using Durable Entities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"IsEncrypted": false, | |
"Values": { | |
"AzureWebJobsStorage": "UseDevelopmentStorage=true", | |
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated", | |
"OrchestratorSemaphore:Orchestrator:MaxConcurrent": "2", | |
"OrchestratorSemaphore:Orchestrator:MaxLeaseTime": "0.00:01:00" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.Extensions.Hosting; | |
var host = new HostBuilder() | |
.ConfigureFunctionsWorkerDefaults() | |
.Build(); | |
host.Run(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.Azure.Functions.Worker; | |
using Microsoft.Azure.Functions.Worker.Http; | |
using Microsoft.DurableTask; | |
using Microsoft.DurableTask.Client; | |
using Microsoft.DurableTask.Entities; | |
using Microsoft.Extensions.Logging; | |
using System.Net; | |
using Microsoft.Extensions.Configuration; | |
using Microsoft.Extensions.DependencyInjection; | |
namespace SemaphoreTest; | |
public class Test | |
{ | |
[Function( "start" )] | |
public async Task<HttpResponseData> Run( | |
[HttpTrigger( AuthorizationLevel.Function, "get", "post" )] HttpRequestData req, | |
[DurableClient] DurableTaskClient client ) | |
{ | |
var count = int.TryParse(req.Query.Get("count"), out var v) ? v : 1; | |
for (var i = 0; i < count; i++) | |
{ | |
await client.ScheduleNewOrchestrationInstanceAsync( nameof( Orchestrator ) ); | |
} | |
return req.CreateResponse( HttpStatusCode.Accepted ); | |
} | |
} | |
public class Orchestrator | |
{ | |
[Function( "Orchestrator" )] | |
public async Task<List<string>> Run( [OrchestrationTrigger] TaskOrchestrationContext context) | |
{ | |
var logger = context.CreateReplaySafeLogger<Orchestrator>(); | |
var outputs = new List<string>(); | |
await using (var permit = await context.AcquireSemaphorePermitAsync()) | |
{ | |
outputs.Add(await context.CallActivityAsync<string>(nameof(Activity), "Tokyo")); | |
outputs.Add(await context.CallActivityAsync<string>(nameof(Activity), "Seattle")); | |
outputs.Add(await context.CallActivityAsync<string>(nameof(Activity), "London")); | |
await context.CreateTimer(TimeSpan.FromSeconds(5), CancellationToken.None); | |
} | |
// ReSharper disable once TemplateIsNotCompileTimeConstantProblem | |
logger.LogInformation( "{Timestamp}: Hello from instance {Instance}, with data: {Data}", | |
context.CurrentUtcDateTime, | |
context.InstanceId, | |
string.Join( ", ", outputs ) ); | |
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"] | |
return outputs; | |
} | |
[Function( nameof( Activity ) )] | |
public static Task<string> Activity( [ActivityTrigger] string input ) | |
{ | |
return Task.FromResult( "Hello " + input + "!" ); | |
} | |
} | |
public sealed class OrchestratorSemaphoreState | |
{ | |
public HashSet<string> Running { get; set; } = []; | |
public Queue<string> Waiting { get; set; } = []; | |
} | |
public sealed class OrchestratorSemaphoreSettings | |
{ | |
public static OrchestratorSemaphoreSettings Default { get; } = new(); | |
public int MaxConcurrent { get; init; } = 15; | |
public TimeSpan MaxLeaseTime { get; init; } = TimeSpan.FromMinutes(5); | |
} | |
public sealed class OrchestratorSemaphore( | |
IConfiguration configuration, | |
DurableTaskClient client, | |
ILogger<OrchestratorSemaphore> logger) | |
: TaskEntity<OrchestratorSemaphoreState> | |
{ | |
private OrchestratorSemaphoreSettings Settings => configuration | |
.GetSection(Context.Id.Name) | |
.GetSection(Context.Id.Key) | |
.Get<OrchestratorSemaphoreSettings>() ?? OrchestratorSemaphoreSettings.Default; | |
private async Task RaiseAcquiredEvent( string instanceId ) => | |
await client.RaiseEventAsync(instanceId, "SemaphorePermitAcquired"); | |
private void ScheduleRelease( string instanceId, TimeSpan delay ) => | |
Context.SignalEntity(Context.Id, nameof(ReleasePermit), instanceId, new SignalEntityOptions | |
{ | |
SignalTime = DateTimeOffset.UtcNow.Add(delay) | |
}); | |
public async Task AcquirePermit( string instanceId ) | |
{ | |
logger.LogInformation("Acquiring permit for instance {InstanceId}", instanceId); | |
var settings = Settings; | |
// If the instance is already running, we can immediately acquire a permit from the semaphore | |
if (State.Running.Contains(instanceId)) | |
{ | |
logger.LogInformation("Instance {InstanceId} is already running", instanceId); | |
await RaiseAcquiredEvent(instanceId); | |
} | |
// If we have slots left, we can acquire the semaphore | |
else if (State.Running.Count < settings.MaxConcurrent) | |
{ | |
State.Running.Add(instanceId); | |
ScheduleRelease(instanceId, settings.MaxLeaseTime); | |
await RaiseAcquiredEvent(instanceId); | |
logger.LogInformation("Instance {InstanceId} acquired permit. Permit count is {Count}", instanceId, State.Running.Count); | |
} | |
// Else put on the waiting queue | |
else | |
{ | |
State.Waiting.Enqueue(instanceId); | |
logger.LogInformation("Instance {InstanceId} is waiting. Permit count is {Count}. Wait list is {WaitCount}", | |
instanceId, | |
State.Running.Count, | |
State.Waiting.Count); | |
} | |
} | |
public async Task ReleasePermit( string instanceId ) | |
{ | |
logger.LogInformation("Releasing permit for instance {InstanceId}", instanceId); | |
var settings = Settings; | |
// If the instance isn't running, just exit. | |
// Unfortunately, it isn't possible to cancel a scheduled signal, | |
// the automatic release will happen anyway even though it has been release manually (via disposal of the slot). | |
// See issue: https://github.com/Azure/azure-functions-durable-extension/issues/1455 | |
if (!State.Running.Remove(instanceId)) | |
{ | |
logger.LogDebug("Instance {InstanceId} is not running", instanceId); | |
return; | |
} | |
// If we have slots left, we can pop one from the waiting queue and acquire the semaphore for it. | |
if (State.Running.Count < settings.MaxConcurrent | |
&& State.Waiting.TryDequeue(out var waitingInstanceId)) | |
{ | |
State.Running.Add(waitingInstanceId); | |
ScheduleRelease(waitingInstanceId, settings.MaxLeaseTime); | |
await RaiseAcquiredEvent(waitingInstanceId); | |
logger.LogInformation("Instance {InstanceId} acquired permit. Permit count is {Count}. Wait list is {WaitCount}", | |
waitingInstanceId, | |
State.Running.Count, | |
State.Waiting.Count); | |
} | |
} | |
[Function( nameof( OrchestratorSemaphore ) )] | |
public static Task Run( | |
[EntityTrigger] TaskEntityDispatcher dispatcher, | |
[DurableClient] DurableTaskClient client, | |
FunctionContext context ) | |
{ | |
var configuration = context.InstanceServices.GetRequiredService<IConfiguration>(); | |
var logger = context.GetLogger<OrchestratorSemaphore>(); | |
var semaphore = new OrchestratorSemaphore( configuration, client, logger ); | |
return dispatcher.DispatchAsync(semaphore); | |
} | |
} | |
public static class SemaphoreExtensions | |
{ | |
public static async Task<IAsyncDisposable> AcquireSemaphorePermitAsync( | |
this TaskOrchestrationContext context, | |
string? key = null) | |
{ | |
await context.SignalSemaphoreAsync(nameof(OrchestratorSemaphore.AcquirePermit), key); | |
await context.WaitForExternalEvent<object>( "SemaphorePermitAcquired" ); | |
return new SemaphorePermit(context, key); | |
} | |
public static async Task ReleaseSemaphorePermitAsync( | |
this TaskOrchestrationContext context, | |
string? key = null) | |
{ | |
await context.SignalSemaphoreAsync(nameof(OrchestratorSemaphore.ReleasePermit), key); | |
} | |
private static async Task SignalSemaphoreAsync( | |
this TaskOrchestrationContext context, | |
string operationName, | |
string? key = null) | |
{ | |
key ??= context.Name; | |
await context.Entities.SignalEntityAsync( | |
new EntityInstanceId( nameof( OrchestratorSemaphore ), key ), | |
operationName, | |
context.InstanceId ); | |
} | |
private class SemaphorePermit(TaskOrchestrationContext context, string? key) : IAsyncDisposable | |
{ | |
public async ValueTask DisposeAsync() => await ReleaseSemaphorePermitAsync(context, key); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment