Created
November 3, 2023 15:28
-
-
Save pitermarx/8d25cdb168bd18929d44abdd633cd109 to your computer and use it in GitHub Desktop.
Idea for an abstraction over TPL DataFlow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DataPipeline<T> | |
{ | |
private static ExecutionDataflowBlockOptions Options => new() { BoundedCapacity = 1 }; | |
private readonly IReceivableSourceBlock<T> sourceBlock; | |
public DataPipeline(IReceivableSourceBlock<T> sourceBlock) => this.sourceBlock = sourceBlock; | |
public Task Completion => sourceBlock.Completion; | |
public IAsyncEnumerable<T> ReceiveAllAsync(CancellationToken ct = default) => sourceBlock.ReceiveAllAsync(ct); | |
public DataPipeline<TOut> Activity<TOut>(Func<T, Task<TOut>> action) => Link(new TransformBlock<T, TOut>(action, Options)); | |
public DataPipeline<TOut> Activity<TOut>(Func<T, TOut> action) => Link(new TransformBlock<T, TOut>(action, Options)); | |
private DataPipeline<TOut> Link<TOut>(TransformBlock<T, TOut> block) | |
{ | |
sourceBlock.LinkTo(block, new DataflowLinkOptions { PropagateCompletion = true }); | |
return new DataPipeline<TOut>(block); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class DataPipelineExtensions | |
{ | |
// Creates an AsyncEnumerable from an IEnumerable, after passing through the activities on a data pipeline | |
// Each activity runs in parallel with other activities but only one item at a time passes through each activity | |
public static async IAsyncEnumerable<TOut> CreateDataPipeline<T, TOut>( | |
this IEnumerable<T> source, | |
Func<DataPipeline<T>, DataPipeline<TOut>> flowConfig, | |
[EnumeratorCancellation]CancellationToken ct = default) | |
{ | |
var bufferBlock = new BufferBlock<T>(new DataflowBlockOptions{ BoundedCapacity = 1 }); | |
var flow = flowConfig(new DataPipeline<T>(bufferBlock)); | |
_ = source.ConsumeItemsAsync(bufferBlock, ct); | |
// always processes all items in the bufferBlock. | |
// only the cancellation token can exit early | |
await foreach (var item in flow.ReceiveAllAsync(CancellationToken.None)) | |
{ | |
yield return item; | |
} | |
if (!flow.Completion.IsCompletedSuccessfully) | |
{ | |
// await the completion to get notified of exceptions in the pipeline | |
await flow.Completion; | |
} | |
} | |
// Consumes | |
private static async Task ConsumeItemsAsync<T>(this IEnumerable<T> source, BufferBlock<T> bufferBlock, CancellationToken ct = default) | |
{ | |
try | |
{ | |
foreach (var item in source) | |
{ | |
if (ct.IsCancellationRequested) break; | |
await bufferBlock.SendAsync(item, ct); | |
} | |
} | |
catch (TaskCanceledException) | |
{ | |
// swallow | |
} | |
catch (Exception e) | |
{ | |
(bufferBlock as IDataflowBlock)?.Fault(e); | |
} | |
bufferBlock.Complete(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Runtime.CompilerServices; | |
using System.Threading.Tasks.Dataflow; | |
var cancellationTokenSource = new CancellationTokenSource(); | |
// If this enumerable is fully consumed, then it will throw | |
var enumerableThatThrows = new []{ "a", "b", "c", "d", "e", "f", "g", "h", "i" } | |
.Select(l => l == "h" ? throw new Exception() : l); | |
var dataFlow = enumerableThatThrows | |
// Helper method to create data pipeline from IEnumerable | |
.CreateDataPipeline( | |
pipeline => pipeline | |
// Each Activity runs in parallel with other activities | |
.Activity(letter => {Console.WriteLine(letter); return letter[0]; }) | |
// But only one item is being processed at a time in each activity | |
.Activity(async letter => { await Task.Delay(1000); return new string(letter, (byte)letter); }) | |
// Can be sync or async | |
.Activity(item => item + "\n"), | |
cancellationTokenSource.Token); | |
Console.WriteLine("Starting..."); | |
await foreach (var item in dataFlow) { | |
Console.Write(item); | |
// we can cancel and terminate the pipeline early | |
if (item.Contains('c')) cancellationTokenSource.Cancel(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment