Skip to content

Instantly share code, notes, and snippets.

@jvanlangen
Last active October 22, 2019 08:02
Show Gist options
  • Save jvanlangen/fa40f6a2e3b719d066f974ca98065206 to your computer and use it in GitHub Desktop.
Save jvanlangen/fa40f6a2e3b719d066f974ca98065206 to your computer and use it in GitHub Desktop.
ASync/Await Enabled Threads in C#
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace VanLangen
{
public class ASyncThread : IDisposable
{
// By JvanLangen.
// [email protected]
// http://www.vanlangen.biz
// Dataholder class for an action and state
private class ActionWithState
{
public SendOrPostCallback Action { get; set; }
public object State { get; set; }
}
private class ASyncThreadSynchronizationContext : SynchronizationContext
{
private ASyncThread _asyncThread;
public ASyncThreadSynchronizationContext(ASyncThread asyncThread) => _asyncThread = asyncThread;
public override void Post(SendOrPostCallback d, object state = null) => _asyncThread.Post(d, state);
public override void Send(SendOrPostCallback d, object state = null) => _asyncThread.Send(d, state);
}
private Task _mainTask;
private int _mainTaskThreadId;
private readonly ManualResetEvent _terminate = new ManualResetEvent(false);
private readonly AutoResetEvent _actionAdded = new AutoResetEvent(false);
private readonly ConcurrentQueue<ActionWithState> _actions = new ConcurrentQueue<ActionWithState>();
private void TaskMethod()
{
// Set this instance as SynchronizationContext
SynchronizationContext.SetSynchronizationContext(new ASyncThreadSynchronizationContext(this));
// Capture the managed thread id, used for the Send method.
_mainTaskThreadId = Thread.CurrentThread.ManagedThreadId;
// Create an array of waithandles
var waitHandles = new WaitHandle[] { _terminate, _actionAdded };
// Wait on any handle, break the loop if the first waithandle was set. (_terminate)
while (WaitHandle.WaitAny(waitHandles) != 0)
// Try dequeue all queued actions
while (_actions.TryDequeue(out var actionWithState))
// execute it.
actionWithState.Action(actionWithState.State);
}
// Optional, If you want to create something "autonoom"
protected virtual void Run(object state) { }
/// <summary>
/// Initializes a new instance of the <see cref="ASyncThread"/> class.
/// </summary>
/// <param name="state">Passes this state to the initial virtual run method.</param>
public ASyncThread(object state = null)
{
// Create the thread. (used the Factory.Start for the TaskCreationOptions.LongRunning)
// Normally you should use the Task.Run.
_mainTask = Task.Factory.StartNew(TaskMethod, TaskCreationOptions.LongRunning);
// Post the run method as first to be executed.
// If it isn't overridden, do nothing.
Post(Run, state);
}
/// <summary>
/// Posts an action to the thread's execution queue.
/// </summary>
/// <param name="d">The <see cref="T:System.Threading.SendOrPostCallback"></see> delegate to call.</param>
/// <param name="state">The object passed to the delegate.</param>
public void Post(SendOrPostCallback d, object state = null)
{
// Enqueue the action to the action queue.
_actions.Enqueue(new ActionWithState { Action = d, State = state });
_actionAdded.Set();
}
/// <summary>
/// Sends an action to the thread's execution queue and wait until it has been executed. (remark: watch out for deadlocks between waiting threads)
/// </summary>
/// <param name="d">The <see cref="T:System.Threading.SendOrPostCallback"></see> delegate to call.</param>
/// <param name="state">The object passed to the delegate.</param>
public void Send(SendOrPostCallback d, object state = null)
{
// Execute it directly, if it's on the right thread.
if (Thread.CurrentThread.ManagedThreadId == _mainTaskThreadId)
{
d(state);
return;
}
// create a wait handle to block the current thread until the action is executed
using (var waitHandle = new ManualResetEvent(false))
{
Post(s =>
{
try
{
d(s);
}
finally
{
waitHandle.Set();
}
}, state);
// wait until the action has been executed.
waitHandle.WaitOne();
}
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
// set the terminate waithandle.
_terminate.Set();
// wait for the thread to end.
_mainTask.Wait();
}
// exposing the not terminate event
public bool Running => !_terminate.WaitOne(0);
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
namespace VanLangen
{
public static class ASyncThreadExtension
{
/// <summary>
/// Posts an action to the thread's execution queue.
/// </summary>
/// <param name="action">The action to call.</param>
public static void Post(this ASyncThread thread, Action action)
{
thread.Post(s => action());
}
/// <summary>
/// Sends an action to the thread's execution queue and wait until it has been executed. (remark: watch out for deadlocks between waiting threads)
/// </summary>
/// <param name="action">The action to call.</param>
public static void Send(this ASyncThread thread, Action action)
{
// call the default send
thread.Send(s => action());
}
/// <summary>
/// Posts a delegate to the ASyncThread and returns an awaitable task for it
/// </summary>
/// <param name="thread">The thread</param>
/// <param name="action">An Action</param>
/// <returns>A task</returns>
public static Task SendASync(this ASyncThread thread, Action action)
{
return SendASync(thread, s => action());
}
/// <summary>
/// Posts a delegate to the ASyncThread and returns an awaitable task for it
/// </summary>
/// <param name="thread">The thread</param>
/// <param name="d">The delegate</param>
/// <param name="state">The state</param>
/// <returns>A task</returns>
public static Task SendASync(this ASyncThread thread, SendOrPostCallback d, object state = null)
{
var tcs = new TaskCompletionSource<object>();
thread.Post(s =>
{
try
{
// execute the delegate
d(state);
// return to the previous SynchronizationContext
tcs.SetResult(null);
}
catch (Exception exception)
{
// return to the previous SynchronizationContext
tcs.SetException(exception);
}
}, tcs);
return tcs.Task;
}
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using VanLangen;
namespace InheritanceExample
{
class Program
{
public class MyThread : ASyncThread
{
protected override async void Run()
{
while (true)
{
Console.WriteLine($"{DateTime.Now}| Run on Thread: {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000);
}
}
}
static void Main(string[] args)
{
Console.WriteLine($"Hello World! on Thread: {Thread.CurrentThread.ManagedThreadId}");
using (var myThread = new MyThread())
Console.ReadLine();
}
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using VanLangen;
namespace PostExample
{
class Program
{
static async void First()
{
while(true)
{
Console.WriteLine($"{DateTime.Now}| First on Thread: {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000);
}
}
static async void Second()
{
while (true)
{
Console.WriteLine($"{DateTime.Now}| Second on Thread: {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(500);
}
}
static void Main(string[] args)
{
Console.WriteLine($"Hello World! on Thread: {Thread.CurrentThread.ManagedThreadId}");
using (var myThread = new ASyncThread())
{
myThread.Post(s => First());
myThread.Post(s => Second());
Console.ReadLine();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment