Created
August 9, 2023 06:57
-
-
Save Arithmomaniac/67fea6f86adee821b534eb28a760c8d5 to your computer and use it in GitHub Desktop.
Presentation - Data Flows with Async Streams and Channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!meta | |
{"kernelInfo":{"defaultKernelName":"csharp","items":[{"aliases":[],"languageName":"csharp","name":"csharp"}]}} | |
#!markdown | |
# Data Flows with Async Streams and Channels | |
Avi Levin (@Arithmomaniac)<br> | |
2023-07-30 | |
#!markdown | |
- `IEnumerable` | |
- `async` | |
- `IAsyncEnumerable` | |
- Channels | |
- Producer/Consumer | |
- Timing - deeper look | |
- Enhancing LINQ - `ParallelSelectAwait` | |
- Splitting and joining | |
#!markdown | |
**What is a data flow** | |
- Processing pipeline in which items "flow/stream" through the process | |
- Declarative, not imperative | |
- "Mountain Streams" analogy | |
#!markdown | |
**IEnumerable** | |
- Often use as a `for` shorthand, but is not | |
- Lazy evaluation (including on chaining - like pulling through a straw) | |
- (JS equivalent - iterators/generators) | |
#!csharp | |
IEnumerable<int> Source() | |
{ | |
for (int i = 0; ; i++) | |
{ | |
yield return i; | |
} | |
} | |
IEnumerable<int> LowerDown() | |
{ | |
foreach (var item in Source()) | |
{ | |
if (item > 1) | |
{ | |
yield break; | |
} | |
Console.WriteLine($"Retrieved {item} from Source"); | |
yield return item * 2; | |
} | |
} | |
IEnumerable<int> LowerDown_AsEnumerator() | |
{ | |
using var enumerator = Source().GetEnumerator(); | |
while (enumerator.MoveNext()) | |
{ | |
var item = enumerator.Current; | |
if (item > 1) | |
{ | |
yield break; | |
} | |
Console.WriteLine($"Retrieved {item} from Source"); | |
yield return item * 2; | |
} | |
} | |
foreach (var i in LowerDown()) | |
{ | |
Console.WriteLine(i); | |
} | |
#!markdown | |
**How does this work?** | |
- Auto-generated state machine | |
#!markdown | |
```csharp | |
private bool MoveNext() | |
{ | |
private int <>1__state; | |
private int <>2__current; | |
private int <i>5__2; // captured variable | |
int IEnumerator<int>.Current => <>2__current; | |
bool MoveNext() | |
{ | |
int num = <>1__state; | |
if (num != 0) | |
{ | |
if (num != 1) // will never be hit but if could be would break loop | |
return false; | |
<>1__state = -1; | |
<i>5__2++; //i++ | |
} | |
else | |
{ | |
<>1__state = -1; | |
<i>5__2 = 0; //int i = 0 | |
} | |
<>2__current = <i>5__2; // i is value to return | |
<>1__state = 1; | |
return true; // return it | |
} | |
} | |
``` | |
#!markdown | |
**async** | |
- I/O operations are inherently asynchronous (does not claim a hold on a CPU thread) | |
- Getting work "off" the calling thread can unfreeze UI, or support increased scalability | |
- But how? | |
![image](https://i.stack.imgur.com/8gA9P.png) | |
#!csharp | |
interface Sample | |
{ | |
void X(Action<int> onX); | |
void Y(int x, Action<int> onY); | |
void Z(int x, int y); | |
Task<int> X(); | |
Task<int> Y(int x); | |
} | |
Sample sample = null!; | |
void Callback() | |
{ | |
sample.X(onX: x => sample.Y(x, onY: y => sample.Z(x, y))); | |
} | |
void Promise() | |
{ | |
sample.X() | |
.ContinueWith(task => new{x = task.Result, y = sample.Y(task.Result)}) | |
.ContinueWith(task2 => sample.Z(task2.Result.x, task2.Result.y)); | |
} | |
async Task Async() | |
{ | |
var x = await sample.X(); | |
var y = await sample.Y(x); | |
sample.Z(x, y); | |
} | |
#!markdown | |
- Note that `async` uses the `Task` object, initially created for multithreading but repurposed for async | |
- Underneath, `async` chains together continuation statements with a state machine | |
- Can mix-and-match using `Task`s as async or promises | |
#!csharp | |
using System.Diagnostics; | |
using System.Threading; | |
var sw = Stopwatch.StartNew(); | |
var task = Delay(); | |
Thread.Sleep(400); | |
Console.WriteLine(sw.ElapsedMilliseconds); | |
await task; | |
Console.WriteLine(sw.ElapsedMilliseconds); | |
async Task Delay() | |
{ | |
await Task.Delay(800); | |
Console.WriteLine(sw.ElapsedMilliseconds); | |
} | |
#!markdown | |
Further reading: | |
- https://tirania.org/blog/archive/2013/Aug-15.html | |
- https://blog.stephencleary.com/2013/11/there-is-no-thread.html | |
- https://devblogs.microsoft.com/dotnet/how-async-await-really-works/ | |
#!markdown | |
**IAsyncEnumerable** | |
- Putting the two together - enumerable where each item retrieved async | |
#!csharp | |
async IAsyncEnumerable<int> DoubleAsync(IAsyncEnumerable<int> source) | |
{ | |
await foreach (var item in source) | |
{ | |
Console.WriteLine($"Retrieved {item} from Source"); | |
yield return item * 2; | |
} | |
} | |
async IAsyncEnumerable<int> DoubleAsyncWithEnumerator(IAsyncEnumerable<int> source) | |
{ | |
var enumerator = source.GetAsyncEnumerator(); | |
while (await enumerator.MoveNextAsync()) | |
{ | |
var item = enumerator.Current; | |
Console.WriteLine($"Retrieved {item} from source using enumerator"); | |
yield return item * 2; | |
} | |
} | |
// nearly impossible to model with regular tasks | |
Task DoubleTaskWithEnumerator(IAsyncEnumerable<int> source) | |
{ | |
var enumerator = source.GetAsyncEnumerator(); | |
return Do(); | |
Task Do() | |
{ | |
return enumerator.MoveNextAsync().AsTask().ContinueWith(b => { | |
if (b.Result) | |
{ | |
var item = enumerator.Current; | |
Console.WriteLine($"Retrieved {item} from source using enumerator"); | |
return Do(); | |
} | |
else | |
{ | |
return Task.CompletedTask; | |
} | |
}).Unwrap(); | |
} | |
} | |
#!markdown | |
Introducing our sample space for the rest of the examples | |
#!csharp | |
#r "nuget:SuperLinq.Async" | |
using SuperLinq.Async; // pretty cool helper library, check it out | |
using System.Text.Json; | |
using System.IO; | |
// read items from a single db partition (like often to do in Cosmos) | |
async IAsyncEnumerable<string> ReadPartition(char prefix) | |
{ | |
for (int i = 0; i < 10; i++) | |
{ | |
var value = $"{prefix}{i}"; | |
yield return value; | |
Console.WriteLine($"{value} read from partition"); | |
await Task.Delay(10); | |
} | |
} | |
// make a remote call to transform your input | |
async Task<int> DoRemoteCalculation(string value) | |
{ | |
await Task.Delay(25); | |
return value.Sum(x => (int)x); | |
} | |
// serialize to a very large JSON file in streaming format | |
async Task<string> Serialize(string tag, IAsyncEnumerable<string> values) | |
{ | |
using var stream = new MemoryStream(); | |
await JsonSerializer.SerializeAsync( | |
stream, | |
new { Tag = tag, Values = values.Do(async _ => await Task.Delay(10)) }); | |
return Encoding.UTF8.GetString(stream.ToArray()); | |
} | |
#!markdown | |
Using standard LINQ chaining - lots of "dead" time because lazy execution | |
#!csharp | |
using System.Diagnostics; | |
await NaiveSinglePartitionAndFile(); | |
async Task NaiveSinglePartitionAndFile() | |
{ | |
var sw = Stopwatch.StartNew(); | |
await Serialize("A", ReadPartition('a')); | |
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms"); | |
} | |
#!markdown | |
**Channels** | |
- Async-compatible Producer/Consumer collection | |
- Buffer that can be added to on one thread and consumed on another | |
- Can tell difference between empty and no new items | |
- Compare to non-async (thread-holding) `BlockingCollection` | |
#!csharp | |
using System.Diagnostics; | |
using System.Threading.Channels; | |
await ChannelSinglePartitionAndFile(); | |
// this example mimics EventProcessor/PreAggregator flow in XIDS | |
async Task ChannelSinglePartitionAndFile() | |
{ | |
var sw = Stopwatch.StartNew(); | |
var channel = Channel.CreateUnbounded<string>(); | |
var writeTask = Serialize("A", channel.Reader.ReadAllAsync()); | |
await ReadPartition('a').ForEachAwaitAsync(async item => await channel.Writer.WriteAsync(item)); | |
channel.Writer.Complete(); | |
await writeTask; | |
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms"); | |
} | |
#!markdown | |
Other advantages to this pattern: | |
- Can wait on and create new channels for "checkpointing" | |
- Allows for the consumer of the channel to be "stateful" without creating a state object to recieve events | |
#!markdown | |
- What's with the "async" here | |
- Easy to understand on read - items need to show up | |
- But writing when there's a buffer - `Bounded` channel... | |
- Alternative dropping behaviors | |
#!csharp | |
using System.Diagnostics; | |
using System.Threading.Channels; | |
var sw = Stopwatch.StartNew(); | |
var channel = Channel.CreateBounded<int>(2); | |
await Task.WhenAll(WriteChannel(), ReadChannel()); | |
async Task WriteChannel() | |
{ | |
foreach (var i in Enumerable.Range(0, 10)) | |
{ | |
while (!channel.Writer.TryWrite(i)) | |
{ | |
Console.WriteLine($"{sw.ElapsedMilliseconds}: Cannot write {i} yet"); | |
await Task.Delay(100); | |
} | |
Console.WriteLine($"{sw.ElapsedMilliseconds}: Wrote {i}"); | |
if (i == 5) | |
{ | |
await Task.Delay(500); | |
} | |
} | |
channel.Writer.Complete(); | |
} | |
async Task ReadChannel() | |
{ | |
await Task.Delay(500); | |
while (true) | |
{ | |
if (!channel.Reader.TryRead(out var i)) | |
{ | |
if (channel.Reader.Completion.IsCompletedSuccessfully) | |
{ | |
Console.WriteLine($"{sw.ElapsedMilliseconds}: all done"); | |
break; | |
} | |
else | |
{ | |
Console.WriteLine($"{sw.ElapsedMilliseconds}: Cannot read yet"); | |
await Task.Delay(100); | |
} | |
} | |
else | |
{ | |
Console.WriteLine($"{sw.ElapsedMilliseconds}: Read {i}"); | |
} | |
} | |
} | |
#!markdown | |
**Enhancing standard LINQ chaining** | |
#!csharp | |
await ForEachAsync(); | |
await ParallelForEachAsync(); | |
async Task ForEachAsync() | |
{ | |
var sw = Stopwatch.StartNew(); | |
foreach (var i in Enumerable.Range(0, 50)) { await Task.Delay(5); } | |
Console.WriteLine(sw.ElapsedMilliseconds); | |
} | |
async Task ParallelForEachAsync() | |
{ | |
var sw = Stopwatch.StartNew(); | |
await Parallel.ForEachAsync( | |
Enumerable.Range(0, 50), | |
async (i, _) => { await Task.Delay(5); }); | |
Console.WriteLine(sw.ElapsedMilliseconds); | |
} | |
#!csharp | |
async Task<int> Work(int i) | |
{ | |
await Task.Delay(5); | |
return i * 2; | |
} | |
await SelectAwait(); | |
async Task SelectAwait() | |
{ | |
var sw = Stopwatch.StartNew(); | |
var array = await Enumerable.Range(0, 50).ToAsyncEnumerable() | |
.SelectAwait(async i => await Work(i)) //built-in - Linq.Async | |
.ToArrayAsync(); | |
Console.WriteLine(string.Join(',', array)); | |
Console.WriteLine(sw.ElapsedMilliseconds); | |
} | |
#!markdown | |
- How to do _this_ in parallel? | |
- Parallel.ForEachAsync doesn't have a return object | |
- PLINQ `.AsParallel()` allocates threads and doesn't work with IAsyncEnumerable anyway | |
- Use `ForEachAsync` to write to a channel, and read the channel as the result | |
- Used this often in ADP Data Ingestion, e.g. for getting signed URIs for files | |
- This solution doesn't have ordering; more complex ones exist (e.g. enqueuing `Task`s in a Channel) | |
#!csharp | |
using System.Threading; | |
static async IAsyncEnumerable<TOutput> ParallelSelectAwaitImpl<TInput, TOutput>( | |
IAsyncEnumerable<TInput> source, | |
Func<TInput, ValueTask<TOutput>> body) | |
{ | |
var channel = System.Threading.Channels.Channel.CreateBounded<TOutput>(Environment.ProcessorCount); | |
using var cts = new CancellationTokenSource(); | |
_ = RunForEachAsync(cts); | |
// force processing from ParallelForEachAsync to stop after the first time the iterator | |
// is used (finally is called when the generated iterator is disposed). | |
// This ensures ParallelForEachAsync halts in case of an invocation of Any(), First(), etc | |
// where the desired result is achieved before getting to the end of the enumerable. | |
try | |
{ | |
// do not pass in the cancellation token; so that error bubbles up from ParallelForEachAsync | |
// even after ParallelForEachAsync sets the cancellation token | |
await foreach (var item in channel.Reader.ReadAllAsync(CancellationToken.None)) | |
{ | |
yield return item; | |
} | |
} | |
finally | |
{ | |
cts.Cancel(); | |
} | |
// Run ParallelForEachAsync and, when done, notify the writer that work is done (nothing more to yield). | |
// ParallelForEachAsync assumes responsibility for halting on the cancellation token, which we need to do to | |
// ensure TryComplete is fired. | |
async Task RunForEachAsync(CancellationTokenSource cts) | |
{ | |
try | |
{ | |
await Parallel.ForEachAsync( | |
source, | |
cts.Token, | |
async (item, token) => | |
{ | |
var output = await body(item); | |
await channel.Writer.WriteAsync(output, token); | |
}); | |
channel.Writer.TryComplete(); | |
} | |
catch (Exception ex) | |
{ | |
channel.Writer.TryComplete(ex); | |
} | |
} | |
} | |
#!csharp | |
await ParallelSelectAwait(); | |
async Task ParallelSelectAwait() | |
{ | |
var sw = Stopwatch.StartNew(); | |
var array = await ParallelSelectAwaitImpl( | |
Enumerable.Range(0, 50).ToAsyncEnumerable(), | |
async i => await Work(i) | |
).ToArrayAsync(); | |
Console.WriteLine(string.Join(',', array)); | |
Console.WriteLine(sw.ElapsedMilliseconds); | |
} | |
#!markdown | |
**Split/Join** | |
- How to "funnel" two enumerables into a single pipeline? | |
- Lower-level solutions exist, see e.g. [SuperLinq.ConcurrentMerge](https://viceroypenguin.github.io/SuperLinq/api/SuperLinq.Async.AsyncSuperEnumerable.ConcurrentMerge) | |
#!csharp | |
await ChannelTwoPartitionsAndFile(); | |
async Task ChannelTwoPartitionsAndFile() | |
{ | |
var sw = Stopwatch.StartNew(); | |
// note optimization in channel options - thanks EliA | |
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(5) { SingleReader = true }); | |
var writeTask = Serialize("A", channel.Reader.ReadAllAsync()); | |
await Task.WhenAll( | |
ReadPartition('a').ForEachAwaitAsync(async item => await channel.Writer.WriteAsync(item)), | |
ReadPartition('b').ForEachAwaitAsync(async item => await channel.Writer.WriteAsync(item)) | |
); | |
channel.Writer.Complete(); | |
await writeTask; | |
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms"); | |
} | |
#!markdown | |
- Split single enumerable into multiple outputs | |
- Unlike joining, no lower-level way to do this | |
#!csharp | |
await ChannelPartitionAndTwoFiles(); | |
async Task ChannelPartitionAndTwoFiles() | |
{ | |
var sw = Stopwatch.StartNew(); | |
var channelA = Channel.CreateBounded<string>(new BoundedChannelOptions(5) { SingleWriter = true }); | |
var channelB = Channel.CreateBounded<string>(new BoundedChannelOptions(5) { SingleWriter = true }); | |
var readTaskA = Serialize("A", channelA.Reader.ReadAllAsync()); | |
var readTaskB = Serialize("B", channelB.Reader.ReadAllAsync()); | |
await ReadPartition('a').ForEachAwaitAsync(async (item, i) => { | |
var channel = (i % 2 == 0 ? channelA : channelB); | |
await channel.Writer.WriteAsync(item); | |
}); | |
channelA.Writer.Complete(); | |
channelB.Writer.Complete(); | |
await Task.WhenAll(readTaskA, readTaskB); | |
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms"); | |
} | |
#!markdown | |
Careful when firing and (temporarily) forgetting groups of tasks! | |
- Cancellation and error propagation - when one fails, stop the other | |
Recommend [`StructuredConcurrency`](https://github.com/StephenCleary/StructuredConcurrency) library | |
#!markdown | |
Further reading on Channels | |
- https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/ | |
#!markdown | |
**Alternatives** | |
- [TPL Dataflow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) | |
- Join "Blocks" together | |
- Extremely powerfull, but overkill for most scenarios | |
- Has separate LINQ-like operations for a steep learning curve | |
- [Rx.NET](https://github.com/dotnet/reactive) | |
- Explicit pub/sub with `IObservable<T>` | |
- Limited developement and support | |
#!markdown | |
**In conclusion** | |
- Enumerables for just-in-time processing | |
- Async enables parallelism for free | |
- Decouple components by linking via Channels |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment