Skip to content

Instantly share code, notes, and snippets.

@neuecc
Last active December 18, 2019 13:51
Show Gist options
  • Save neuecc/6ab043f38082743cba4c7b7b07f6b039 to your computer and use it in GitHub Desktop.
Save neuecc/6ab043f38082743cba4c7b7b07f6b039 to your computer and use it in GitHub Desktop.
public class AsyncBufferedUtf8ConsoleLogger : ILogger, IAsyncDisposable
{
readonly int bufferCount;
readonly byte[] newLine;
readonly int newLineLength;
readonly Stream stream;
readonly Channel<string> channel;
readonly Task writeLoop;
readonly CancellationTokenSource cancellationTokenSource;
public AsyncBufferedUtf8ConsoleLogger(int bufferCount = 100, bool withNewLine = true, CancellationToken cancellationToken = default)
{
this.newLine = withNewLine ? Encoding.UTF8.GetBytes(Environment.NewLine) : null;
this.newLineLength = withNewLine ? newLine.Length : 0;
this.bufferCount = bufferCount;
this.cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
this.stream = Console.OpenStandardOutput();
this.channel = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleWriter = false,
SingleReader = true,
});
this.writeLoop = WriteLoop();
}
async Task WriteLoop()
{
var index = 0;
var totalWriteSize = 0;
var stringBuffers = new string[bufferCount];
var reader = channel.Reader;
try
{
while (await reader.WaitToReadAsync(cancellationTokenSource.Token).ConfigureAwait(false))
{
while (reader.TryRead(out var value))
{
var size = Encoding.UTF8.GetMaxByteCount(value.Length);
// avoid to allocate LOH buffer
if (65536 < (totalWriteSize + size + newLineLength))
{
goto FLUSH;
}
else
{
stringBuffers[index++] = value;
totalWriteSize += size;
value = null;
}
if (index != stringBuffers.Length) continue;
FLUSH:
var buffer = ArrayPool<byte>.Shared.Rent(totalWriteSize);
try
{
WriteMessage(stringBuffers, index, buffer);
index = 0;
totalWriteSize = 0;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer, false);
}
if (value != null)
{
stringBuffers[index++] = value;
totalWriteSize += size;
}
}
}
}
catch (OperationCanceledException)
{
}
if (index != 0)
{
var buffer = ArrayPool<byte>.Shared.Rent(totalWriteSize);
try
{
WriteMessage(stringBuffers, index, buffer);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer, false);
}
}
}
void WriteMessage(string[] stringBuffers, int index, byte[] buffer)
{
int written = 0;
for (int i = 0; i < index; i++)
{
var str = stringBuffers[i];
written += Encoding.UTF8.GetBytes(str, 0, str.Length, buffer, written);
if (newLine != null)
{
Buffer.BlockCopy(newLine, 0, buffer, written, newLineLength);
written += newLineLength;
}
stringBuffers[i] = null;
}
// ConsolePal does not support async write, use Write(byte[]) API is most primitive.
stream.Write(buffer, 0, written);
}
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
var str = formatter(state, exception);
channel.Writer.TryWrite(str);
}
public IDisposable BeginScope<TState>(TState state)
{
return NUllDisposable.Instance;
}
public bool IsEnabled(LogLevel logLevel)
{
return true;
}
public async ValueTask DisposeAsync()
{
channel.Writer.Complete();
await channel.Reader.Completion;
cancellationTokenSource.Cancel();
await writeLoop;
}
class NUllDisposable : IDisposable
{
public static IDisposable Instance = new NUllDisposable();
NUllDisposable()
{
}
public void Dispose()
{
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment