Skip to content

Instantly share code, notes, and snippets.

@joonhwan
Last active October 20, 2024 23:50
Show Gist options
  • Save joonhwan/21a8530244735523fa8a82b76fbc6cbb to your computer and use it in GitHub Desktop.
Save joonhwan/21a8530244735523fa8a82b76fbc6cbb to your computer and use it in GitHub Desktop.
distributed read write lock in c# / redis
using StackExchange.Redis;
namespace RedisLockTest;
public enum LockStatus
{
Success,
Failure,
Timeout
}
public class DistributedReadWriteLock
{
private readonly ConnectionMultiplexer _redis;
private readonly IDatabase _db;
private readonly string _lockKey;
private readonly string _readersKey;
private readonly TimeSpan _lockExpiry;
// LuaScript 및 LoadedLuaScript 인스턴스
private readonly LuaScript _acquireReadLockScript;
private readonly LuaScript _releaseReadLockScript;
private readonly LuaScript _acquireWriteLockScript;
private readonly LuaScript _releaseWriteLockScript;
private LoadedLuaScript _acquireReadLockLoadedScript;
private LoadedLuaScript _releaseReadLockLoadedScript;
private LoadedLuaScript _acquireWriteLockLoadedScript;
private LoadedLuaScript _releaseWriteLockLoadedScript;
public DistributedReadWriteLock(string lockKey, ConnectionMultiplexer redis, TimeSpan lockExpiry)
{
_redis = redis;
_db = redis.GetDatabase();
_lockKey = lockKey;
_readersKey = $"{lockKey}:readers";
_lockExpiry = lockExpiry;
// Lua 스크립트 준비
_acquireReadLockScript = LuaScript.Prepare(@"
if (redis.call('exists', @writeLockKey) == 0) then
redis.call('incr', @readersKey);
redis.call('pexpire', @readersKey, @lockExpiry);
return 1;
else
return 0;
end");
_releaseReadLockScript = LuaScript.Prepare(@"
local count = redis.call('decr', @readersKey);
if (count <= 0) then
redis.call('del', @readersKey);
else
redis.call('pexpire', @readersKey, @lockExpiry);
end
return nil");
_acquireWriteLockScript = LuaScript.Prepare(@"
if (redis.call('exists', @writeLockKey) == 0 and redis.call('exists', @readersKey) == 0) then
redis.call('set', @writeLockKey, @lockValue, 'PX', @lockExpiry);
return 1;
else
return 0;
end");
_releaseWriteLockScript = LuaScript.Prepare(@"
return redis.call('del', @writeLockKey)");
// 스크립트 로드
PreloadScripts();
}
private void PreloadScripts()
{
foreach (var endPoint in _redis.GetEndPoints())
{
var server = _redis.GetServer(endPoint);
_acquireReadLockLoadedScript = _acquireReadLockScript.Load(server);
_releaseReadLockLoadedScript = _releaseReadLockScript.Load(server);
_acquireWriteLockLoadedScript = _acquireWriteLockScript.Load(server);
_releaseWriteLockLoadedScript = _releaseWriteLockScript.Load(server);
}
}
public async Task<LockStatus> AcquireReadLockAsync(TimeSpan timeout)
{
var timeoutTime = DateTime.UtcNow + timeout;
int retryDelay = 100; // 초기 딜레이(ms)
int maxDelay = 1000; // 최대 딜레이(ms)
while (DateTime.UtcNow < timeoutTime)
{
try
{
var parameters = new
{
writeLockKey = (RedisKey)_lockKey,
readersKey = (RedisKey)_readersKey,
lockExpiry = (long)_lockExpiry.TotalMilliseconds
};
var result = (int)(long)await _acquireReadLockLoadedScript.EvaluateAsync(_db, parameters);
if (result == 1)
{
return LockStatus.Success;
}
}
catch (RedisServerException ex) when (ex.Message.StartsWith("NOSCRIPT"))
{
await ReloadScriptsAsync();
continue;
}
await Task.Delay(retryDelay);
retryDelay = Math.Min(retryDelay * 2, maxDelay); // 지수 백오프 적용
}
return LockStatus.Timeout;
}
public async Task ReleaseReadLockAsync()
{
try
{
var parameters = new
{
readersKey = (RedisKey)_readersKey,
lockExpiry = (long)_lockExpiry.TotalMilliseconds
};
await _releaseReadLockLoadedScript.EvaluateAsync(_db, parameters);
}
catch (RedisServerException ex) when (ex.Message.StartsWith("NOSCRIPT"))
{
await ReloadScriptsAsync();
await ReleaseReadLockAsync();
}
}
public async Task<LockStatus> AcquireWriteLockAsync(TimeSpan timeout)
{
var timeoutTime = DateTime.UtcNow + timeout;
int retryDelay = 100; // 초기 딜레이(ms)
int maxDelay = 1000; // 최대 딜레이(ms)
while (DateTime.UtcNow < timeoutTime)
{
try
{
var parameters = new
{
writeLockKey = (RedisKey)_lockKey,
readersKey = (RedisKey)_readersKey,
lockValue = "write_lock",
lockExpiry = (long)_lockExpiry.TotalMilliseconds
};
var result = (int)(long)await _acquireWriteLockLoadedScript.EvaluateAsync(_db, parameters);
if (result == 1)
{
return LockStatus.Success;
}
}
catch (RedisServerException ex) when (ex.Message.StartsWith("NOSCRIPT"))
{
await ReloadScriptsAsync();
continue;
}
await Task.Delay(retryDelay);
retryDelay = Math.Min(retryDelay * 2, maxDelay); // 지수 백오프 적용
}
return LockStatus.Timeout;
}
public async Task ReleaseWriteLockAsync()
{
try
{
var parameters = new
{
writeLockKey = (RedisKey)_lockKey
};
await _releaseWriteLockLoadedScript.EvaluateAsync(_db, parameters);
}
catch (RedisServerException ex) when (ex.Message.StartsWith("NOSCRIPT"))
{
await ReloadScriptsAsync();
await ReleaseWriteLockAsync();
}
}
private async Task ReloadScriptsAsync()
{
foreach (var endPoint in _redis.GetEndPoints())
{
var server = _redis.GetServer(endPoint);
_acquireReadLockLoadedScript = await _acquireReadLockScript.LoadAsync(server);
_releaseReadLockLoadedScript = await _releaseReadLockScript.LoadAsync(server);
_acquireWriteLockLoadedScript = await _acquireWriteLockScript.LoadAsync(server);
_releaseWriteLockLoadedScript = await _releaseWriteLockScript.LoadAsync(server);
}
}
}
using StackExchange.Redis;
namespace RedisLeaderElectionTest;
public class RedisLeaderElection : IDisposable
{
private readonly IDatabase _db;
private readonly string _lockKey;
private readonly TimeSpan _lockExpiry;
private readonly string _instanceId;
private Timer? _renewalTimer;
private Timer? _acquisitionTimer;
private volatile bool _isLeader;
private readonly object _leaderLock = new object();
private readonly CancellationTokenSource _cts;
// Lua 스크립트 관련 변수들
private static readonly LuaScript RenewLockScript = LuaScript.Prepare(@"
if redis.call('get', @key) == @value then
return redis.call('pexpire', @key, @expiry)
else
return 0
end");
private static readonly LuaScript ReleaseLockScript = LuaScript.Prepare(@"
if redis.call('get', @key) == @value then
return redis.call('del', @key)
else
return 0
end");
// 리더가 되었을 때 통지하는 이벤트
public event Action? BecameLeader;
public RedisLeaderElection(IDatabase database, string lockKey = "leader_lock", TimeSpan? lockExpiry = null)
{
_db = database ?? throw new ArgumentNullException(nameof(database));
_lockKey = lockKey;
_lockExpiry = lockExpiry ?? TimeSpan.FromSeconds(10);
_instanceId = Guid.NewGuid().ToString();
_cts = new CancellationTokenSource();
}
/// <summary>
/// 현재 인스턴스가 리더인지 확인합니다.
/// </summary>
public bool IsLeader => _isLeader;
/// <summary>
/// 리더 선출 프로세스를 시작합니다.
/// </summary>
public void Start()
{
TryAcquireLeadership();
// 리더 여부에 따라 타이머 시작
if (_isLeader)
{
// 갱신 타이머 시작
_renewalTimer = new Timer(async _ => await RenewLockAsync(), null, _lockExpiry / 2, _lockExpiry / 2);
}
else
{
// 획득 시도 타이머 시작
_acquisitionTimer = new Timer(async _ => await TryAcquireLeadershipAsync(), null, _lockExpiry / 2, _lockExpiry / 2);
}
}
private void TryAcquireLeadership()
{
// 락 획득 시도
bool acquired = _db.StringSet(_lockKey, _instanceId, _lockExpiry, When.NotExists);
if (acquired)
{
BecomeLeader();
}
else
{
_isLeader = false;
}
}
private async Task TryAcquireLeadershipAsync()
{
if (_isLeader)
return;
bool acquired = await _db.StringSetAsync(_lockKey, _instanceId, _lockExpiry, When.NotExists);
if (acquired)
{
BecomeLeader();
}
}
private void BecomeLeader()
{
lock (_leaderLock)
{
if (_isLeader)
return;
_isLeader = true;
// 획득 시도 타이머 중지
_acquisitionTimer?.Dispose();
// 갱신 타이머 시작
_renewalTimer = new Timer(async _ => await RenewLockAsync(), null, _lockExpiry / 2, _lockExpiry / 2);
// 리스너에게 통지
BecameLeader?.Invoke();
}
}
private async Task RenewLockAsync()
{
if (!_isLeader)
return;
var parameters = new
{
key = (RedisKey)_lockKey,
value = (RedisValue)_instanceId,
expiry = (long)_lockExpiry.TotalMilliseconds
};
var result = (long)(await _db.ScriptEvaluateAsync(RenewLockScript, parameters));
if (result == 0)
{
// 리더십 상실
_isLeader = false;
_renewalTimer?.Dispose();
// 획득 시도 타이머 시작
_acquisitionTimer = new Timer(async _ => await TryAcquireLeadershipAsync(), null, _lockExpiry / 2, _lockExpiry / 2);
}
}
/// <summary>
/// 리더 선출 프로세스를 중지하고 자원을 해제합니다.
/// </summary>
public void Stop()
{
_cts.Cancel();
if (_isLeader)
{
// 락 해제
ReleaseLock();
}
_renewalTimer?.Dispose();
_acquisitionTimer?.Dispose();
}
private void ReleaseLock()
{
var parameters = new
{
key = (RedisKey)_lockKey,
value = (RedisValue)_instanceId
};
_db.ScriptEvaluate(ReleaseLockScript, parameters);
}
public void Dispose()
{
Stop();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment