Created
October 15, 2023 13:57
-
-
Save michel-pi/f766cc3ac6efb066ab69d807744d2db8 to your computer and use it in GitHub Desktop.
Provides services for managing the queue of work items for a thread.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Snippets.Threading | |
{ | |
/// <summary> | |
/// Provides services for managing the queue of work items for a thread. | |
/// </summary> | |
public sealed class WorkerThreadDispatcher | |
{ | |
private readonly Queue<WorkQueueItem> _queue; | |
/// <summary> | |
/// Initializes a new instance of the <see cref="WorkerThreadDispatcher"/> class. | |
/// </summary> | |
public WorkerThreadDispatcher() | |
{ | |
_queue = new Queue<WorkQueueItem>(); | |
} | |
/// <summary> | |
/// Executes the specified <see cref="Action"/> synchronously on the thread the <see cref="WorkerThreadDispatcher"/> is associated with. | |
/// </summary> | |
/// <param name="callback">A delegate to invoke through the dispatcher.</param> | |
/// <remarks> | |
/// Blocks the current thread until the <paramref name="callback" /> is executed. | |
/// </remarks> | |
public void Invoke(Action callback) | |
{ | |
_ = Invoke(() => | |
{ | |
callback(); | |
return (object?)null; | |
}); | |
} | |
/// <summary> | |
/// Executes the specified <see cref="Func{TResult}"/> synchronously on the thread the <see cref="WorkerThreadDispatcher"/> is associated with. | |
/// </summary> | |
/// <typeparam name="T">The return value type of the specified delegate.</typeparam> | |
/// <param name="callback">A delegate to invoke through the dispatcher.</param> | |
/// <returns>The value returned by <paramref name="callback"/>.</returns> | |
/// <remarks> | |
/// Blocks the current thread until the <paramref name="callback" /> is executed. | |
/// </remarks> | |
public T? Invoke<T>(Func<T?> callback) | |
{ | |
WorkQueueItem item; | |
lock (this) | |
{ | |
item = EnqueueWorkItem(() => callback(), true); | |
} | |
object? a = item.WaitExecuted(); | |
return (T?)a; | |
} | |
/// <summary> | |
/// Asynchronously executes the specified <see cref="Action"/> synchronously on the thread the <see cref="WorkerThreadDispatcher"/> is associated with. | |
/// </summary> | |
/// <param name="callback">A delegate to invoke through the dispatcher.</param> | |
/// <returns>A task that represents the asynchronous operation.</returns> | |
public Task InvokeAsync(Action callback) | |
{ | |
return InvokeAsync(() => | |
{ | |
callback(); | |
return (object?)null; | |
}); | |
} | |
/// <summary> | |
/// Asynchronously executes the specified <see cref="Func{TResult}"/> synchronously on the thread the <see cref="WorkerThreadDispatcher"/> is associated with. | |
/// </summary> | |
/// <typeparam name="T">The return value type of the specified delegate.</typeparam> | |
/// <param name="callback">A delegate to invoke through the dispatcher.</param> | |
/// <returns>A task that represents the asynchronous operation which wraps the result.</returns> | |
public Task<T?> InvokeAsync<T>(Func<T?> callback) | |
{ | |
WorkQueueItem item; | |
lock (this) | |
{ | |
item = EnqueueWorkItem(() => callback(), true); | |
} | |
return Task.Factory.StartNew(() => (T?)item.WaitExecuted()); | |
} | |
/// <summary> | |
/// Executes the specified <see cref="Action"/> on the thread the <see cref="WorkerThreadDispatcher"/> is associated with. | |
/// </summary> | |
/// <param name="callback">A delegate to invoke through the dispatcher.</param> | |
/// <remarks> | |
/// Does not wait until the <paramref name="callback"/> is executed. | |
/// </remarks> | |
public void InvokeWithoutWait(Action callback) | |
{ | |
lock (this) | |
{ | |
_ = EnqueueWorkItem(() => | |
{ | |
callback(); | |
return null; | |
}, false); | |
} | |
} | |
/// <summary> | |
/// Process a single work item. | |
/// </summary> | |
/// <returns>Returns <see langword="true"/> when more work items can be processed; otherwise <see langword="false"/>.</returns> | |
/// <remarks> | |
/// Use <see langword="lock"/> on this instance for thread safety and convenience. | |
/// </remarks> | |
public bool ProcessWork() | |
{ | |
if (_queue.Count == 0) | |
{ | |
return false; | |
} | |
var item = _queue.Dequeue(); | |
if (item.Execute()) | |
{ | |
// dispose the item when no one is waiting for the result. fire and forget. | |
item.Dispose(); | |
} | |
return _queue.Count != 0; | |
} | |
private WorkQueueItem EnqueueWorkItem(Func<object?> callback, bool deferredDispose) | |
{ | |
var item = new WorkQueueItem(callback, deferredDispose); | |
_queue.Enqueue(item); | |
return item; | |
} | |
internal class WorkQueueItem : IDisposable | |
{ | |
private readonly AutoResetEvent _autoResetEvent; | |
private readonly bool _deferredDispose; | |
private readonly Func<object?> _callback; | |
private object? _callbackResult; | |
public WorkQueueItem(Func<object?> callback, bool deferredDispose) | |
{ | |
ArgumentNullException.ThrowIfNull(callback, nameof(callback)); | |
_callback = callback; | |
_autoResetEvent = new AutoResetEvent(false); | |
_deferredDispose = deferredDispose; | |
} | |
// returns true when the caller should dispose this item. | |
public bool Execute() | |
{ | |
var callbackResult = _callback.Invoke(); | |
Volatile.Write(ref _callbackResult, callbackResult); | |
_autoResetEvent.Set(); | |
return !_deferredDispose; | |
} | |
public object? WaitExecuted() | |
{ | |
_autoResetEvent.WaitOne(); | |
var result = Volatile.Read(ref _callbackResult); | |
if (_deferredDispose) | |
{ | |
Dispose(); | |
} | |
return result; | |
} | |
public void Dispose() | |
{ | |
_autoResetEvent.Set(); | |
_autoResetEvent.Dispose(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment