Last active
December 8, 2023 18:19
-
-
Save JKamsker/0de2d65ba0730d89022e7aea83ca7643 to your computer and use it in GitHub Desktop.
FileWriterMutex
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Concurrent; | |
namespace JKToolKit.Logging.File.Utils; | |
public abstract class AsyncMutex : IDisposable | |
{ | |
private static ConcurrentDictionary<string, AsyncMutex> _localMutexes = new ConcurrentDictionary<string, AsyncMutex>(); | |
public abstract Task<IDisposable> WaitOneAsync(CancellationToken ct); | |
public abstract void Release(); | |
public abstract void Dispose(); | |
public static AsyncMutex Create(string name, Scope scope) | |
{ | |
return scope switch | |
{ | |
Scope.Global => CreateGlobal(name), | |
Scope.Local => CreateLocal(name), | |
Scope.None => NoMutex.Instance, | |
_ => throw new ArgumentOutOfRangeException(nameof(scope), scope, null) | |
}; | |
} | |
private static AsyncMutex CreateLocal(string name) | |
{ | |
return _localMutexes.GetOrAdd(name, _ => new LocalScope()); | |
} | |
private static AsyncMutex CreateGlobal(string name) | |
{ | |
return new GlobalScope(name); | |
} | |
public enum Scope | |
{ | |
Global, | |
Local, | |
None | |
} | |
private class GlobalScope : AsyncMutex | |
{ | |
private readonly Semaphore _mutex; | |
public GlobalScope(string name) | |
{ | |
_mutex = new Semaphore(1, 1, name); | |
} | |
public override async Task<IDisposable> WaitOneAsync(CancellationToken ct) | |
{ | |
var thread = new Thread(() => _mutex.WaitOne()); | |
try | |
{ | |
return await Task.Factory.StartNew | |
( | |
() => | |
{ | |
_mutex.WaitOne(); | |
return (IDisposable)new AsyncMutexDisposable(this); | |
}, | |
ct, | |
TaskCreationOptions.LongRunning, | |
TaskScheduler.Default | |
); | |
} | |
catch (TaskCanceledException) | |
{ | |
return new AsyncMutexDisposable(this, false); | |
} | |
} | |
public override void Release() | |
{ | |
_mutex.Release(); | |
} | |
public override void Dispose() | |
{ | |
_mutex.Dispose(); | |
} | |
} | |
private class LocalScope : AsyncMutex | |
{ | |
private readonly SemaphoreSlim _semaphore; | |
public LocalScope(int initialCount = 1, int maxCount = 1) | |
{ | |
_semaphore = new SemaphoreSlim(initialCount, maxCount); | |
} | |
public override async Task<IDisposable> WaitOneAsync(CancellationToken ct) | |
{ | |
try | |
{ | |
await _semaphore.WaitAsync(ct); | |
return new AsyncMutexDisposable(this); | |
} | |
catch (TaskCanceledException) | |
{ | |
return new AsyncMutexDisposable(this, false); | |
} | |
} | |
public override void Release() | |
{ | |
_semaphore.Release(); | |
} | |
public override void Dispose() | |
{ | |
_semaphore.Dispose(); | |
} | |
} | |
public class NoMutex : AsyncMutex | |
{ | |
public static readonly NoMutex Instance = new NoMutex(); | |
public override Task<IDisposable> WaitOneAsync(CancellationToken ct) | |
{ | |
return Task.FromResult<IDisposable>(new AsyncMutexDisposable(this)); | |
} | |
public override void Release() | |
{ | |
} | |
public override void Dispose() | |
{ | |
} | |
} | |
private class AsyncMutexDisposable : IDisposable | |
{ | |
private readonly AsyncMutex _mutex; | |
private readonly bool _shouldRelease; | |
public AsyncMutexDisposable(AsyncMutex mutex, bool shouldRelease = true) | |
{ | |
_mutex = mutex; | |
_shouldRelease = shouldRelease; | |
} | |
public void Dispose() | |
{ | |
if (_shouldRelease) | |
{ | |
_mutex.Release(); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ParallelFileWriter : IAsyncDisposable | |
{ | |
private readonly Channel<IMessage> _channel; | |
private readonly AsyncMutex _mutex; | |
private readonly string _filePath; | |
private readonly FileWriterSettings _settings; | |
private Task _workerTask; | |
public ParallelFileWriter(string filePath, FileWriterSettings? settings) | |
{ | |
settings ??= new FileWriterSettings | |
{ | |
FullQueueStrategy = FullQueueStrategy.Block, | |
QueueSize = null | |
}; | |
_channel = ChannelFactory.CreateChannel<IMessage>(settings); | |
_filePath = filePath; | |
_settings = settings; | |
var mutexName = HashUtils.Md5String(filePath); | |
_mutex = AsyncMutex.Create(mutexName + "1", settings.MutexScope); | |
} | |
public void Write(string message) | |
{ | |
Write(new Message { Content = message }); | |
} | |
public void Write(IMessage message) | |
{ | |
// In case of Block strategy, this will return false if the channel is full. | |
// In case of DropNewest or DropOldest, this will drop the appropriate message and write the new one. | |
var waittime = 1; | |
while (!_channel.Writer.TryWrite(message)) | |
{ | |
// If TryWrite returned false and the FullQueueStrategy is Block, we could add a small delay here before trying again. | |
// However, this would make the method no longer fully synchronous. | |
// If a delay is not acceptable, you could throw an exception or handle this case differently. | |
if (_settings.FullQueueStrategy == FullQueueStrategy.Block) | |
{ | |
//Console.WriteLine("Channel is full, waiting" + waittime + "ms"); | |
Thread.Sleep(waittime); | |
// Expotential backoff, max 500ms | |
waittime = Math.Min(waittime * 2, 500); | |
} | |
else | |
{ | |
throw new Exception("Channel is full"); | |
} | |
} | |
} | |
public async Task WriteAsync(IMessage message) | |
{ | |
await _channel.Writer.WriteAsync(message); | |
} | |
public void Start(CancellationToken cancellationToken = default) | |
{ | |
_workerTask = Task.Run( | |
async () => | |
{ | |
try | |
{ | |
await Worker(cancellationToken); | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine(ex); | |
} | |
}, | |
cancellationToken | |
); | |
} | |
public async Task StopAsync() | |
{ | |
_channel.Writer.Complete(); | |
await _workerTask; | |
} | |
private async Task Worker(CancellationToken cancellationToken) | |
{ | |
var isFirstTime = true; | |
while ( | |
await _channel.Reader.WaitToReadAsync(cancellationToken) | |
&& !cancellationToken.IsCancellationRequested | |
) | |
{ | |
if (!isFirstTime) | |
{ | |
// Wait for one millsecond, to allow other threads/processes to write to the file. | |
// Instant relocking could cause the lock not actually being released fairly. | |
await Task.Delay(1, cancellationToken); | |
} | |
isFirstTime = false; | |
using var _ = await _mutex.WaitOneAsync(cancellationToken); | |
if (cancellationToken.IsCancellationRequested) | |
{ | |
break; | |
} | |
using FileStream fs = await OpenFileAsync(); | |
using var writer = new StreamWriter(fs, Encoding.UTF8, _settings.CacheSize); | |
var count = 0; | |
//var sw = Stopwatch.StartNew(); | |
while (count < 100_000 && _channel.Reader.TryRead(out var message)) | |
{ | |
await message.WriteToAsync(writer); | |
count++; | |
} | |
} | |
} | |
private async ValueTask<FileStream> OpenFileAsync(int maxAttempts = 20, TimeSpan? initialDelay = null, TimeSpan? maxDelay = null) | |
{ | |
TimeSpan delay = initialDelay ?? TimeSpan.FromMilliseconds(10); | |
TimeSpan maximumDelay = maxDelay ?? TimeSpan.FromMinutes(0.5); | |
foreach (var backoffDelay in Backoff.Logarithmic(maxAttempts, delay, maximumDelay)) | |
{ | |
try | |
{ | |
//Console.WriteLine($"Writing to file {_filePath}"); | |
return File.Open( | |
_filePath, | |
new FileStreamOptions | |
{ | |
Access = FileAccess.Write, | |
Mode = FileMode.Append, | |
Share = FileShare.Read, // maybe ReadWrite? | |
Options = FileOptions.Asynchronous | FileOptions.WriteThrough, | |
BufferSize = _settings.CacheSize | |
} | |
); | |
} | |
catch (IOException e) | |
{ | |
// The file is locked, log the error and wait | |
Debug.WriteLine($"Attempt failed: {e.Message}"); | |
// Wait before trying again | |
await Task.Delay(backoffDelay); | |
} | |
} | |
// This should never be reached, but is required for the function to compile | |
throw new InvalidOperationException("Failed to open file after multiple attempts."); | |
} | |
public async ValueTask DisposeAsync() | |
{ | |
await StopAsync(); | |
} | |
} | |
internal class ChannelFactory | |
{ | |
public static Channel<T> CreateChannel<T>(IChannelSettings settings) | |
{ | |
if (!settings.QueueSize.HasValue) | |
{ | |
return Channel.CreateUnbounded<T>( | |
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false } | |
); | |
} | |
return Channel.CreateBounded<T>( | |
new BoundedChannelOptions(settings.QueueSize.Value) | |
{ | |
FullMode = settings.FullQueueStrategy switch | |
{ | |
FullQueueStrategy.Block => BoundedChannelFullMode.Wait, | |
FullQueueStrategy.DropNewest => BoundedChannelFullMode.DropWrite, | |
FullQueueStrategy.DropOldest => BoundedChannelFullMode.DropOldest, | |
_ => throw new ArgumentOutOfRangeException(nameof(settings.FullQueueStrategy)) | |
} | |
} | |
); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment