Skip to content

Instantly share code, notes, and snippets.

@Ilchert
Last active June 6, 2020 10:11
Show Gist options
  • Save Ilchert/b483d9769db651c679b87ee2499d1222 to your computer and use it in GitHub Desktop.
Save Ilchert/b483d9769db651c679b87ee2499d1222 to your computer and use it in GitHub Desktop.
Read json array as async enumerable
public class Program
{
public static async Task Main(string[] args)
{
using var dataResponse = await client.GetAsync();
dataResponse.EnsureSuccessStatusCode();
await using var stream = await dataResponse.Content.ReadAsStreamAsync();
var pipe = PipeReader.Create(stream);
await foreach (var item in GetAsync(pipe))
{
var parsed = Parse(item, 2020);
Console.WriteLine(parsed.exchange);
}
}
private static async IAsyncEnumerable<ReadOnlySequence<byte>> GetAsync(PipeReader reader, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var state = new JsonReaderState(new JsonReaderOptions() { AllowTrailingCommas = true });
while (true)
{
var rr = await reader.ReadAsync(cancellationToken);
var buffer = rr.Buffer;
while (Process(out var data))
yield return data;
reader.AdvanceTo(buffer.Start, buffer.End);
if (rr.IsCompleted)
break;
bool Process(out ReadOnlySequence<byte> item)
{
var utfReader = new Utf8JsonReader(buffer, false, state);
item = default;
if (!ReadToArrayContent(ref utfReader))
return false;
var startIndex = utfReader.TokenStartIndex;
if (utfReader.TrySkip())
{
item = buffer.Slice(startIndex, utfReader.Position);
AdvanceBufferToReaderPosition(ref utfReader);
return true;
}
item = default;
return false;
}
bool ReadToArrayContent(ref Utf8JsonReader utfReader)
{
while (true)
{
switch (utfReader.TokenType)
{
case JsonTokenType.None:
case JsonTokenType.EndArray:
case JsonTokenType.StartArray:
case JsonTokenType.EndObject:
{
if (!utfReader.Read())
{
AdvanceBufferToReaderPosition(ref utfReader);
return false;
}
break;
}
default:
return true;
}
}
}
void AdvanceBufferToReaderPosition(ref Utf8JsonReader utfReader)
{
buffer = buffer.Slice(utfReader.Position);
state = utfReader.CurrentState;
}
}
await reader.CompleteAsync();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment