Created
October 18, 2020 10:21
-
-
Save oguzhaneren/9edc2902e5c67dafff75d47800ae50e9 to your computer and use it in GitHub Desktop.
TPL Dataflow Extensions for IEnumerable<T>
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class EnumerableTplExtensions | |
{ | |
public static Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source, | |
Action<TSource> action, | |
ExecutionDataflowBlockOptions options | |
) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException(nameof(source)); | |
} | |
if (action == null) | |
{ | |
throw new ArgumentNullException(nameof(action)); | |
} | |
if (options == null) | |
{ | |
throw new ArgumentNullException(nameof(options)); | |
} | |
return ForeachWithTpl(source, new ActionBlock<TSource>(action, options), options); | |
} | |
public static Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source, | |
Func<TSource, Task> action, | |
ExecutionDataflowBlockOptions options | |
) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException(nameof(source)); | |
} | |
if (action == null) | |
{ | |
throw new ArgumentNullException(nameof(action)); | |
} | |
if (options == null) | |
{ | |
throw new ArgumentNullException(nameof(options)); | |
} | |
return ForeachWithTpl(source, new ActionBlock<TSource>(action, options), options); | |
} | |
public static Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source, | |
Action<TSource> action, | |
int maxDegreeOfParallelism = -1, | |
CancellationToken cancellationToken = default | |
) | |
{ | |
return ForeachWithTpl(source, action, BuildOptions(maxDegreeOfParallelism, cancellationToken)); | |
} | |
public static Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source, | |
Func<TSource, Task> action, | |
int maxDegreeOfParallelism = -1, | |
CancellationToken cancellationToken = default | |
) | |
{ | |
return ForeachWithTpl(source, action, BuildOptions(maxDegreeOfParallelism, cancellationToken)); | |
} | |
public static Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source, | |
Func<TSource, TResult> transformer, | |
ExecutionDataflowBlockOptions options | |
) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException(nameof(source)); | |
} | |
if (transformer == null) | |
{ | |
throw new ArgumentNullException(nameof(transformer)); | |
} | |
if (options == null) | |
{ | |
throw new ArgumentNullException(nameof(options)); | |
} | |
return TransformWithTpl(source, new TransformBlock<TSource, TResult>(transformer, options), options); | |
} | |
public static Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source, | |
Func<TSource, Task<TResult>> transformer, | |
ExecutionDataflowBlockOptions options | |
) | |
{ | |
if (source == null) | |
{ | |
throw new ArgumentNullException(nameof(source)); | |
} | |
if (transformer == null) | |
{ | |
throw new ArgumentNullException(nameof(transformer)); | |
} | |
if (options == null) | |
{ | |
throw new ArgumentNullException(nameof(options)); | |
} | |
return TransformWithTpl(source, new TransformBlock<TSource, TResult>(transformer, options), options); | |
} | |
public static Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source, | |
Func<TSource, Task<TResult>> transformer, | |
int maxDegreeOfParallelism = -1, | |
CancellationToken cancellationToken = default | |
) | |
{ | |
return TransformWithTpl(source, transformer, BuildOptions(maxDegreeOfParallelism, cancellationToken)); | |
} | |
public static Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source, | |
Func<TSource, TResult> transformer, | |
int maxDegreeOfParallelism = -1, | |
CancellationToken cancellationToken = default | |
) | |
{ | |
return TransformWithTpl(source, transformer, BuildOptions(maxDegreeOfParallelism, cancellationToken)); | |
} | |
private static async Task ForeachWithTpl<TSource>(this IEnumerable<TSource> source, ITargetBlock<TSource> actionBlock, DataflowBlockOptions options) | |
{ | |
foreach (var id in source) | |
{ | |
await actionBlock.SendAsync(id, options.CancellationToken); | |
} | |
actionBlock.Complete(); | |
await actionBlock.Completion; | |
} | |
private static async Task<IEnumerable<TResult>> TransformWithTpl<TSource, TResult>(this IEnumerable<TSource> source, | |
IPropagatorBlock<TSource, TResult> transformBlock, | |
DataflowBlockOptions options) | |
{ | |
var readBlock = new BufferBlock<TResult>(new DataflowBlockOptions | |
{ | |
CancellationToken = options.CancellationToken, | |
EnsureOrdered = options.EnsureOrdered, | |
BoundedCapacity = options.BoundedCapacity, | |
TaskScheduler = options.TaskScheduler, | |
MaxMessagesPerTask = options.MaxMessagesPerTask, | |
NameFormat = options.NameFormat | |
}); | |
transformBlock.LinkTo(readBlock, new DataflowLinkOptions | |
{ | |
PropagateCompletion = true | |
}); | |
foreach (var id in source) | |
{ | |
await transformBlock.SendAsync(id, options.CancellationToken); | |
} | |
transformBlock.Complete(); | |
await transformBlock.Completion; | |
readBlock.TryReceiveAll(out var list); | |
return list; | |
} | |
private static ExecutionDataflowBlockOptions BuildOptions(int maxDegreeOfParallelism = -1, CancellationToken cancellationToken = default) | |
{ | |
return new ExecutionDataflowBlockOptions | |
{ | |
CancellationToken = cancellationToken, | |
MaxDegreeOfParallelism = maxDegreeOfParallelism, | |
EnsureOrdered = false, | |
SingleProducerConstrained = true, | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment