Skip to content

Instantly share code, notes, and snippets.

@cajuncoding
Created September 22, 2023 05:51
Show Gist options
  • Save cajuncoding/48e8b655ff75cfb8789cb55358bb6536 to your computer and use it in GitHub Desktop.
Save cajuncoding/48e8b655ff75cfb8789cb55358bb6536 to your computer and use it in GitHub Desktop.
Transform any IEnumerable set into an IAsyncEnumerable with Async Transformation in Chunks (e.g. Async stream REST/GraphQL API results in Chunks)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Channels;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
namespace CajunCoding.Async
{
public static class IEnumerableChunkCustomExtensions
{
/// <summary>
/// A Custom Extension method that takes an IEnumerable set of items that can be managed as chunks/batches along with
/// an Async Transform Func provided as the producer that converts each batch and streams the data for consumption
/// as an IAsyncEnumerable.
///
/// A great use case for this is when you have a set of Ids for which you need to retrieve data from an External API
/// in chunks and want to be able to process the results as fast as possible with automatically managed buffering.
/// This streaming will provide maximum throughput between producer & consumer regardless of whether the consumer is
/// slower than producer or the producer is slower than the consumer.
///
/// Adapted from concepts presented on Stack Overflow:
/// Here: https://stackoverflow.com/a/70026589/7293142
/// And Here: https://stackoverflow.com/a/74201074/7293142
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="items"></param>
/// <param name="batchSize"></param>
/// <param name="asyncTransformerFunc"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async IAsyncEnumerable<TResult> ChunkAndTransformAsAsyncEnumerable<TInput, TResult>(
this IEnumerable<TInput> items,
int batchSize,
Func<IEnumerable<TInput>, Task<IEnumerable<TResult>>> asyncTransformerFunc,
[EnumeratorCancellation] CancellationToken cancellationToken = default
)
{
var channel = Channel.CreateUnbounded<TResult>();
//Allow Producer to run on it's own Thread to produce data as fast as possible...
//NOTE: This NOT BLOCK a Thread since it's Fully Async!
using CancellationTokenSource producerCancellationTokenSource = new();
var producerCancellationToken = producerCancellationTokenSource.Token;
var producerTask = Task.Run(async () =>
{
try
{
foreach (var batch in items.Chunk(batchSize))
{
var transformedBatch = await asyncTransformerFunc(batch).ConfigureAwait(false);
foreach (var result in transformedBatch)
{
if (producerCancellationToken.IsCancellationRequested) return;
await channel.Writer.WriteAsync(result, cancellationToken).ConfigureAwait(false);
}
}
}
catch (ChannelClosedException) { } // Ignore
finally { channel.Writer.TryComplete(); }
}, producerCancellationToken);
//NOW Stream all items being produced to our caller to consume them as fast as possible!
try
{
await foreach (var result in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
yield return result;
//The ThrowIfCancellationRequested inside the await foreach loop here might seem redundant,
// but it is actually required because of a by-design behavior of the ReadAllAsync method, that is explained here:
// https://stackoverflow.com/questions/67569758/channelreader-readallasynccancellationtoken-not-actually-cancelled-mid-iterati
cancellationToken.ThrowIfCancellationRequested();
}
//We need to safely await the producerTask to complete...
//NOTE: This also allows the propagation of any possible source exception.
await producerTask.ConfigureAwait(false);
}
finally
{
// Prevent potential fire-and-forget use case when the calling code may abandon the enumeration and
// handle cleanup of the Producer Task if cancelled and Producer is still running...
if (!producerTask.IsCompleted)
{
producerCancellationTokenSource.Cancel();
channel.Writer.TryComplete();
//We need to safely wait for Task to actually complete after signaling cancellation...
try { await producerTask.ConfigureAwait(false); }
catch (OperationCanceledException) { } // Ignore
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment