Created
September 22, 2023 05:51
-
-
Save cajuncoding/48e8b655ff75cfb8789cb55358bb6536 to your computer and use it in GitHub Desktop.
Transform any IEnumerable set into an IAsyncEnumerable with Async Transformation in Chunks (e.g. Async stream REST/GraphQL API results in Chunks)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading.Channels; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Runtime.CompilerServices; | |
namespace CajunCoding.Async | |
{ | |
public static class IEnumerableChunkCustomExtensions | |
{ | |
/// <summary> | |
/// A Custom Extension method that takes an IEnumerable set of items that can be managed as chunks/batches along with | |
/// an Async Transform Func provided as the producer that converts each batch and streams the data for consumption | |
/// as an IAsyncEnumerable. | |
/// | |
/// A great use case for this is when you have a set of Ids for which you need to retrieve data from an External API | |
/// in chunks and want to be able to process the results as fast as possible with automatically managed buffering. | |
/// This streaming will provide maximum throughput between producer & consumer regardless of whether the consumer is | |
/// slower than producer or the producer is slower than the consumer. | |
/// | |
/// Adapted from concepts presented on Stack Overflow: | |
/// Here: https://stackoverflow.com/a/70026589/7293142 | |
/// And Here: https://stackoverflow.com/a/74201074/7293142 | |
/// </summary> | |
/// <typeparam name="TInput"></typeparam> | |
/// <typeparam name="TResult"></typeparam> | |
/// <param name="items"></param> | |
/// <param name="batchSize"></param> | |
/// <param name="asyncTransformerFunc"></param> | |
/// <param name="cancellationToken"></param> | |
/// <returns></returns> | |
public static async IAsyncEnumerable<TResult> ChunkAndTransformAsAsyncEnumerable<TInput, TResult>( | |
this IEnumerable<TInput> items, | |
int batchSize, | |
Func<IEnumerable<TInput>, Task<IEnumerable<TResult>>> asyncTransformerFunc, | |
[EnumeratorCancellation] CancellationToken cancellationToken = default | |
) | |
{ | |
var channel = Channel.CreateUnbounded<TResult>(); | |
//Allow Producer to run on it's own Thread to produce data as fast as possible... | |
//NOTE: This NOT BLOCK a Thread since it's Fully Async! | |
using CancellationTokenSource producerCancellationTokenSource = new(); | |
var producerCancellationToken = producerCancellationTokenSource.Token; | |
var producerTask = Task.Run(async () => | |
{ | |
try | |
{ | |
foreach (var batch in items.Chunk(batchSize)) | |
{ | |
var transformedBatch = await asyncTransformerFunc(batch).ConfigureAwait(false); | |
foreach (var result in transformedBatch) | |
{ | |
if (producerCancellationToken.IsCancellationRequested) return; | |
await channel.Writer.WriteAsync(result, cancellationToken).ConfigureAwait(false); | |
} | |
} | |
} | |
catch (ChannelClosedException) { } // Ignore | |
finally { channel.Writer.TryComplete(); } | |
}, producerCancellationToken); | |
//NOW Stream all items being produced to our caller to consume them as fast as possible! | |
try | |
{ | |
await foreach (var result in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) | |
{ | |
yield return result; | |
//The ThrowIfCancellationRequested inside the await foreach loop here might seem redundant, | |
// but it is actually required because of a by-design behavior of the ReadAllAsync method, that is explained here: | |
// https://stackoverflow.com/questions/67569758/channelreader-readallasynccancellationtoken-not-actually-cancelled-mid-iterati | |
cancellationToken.ThrowIfCancellationRequested(); | |
} | |
//We need to safely await the producerTask to complete... | |
//NOTE: This also allows the propagation of any possible source exception. | |
await producerTask.ConfigureAwait(false); | |
} | |
finally | |
{ | |
// Prevent potential fire-and-forget use case when the calling code may abandon the enumeration and | |
// handle cleanup of the Producer Task if cancelled and Producer is still running... | |
if (!producerTask.IsCompleted) | |
{ | |
producerCancellationTokenSource.Cancel(); | |
channel.Writer.TryComplete(); | |
//We need to safely wait for Task to actually complete after signaling cancellation... | |
try { await producerTask.ConfigureAwait(false); } | |
catch (OperationCanceledException) { } // Ignore | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment