Created
October 9, 2019 08:31
-
-
Save jrgcubano/909e3a45c7d6451754fcac1f7b616551 to your computer and use it in GitHub Desktop.
Throttling concurrent async tasks and Run async code in sync
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Ylp.Cms.Web.Infrastructure | |
{ | |
public static class AsyncHelpers | |
{ | |
/// <summary> | |
/// Execute's an async Task<T> method which has a void return value synchronously | |
/// </summary> | |
/// <param name="task"> | |
/// Task<T> method to execute | |
/// </param> | |
public static void RunSync(Func<Task> task) | |
{ | |
var oldContext = SynchronizationContext.Current; | |
var synch = new ExclusiveSynchronizationContext(); | |
SynchronizationContext.SetSynchronizationContext(synch); | |
synch.Post(async _ => | |
{ | |
try | |
{ | |
await task(); | |
} | |
catch (Exception e) | |
{ | |
synch.InnerException = e; | |
throw; | |
} | |
finally | |
{ | |
synch.EndMessageLoop(); | |
} | |
}, null); | |
synch.BeginMessageLoop(); | |
SynchronizationContext.SetSynchronizationContext(oldContext); | |
} | |
/// <summary> | |
/// Execute's an async Task<T> method which has a T return type synchronously | |
/// </summary> | |
/// <typeparam name="T">Return Type</typeparam> | |
/// <param name="task"> | |
/// Task<T> method to execute | |
/// </param> | |
/// <returns></returns> | |
public static T RunSync<T>(Func<Task<T>> task) | |
{ | |
var oldContext = SynchronizationContext.Current; | |
var synch = new ExclusiveSynchronizationContext(); | |
SynchronizationContext.SetSynchronizationContext(synch); | |
T ret = default(T); | |
synch.Post(async _ => | |
{ | |
try | |
{ | |
ret = await task(); | |
} | |
catch (Exception e) | |
{ | |
synch.InnerException = e; | |
throw; | |
} | |
finally | |
{ | |
synch.EndMessageLoop(); | |
} | |
}, null); | |
synch.BeginMessageLoop(); | |
SynchronizationContext.SetSynchronizationContext(oldContext); | |
return ret; | |
} | |
private class ExclusiveSynchronizationContext : SynchronizationContext | |
{ | |
private readonly Queue<Tuple<SendOrPostCallback, object>> _items = | |
new Queue<Tuple<SendOrPostCallback, object>>(); | |
private readonly AutoResetEvent _workItemsWaiting = new AutoResetEvent(false); | |
private bool done; | |
public Exception InnerException { get; set; } | |
public override void Send(SendOrPostCallback d, object state) | |
{ | |
throw new NotSupportedException("We cannot send to our same thread"); | |
} | |
public override void Post(SendOrPostCallback d, object state) | |
{ | |
lock (_items) | |
{ | |
_items.Enqueue(Tuple.Create(d, state)); | |
} | |
_workItemsWaiting.Set(); | |
} | |
public void EndMessageLoop() | |
{ | |
Post(_ => done = true, null); | |
} | |
public void BeginMessageLoop() | |
{ | |
while (!done) | |
{ | |
Tuple<SendOrPostCallback, object> task = null; | |
lock (_items) | |
{ | |
if (_items.Count > 0) | |
{ | |
task = _items.Dequeue(); | |
} | |
} | |
if (task != null) | |
{ | |
task.Item1(task.Item2); | |
if (InnerException != null) // the method threw an exeption | |
{ | |
throw new AggregateException("AsyncHelpers.Run method threw an exception.", InnerException); | |
} | |
} | |
else | |
{ | |
_workItemsWaiting.WaitOne(); | |
} | |
} | |
} | |
public override SynchronizationContext CreateCopy() | |
{ | |
return this; | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Helper class for throttling concurrent async tasks. | |
/// </summary> | |
public static class Throttler | |
{ | |
/// <summary> | |
/// Perform concurrent asynchronous work on a set of data, but limit concurrency to some maximum number of running tasks. | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
/// <param name="data"></param> | |
/// <param name="doWorkAsync"></param> | |
/// <param name="maxConcurrent">Maximum number of tasks that can be running concurrently at any given time. Default is 50.</param> | |
/// <returns></returns> | |
public static async Task ForEachAsync<T>(IEnumerable<T> data, Func<T, Task> doWorkAsync, int maxConcurrent = 50) { | |
var tasks = new List<Task>(); | |
foreach (var x in data) { | |
if (tasks.Count >= maxConcurrent) { // do a quick check first | |
// count running tasks. if at or over limit, wait for one (or more) to complete before letting the next one in | |
while (tasks.Count(t => !t.IsCompleted && !t.IsFaulted) > maxConcurrent) | |
await Task.WhenAny(tasks).ConfigureAwait(false); | |
} | |
tasks.Add(doWorkAsync(x)); | |
} | |
await Task.WhenAll(tasks).ConfigureAwait(false); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment