Created
January 11, 2021 18:10
-
-
Save Wind010/84561c43e547747caf4375909b987d84 to your computer and use it in GitHub Desktop.
Wrapper around TPL for spawning multiple threads calling the passed in method returning an IEnumerable of the results.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| namespace Misc.Engines | |
| { | |
| /// <summary> | |
| /// Meant for CPU bound operations. Long running I/O operations should be run | |
| /// asynchronously. This can be used for combined concurrent and asynchronous operations. | |
| /// </summary> | |
| public class JobEngine : IJobEngine | |
| { | |
| private const int MaxThreadLimit = 250; | |
| private int _maxNumberOfThreads; | |
| public JobEngine(int maxNumberOfThreads) | |
| { | |
| if (maxNumberOfThreads < 0 || maxNumberOfThreads > 250) | |
| { | |
| throw new ArgumentException($"{nameof(maxNumberOfThreads)} must be greater than 0 and less than {MaxThreadLimit}"); | |
| } | |
| _maxNumberOfThreads = maxNumberOfThreads; | |
| } | |
| public async Task<IList<TResult>> RunJobsAsync<T, TResult>( | |
| Func<T, Task<TResult>> func, T[] t, int throttleInMs = 0 | |
| , CancellationToken cancellationToken = default) | |
| { | |
| // Useful? IAsyncEnumerable | |
| var results = new List<TResult>(); | |
| var tasks = new List<Task<TResult>>(); | |
| int threadCount = 0; | |
| int index = 0; | |
| // If less than 2 we just run the delegate asynchronously in same thread. | |
| if (_maxNumberOfThreads <= 1) | |
| { | |
| foreach (var item in t.ToList()) | |
| { | |
| results.Add(await func(item).ConfigureAwait(false)); | |
| } | |
| return results; | |
| } | |
| while (threadCount < _maxNumberOfThreads && index < t.Length) | |
| { | |
| // Interesting to note accessing the index directly inline passed into func | |
| // will yield index out of range. Ex func(t[jobCount]); | |
| var item = t[index]; | |
| // Alternatively could use Task.Run. | |
| tasks.Add(Task.Factory.StartNew(async () => | |
| { | |
| await Task.Delay(throttleInMs); | |
| return await func(item).ConfigureAwait(false); | |
| }, cancellationToken).Unwrap()); | |
| index++; | |
| threadCount++; | |
| while (tasks.Any()) | |
| { | |
| Task<TResult> completedTask = await Task.WhenAny(tasks); | |
| TResult result = await completedTask; | |
| if (result != null) | |
| { | |
| results.Add(result); | |
| } | |
| tasks.Remove(completedTask); | |
| threadCount--; // Allow new thread to start. | |
| } | |
| } | |
| return results; | |
| } | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example method: