Skip to content

Instantly share code, notes, and snippets.

@Wind010
Created January 11, 2021 18:10
Show Gist options
  • Select an option

  • Save Wind010/84561c43e547747caf4375909b987d84 to your computer and use it in GitHub Desktop.

Select an option

Save Wind010/84561c43e547747caf4375909b987d84 to your computer and use it in GitHub Desktop.
Wrapper around TPL for spawning multiple threads calling the passed in method returning an IEnumerable of the results.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Misc.Engines
{
/// <summary>
/// Meant for CPU bound operations. Long running I/O operations should be run
/// asynchronously. This can be used for combined concurrent and asynchronous operations.
/// </summary>
public class JobEngine : IJobEngine
{
private const int MaxThreadLimit = 250;
private int _maxNumberOfThreads;
public JobEngine(int maxNumberOfThreads)
{
if (maxNumberOfThreads < 0 || maxNumberOfThreads > 250)
{
throw new ArgumentException($"{nameof(maxNumberOfThreads)} must be greater than 0 and less than {MaxThreadLimit}");
}
_maxNumberOfThreads = maxNumberOfThreads;
}
public async Task<IList<TResult>> RunJobsAsync<T, TResult>(
Func<T, Task<TResult>> func, T[] t, int throttleInMs = 0
, CancellationToken cancellationToken = default)
{
// Useful? IAsyncEnumerable
var results = new List<TResult>();
var tasks = new List<Task<TResult>>();
int threadCount = 0;
int index = 0;
// If less than 2 we just run the delegate asynchronously in same thread.
if (_maxNumberOfThreads <= 1)
{
foreach (var item in t.ToList())
{
results.Add(await func(item).ConfigureAwait(false));
}
return results;
}
while (threadCount < _maxNumberOfThreads && index < t.Length)
{
// Interesting to note accessing the index directly inline passed into func
// will yield index out of range. Ex func(t[jobCount]);
var item = t[index];
// Alternatively could use Task.Run.
tasks.Add(Task.Factory.StartNew(async () =>
{
await Task.Delay(throttleInMs);
return await func(item).ConfigureAwait(false);
}, cancellationToken).Unwrap());
index++;
threadCount++;
while (tasks.Any())
{
Task<TResult> completedTask = await Task.WhenAny(tasks);
TResult result = await completedTask;
if (result != null)
{
results.Add(result);
}
tasks.Remove(completedTask);
threadCount--; // Allow new thread to start.
}
}
return results;
}
}
}
@Wind010
Copy link
Copy Markdown
Author

Wind010 commented Jan 11, 2021

Example method:

protected internal async Task<IEnumerable<Response>> MakeApiCall(
    IEnumerable<Request> requests, string jwt, string correlationid)
{
    Func<Request, Task<Response>> func =
        async pi =>
        {
            try
            {
                // Make API Call
                var response = _client.MakeAPICall();
                
                // Validation
                
                return response;
            }
            catch (Exception ex)
            {
                // Log and handle errors.
            }
        };

    IEnumerable<Response> responses= await JobEngine.RunJobsAsync(func
        , requests.ToArray(), throttleInMs); 

    return responses;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment