Skip to content

Instantly share code, notes, and snippets.

@bjorkstromm
Created October 9, 2024 11:01
Show Gist options
  • Save bjorkstromm/b439e5588e7e31fbf000d4b2ba30c351 to your computer and use it in GitHub Desktop.
Save bjorkstromm/b439e5588e7e31fbf000d4b2ba30c351 to your computer and use it in GitHub Desktop.
Distributed Lock using Lease Blob
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Polly;
using System.Net;
var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.ConfigureServices( services =>
{
services.AddTransient( provider =>
{
var config = provider.GetRequiredService<IConfiguration>();
return new Sample.DistributedLock( new BlobServiceClient( config[ "AzureWebJobsStorage" ] ) );
} );
} )
.Build();
host.Run();
namespace Sample
{
public class HttpTrigger
{
[Function( nameof( HttpTrigger ) )]
public async Task<HttpResponseData> Run(
[HttpTrigger( AuthorizationLevel.Function, "get", "post" )] HttpRequestData req,
[DurableClient] DurableTaskClient client )
{
var count = int.TryParse( req.Query.Get( "count" ), out var v ) ? v : 10;
for( var i = 0; i < count; i++ )
{
await client.ScheduleNewOrchestrationInstanceAsync( nameof( Orchestrator ) );
}
return req.CreateResponse( HttpStatusCode.Accepted );
}
}
public class Orchestrator
{
[Function( nameof( Orchestrator ) )]
public async Task Run( [OrchestrationTrigger] TaskOrchestrationContext context )
{
var result = await context.CallActivityAsync<string>(
nameof( Activity ),
context.InstanceId );
context
.CreateReplaySafeLogger( "SingletonEntity" )
.LogWarning( "{Time} : {Result}", context.CurrentUtcDateTime, result );
}
}
public class Activity( DistributedLock @lock )
{
[Function( nameof( Activity ) )]
public async Task<string> Run( [ActivityTrigger] string name )
{
await using( var handle = await @lock.Acquire( "foobar", CancellationToken.None ) )
{
// Do some critical stuff here.
await Task.Delay( 1000 );
}
return $"{DateTime.UtcNow:O} - Hello {name}";
}
}
public class DistributedLock( BlobServiceClient blobServiceClient )
{
public async Task<IAsyncDisposable> Acquire( string name, CancellationToken cancellationToken )
{
try
{
var container = blobServiceClient.GetBlobContainerClient( "locks" );
if( !await container.ExistsAsync( cancellationToken ) )
{
await container.CreateIfNotExistsAsync( cancellationToken: cancellationToken );
}
// Unfortuneately UploadAsync doesn't expose ErrorOptions, so we need to wrap this using Polly.
// There might be a slight chance that the blob doesn't exist when we first check,
// but then exist and have a lock whwn we try to create.
var blob = await Policy
.Handle<RequestFailedException>()
.WaitAndRetryAsync( 5, i => TimeSpan.FromSeconds( Math.Pow( 2, i ) ) )
.ExecuteAsync( async () =>
{
var blob = container.GetBlockBlobClient( $"{name}.lock" );
if( !await blob.ExistsAsync( cancellationToken ) )
{
await blob.UploadAsync( Stream.Null, cancellationToken: cancellationToken );
}
return blob;
} );
var leaseBlob = blob.GetBlobLeaseClient();
while( !cancellationToken.IsCancellationRequested )
{
var leaseResponse = await leaseBlob.AcquireAsync(
TimeSpan.FromSeconds( 60 ),
new RequestConditions(),
new RequestContext
{
CancellationToken = cancellationToken,
ErrorOptions = ErrorOptions.NoThrow
} );
// If we got the lease, return.
if( !leaseResponse.IsError )
{
break;
}
// If we didn't get the lock, then wait and retry
await Task.Delay( 300, cancellationToken );
}
return new LockFile( leaseBlob );
}
catch( RequestFailedException e )
{
throw new InvalidOperationException( $"Failed to acquire lock '{name}'", e );
}
}
private class LockFile( BlobLeaseClient leaseClient ) : IAsyncDisposable
{
public async ValueTask DisposeAsync() => await leaseClient.ReleaseAsync( cancellationToken: CancellationToken.None );
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment