Created
April 10, 2021 21:56
-
-
Save nickwesselman/3176835929ee4fc5c80bafab49cf507c to your computer and use it in GitHub Desktop.
Read from a multiplexed stream provided by Docker.DotNet for container logs. In parallel, read from the multiplexed streams using a StreamReader and write to the gRPC response. Currently this isn't working -- the gRPC call never returns.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public override async Task StreamContainerLogs(StreamContainerLogsRequest request, IServerStreamWriter<StreamContainerLogsResponse> responseStream, ServerCallContext context) | |
{ | |
const int bufferSize = 81920; | |
var streamTask = DockerClient.Containers.GetContainerLogsAsync(request.Id, false, new ContainerLogsParameters | |
{ | |
Follow = true, | |
Tail = "500", | |
ShowStdout = true, | |
ShowStderr = true | |
}, context.CancellationToken); | |
var buffer = ArrayPool<byte>.Shared.Rent(bufferSize); | |
try | |
{ | |
using (var stream = await streamTask) | |
using (MemoryStream outStream = new MemoryStream(), errStream = new MemoryStream()) | |
using (StreamReader outReader = new StreamReader(outStream), errReader = new StreamReader(errStream)) | |
{ | |
var inTask = Task.Run(async () => | |
{ | |
while (!context.CancellationToken.IsCancellationRequested) | |
{ | |
var result = await stream.ReadOutputAsync(buffer, 0, buffer.Length, context.CancellationToken).ConfigureAwait(false); | |
if (result.EOF) | |
{ | |
return; | |
} | |
Stream target; | |
switch (result.Target) | |
{ | |
case MultiplexedStream.TargetStream.StandardOut: | |
target = outStream; | |
break; | |
case MultiplexedStream.TargetStream.StandardError: | |
target = errStream; | |
break; | |
default: | |
throw new InvalidOperationException($"Unexpected TargetStream: {result.Target}"); | |
} | |
await target.WriteAsync(buffer, 0, result.Count, context.CancellationToken).ConfigureAwait(false); | |
} | |
}); | |
Func<StreamReader, Task> readTheReader = async (StreamReader reader) => | |
{ | |
while (!context.CancellationToken.IsCancellationRequested) | |
{ | |
var line = await reader.ReadLineAsync(); | |
if (line == null) | |
{ | |
return; | |
} | |
await responseStream.WriteAsync(new StreamContainerLogsResponse | |
{ | |
Log = line | |
}); | |
} | |
}; | |
var outTask = Task.Run(() => readTheReader(outReader)); | |
var errTask = Task.Run(() => readTheReader(errReader)); | |
Task.WaitAll(new[] { inTask, outTask, errTask }); | |
} | |
} | |
finally | |
{ | |
ArrayPool<byte>.Shared.Return(buffer); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment