Skip to content

Instantly share code, notes, and snippets.

@Fasteroid
Last active July 3, 2024 19:21
Show Gist options
  • Save Fasteroid/cec4e2bcd51b6b298f9258f2c5d4e73e to your computer and use it in GitHub Desktop.
Save Fasteroid/cec4e2bcd51b6b298f9258f2c5d4e73e to your computer and use it in GitHub Desktop.
Some C# utilities for parallelization
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using System.Linq;
namespace Fasteroid {
public static class ParallelUtils {
// ------------ Polyfill for older versions of C# ------------
#if !NET6_0_OR_GREATER
public static IEnumerable<IEnumerable<T>> Chunk<T>(this IEnumerable<T> master, int chunkSize){
List<T> chunk = new List<T>(chunkSize);
foreach (var item in master){
chunk.Add(item);
if (chunk.Count == chunkSize){
yield return chunk;
chunk = new List<T>(chunkSize);
}
}
if (chunk.Count > 0){
yield return chunk;
}
}
#endif
// ------------ TranslateInChunks ------------
/// <summary>
/// Rapidly translates elements of <paramref name="input"/> according to <paramref name="translator"/>, in chunks of size <paramref name="chunkSize"/>.<br></br>
/// Returns a <see cref="Task"/> that will resolve to a <see cref="List{Out}"/> when processing completes.
/// </summary>
public static Task<List<Out>> TranslateInChunks<In, Out>( IEnumerable<In> input, Func<In, Out> translator, int chunkSize ) {
List<Task> tasks = new List<Task>( input.Count() / chunkSize );
List<Out> results = new List<Out>( input.Count() );
foreach (var chunk in input.Chunk(chunkSize)){
tasks.Add( Task.Run( () => {
var processed = chunk.Select( translator );
lock (results) {
results.AddRange( processed );
}
}));
}
return Task.Run( () => {
Task.WaitAll(tasks.ToArray());
return results;
});
}
/// <summary>
/// Rapidly translates elements of <paramref name="input"/> according to <paramref name="translator"/>, in chunks of size <paramref name="chunkSize"/>.<br></br>
/// Returns a <see cref="Task"/> that will resolve to a <see cref="List{Out}"/> when processing completes.
/// </summary>
public static Task<List<Out>> TranslateInChunks<In, Out>( Task<IEnumerable<In>> input, Func<In, Out> translator, int chunkSize ) {
return Task.Run( async () => {
return await TranslateInChunks(await input, translator, chunkSize);
});
}
/// <summary>
/// Rapidly translates elements of <paramref name="input"/> according to <paramref name="translator"/>, in chunks of size <paramref name="chunkSize"/>.<br></br>
/// Returns a <see cref="Task"/> that will resolve to a <see cref="List{Out}"/> when processing completes.
/// </summary>
public static Task<List<Out>> TranslateInChunks<In, Out>( IEnumerable<In> input, Func<In, Task<Out>> translator, int chunkSize ) {
return Task.Run( async () => {
var tasks = await TranslateInChunks<In, Task<Out>>(input, translator, chunkSize);
return (await Task.WhenAll(tasks)).ToList();
});
}
/// <summary>
/// Rapidly translates elements of <paramref name="input"/> according to <paramref name="translator"/>, in chunks of size <paramref name="chunkSize"/>.<br></br>
/// Returns a <see cref="Task"/> that will resolve to a <see cref="List{Out}"/> when processing completes.
/// </summary>
public static Task<List<Out>> TranslateInChunks<In, Out>( Task<IEnumerable<In>> input, Func<In, Task<Out>> translator, int chunkSize ) {
return Task.Run( async () => {
var tasks = await TranslateInChunks<In, Task<Out>>(input, translator, chunkSize);
return (await Task.WhenAll(tasks)).ToList();
});
}
// ------------ ProcessInChunks ------------
public delegate void SideEffect<T>(T value);
/// <summary>
/// Rapidly processes elements of <paramref name="input"/> according to <paramref name="process"/>, in chunks of size <paramref name="chunkSize"/>.<br></br>
/// Returns a <see cref="Task"/> that will resolve when processing completes.<br></br>
/// <br></br>
/// For translating elements, see <see cref="TranslateInChunks"/>
/// </summary>
public static Task ProcessInChunks<In>( IEnumerable<In> data, SideEffect<In> process, int chunkSize ) {
List<Task> tasks = new List<Task>(data.Count() / chunkSize);
foreach ( var chunk in data.Chunk(chunkSize) ){
tasks.Add( Task.Run( () => {
foreach ( var value in chunk ) {
process(value);
}
}));
}
return Task.WhenAll(tasks);
}
/// <summary>
/// Rapidly processes elements of <paramref name="input"/> according to <paramref name="process"/>, in chunks of size <paramref name="chunkSize"/>.<br></br>
/// Returns a <see cref="Task"/> that will resolve when processing completes.<br></br>
/// <br></br>
/// For translating elements, see <see cref="TranslateInChunks"/>
/// </summary>
public static Task ProcessInChunks<In>( Task<IEnumerable<In>> data, SideEffect<In> process, int chunkSize ) {
return Task.Run( async () => {
return ProcessInChunks(await data, process, chunkSize);
});
}
/// <summary>
/// Rapidly processes elements of <paramref name="input"/> according to <paramref name="process"/>, in chunks of size <paramref name="chunkSize"/>.<br></br>
/// Returns a <see cref="Task"/> that will resolve when processing completes.<br></br>
/// <br></br>
/// For translating elements, see <see cref="TranslateInChunks"/>
/// </summary>
public static Task ProcessInChunks<In>( IEnumerable<In> data, Func<In, Task> process, int chunkSize ) {
return Task.Run( async () => {
await Task.WhenAll( await TranslateInChunks(data, process, chunkSize) );
});
}
/// <summary>
/// Rapidly processes elements of <paramref name="input"/> according to <paramref name="process"/>, in chunks of size <paramref name="chunkSize"/>.<br></br>
/// Returns a <see cref="Task"/> that will resolve when processing completes.<br></br>
/// <br></br>
/// For translating elements, see <see cref="TranslateInChunks"/>
/// </summary>
public static Task ProcessInChunks<In>( Task<IEnumerable<In>> data, Func<In, Task> process, int chunkSize ) {
return Task.Run( async () => {
await Task.WhenAll( await TranslateInChunks(data, process, chunkSize) );
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment