Created
October 9, 2024 11:01
-
-
Save bjorkstromm/b439e5588e7e31fbf000d4b2ba30c351 to your computer and use it in GitHub Desktop.
Distributed Lock using Lease Blob
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Azure; | |
using Azure.Storage.Blobs; | |
using Azure.Storage.Blobs.Specialized; | |
using Microsoft.Azure.Functions.Worker; | |
using Microsoft.Azure.Functions.Worker.Http; | |
using Microsoft.DurableTask; | |
using Microsoft.DurableTask.Client; | |
using Microsoft.Extensions.Configuration; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Hosting; | |
using Microsoft.Extensions.Logging; | |
using Polly; | |
using System.Net; | |
var host = new HostBuilder() | |
.ConfigureFunctionsWorkerDefaults() | |
.ConfigureServices( services => | |
{ | |
services.AddTransient( provider => | |
{ | |
var config = provider.GetRequiredService<IConfiguration>(); | |
return new Sample.DistributedLock( new BlobServiceClient( config[ "AzureWebJobsStorage" ] ) ); | |
} ); | |
} ) | |
.Build(); | |
host.Run(); | |
namespace Sample | |
{ | |
public class HttpTrigger | |
{ | |
[Function( nameof( HttpTrigger ) )] | |
public async Task<HttpResponseData> Run( | |
[HttpTrigger( AuthorizationLevel.Function, "get", "post" )] HttpRequestData req, | |
[DurableClient] DurableTaskClient client ) | |
{ | |
var count = int.TryParse( req.Query.Get( "count" ), out var v ) ? v : 10; | |
for( var i = 0; i < count; i++ ) | |
{ | |
await client.ScheduleNewOrchestrationInstanceAsync( nameof( Orchestrator ) ); | |
} | |
return req.CreateResponse( HttpStatusCode.Accepted ); | |
} | |
} | |
public class Orchestrator | |
{ | |
[Function( nameof( Orchestrator ) )] | |
public async Task Run( [OrchestrationTrigger] TaskOrchestrationContext context ) | |
{ | |
var result = await context.CallActivityAsync<string>( | |
nameof( Activity ), | |
context.InstanceId ); | |
context | |
.CreateReplaySafeLogger( "SingletonEntity" ) | |
.LogWarning( "{Time} : {Result}", context.CurrentUtcDateTime, result ); | |
} | |
} | |
public class Activity( DistributedLock @lock ) | |
{ | |
[Function( nameof( Activity ) )] | |
public async Task<string> Run( [ActivityTrigger] string name ) | |
{ | |
await using( var handle = await @lock.Acquire( "foobar", CancellationToken.None ) ) | |
{ | |
// Do some critical stuff here. | |
await Task.Delay( 1000 ); | |
} | |
return $"{DateTime.UtcNow:O} - Hello {name}"; | |
} | |
} | |
public class DistributedLock( BlobServiceClient blobServiceClient ) | |
{ | |
public async Task<IAsyncDisposable> Acquire( string name, CancellationToken cancellationToken ) | |
{ | |
try | |
{ | |
var container = blobServiceClient.GetBlobContainerClient( "locks" ); | |
if( !await container.ExistsAsync( cancellationToken ) ) | |
{ | |
await container.CreateIfNotExistsAsync( cancellationToken: cancellationToken ); | |
} | |
// Unfortuneately UploadAsync doesn't expose ErrorOptions, so we need to wrap this using Polly. | |
// There might be a slight chance that the blob doesn't exist when we first check, | |
// but then exist and have a lock whwn we try to create. | |
var blob = await Policy | |
.Handle<RequestFailedException>() | |
.WaitAndRetryAsync( 5, i => TimeSpan.FromSeconds( Math.Pow( 2, i ) ) ) | |
.ExecuteAsync( async () => | |
{ | |
var blob = container.GetBlockBlobClient( $"{name}.lock" ); | |
if( !await blob.ExistsAsync( cancellationToken ) ) | |
{ | |
await blob.UploadAsync( Stream.Null, cancellationToken: cancellationToken ); | |
} | |
return blob; | |
} ); | |
var leaseBlob = blob.GetBlobLeaseClient(); | |
while( !cancellationToken.IsCancellationRequested ) | |
{ | |
var leaseResponse = await leaseBlob.AcquireAsync( | |
TimeSpan.FromSeconds( 60 ), | |
new RequestConditions(), | |
new RequestContext | |
{ | |
CancellationToken = cancellationToken, | |
ErrorOptions = ErrorOptions.NoThrow | |
} ); | |
// If we got the lease, return. | |
if( !leaseResponse.IsError ) | |
{ | |
break; | |
} | |
// If we didn't get the lock, then wait and retry | |
await Task.Delay( 300, cancellationToken ); | |
} | |
return new LockFile( leaseBlob ); | |
} | |
catch( RequestFailedException e ) | |
{ | |
throw new InvalidOperationException( $"Failed to acquire lock '{name}'", e ); | |
} | |
} | |
private class LockFile( BlobLeaseClient leaseClient ) : IAsyncDisposable | |
{ | |
public async ValueTask DisposeAsync() => await leaseClient.ReleaseAsync( cancellationToken: CancellationToken.None ); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment