Created
November 18, 2019 13:50
-
-
Save radleta/bbc3bbb7234c7003044f6969f4257ce6 to your computer and use it in GitHub Desktop.
Cargo is a fire and forget processing queue that ensures items are run in payloads of a fixed size when possible based on currency and delay.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Concurrent; | |
using System; | |
using System.Collections.Generic; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Nito.AsyncEx; | |
namespace RichardAdleta | |
{ | |
/// <summary> | |
/// Cargo is a fire and forget processing queue that ensures items are run in payloads of a fixed | |
/// size when possible based on currency and delay. | |
/// </summary> | |
/// <typeparam name="TValue">The type of items to be processed.</typeparam> | |
public class Cargo<TValue> : IDisposable | |
{ | |
private bool _disposedValue = false; // To detect redundant calls | |
private readonly CancellationTokenSource _disposeCancellationTokenSource = new CancellationTokenSource(); | |
private readonly Func<List<TValue>, CancellationToken, Task> _cargo; | |
private readonly int _payload; | |
private readonly int _concurrency; | |
private readonly TimeSpan _timeout; | |
private readonly TimeSpan _delay; | |
private readonly bool _hasDelay; | |
private readonly ConcurrentQueue<TValue> _queue = new ConcurrentQueue<TValue>(); | |
private readonly AsyncAutoResetEvent _moreThanPayloadEvent = new AsyncAutoResetEvent(); | |
private readonly AsyncAutoResetEvent _taskCompletedEvent = new AsyncAutoResetEvent(); | |
private int _activeTask; | |
public int Count => _queue.Count; | |
/// <summary> | |
/// Initializes a new instance of this class. | |
/// </summary> | |
/// <param name="cargo">The func to call to process the <see cref="TValue"/> instances.</param> | |
/// <param name="payload">The size of the payload to send to <c>cargo</c>. Must be equal to or greater than 1.</param> | |
/// <param name="concurrency">The total number of concurrent tasks running to process the queue at any one time.</param> | |
/// <param name="delay">The amount of delay to wait when the queue has less than the total amount in queue.</param> | |
/// <param name="timeout">The timeout of the <see cref="CancellationToken"/> passed to cargo on each execute.</param> | |
public Cargo(Func<List<TValue>, CancellationToken, Task> cargo, int payload, int concurrency, TimeSpan delay, TimeSpan timeout) | |
{ | |
if (payload < 1) | |
{ | |
throw new ArgumentOutOfRangeException(nameof(payload), payload, "Value cannot be less than one."); | |
} | |
if (concurrency < 1) | |
{ | |
throw new ArgumentOutOfRangeException(nameof(concurrency), concurrency, "Value cannot be less than one."); | |
} | |
if (timeout <= TimeSpan.Zero) | |
{ | |
throw new ArgumentOutOfRangeException(nameof(timeout), timeout, "Value cannot be equal or less than zero."); | |
} | |
_cargo = cargo ?? throw new ArgumentNullException(nameof(cargo)); | |
_payload = payload; | |
_concurrency = concurrency; | |
_timeout = timeout; | |
_delay = delay; | |
_hasDelay = delay > TimeSpan.Zero; | |
} | |
private void ThrowObjectDisposedExceptionWhenDisposed() | |
{ | |
if (_disposedValue) throw new ObjectDisposedException(GetType().FullName); | |
} | |
/// <summary> | |
/// Enqueues a single item. | |
/// </summary> | |
/// <param name="value">The item to enqueue.</param> | |
public void Enqueue(TValue value) | |
{ | |
ThrowObjectDisposedExceptionWhenDisposed(); | |
_queue.Enqueue(value); | |
// ensure a task is spawned to process the queue | |
EnsureTaskRunning(); | |
} | |
/// <summary> | |
/// Enqueues multiple items. | |
/// </summary> | |
/// <param name="values">The values to enqueue.</param> | |
public void Enqueue(IEnumerable<TValue> values) | |
{ | |
if (values is null) | |
{ | |
throw new ArgumentNullException(nameof(values)); | |
} | |
ThrowObjectDisposedExceptionWhenDisposed(); | |
foreach (var value in values) | |
{ | |
_queue.Enqueue(value); | |
} | |
// ensure a task is spawned to process the queue | |
EnsureTaskRunning(); | |
} | |
/// <summary> | |
/// Determines whether or not the cargo is currently processing items. | |
/// </summary> | |
/// <returns><c>True</c> the cargo is processing items; otherwise, <c>false</c>.</returns> | |
public bool IsRunning() | |
{ | |
ThrowObjectDisposedExceptionWhenDisposed(); | |
return _activeTask > 0 | |
|| _queue.Count > 0; | |
} | |
/// <summary> | |
/// Waits for the cargo to finish processing items. | |
/// </summary> | |
/// <param name="cancellationToken">The cancellation token.</param> | |
/// <returns>Awaitable task.</returns> | |
public async Task<bool> WaitAsync(System.Threading.CancellationToken cancellationToken) | |
{ | |
ThrowObjectDisposedExceptionWhenDisposed(); | |
while (IsRunning()) | |
{ | |
// check for disposed state | |
if (cancellationToken.IsCancellationRequested) | |
return false; | |
// just check to make sure a task is running | |
EnsureTaskRunning(); | |
// wait for a task to complete | |
await Task.WhenAny( | |
// wait 1s to recheck everything | |
Task.Delay(5000), | |
// wait for a task to complete | |
_taskCompletedEvent.WaitAsync(cancellationToken) | |
); | |
} | |
// success, we've waited till we weren't running | |
return true; | |
} | |
/// <summary> | |
/// Ensures a task is running to process the queue. | |
/// </summary> | |
private void EnsureTaskRunning() | |
{ | |
// we want to signal any tasks | |
// that are being delayed for items | |
if (_hasDelay | |
&& _activeTask > 0 | |
&& _payload < _queue.Count) | |
{ | |
_moreThanPayloadEvent.Set(); | |
} | |
// determine whether or not to spawn a task | |
if (!_disposedValue | |
&& 0 < _queue.Count) | |
{ | |
// capture the current state of active task | |
var currentActiveTask = _activeTask; | |
// when its less than the concurrency | |
if (currentActiveTask < _concurrency) | |
{ | |
// lets try incrementing it | |
var nextActiveTask = currentActiveTask + 1; | |
// fancy thread magic to exchange the value with the next one | |
if (Interlocked.CompareExchange(ref _activeTask, nextActiveTask, currentActiveTask) == currentActiveTask) | |
{ | |
// when we won then we spawn another task | |
Task.Run(ExecuteAsync); | |
} | |
} | |
} | |
} | |
/// <summary> | |
/// The main execution method for cargo. It will loop until no more work to do. | |
/// </summary> | |
/// <returns>Awaitable.</returns> | |
private async Task ExecuteAsync() | |
{ | |
try | |
{ | |
// loop while this class is not disposed | |
while (!_disposedValue | |
&& _queue.Count > 0) | |
{ | |
// delay between executes when its present | |
// except when we have more than one payload | |
if (_hasDelay | |
&& _queue.Count < _payload) | |
{ | |
var delay = new System.Diagnostics.Stopwatch(); | |
delay.Start(); | |
while (!_disposedValue | |
&& _queue.Count < _payload | |
&& delay.Elapsed < _delay) | |
{ | |
var remaining = (int)Math.Max(0, _delay.TotalMilliseconds - delay.ElapsedMilliseconds); | |
if (remaining > 0) | |
{ | |
// go to sleep here waiting for | |
// either our time to be up | |
// or there is enough work to do | |
await Task.WhenAny( | |
Task.Delay(remaining), | |
_moreThanPayloadEvent.WaitAsync(_disposeCancellationTokenSource.Token) | |
); | |
} | |
} | |
// check to see whether we're disposed and bail | |
if (_disposedValue) return; | |
} | |
// loop and build a payload based on the items in the queue | |
var payload = new List<TValue>(); | |
while (!_disposedValue | |
&& payload.Count < _payload | |
&& _queue.TryDequeue(out TValue value)) | |
{ | |
payload.Add(value); | |
} | |
// check to see whether we're disposed and bail | |
if (_disposedValue) return; | |
// exit loop once we have no more work | |
if (payload.Count == 0) break; | |
// do the work by processing the payload by passing it to the cargo func | |
try | |
{ | |
// build a cts to do cancellation based on the requested timeout | |
using (var timeoutCts = new CancellationTokenSource(_timeout)) | |
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, _disposeCancellationTokenSource.Token)) | |
{ | |
await _cargo(payload, linkedCts.Token); | |
} | |
} | |
catch (Exception ex) | |
{ | |
TelemetryClientManager.Default.TrackException(ex); | |
} | |
} | |
} | |
catch (Exception ex) | |
{ | |
TelemetryClientManager.Default.TrackException(ex); | |
} | |
finally | |
{ | |
// decrement the total number of active threads | |
Interlocked.Decrement(ref _activeTask); | |
// check to see whether we should spawn another task | |
// since we just stopped and things might have changed | |
// since we started stopping | |
EnsureTaskRunning(); | |
// denote a task completed | |
_taskCompletedEvent.Set(); | |
} | |
} | |
#region IDisposable Support | |
protected virtual void Dispose(bool disposing) | |
{ | |
if (!_disposedValue) | |
{ | |
_disposeCancellationTokenSource.Dispose(); | |
_disposedValue = true; | |
} | |
} | |
~Cargo() | |
{ | |
// Do not change this code. Put cleanup code in Dispose(bool disposing) above. | |
Dispose(false); | |
} | |
// This code added to correctly implement the disposable pattern. | |
public void Dispose() | |
{ | |
// Do not change this code. Put cleanup code in Dispose(bool disposing) above. | |
Dispose(true); | |
GC.SuppressFinalize(this); | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment