Presentation - Data Flows with Async Streams and Channels
# Data Flows with Async Streams and Channels
Avi Levin (@Arithmomaniac)<br>
- `IEnumerable`
- `async`
- `IAsyncEnumerable`
- Channels
- Producer/Consumer
- Timing - deeper look
- Enhancing LINQ - `ParallelSelectAwait`
- Splitting and joining
**What is a data flow**
- Processing pipeline in which items "flow/stream" through the process
- Declarative, not imperative
- "Mountain Streams" analogy
- Often use as a `for` shorthand, but is not
- Lazy evaluation (including on chaining - like pulling through a straw)
- (JS equivalent - iterators/generators)
IEnumerable<int> Source()
for (int i = 0; ; i++)
yield return i;
IEnumerable<int> LowerDown()
foreach (var item in Source())
if (item > 1)
yield break;
Console.WriteLine($"Retrieved {item} from Source");
yield return item * 2;
IEnumerable<int> LowerDown_AsEnumerator()
using var enumerator = Source().GetEnumerator();
while (enumerator.MoveNext())
var item = enumerator.Current;
if (item > 1)
yield break;
Console.WriteLine($"Retrieved {item} from Source");
yield return item * 2;
foreach (var i in LowerDown())
**How does this work?**
- Auto-generated state machine
private bool MoveNext()
private int <>1__state;
private int <>2__current;
private int <i>5__2; // captured variable
int IEnumerator<int>.Current => <>2__current;
bool MoveNext()
int num = <>1__state;
if (num != 0)
if (num != 1) // will never be hit but if could be would break loop
return false;
<>1__state = -1;
<i>5__2++; //i++
<>1__state = -1;
<i>5__2 = 0; //int i = 0
<>2__current = <i>5__2; // i is value to return
<>1__state = 1;
return true; // return it
- I/O operations are inherently asynchronous (does not claim a hold on a CPU thread)
- Getting work "off" the calling thread can unfreeze UI, or support increased scalability
- But how?
interface Sample
void X(Action<int> onX);
void Y(int x, Action<int> onY);
void Z(int x, int y);
Task<int> X();
Task<int> Y(int x);
Sample sample = null!;
void Callback()
sample.X(onX: x => sample.Y(x, onY: y => sample.Z(x, y)));
void Promise()
.ContinueWith(task => new{x = task.Result, y = sample.Y(task.Result)})
.ContinueWith(task2 => sample.Z(task2.Result.x, task2.Result.y));
async Task Async()
var x = await sample.X();
var y = await sample.Y(x);
sample.Z(x, y);
- Note that `async` uses the `Task` object, initially created for multithreading but repurposed for async
- Underneath, `async` chains together continuation statements with a state machine
- Can mix-and-match using `Task`s as async or promises
using System.Diagnostics;
using System.Threading;
var sw = Stopwatch.StartNew();
var task = Delay();
await task;
async Task Delay()
await Task.Delay(800);
Further reading:
- Putting the two together - enumerable where each item retrieved async
async IAsyncEnumerable<int> DoubleAsync(IAsyncEnumerable<int> source)
await foreach (var item in source)
Console.WriteLine($"Retrieved {item} from Source");
yield return item * 2;
async IAsyncEnumerable<int> DoubleAsyncWithEnumerator(IAsyncEnumerable<int> source)
var enumerator = source.GetAsyncEnumerator();
while (await enumerator.MoveNextAsync())
var item = enumerator.Current;
Console.WriteLine($"Retrieved {item} from source using enumerator");
yield return item * 2;
// nearly impossible to model with regular tasks
Task DoubleTaskWithEnumerator(IAsyncEnumerable<int> source)
var enumerator = source.GetAsyncEnumerator();
return Do();
Task Do()
return enumerator.MoveNextAsync().AsTask().ContinueWith(b => {
if (b.Result)
var item = enumerator.Current;
Console.WriteLine($"Retrieved {item} from source using enumerator");
return Do();
return Task.CompletedTask;
Introducing our sample space for the rest of the examples
#r "nuget:SuperLinq.Async"
using SuperLinq.Async; // pretty cool helper library, check it out
using System.Text.Json;
using System.IO;
// read items from a single db partition (like often to do in Cosmos)
async IAsyncEnumerable<string> ReadPartition(char prefix)
for (int i = 0; i < 10; i++)
var value = $"{prefix}{i}";
yield return value;
Console.WriteLine($"{value} read from partition");
await Task.Delay(10);
// make a remote call to transform your input
async Task<int> DoRemoteCalculation(string value)
await Task.Delay(25);
return value.Sum(x => (int)x);
// serialize to a very large JSON file in streaming format
async Task<string> Serialize(string tag, IAsyncEnumerable<string> values)
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(
new { Tag = tag, Values = values.Do(async _ => await Task.Delay(10)) });
return Encoding.UTF8.GetString(stream.ToArray());
Using standard LINQ chaining - lots of "dead" time because lazy execution
using System.Diagnostics;
await NaiveSinglePartitionAndFile();
async Task NaiveSinglePartitionAndFile()
var sw = Stopwatch.StartNew();
await Serialize("A", ReadPartition('a'));
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms");
- Async-compatible Producer/Consumer collection
- Buffer that can be added to on one thread and consumed on another
- Can tell difference between empty and no new items
- Compare to non-async (thread-holding) `BlockingCollection`
using System.Diagnostics;
using System.Threading.Channels;
await ChannelSinglePartitionAndFile();
// this example mimics EventProcessor/PreAggregator flow in XIDS
async Task ChannelSinglePartitionAndFile()
var sw = Stopwatch.StartNew();
var channel = Channel.CreateUnbounded<string>();
var writeTask = Serialize("A", channel.Reader.ReadAllAsync());
await ReadPartition('a').ForEachAwaitAsync(async item => await channel.Writer.WriteAsync(item));
await writeTask;
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms");
Other advantages to this pattern:
- Can wait on and create new channels for "checkpointing"
- Allows for the consumer of the channel to be "stateful" without creating a state object to recieve events
- What's with the "async" here
- Easy to understand on read - items need to show up
- But writing when there's a buffer - `Bounded` channel...
- Alternative dropping behaviors
using System.Diagnostics;
using System.Threading.Channels;
var sw = Stopwatch.StartNew();
var channel = Channel.CreateBounded<int>(2);
await Task.WhenAll(WriteChannel(), ReadChannel());
async Task WriteChannel()
foreach (var i in Enumerable.Range(0, 10))
while (!channel.Writer.TryWrite(i))
Console.WriteLine($"{sw.ElapsedMilliseconds}: Cannot write {i} yet");
await Task.Delay(100);
Console.WriteLine($"{sw.ElapsedMilliseconds}: Wrote {i}");
if (i == 5)
await Task.Delay(500);
async Task ReadChannel()
await Task.Delay(500);
while (true)
if (!channel.Reader.TryRead(out var i))
if (channel.Reader.Completion.IsCompletedSuccessfully)
Console.WriteLine($"{sw.ElapsedMilliseconds}: all done");
Console.WriteLine($"{sw.ElapsedMilliseconds}: Cannot read yet");
await Task.Delay(100);
Console.WriteLine($"{sw.ElapsedMilliseconds}: Read {i}");
**Enhancing standard LINQ chaining**
await ForEachAsync();
await ParallelForEachAsync();
async Task ForEachAsync()
var sw = Stopwatch.StartNew();
foreach (var i in Enumerable.Range(0, 50)) { await Task.Delay(5); }
async Task ParallelForEachAsync()
var sw = Stopwatch.StartNew();
await Parallel.ForEachAsync(
Enumerable.Range(0, 50),
async (i, _) => { await Task.Delay(5); });
async Task<int> Work(int i)
await Task.Delay(5);
return i * 2;
await SelectAwait();
async Task SelectAwait()
var sw = Stopwatch.StartNew();
var array = await Enumerable.Range(0, 50).ToAsyncEnumerable()
.SelectAwait(async i => await Work(i)) //built-in - Linq.Async
Console.WriteLine(string.Join(',', array));
- How to do _this_ in parallel?
- Parallel.ForEachAsync doesn't have a return object
- PLINQ `.AsParallel()` allocates threads and doesn't work with IAsyncEnumerable anyway
- Use `ForEachAsync` to write to a channel, and read the channel as the result
- Used this often in ADP Data Ingestion, e.g. for getting signed URIs for files
- This solution doesn't have ordering; more complex ones exist (e.g. enqueuing `Task`s in a Channel)
using System.Threading;
static async IAsyncEnumerable<TOutput> ParallelSelectAwaitImpl<TInput, TOutput>(
IAsyncEnumerable<TInput> source,
Func<TInput, ValueTask<TOutput>> body)
var channel = System.Threading.Channels.Channel.CreateBounded<TOutput>(Environment.ProcessorCount);
using var cts = new CancellationTokenSource();
_ = RunForEachAsync(cts);
// force processing from ParallelForEachAsync to stop after the first time the iterator
// is used (finally is called when the generated iterator is disposed).
// This ensures ParallelForEachAsync halts in case of an invocation of Any(), First(), etc
// where the desired result is achieved before getting to the end of the enumerable.
// do not pass in the cancellation token; so that error bubbles up from ParallelForEachAsync
// even after ParallelForEachAsync sets the cancellation token
await foreach (var item in channel.Reader.ReadAllAsync(CancellationToken.None))
yield return item;
// Run ParallelForEachAsync and, when done, notify the writer that work is done (nothing more to yield).
// ParallelForEachAsync assumes responsibility for halting on the cancellation token, which we need to do to
// ensure TryComplete is fired.
async Task RunForEachAsync(CancellationTokenSource cts)
await Parallel.ForEachAsync(
async (item, token) =>
var output = await body(item);
await channel.Writer.WriteAsync(output, token);
catch (Exception ex)
await ParallelSelectAwait();
async Task ParallelSelectAwait()
var sw = Stopwatch.StartNew();
var array = await ParallelSelectAwaitImpl(
Enumerable.Range(0, 50).ToAsyncEnumerable(),
async i => await Work(i)
Console.WriteLine(string.Join(',', array));
- How to "funnel" two enumerables into a single pipeline?
- Lower-level solutions exist, see e.g. [SuperLinq.ConcurrentMerge](
await ChannelTwoPartitionsAndFile();
async Task ChannelTwoPartitionsAndFile()
var sw = Stopwatch.StartNew();
// note optimization in channel options - thanks EliA
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(5) { SingleReader = true });
var writeTask = Serialize("A", channel.Reader.ReadAllAsync());
await Task.WhenAll(
ReadPartition('a').ForEachAwaitAsync(async item => await channel.Writer.WriteAsync(item)),
ReadPartition('b').ForEachAwaitAsync(async item => await channel.Writer.WriteAsync(item))
await writeTask;
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms");
- Split single enumerable into multiple outputs
- Unlike joining, no lower-level way to do this
await ChannelPartitionAndTwoFiles();
async Task ChannelPartitionAndTwoFiles()
var sw = Stopwatch.StartNew();
var channelA = Channel.CreateBounded<string>(new BoundedChannelOptions(5) { SingleWriter = true });
var channelB = Channel.CreateBounded<string>(new BoundedChannelOptions(5) { SingleWriter = true });
var readTaskA = Serialize("A", channelA.Reader.ReadAllAsync());
var readTaskB = Serialize("B", channelB.Reader.ReadAllAsync());
await ReadPartition('a').ForEachAwaitAsync(async (item, i) => {
var channel = (i % 2 == 0 ? channelA : channelB);
await channel.Writer.WriteAsync(item);
await Task.WhenAll(readTaskA, readTaskB);
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms");
Careful when firing and (temporarily) forgetting groups of tasks!
- Cancellation and error propagation - when one fails, stop the other
Recommend [`StructuredConcurrency`]( library
Further reading on Channels
- [TPL Dataflow](
- Join "Blocks" together
- Extremely powerfull, but overkill for most scenarios
- Has separate LINQ-like operations for a steep learning curve
- [Rx.NET](
- Explicit pub/sub with `IObservable<T>`
- Limited developement and support
**In conclusion**
- Enumerables for just-in-time processing
- Async enables parallelism for free
- Decouple components by linking via Channels
