Skip to content

Instantly share code, notes, and snippets.

@georgevanburgh
Created August 28, 2020 18:15
Show Gist options
  • Save georgevanburgh/c07889e9fc70799ad532721b11ad681b to your computer and use it in GitHub Desktop.
Save georgevanburgh/c07889e9fc70799ad532721b11ad681b to your computer and use it in GitHub Desktop.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Text.Json;
using System.Threading.Tasks;
namespace span
{
class Program
{
static async Task Main(string[] args)
{
var pipeline = new Pipe();
var readTask = Task.Run(() => WriteFileIntoPipeline(pipeline.Writer));
var writeTask = Task.Run(() => ReadJsonFromPipeline(pipeline.Reader));
await Task.WhenAll(new List<Task>{readTask, writeTask}).ConfigureAwait(false);
}
static void ReadJsonFromPipeline(PipeReader reader)
{
while (true)
{
if (reader.TryRead(out var readResult))
{
var buffer = readResult.Buffer;
var lineEndPosition = buffer.PositionOf((byte)'\n');
while (lineEndPosition.HasValue)
{
var lineSlice = buffer.Slice(0, lineEndPosition.Value);
buffer = buffer.Slice(buffer.GetPosition(1, lineEndPosition.Value));
var document = JsonDocument.Parse(lineSlice);
Console.WriteLine(document.RootElement.GetProperty("abc").GetInt32());
lineEndPosition = buffer.PositionOf((byte)'\n');
}
if (readResult.IsCompleted)
break;
}
}
}
static void WriteFileIntoPipeline(PipeWriter writer)
{
using (var stream = File.OpenRead("test.txt"))
{
var bytesRead = 0;
Memory<byte> buffer;
do
{
buffer = writer.GetMemory(1024 * 1024 * 4);
bytesRead = stream.Read(buffer.Span);
writer.Advance(bytesRead);
}
while (bytesRead == buffer.Length);
}
writer.Complete();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment