Last active
September 5, 2023 16:36
-
-
Save govert/1bf0a1026ec3aaee19a8 to your computer and use it in GitHub Desktop.
Async Batching Sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Timers; | |
using Timer = System.Timers.Timer; | |
using ExcelDna.Integration; | |
namespace ExcelDnaSamples | |
{ | |
public static class AsyncBatchExample | |
{ | |
// 1. Create an instance of the AsyncBatchUtil, passing in some parameters and the batch running function. | |
static readonly AsyncBatchUtil BatchRunner = new AsyncBatchUtil(1000, TimeSpan.FromMilliseconds(250), RunBatch); | |
// 2. This function will be called for each batch, on a ThreadPool thread. | |
// Each AsyncCall contains the function name and arguments passed from the function. | |
// The List<object> returned by the Task must contain the results, corresponding to the calls list. | |
static Task<List<object>> RunBatch(List<AsyncBatchUtil.AsyncCall> calls) | |
{ | |
var batchStart = DateTime.Now; | |
return Task.Factory.StartNew(() => | |
{ | |
// Simulate things taking a while... | |
Thread.Sleep(TimeSpan.FromSeconds(10)); | |
// Now build up the results | |
var results = new List<object>(); | |
int i = 0; | |
foreach(var call in calls) | |
{ | |
// As an example just an informative string | |
var result = string.Format("{0} - {1} : {2}/{3} @ {4:HH:mm:ss.fff}", call.FunctionName, call.Arguments[0], i++, calls.Count, batchStart); | |
results.Add(result); | |
} | |
return results; | |
}); | |
} | |
// 3. The wrapper functions just call BatchRunner.Run(...) | |
[ExcelFunction(Description="A function that runs in batches")] | |
public static object BatchedFunction(string code, int value) | |
{ | |
return BatchRunner.Run("BatchedFunction", code, value); | |
} | |
} | |
// This is the main helper class for supporting batched async calls | |
// To use: | |
// 1. Create an instance of AsyncBatchUtil, passing in a Func<List<AsyncCall>, Task<List<object>>> delegate to run each batch. | |
// 2. The batch runner delegate will run on a threadpool thread, and returns a Task object | |
// which completes and returns a list of results when the batch is done. | |
// 3. Call from inside a batched function as: | |
// public static object SlowFunction(string code, int value) | |
// { | |
// return BatchRunner.Run("SlowFunction", code, value); | |
// } | |
public class AsyncBatchUtil | |
{ | |
// Represents a single function call in a batch | |
public class AsyncCall | |
{ | |
internal TaskCompletionSource<object> TaskCompletionSource; | |
public string FunctionName { get; private set; } | |
public object[] Arguments { get; private set; } | |
public AsyncCall(TaskCompletionSource<object> taskCompletion, string functionName, object[] args) | |
{ | |
TaskCompletionSource = taskCompletion; | |
FunctionName = functionName; | |
Arguments = args; | |
} | |
} | |
// Not a hard limit | |
readonly int _maxBatchSize; | |
readonly Func<List<AsyncCall>, Task<List<object>>> _batchRunner; | |
readonly object _lock = new object(); | |
readonly Timer _batchTimer; // Timer events will fire from a ThreadPool thread | |
List<AsyncCall> _currentBatch; | |
public AsyncBatchUtil(int maxBatchSize, TimeSpan batchTimeout, Func<List<AsyncCall>, Task<List<object>>> batchRunner) | |
{ | |
if (maxBatchSize < 1) | |
{ | |
throw new ArgumentOutOfRangeException("maxBatchSize", "Max batch size must be positive"); | |
} | |
if (batchRunner == null) | |
{ | |
// Check early - otherwise the NullReferenceException would happen in a threadpool callback. | |
throw new ArgumentNullException("batchRunner"); | |
} | |
_maxBatchSize = maxBatchSize; | |
_batchRunner = batchRunner; | |
_currentBatch = new List<AsyncCall>(); | |
_batchTimer = new Timer(batchTimeout.TotalMilliseconds); | |
_batchTimer.AutoReset = false; | |
_batchTimer.Elapsed += TimerElapsed; | |
// Timer is not Enabled (Started) by default | |
} | |
// Will only run on the main thread | |
public object Run(string functionName, params object[] args) | |
{ | |
return ExcelAsyncUtil.Observe(functionName, args, delegate | |
{ | |
var tcs = new TaskCompletionSource<object>(); | |
EnqueueAsyncCall(tcs, functionName, args); | |
return new TaskExcelObservable(tcs.Task); | |
}); | |
} | |
// Will only run on the main thread | |
void EnqueueAsyncCall(TaskCompletionSource<object> taskCompletion, string functionName, object[] args) | |
{ | |
lock (_lock) | |
{ | |
_currentBatch.Add(new AsyncCall(taskCompletion, functionName, args)); | |
// Check if the batch size has been reached, schedule it to be run | |
if (_currentBatch.Count >= _maxBatchSize) | |
{ | |
// This won't run the batch immediately, but will ensure that the current batch (containing this call) will run soon. | |
ThreadPool.QueueUserWorkItem(state => RunBatch((List<AsyncCall>)state), _currentBatch); | |
_currentBatch = new List<AsyncCall>(); | |
_batchTimer.Stop(); | |
} | |
else | |
{ | |
// We don't know if the batch containing the current call will run, | |
// so ensure that a timer is started. | |
if (!_batchTimer.Enabled) | |
{ | |
_batchTimer.Start(); | |
} | |
} | |
} | |
} | |
// Will run on a ThreadPool thread | |
void TimerElapsed(object sender, ElapsedEventArgs e) | |
{ | |
List<AsyncCall> batch; | |
lock (_lock) | |
{ | |
batch = _currentBatch; | |
_currentBatch = new List<AsyncCall>(); | |
} | |
RunBatch(batch); | |
} | |
// Will always run on a ThreadPool thread | |
// Might be re-entered... | |
// batch is allowed to be empty | |
void RunBatch(List<AsyncCall> batch) | |
{ | |
// Maybe due to Timer re-entrancy we got an empty batch...? | |
if (batch.Count == 0) | |
{ | |
// No problem - just return | |
return; | |
} | |
var batchTask = _batchRunner(batch); | |
batchTask.ContinueWith(t => | |
{ | |
try | |
{ | |
var resultList = t.Result; | |
if (resultList.Count != batch.Count) | |
{ | |
throw new InvalidOperationException(string.Format("Batch result size incorrect. Batch Count: {0}, Result Count: {1}", batch.Count, resultList.Count)); | |
} | |
for (int i = 0; i < resultList.Count; i++) | |
{ | |
batch[i].TaskCompletionSource.SetResult(resultList[i]); | |
} | |
} | |
catch (Exception ex) | |
{ | |
foreach (var call in batch) | |
{ | |
call.TaskCompletionSource.SetException(ex); | |
} | |
} | |
}); | |
} | |
// Helper class to turn a tak into an IExcelObservable that either returns the task result and completes, or pushes an Exception | |
class TaskExcelObservable : IExcelObservable | |
{ | |
readonly Task<object> _task; | |
public TaskExcelObservable(Task<object> task) | |
{ | |
_task = task; | |
} | |
public IDisposable Subscribe(IExcelObserver observer) | |
{ | |
switch (_task.Status) | |
{ | |
case TaskStatus.RanToCompletion: | |
observer.OnNext(_task.Result); | |
observer.OnCompleted(); | |
break; | |
case TaskStatus.Faulted: | |
observer.OnError(_task.Exception.InnerException); | |
break; | |
case TaskStatus.Canceled: | |
observer.OnError(new TaskCanceledException(_task)); | |
break; | |
default: | |
var task = _task; | |
// OK - the Task has not completed synchronously | |
// And handle the Task completion | |
task.ContinueWith(t => | |
{ | |
switch (t.Status) | |
{ | |
case TaskStatus.RanToCompletion: | |
observer.OnNext(t.Result); | |
observer.OnCompleted(); | |
break; | |
case TaskStatus.Faulted: | |
observer.OnError(t.Exception.InnerException); | |
break; | |
case TaskStatus.Canceled: | |
observer.OnError(new TaskCanceledException(t)); | |
break; | |
} | |
}); | |
break; | |
} | |
return DefaultDisposable.Instance; | |
} | |
// Helper class to make an empty IDisposable | |
sealed class DefaultDisposable : IDisposable | |
{ | |
public static readonly DefaultDisposable Instance = new DefaultDisposable(); | |
// Prevent external instantiation | |
DefaultDisposable() {} | |
public void Dispose() {} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment