Skip to content

Instantly share code, notes, and snippets.

@oguzhaneren
Created October 18, 2020 10:21
Show Gist options
  • Save oguzhaneren/9edc2902e5c67dafff75d47800ae50e9 to your computer and use it in GitHub Desktop.
Save oguzhaneren/9edc2902e5c67dafff75d47800ae50e9 to your computer and use it in GitHub Desktop.
TPL Dataflow Extensions for IEnumerable<T>
public static class EnumerableTplExtensions
{
public static Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source,
Action<TSource> action,
ExecutionDataflowBlockOptions options
)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (action == null)
{
throw new ArgumentNullException(nameof(action));
}
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
return ForeachWithTpl(source, new ActionBlock<TSource>(action, options), options);
}
public static Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source,
Func<TSource, Task> action,
ExecutionDataflowBlockOptions options
)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (action == null)
{
throw new ArgumentNullException(nameof(action));
}
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
return ForeachWithTpl(source, new ActionBlock<TSource>(action, options), options);
}
public static Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source,
Action<TSource> action,
int maxDegreeOfParallelism = -1,
CancellationToken cancellationToken = default
)
{
return ForeachWithTpl(source, action, BuildOptions(maxDegreeOfParallelism, cancellationToken));
}
public static Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source,
Func<TSource, Task> action,
int maxDegreeOfParallelism = -1,
CancellationToken cancellationToken = default
)
{
return ForeachWithTpl(source, action, BuildOptions(maxDegreeOfParallelism, cancellationToken));
}
public static Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source,
Func<TSource, TResult> transformer,
ExecutionDataflowBlockOptions options
)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (transformer == null)
{
throw new ArgumentNullException(nameof(transformer));
}
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
return TransformWithTpl(source, new TransformBlock<TSource, TResult>(transformer, options), options);
}
public static Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source,
Func<TSource, Task<TResult>> transformer,
ExecutionDataflowBlockOptions options
)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (transformer == null)
{
throw new ArgumentNullException(nameof(transformer));
}
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
return TransformWithTpl(source, new TransformBlock<TSource, TResult>(transformer, options), options);
}
public static Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source,
Func<TSource, Task<TResult>> transformer,
int maxDegreeOfParallelism = -1,
CancellationToken cancellationToken = default
)
{
return TransformWithTpl(source, transformer, BuildOptions(maxDegreeOfParallelism, cancellationToken));
}
public static Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source,
Func<TSource, TResult> transformer,
int maxDegreeOfParallelism = -1,
CancellationToken cancellationToken = default
)
{
return TransformWithTpl(source, transformer, BuildOptions(maxDegreeOfParallelism, cancellationToken));
}
private static async Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source, ITargetBlock<TSource> actionBlock, DataflowBlockOptions options)
{
foreach (var id in source)
{
await actionBlock.SendAsync(id, options.CancellationToken);
}
actionBlock.Complete();
await actionBlock.Completion;
}
private static async Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source,
IPropagatorBlock<TSource, TResult> transformBlock,
DataflowBlockOptions options)
{
var readBlock = new BufferBlock<TResult>(new DataflowBlockOptions
{
CancellationToken = options.CancellationToken,
EnsureOrdered = options.EnsureOrdered,
BoundedCapacity = options.BoundedCapacity,
TaskScheduler = options.TaskScheduler,
MaxMessagesPerTask = options.MaxMessagesPerTask,
NameFormat = options.NameFormat
});
transformBlock.LinkTo(readBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in source)
{
await transformBlock.SendAsync(id, options.CancellationToken);
}
transformBlock.Complete();
await transformBlock.Completion;
readBlock.TryReceiveAll(out var list);
return list;
}
private static ExecutionDataflowBlockOptions BuildOptions(int maxDegreeOfParallelism = -1, CancellationToken cancellationToken = default)
{
return new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = maxDegreeOfParallelism,
EnsureOrdered = false,
SingleProducerConstrained = true,
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment