Skip to content

Instantly share code, notes, and snippets.

@StephenCleary
Created March 26, 2025 23:28
Show Gist options
  • Save StephenCleary/9777f2dcb9834a05efb261cc36839b3c to your computer and use it in GitHub Desktop.
Save StephenCleary/9777f2dcb9834a05efb261cc36839b3c to your computer and use it in GitHub Desktop.
Distributed lease implementations in C#
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json;
using System.Diagnostics;
using System.Net;
using Nito.Logging;
/// <summary>
/// The container should have TTL enabled with a default TTL of -1.
/// </summary>
public sealed class CosmosDbDistributedLeaseProvider(ILogger logger, Container container, string? owner = null)
: IDistributedLeaseProvider
{
private readonly ILogger _logger = logger;
private readonly Container _container = container;
private readonly string _owner = owner ?? Guid.NewGuid().ToString("N");
public async ValueTask<IDistributedLease?> TryAcquireLeaseAsync(string leaseName, TimeSpan leaseTimeout)
{
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner));
var ttl = CosmosDbTtl(leaseTimeout);
var partitionKey = new PartitionKey(leaseName);
var response = await _container.CreateItemAsync(
new LeaseItem
{
Ttl = ttl,
Owner = _owner,
},
partitionKey,
new ItemRequestOptions()
{
EnableContentResponseOnWrite = false,
});
if (response.StatusCode == HttpStatusCode.Created)
{
_logger.LogInformation("Acquired lease with timeout {leaseTimeout}.", leaseTimeout);
return new DistributedLease(this, partitionKey, response.Headers.ETag);
}
else
{
_logger.LogInformation("Failed to acquire lease.");
return null;
}
}
private async ValueTask<string?> TryExtendLeaseAsync(PartitionKey leaseName, string eTag, TimeSpan leaseTimeout)
{
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner));
var ttl = CosmosDbTtl(leaseTimeout);
var response = await _container.ReplaceItemAsync(
new LeaseItem
{
Ttl = ttl,
Owner = _owner,
},
LeaseItem.LeaseId,
leaseName,
new ItemRequestOptions()
{
IfMatchEtag = eTag,
EnableContentResponseOnWrite = false,
});
if (response.StatusCode == HttpStatusCode.OK)
{
_logger.LogInformation("Extended lease to {leaseTimeout}.", leaseTimeout);
return response.Headers.ETag;
}
else
{
_logger.LogInformation("Failed to extend lease.");
return null;
}
}
private async ValueTask ReleaseLeaseAsync(PartitionKey leaseName, string eTag)
{
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner));
var response = await _container.DeleteItemAsync<LeaseItem>(
LeaseItem.LeaseId,
leaseName,
new ItemRequestOptions()
{
IfMatchEtag = eTag,
EnableContentResponseOnWrite = false,
});
if (response.StatusCode == HttpStatusCode.NoContent)
_logger.LogInformation("Released lease.");
else
_logger.LogInformation("Failed to release lease.");
}
private static int CosmosDbTtl(TimeSpan timespan)
{
if (timespan == Timeout.InfiniteTimeSpan)
return -1;
var seconds = (int)timespan.TotalSeconds;
return seconds < 1 ? 1 : seconds;
}
private sealed class DistributedLease(CosmosDbDistributedLeaseProvider provider, PartitionKey leaseName, string eTag)
: IDistributedLease
{
private readonly CosmosDbDistributedLeaseProvider _provider = provider;
public PartitionKey _leaseName = leaseName;
private string _eTag = eTag;
public async ValueTask<bool> TryExtendLeaseAsync(TimeSpan newLeaseTimeout)
{
var eTag = await _provider.TryExtendLeaseAsync(_leaseName, _eTag, newLeaseTimeout);
if (eTag == null)
return false;
_eTag = eTag;
return true;
}
public ValueTask DisposeAsync() => _provider.ReleaseLeaseAsync(_leaseName, _eTag);
}
private class LeaseItem
{
public static string LeaseId = "lease";
[JsonProperty("id"), DebuggerBrowsable(DebuggerBrowsableState.Never)]
public string Id { get; set; } = LeaseId;
[JsonProperty("ttl")]
public int Ttl { get; set; }
[JsonProperty("_etag")]
public string ETag { get; set; } = null!;
public string Owner { get; set; } = null!;
}
}
using Amazon.DynamoDBv2.Model;
using Amazon.DynamoDBv2;
using Nito.Logging;
/// <summary>
/// The table should have a primary (partition) key "id" (string), and TTL enabled with an attribute name "ttl".
/// </summary>
public sealed class DynamoDbDistributedLeaseProvider(ILogger logger, IAmazonDynamoDB dynamoDb, string tableName, string? owner = null)
: IDistributedLeaseProvider
{
private readonly ILogger _logger = logger;
private readonly IAmazonDynamoDB _dynamoDb = dynamoDb;
private readonly string _tableName = tableName;
private readonly string _owner = owner ?? Guid.NewGuid().ToString("N");
public async ValueTask<IDistributedLease?> TryAcquireLeaseAsync(string leaseName, TimeSpan leaseTimeout)
{
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner));
var utcNow = DateTimeOffset.UtcNow;
var now = utcNow.ToUnixTimeSeconds();
var leaseExpiration = utcNow.Add(leaseTimeout).ToUnixTimeSeconds();
var request = new PutItemRequest
{
TableName = _tableName,
Item = new Dictionary<string, AttributeValue>
{
{ "id", new AttributeValue { S = leaseName } },
{ "ttl", new AttributeValue { N = leaseExpiration.ToString() } },
{ "owner", new AttributeValue { S = _owner } },
{ "version", new AttributeValue { N = "0" } },
},
ConditionExpression = "attribute_not_exists(id) OR ttl < :now",
ExpressionAttributeValues = new Dictionary<string, AttributeValue>
{
{ ":now", new AttributeValue { N = now.ToString() } },
}
};
try
{
await _dynamoDb.PutItemAsync(request);
_logger.LogInformation("Acquired lease with timeout {leaseTimeout}.", leaseTimeout);
return new DistributedLease(this, leaseName);
}
catch (ConditionalCheckFailedException)
{
_logger.LogInformation("Failed to acquire lease.");
return null;
}
}
public async ValueTask<uint?> TryExtendLeaseAsync(string leaseName, uint version, TimeSpan leaseTimeout)
{
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner));
var utcNow = DateTimeOffset.UtcNow;
var leaseExpiration = utcNow.Add(leaseTimeout).ToUnixTimeSeconds();
var newVersion = version + 1;
var request = new UpdateItemRequest
{
TableName = _tableName,
Key = new Dictionary<string, AttributeValue>
{
{ "id", new AttributeValue { S = leaseName } },
},
UpdateExpression = "SET ttl = :newTtl, version = :newVersion",
ConditionExpression = "version = :version",
ExpressionAttributeValues = new Dictionary<string, AttributeValue>
{
{ ":newTtl", new AttributeValue { N = leaseExpiration.ToString() } },
{ ":newVersion", new AttributeValue { N = newVersion.ToString() } },
{ ":version", new AttributeValue { N = version.ToString() } }
},
};
try
{
await _dynamoDb.UpdateItemAsync(request);
_logger.LogInformation("Extended lease to {leaseTimeout}.", leaseTimeout);
return newVersion;
}
catch (ConditionalCheckFailedException)
{
_logger.LogInformation("Failed to extend lease.");
return null;
}
}
private async ValueTask ReleaseLeaseAsync(string leaseName, uint version)
{
var request = new DeleteItemRequest
{
TableName = _tableName,
Key = new Dictionary<string, AttributeValue>
{
{ "id", new AttributeValue { S = leaseName } },
},
ConditionExpression = "version = :version",
ExpressionAttributeValues = new Dictionary<string, AttributeValue>
{
{ ":version", new AttributeValue { N = version.ToString() } },
},
};
try
{
await _dynamoDb.DeleteItemAsync(request);
_logger.LogInformation("Released lease.");
}
catch (ConditionalCheckFailedException)
{
_logger.LogInformation("Failed to release lease.");
}
}
private sealed class DistributedLease(DynamoDbDistributedLeaseProvider provider, string leaseName) : IDistributedLease
{
private readonly DynamoDbDistributedLeaseProvider _provider = provider;
private readonly string _leaseName = leaseName;
private uint _version;
public ValueTask DisposeAsync() => _provider.ReleaseLeaseAsync(_leaseName, _version);
public async ValueTask<bool> TryExtendLeaseAsync(TimeSpan newLeaseTimeout)
{
var result = await _provider.TryExtendLeaseAsync(_leaseName, _version, newLeaseTimeout);
if (result == null)
return false;
_version = result.Value;
return true;
}
}
}
public interface IDistributedLeaseProvider
{
ValueTask<IDistributedLease?> TryAcquireLeaseAsync(string leaseName, TimeSpan leaseTimeout);
}
public interface IDistributedLease : IAsyncDisposable
{
ValueTask<bool> TryExtendLeaseAsync(TimeSpan newLeaseTimeout);
}
using StackExchange.Redis;
using Nito.Logging;
public sealed class RedisDistributedLeaseProvider(ILogger logger, IDatabaseAsync database, string? owner = null)
: IDistributedLeaseProvider
{
private readonly ILogger _logger = logger;
private readonly IDatabaseAsync _database = database;
private readonly string _owner = owner ?? Guid.NewGuid().ToString("N");
public async ValueTask<IDistributedLease?> TryAcquireLeaseAsync(string leaseName, TimeSpan leaseTimeout)
{
var leaseKey = $"lease:{leaseName}";
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner));
if (await _database.StringSetAsync(leaseKey, _owner, leaseTimeout == Timeout.InfiniteTimeSpan ? null : leaseTimeout, When.NotExists))
{
_logger.LogInformation("Acquired lease with timeout {leaseTimeout}.", leaseTimeout);
return new DistributedLease(this, leaseName);
}
else
{
_logger.LogInformation("Failed to acquire lease.");
return null;
}
}
private async ValueTask<bool> TryExtendLeaseAsync(string leaseName, TimeSpan leaseTimeout)
{
var leaseKey = $"lease:{leaseName}";
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner));
if (await DoExtendAsync(_database, leaseKey, _owner, leaseTimeout == Timeout.InfiniteTimeSpan ? -1 : (int)leaseTimeout.TotalMilliseconds))
{
_logger.LogInformation("Extended lease to {leaseTimeout}.", leaseTimeout);
return true;
}
else
{
_logger.LogInformation("Failed to extend lease.");
return false;
}
static async ValueTask<bool> DoExtendAsync(IDatabaseAsync database, RedisKey leaseKey, RedisValue owner, RedisValue leaseTimeoutMilliseconds)
{
const string script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
if ARGV[2] == -1 then
return redis.call("persist", KEYS[1])
else
return redis.call("pexpire", KEYS[1], ARGV[2])
else
return 0
end
""";
var scriptResult = await database.ScriptEvaluateAsync(script, keys: [leaseKey], values: [owner, leaseTimeoutMilliseconds]);
return (int)scriptResult == 1;
}
}
private async ValueTask ReleaseLeaseAsync(string leaseName)
{
var leaseKey = $"lease:{leaseName}";
using var _ = _logger.BeginDataScope(("LeaseName", leaseName), ("LeaseOwner", _owner));
if (await DoReleaseAsync(_database, leaseKey, _owner))
_logger.LogInformation("Released lease.");
else
_logger.LogInformation("Failed to release lease.");
static async ValueTask<bool> DoReleaseAsync(IDatabaseAsync database, RedisKey leaseKey, RedisValue owner)
{
const string script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
""";
var scriptResult = await database.ScriptEvaluateAsync(script, keys:[leaseKey], values:[owner]);
return (int)scriptResult == 1;
}
}
private sealed class DistributedLease(RedisDistributedLeaseProvider provider, string name)
: IDistributedLease
{
private readonly RedisDistributedLeaseProvider _provider = provider;
private readonly string _name = name;
public ValueTask<bool> TryExtendLeaseAsync(TimeSpan newLeaseTimeout) => _provider.TryExtendLeaseAsync(_name, newLeaseTimeout);
public async ValueTask DisposeAsync() => await _provider.ReleaseLeaseAsync(_name);
}
}
@StephenCleary
Copy link
Author

Warning: untested! ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment