Last active
July 3, 2024 19:21
-
-
Save Fasteroid/cec4e2bcd51b6b298f9258f2c5d4e73e to your computer and use it in GitHub Desktop.
Some C# utilities for parallelization
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
using System; | |
using System.Linq; | |
namespace Fasteroid { | |
public static class ParallelUtils { | |
// ------------ Polyfill for older versions of C# ------------ | |
#if !NET6_0_OR_GREATER | |
public static IEnumerable<IEnumerable<T>> Chunk<T>(this IEnumerable<T> master, int chunkSize){ | |
List<T> chunk = new List<T>(chunkSize); | |
foreach (var item in master){ | |
chunk.Add(item); | |
if (chunk.Count == chunkSize){ | |
yield return chunk; | |
chunk = new List<T>(chunkSize); | |
} | |
} | |
if (chunk.Count > 0){ | |
yield return chunk; | |
} | |
} | |
#endif | |
// ------------ TranslateInChunks ------------ | |
/// <summary> | |
/// Rapidly translates elements of <paramref name="input"/> according to <paramref name="translator"/>, in chunks of size <paramref name="chunkSize"/>.<br></br> | |
/// Returns a <see cref="Task"/> that will resolve to a <see cref="List{Out}"/> when processing completes. | |
/// </summary> | |
public static Task<List<Out>> TranslateInChunks<In, Out>( IEnumerable<In> input, Func<In, Out> translator, int chunkSize ) { | |
List<Task> tasks = new List<Task>( input.Count() / chunkSize ); | |
List<Out> results = new List<Out>( input.Count() ); | |
foreach (var chunk in input.Chunk(chunkSize)){ | |
tasks.Add( Task.Run( () => { | |
var processed = chunk.Select( translator ); | |
lock (results) { | |
results.AddRange( processed ); | |
} | |
})); | |
} | |
return Task.Run( () => { | |
Task.WaitAll(tasks.ToArray()); | |
return results; | |
}); | |
} | |
/// <summary> | |
/// Rapidly translates elements of <paramref name="input"/> according to <paramref name="translator"/>, in chunks of size <paramref name="chunkSize"/>.<br></br> | |
/// Returns a <see cref="Task"/> that will resolve to a <see cref="List{Out}"/> when processing completes. | |
/// </summary> | |
public static Task<List<Out>> TranslateInChunks<In, Out>( Task<IEnumerable<In>> input, Func<In, Out> translator, int chunkSize ) { | |
return Task.Run( async () => { | |
return await TranslateInChunks(await input, translator, chunkSize); | |
}); | |
} | |
/// <summary> | |
/// Rapidly translates elements of <paramref name="input"/> according to <paramref name="translator"/>, in chunks of size <paramref name="chunkSize"/>.<br></br> | |
/// Returns a <see cref="Task"/> that will resolve to a <see cref="List{Out}"/> when processing completes. | |
/// </summary> | |
public static Task<List<Out>> TranslateInChunks<In, Out>( IEnumerable<In> input, Func<In, Task<Out>> translator, int chunkSize ) { | |
return Task.Run( async () => { | |
var tasks = await TranslateInChunks<In, Task<Out>>(input, translator, chunkSize); | |
return (await Task.WhenAll(tasks)).ToList(); | |
}); | |
} | |
/// <summary> | |
/// Rapidly translates elements of <paramref name="input"/> according to <paramref name="translator"/>, in chunks of size <paramref name="chunkSize"/>.<br></br> | |
/// Returns a <see cref="Task"/> that will resolve to a <see cref="List{Out}"/> when processing completes. | |
/// </summary> | |
public static Task<List<Out>> TranslateInChunks<In, Out>( Task<IEnumerable<In>> input, Func<In, Task<Out>> translator, int chunkSize ) { | |
return Task.Run( async () => { | |
var tasks = await TranslateInChunks<In, Task<Out>>(input, translator, chunkSize); | |
return (await Task.WhenAll(tasks)).ToList(); | |
}); | |
} | |
// ------------ ProcessInChunks ------------ | |
public delegate void SideEffect<T>(T value); | |
/// <summary> | |
/// Rapidly processes elements of <paramref name="input"/> according to <paramref name="process"/>, in chunks of size <paramref name="chunkSize"/>.<br></br> | |
/// Returns a <see cref="Task"/> that will resolve when processing completes.<br></br> | |
/// <br></br> | |
/// For translating elements, see <see cref="TranslateInChunks"/> | |
/// </summary> | |
public static Task ProcessInChunks<In>( IEnumerable<In> data, SideEffect<In> process, int chunkSize ) { | |
List<Task> tasks = new List<Task>(data.Count() / chunkSize); | |
foreach ( var chunk in data.Chunk(chunkSize) ){ | |
tasks.Add( Task.Run( () => { | |
foreach ( var value in chunk ) { | |
process(value); | |
} | |
})); | |
} | |
return Task.WhenAll(tasks); | |
} | |
/// <summary> | |
/// Rapidly processes elements of <paramref name="input"/> according to <paramref name="process"/>, in chunks of size <paramref name="chunkSize"/>.<br></br> | |
/// Returns a <see cref="Task"/> that will resolve when processing completes.<br></br> | |
/// <br></br> | |
/// For translating elements, see <see cref="TranslateInChunks"/> | |
/// </summary> | |
public static Task ProcessInChunks<In>( Task<IEnumerable<In>> data, SideEffect<In> process, int chunkSize ) { | |
return Task.Run( async () => { | |
return ProcessInChunks(await data, process, chunkSize); | |
}); | |
} | |
/// <summary> | |
/// Rapidly processes elements of <paramref name="input"/> according to <paramref name="process"/>, in chunks of size <paramref name="chunkSize"/>.<br></br> | |
/// Returns a <see cref="Task"/> that will resolve when processing completes.<br></br> | |
/// <br></br> | |
/// For translating elements, see <see cref="TranslateInChunks"/> | |
/// </summary> | |
public static Task ProcessInChunks<In>( IEnumerable<In> data, Func<In, Task> process, int chunkSize ) { | |
return Task.Run( async () => { | |
await Task.WhenAll( await TranslateInChunks(data, process, chunkSize) ); | |
}); | |
} | |
/// <summary> | |
/// Rapidly processes elements of <paramref name="input"/> according to <paramref name="process"/>, in chunks of size <paramref name="chunkSize"/>.<br></br> | |
/// Returns a <see cref="Task"/> that will resolve when processing completes.<br></br> | |
/// <br></br> | |
/// For translating elements, see <see cref="TranslateInChunks"/> | |
/// </summary> | |
public static Task ProcessInChunks<In>( Task<IEnumerable<In>> data, Func<In, Task> process, int chunkSize ) { | |
return Task.Run( async () => { | |
await Task.WhenAll( await TranslateInChunks(data, process, chunkSize) ); | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment