Created
March 26, 2025 23:28
-
-
Save StephenCleary/9777f2dcb9834a05efb261cc36839b3c to your computer and use it in GitHub Desktop.
Distributed lease implementations in C#
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.Azure.Cosmos; | |
using Newtonsoft.Json; | |
using System.Diagnostics; | |
using System.Net; | |
using Nito.Logging; | |
/// <summary> | |
/// The container should have TTL enabled with a default TTL of -1. | |
/// </summary> | |
public sealed class CosmosDbDistributedLeaseProvider(ILogger logger, Container container, string? owner = null) | |
: IDistributedLeaseProvider | |
{ | |
private readonly ILogger _logger = logger; | |
private readonly Container _container = container; | |
private readonly string _owner = owner ?? Guid.NewGuid().ToString("N"); | |
public async ValueTask<IDistributedLease?> TryAcquireLeaseAsync(string leaseName, TimeSpan leaseTimeout) | |
{ | |
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner)); | |
var ttl = CosmosDbTtl(leaseTimeout); | |
var partitionKey = new PartitionKey(leaseName); | |
var response = await _container.CreateItemAsync( | |
new LeaseItem | |
{ | |
Ttl = ttl, | |
Owner = _owner, | |
}, | |
partitionKey, | |
new ItemRequestOptions() | |
{ | |
EnableContentResponseOnWrite = false, | |
}); | |
if (response.StatusCode == HttpStatusCode.Created) | |
{ | |
_logger.LogInformation("Acquired lease with timeout {leaseTimeout}.", leaseTimeout); | |
return new DistributedLease(this, partitionKey, response.Headers.ETag); | |
} | |
else | |
{ | |
_logger.LogInformation("Failed to acquire lease."); | |
return null; | |
} | |
} | |
private async ValueTask<string?> TryExtendLeaseAsync(PartitionKey leaseName, string eTag, TimeSpan leaseTimeout) | |
{ | |
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner)); | |
var ttl = CosmosDbTtl(leaseTimeout); | |
var response = await _container.ReplaceItemAsync( | |
new LeaseItem | |
{ | |
Ttl = ttl, | |
Owner = _owner, | |
}, | |
LeaseItem.LeaseId, | |
leaseName, | |
new ItemRequestOptions() | |
{ | |
IfMatchEtag = eTag, | |
EnableContentResponseOnWrite = false, | |
}); | |
if (response.StatusCode == HttpStatusCode.OK) | |
{ | |
_logger.LogInformation("Extended lease to {leaseTimeout}.", leaseTimeout); | |
return response.Headers.ETag; | |
} | |
else | |
{ | |
_logger.LogInformation("Failed to extend lease."); | |
return null; | |
} | |
} | |
private async ValueTask ReleaseLeaseAsync(PartitionKey leaseName, string eTag) | |
{ | |
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner)); | |
var response = await _container.DeleteItemAsync<LeaseItem>( | |
LeaseItem.LeaseId, | |
leaseName, | |
new ItemRequestOptions() | |
{ | |
IfMatchEtag = eTag, | |
EnableContentResponseOnWrite = false, | |
}); | |
if (response.StatusCode == HttpStatusCode.NoContent) | |
_logger.LogInformation("Released lease."); | |
else | |
_logger.LogInformation("Failed to release lease."); | |
} | |
private static int CosmosDbTtl(TimeSpan timespan) | |
{ | |
if (timespan == Timeout.InfiniteTimeSpan) | |
return -1; | |
var seconds = (int)timespan.TotalSeconds; | |
return seconds < 1 ? 1 : seconds; | |
} | |
private sealed class DistributedLease(CosmosDbDistributedLeaseProvider provider, PartitionKey leaseName, string eTag) | |
: IDistributedLease | |
{ | |
private readonly CosmosDbDistributedLeaseProvider _provider = provider; | |
public PartitionKey _leaseName = leaseName; | |
private string _eTag = eTag; | |
public async ValueTask<bool> TryExtendLeaseAsync(TimeSpan newLeaseTimeout) | |
{ | |
var eTag = await _provider.TryExtendLeaseAsync(_leaseName, _eTag, newLeaseTimeout); | |
if (eTag == null) | |
return false; | |
_eTag = eTag; | |
return true; | |
} | |
public ValueTask DisposeAsync() => _provider.ReleaseLeaseAsync(_leaseName, _eTag); | |
} | |
private class LeaseItem | |
{ | |
public static string LeaseId = "lease"; | |
[JsonProperty("id"), DebuggerBrowsable(DebuggerBrowsableState.Never)] | |
public string Id { get; set; } = LeaseId; | |
[JsonProperty("ttl")] | |
public int Ttl { get; set; } | |
[JsonProperty("_etag")] | |
public string ETag { get; set; } = null!; | |
public string Owner { get; set; } = null!; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Amazon.DynamoDBv2.Model; | |
using Amazon.DynamoDBv2; | |
using Nito.Logging; | |
/// <summary> | |
/// The table should have a primary (partition) key "id" (string), and TTL enabled with an attribute name "ttl". | |
/// </summary> | |
public sealed class DynamoDbDistributedLeaseProvider(ILogger logger, IAmazonDynamoDB dynamoDb, string tableName, string? owner = null) | |
: IDistributedLeaseProvider | |
{ | |
private readonly ILogger _logger = logger; | |
private readonly IAmazonDynamoDB _dynamoDb = dynamoDb; | |
private readonly string _tableName = tableName; | |
private readonly string _owner = owner ?? Guid.NewGuid().ToString("N"); | |
public async ValueTask<IDistributedLease?> TryAcquireLeaseAsync(string leaseName, TimeSpan leaseTimeout) | |
{ | |
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner)); | |
var utcNow = DateTimeOffset.UtcNow; | |
var now = utcNow.ToUnixTimeSeconds(); | |
var leaseExpiration = utcNow.Add(leaseTimeout).ToUnixTimeSeconds(); | |
var request = new PutItemRequest | |
{ | |
TableName = _tableName, | |
Item = new Dictionary<string, AttributeValue> | |
{ | |
{ "id", new AttributeValue { S = leaseName } }, | |
{ "ttl", new AttributeValue { N = leaseExpiration.ToString() } }, | |
{ "owner", new AttributeValue { S = _owner } }, | |
{ "version", new AttributeValue { N = "0" } }, | |
}, | |
ConditionExpression = "attribute_not_exists(id) OR ttl < :now", | |
ExpressionAttributeValues = new Dictionary<string, AttributeValue> | |
{ | |
{ ":now", new AttributeValue { N = now.ToString() } }, | |
} | |
}; | |
try | |
{ | |
await _dynamoDb.PutItemAsync(request); | |
_logger.LogInformation("Acquired lease with timeout {leaseTimeout}.", leaseTimeout); | |
return new DistributedLease(this, leaseName); | |
} | |
catch (ConditionalCheckFailedException) | |
{ | |
_logger.LogInformation("Failed to acquire lease."); | |
return null; | |
} | |
} | |
public async ValueTask<uint?> TryExtendLeaseAsync(string leaseName, uint version, TimeSpan leaseTimeout) | |
{ | |
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner)); | |
var utcNow = DateTimeOffset.UtcNow; | |
var leaseExpiration = utcNow.Add(leaseTimeout).ToUnixTimeSeconds(); | |
var newVersion = version + 1; | |
var request = new UpdateItemRequest | |
{ | |
TableName = _tableName, | |
Key = new Dictionary<string, AttributeValue> | |
{ | |
{ "id", new AttributeValue { S = leaseName } }, | |
}, | |
UpdateExpression = "SET ttl = :newTtl, version = :newVersion", | |
ConditionExpression = "version = :version", | |
ExpressionAttributeValues = new Dictionary<string, AttributeValue> | |
{ | |
{ ":newTtl", new AttributeValue { N = leaseExpiration.ToString() } }, | |
{ ":newVersion", new AttributeValue { N = newVersion.ToString() } }, | |
{ ":version", new AttributeValue { N = version.ToString() } } | |
}, | |
}; | |
try | |
{ | |
await _dynamoDb.UpdateItemAsync(request); | |
_logger.LogInformation("Extended lease to {leaseTimeout}.", leaseTimeout); | |
return newVersion; | |
} | |
catch (ConditionalCheckFailedException) | |
{ | |
_logger.LogInformation("Failed to extend lease."); | |
return null; | |
} | |
} | |
private async ValueTask ReleaseLeaseAsync(string leaseName, uint version) | |
{ | |
var request = new DeleteItemRequest | |
{ | |
TableName = _tableName, | |
Key = new Dictionary<string, AttributeValue> | |
{ | |
{ "id", new AttributeValue { S = leaseName } }, | |
}, | |
ConditionExpression = "version = :version", | |
ExpressionAttributeValues = new Dictionary<string, AttributeValue> | |
{ | |
{ ":version", new AttributeValue { N = version.ToString() } }, | |
}, | |
}; | |
try | |
{ | |
await _dynamoDb.DeleteItemAsync(request); | |
_logger.LogInformation("Released lease."); | |
} | |
catch (ConditionalCheckFailedException) | |
{ | |
_logger.LogInformation("Failed to release lease."); | |
} | |
} | |
private sealed class DistributedLease(DynamoDbDistributedLeaseProvider provider, string leaseName) : IDistributedLease | |
{ | |
private readonly DynamoDbDistributedLeaseProvider _provider = provider; | |
private readonly string _leaseName = leaseName; | |
private uint _version; | |
public ValueTask DisposeAsync() => _provider.ReleaseLeaseAsync(_leaseName, _version); | |
public async ValueTask<bool> TryExtendLeaseAsync(TimeSpan newLeaseTimeout) | |
{ | |
var result = await _provider.TryExtendLeaseAsync(_leaseName, _version, newLeaseTimeout); | |
if (result == null) | |
return false; | |
_version = result.Value; | |
return true; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IDistributedLeaseProvider | |
{ | |
ValueTask<IDistributedLease?> TryAcquireLeaseAsync(string leaseName, TimeSpan leaseTimeout); | |
} | |
public interface IDistributedLease : IAsyncDisposable | |
{ | |
ValueTask<bool> TryExtendLeaseAsync(TimeSpan newLeaseTimeout); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using StackExchange.Redis; | |
using Nito.Logging; | |
public sealed class RedisDistributedLeaseProvider(ILogger logger, IDatabaseAsync database, string? owner = null) | |
: IDistributedLeaseProvider | |
{ | |
private readonly ILogger _logger = logger; | |
private readonly IDatabaseAsync _database = database; | |
private readonly string _owner = owner ?? Guid.NewGuid().ToString("N"); | |
public async ValueTask<IDistributedLease?> TryAcquireLeaseAsync(string leaseName, TimeSpan leaseTimeout) | |
{ | |
var leaseKey = $"lease:{leaseName}"; | |
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner)); | |
if (await _database.StringSetAsync(leaseKey, _owner, leaseTimeout == Timeout.InfiniteTimeSpan ? null : leaseTimeout, When.NotExists)) | |
{ | |
_logger.LogInformation("Acquired lease with timeout {leaseTimeout}.", leaseTimeout); | |
return new DistributedLease(this, leaseName); | |
} | |
else | |
{ | |
_logger.LogInformation("Failed to acquire lease."); | |
return null; | |
} | |
} | |
private async ValueTask<bool> TryExtendLeaseAsync(string leaseName, TimeSpan leaseTimeout) | |
{ | |
var leaseKey = $"lease:{leaseName}"; | |
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner)); | |
if (await DoExtendAsync(_database, leaseKey, _owner, leaseTimeout == Timeout.InfiniteTimeSpan ? -1 : (int)leaseTimeout.TotalMilliseconds)) | |
{ | |
_logger.LogInformation("Extended lease to {leaseTimeout}.", leaseTimeout); | |
return true; | |
} | |
else | |
{ | |
_logger.LogInformation("Failed to extend lease."); | |
return false; | |
} | |
static async ValueTask<bool> DoExtendAsync(IDatabaseAsync database, RedisKey leaseKey, RedisValue owner, RedisValue leaseTimeoutMilliseconds) | |
{ | |
const string script = """ | |
if redis.call("get", KEYS[1]) == ARGV[1] then | |
if ARGV[2] == -1 then | |
return redis.call("persist", KEYS[1]) | |
else | |
return redis.call("pexpire", KEYS[1], ARGV[2]) | |
else | |
return 0 | |
end | |
"""; | |
var scriptResult = await database.ScriptEvaluateAsync(script, keys: [leaseKey], values: [owner, leaseTimeoutMilliseconds]); | |
return (int)scriptResult == 1; | |
} | |
} | |
private async ValueTask ReleaseLeaseAsync(string leaseName) | |
{ | |
var leaseKey = $"lease:{leaseName}"; | |
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner)); | |
if (await DoReleaseAsync(_database, leaseKey, _owner)) | |
_logger.LogInformation("Released lease."); | |
else | |
_logger.LogInformation("Failed to release lease."); | |
static async ValueTask<bool> DoReleaseAsync(IDatabaseAsync database, RedisKey leaseKey, RedisValue owner) | |
{ | |
const string script = """ | |
if redis.call("get", KEYS[1]) == ARGV[1] then | |
return redis.call("del", KEYS[1]) | |
else | |
return 0 | |
end | |
"""; | |
var scriptResult = await database.ScriptEvaluateAsync(script, keys:[leaseKey], values:[owner]); | |
return (int)scriptResult == 1; | |
} | |
} | |
private sealed class DistributedLease(RedisDistributedLeaseProvider provider, string name) | |
: IDistributedLease | |
{ | |
private readonly RedisDistributedLeaseProvider _provider = provider; | |
private readonly string _name = name; | |
public ValueTask<bool> TryExtendLeaseAsync(TimeSpan newLeaseTimeout) => _provider.TryExtendLeaseAsync(_name, newLeaseTimeout); | |
public async ValueTask DisposeAsync() => await _provider.ReleaseLeaseAsync(_name); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Warning: untested! ;)