-
-
Save vleboutouiller/322942e200aa2c50dd4b7a96c55568b1 to your computer and use it in GitHub Desktop.
Kevin Goss - Design and Code Tasks from Scratch - Clunky WIP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Collections.Concurrent; | |
using System.Runtime.CompilerServices; | |
using Microsoft.VisualStudio.Threading; | |
// Code adapted from https://www.youtube.com/watch?time_continue=20912&v=FMx3D5Q7upo&feature=emb_logo | |
// Need to figure out where does the deadlock happen with SingleThreadedSynchronizationContext =| | |
namespace CSharpStuff | |
{ | |
public static class Helpers | |
{ | |
public static void DisplayCurrentSynchronizationContext() | |
{ | |
Console.Write($"{nameof(SynchronizationContext)}.{nameof(SynchronizationContext.Current)}: "); | |
Console.WriteLine(SynchronizationContext.Current != null | |
? $"{SynchronizationContext.Current.GetType().Name}" | |
: "null"); | |
} | |
} | |
public class ExampleA | |
{ | |
private Future<int> DoSomethingAsync() | |
{ | |
var promise = new Promise<int>(); | |
var thread = new Thread(() => | |
{ | |
Thread.Sleep(100); | |
promise.Complete(42); | |
}); | |
thread.Start(); | |
return promise.Future; | |
} | |
public void Run() | |
{ | |
Console.WriteLine($"{nameof(ExampleA)}: Start"); | |
var future = DoSomethingAsync(); | |
var future1 = future.ContinueWith(f => | |
{ | |
Thread.Sleep(1000); | |
Console.WriteLine($"First future completed with value {f.Result}"); | |
return f.Result * 2; | |
}) | |
.ContinueWith(f => | |
{ | |
Console.WriteLine($"Second future completed with value {f.Result}"); | |
}); | |
// Should not wait the stuff above to complete to execute | |
var future2 = future.ContinueWith(f => | |
{ | |
Console.WriteLine("Second continuation"); | |
}); | |
future.Wait(); | |
Console.WriteLine($"{nameof(future)}.{nameof(future.Result)} = {future.Result}"); | |
// Just making things a lil' more deterministic | |
future2.Wait(); | |
future1.Wait(); | |
Console.WriteLine($"{nameof(ExampleA)}: Done"); | |
Console.WriteLine(); | |
} | |
} | |
public class ExampleB | |
{ | |
private Future Delay(int ms) | |
{ | |
var promise = new Promise(); | |
var thread = new Thread(() => | |
{ | |
Thread.Sleep(ms); | |
promise.Complete(); | |
}); | |
thread.Start(); | |
return promise.Future; | |
} | |
private Future<int> DoSomething1Async() => | |
Delay(500).ContinueWith(_ => 1); | |
private Future<int> DoSomething2Async() => | |
Delay(500).ContinueWith(_ => 10); | |
private Future<int> DoSomething3Async() => | |
Delay(500).ContinueWith(_ => 100); | |
private Future<int> DoSomethingElseAsync(int input) => | |
Delay(500).ContinueWith(_ => input * 2); | |
// Compiler rewrites the code above as something below: | |
private Future<int> CallAsync() | |
{ | |
var stateMachine = new StateMachine { This = this }; | |
// Actually here, it call starts on the builder, | |
// => the builder can perform some initialization work | |
stateMachine.MoveNext(); | |
return stateMachine.Builder.Task; | |
} | |
// Nested: cause need to access methods you're calling (ie. even if they're private) | |
// In debug mode the struct can be converted to a class so that the tracking is easier when debugging. | |
private struct StateMachine : IAsyncStateMachine | |
{ | |
private int _state; | |
// Reference to the parent class if methods are not static | |
public ExampleB This; | |
public FutureBuilder<int> Builder; | |
private FutureAwaiter<int> _future; | |
private int _i; | |
private int _j; | |
private int _k; | |
public void SetStateMachine(IAsyncStateMachine stateMachine) | |
{ | |
Builder.SetStateMachine(stateMachine); | |
} | |
public void MoveNext() | |
{ | |
switch (_state) | |
{ | |
case 0: | |
{ | |
_state = 1; | |
_future = This.DoSomething1Async().GetAwaiter(); | |
// Optimization | |
if (_future.IsCompleted) | |
{ | |
goto case 1; | |
} | |
Builder.AwaitOnCompleted(ref _future, ref this); | |
return; | |
} | |
case 1: | |
{ | |
_state = 2; | |
_i = _future.GetResult(); | |
_future = This.DoSomething2Async().GetAwaiter(); | |
// Optimization | |
if (_future.IsCompleted) | |
{ | |
goto case 2; | |
} | |
Builder.AwaitOnCompleted(ref _future, ref this); | |
return; | |
} | |
case 2: | |
{ | |
_state = 3; | |
_j = _future.GetResult(); | |
_future = This.DoSomething3Async().GetAwaiter(); | |
// Optimization | |
if (_future.IsCompleted) | |
{ | |
goto case 3; | |
} | |
Builder.AwaitOnCompleted(ref _future, ref this); | |
return; | |
} | |
case 3: | |
{ | |
_state = 4; | |
_k = _future.GetResult(); | |
_future = This.DoSomethingElseAsync(_i + _j + _k).GetAwaiter(); | |
// Optimization | |
if (_future.IsCompleted) | |
{ | |
goto case 4; | |
} | |
Builder.AwaitOnCompleted(ref _future, ref this); | |
return; | |
} | |
case 4: | |
{ | |
Builder.SetResult(_future.GetResult()); | |
return; | |
} | |
} | |
} | |
} | |
public void Run() | |
{ | |
Console.WriteLine($"{nameof(ExampleB)}: Start"); | |
var future = CallAsync(); | |
future.Wait(); | |
Console.WriteLine(future.Result); | |
Console.WriteLine($"{nameof(ExampleB)}: Done"); | |
Console.WriteLine(); | |
} | |
} | |
public class ExampleC | |
{ | |
private Future Delay(int ms) | |
{ | |
var promise = new Promise(); | |
var thread = new Thread(() => | |
{ | |
Thread.Sleep(ms); | |
promise.Complete(); | |
}); | |
thread.Start(); | |
return promise.Future; | |
} | |
private Future<int> DoSomethingAsync1Async() => | |
Delay(500).ContinueWith(_ => 1); | |
private Future<int> DoSomethingAsync2Async() => | |
Delay(500).ContinueWith(_ => 10); | |
private Future<int> DoSomethingAsync3Async() => | |
Delay(500).ContinueWith(_ => 100); | |
private Future<int> DoSomethingElseAsync(int input) => | |
Delay(500).ContinueWith(_ => input * 2); | |
private async Future<int> CallAsync() | |
{ | |
Helpers.DisplayCurrentSynchronizationContext(); | |
var i = await DoSomethingAsync1Async(); | |
Helpers.DisplayCurrentSynchronizationContext(); | |
var j = await DoSomethingAsync2Async(); | |
Helpers.DisplayCurrentSynchronizationContext(); | |
var k = await DoSomethingAsync3Async(); | |
Helpers.DisplayCurrentSynchronizationContext(); | |
return await DoSomethingElseAsync(i + j + k); | |
} | |
public void Run() | |
{ | |
Console.WriteLine($"{nameof(ExampleC)}: Start"); | |
var future = CallAsync(); | |
future.Wait(); | |
Console.WriteLine(future.Result); | |
Console.WriteLine($"{nameof(ExampleC)}: Done"); | |
Console.WriteLine(); | |
} | |
} | |
public class ExampleD | |
{ | |
private Future Delay(int ms) | |
{ | |
var promise = new Promise(); | |
var thread = new Thread(() => | |
{ | |
Thread.Sleep(ms); | |
promise.Complete(); | |
}); | |
thread.Start(); | |
return promise.Future; | |
} | |
private Future<int> DoSomethingAsync1Async() | |
{ | |
Console.WriteLine("meh"); | |
return Delay(500).ContinueWith(_ => 1); | |
} | |
private Future<int> DoSomethingAsync2Async() | |
{ | |
Console.WriteLine("meh2"); | |
return Delay(500).ContinueWith(_ => 10); | |
} | |
private Future<int> DoSomethingAsync3Async() => | |
Delay(500).ContinueWith(_ => 100); | |
private Future<int> DoSomethingElseAsync(int input) => | |
Delay(500).ContinueWith(_ => input * 2); | |
private async Future<int> CallAsync() | |
{ | |
Helpers.DisplayCurrentSynchronizationContext(); | |
var i = await DoSomethingAsync1Async(); | |
Helpers.DisplayCurrentSynchronizationContext(); | |
var j = await DoSomethingAsync2Async().ConfigureAwait(false); | |
Helpers.DisplayCurrentSynchronizationContext(); | |
var k = await DoSomethingAsync3Async(); | |
Helpers.DisplayCurrentSynchronizationContext(); | |
return await DoSomethingElseAsync(i + j + k); | |
} | |
public void Run() | |
{ | |
Console.WriteLine($"{nameof(ExampleD)}: Start"); | |
// Too lazy to copy paste MS code here and filter what we can't really get access to | |
// => From Microsoft.VisualStudio.Threading | |
// => Fake UI behaviour (WPF, Winforms stuff, etc.) | |
var sc = new SingleThreadedSynchronizationContext(); | |
sc.Send(_ => | |
{ | |
SynchronizationContext.SetSynchronizationContext(sc); | |
var future = CallAsync(); | |
future.Wait(); | |
Console.WriteLine(future.Result); | |
}, null); | |
Console.WriteLine($"{nameof(ExampleD)}: Done"); | |
Console.WriteLine(); | |
} | |
} | |
// ReSharper disable once UnusedTypeParameter | |
public interface IAwaiter<out T> : INotifyCompletion | |
{ | |
// T GetResult(); | |
// bool IsCompleted { get; } | |
// To Chain a continuation | |
// void OnCompleted(Action action); | |
} | |
// A awaiter relying on spoiler alert... : Future<T> | |
public readonly struct FutureAwaiter<T> : IAwaiter<T> | |
{ | |
private readonly Future<T> _future; | |
private readonly bool _captureContext; | |
public FutureAwaiter(Future<T> future, bool captureContext) | |
{ | |
_future = future; | |
_captureContext = captureContext; | |
} | |
public T GetResult() => _future.Result; | |
public bool IsCompleted => _future.IsCompleted; | |
public void OnCompleted(Action action) | |
{ | |
SynchronizationContext currentSynchronizationContext = null; | |
if (_captureContext) | |
{ | |
currentSynchronizationContext = SynchronizationContext.Current; | |
} | |
_future.ContinueWith(_ => | |
{ | |
if (currentSynchronizationContext != null) | |
{ | |
// ReSharper disable once VSTHRD001 | |
currentSynchronizationContext.Send(s => action(), null); | |
} | |
else | |
{ | |
action(); | |
} | |
}); | |
} | |
} | |
public static class FutureExtensions | |
{ | |
public static FutureAwaiter<T> GetAwaiter<T>(this Future<T> future) => | |
new FutureAwaiter<T>(future, true); | |
public static ConfigureFutureAwaitable<T> ConfigureAwait<T>(this Future<T> future, bool captureContext) => | |
new ConfigureFutureAwaitable<T>(future, captureContext); | |
} | |
public readonly struct ConfigureFutureAwaitable<T> | |
{ | |
private readonly Future<T> _future; | |
private readonly bool _capturedContext; | |
public ConfigureFutureAwaitable(Future<T> future, bool capturedContext) | |
{ | |
_future = future; | |
_capturedContext = capturedContext; | |
} | |
public FutureAwaiter<T> GetAwaiter() => | |
new FutureAwaiter<T>(_future, _capturedContext); | |
} | |
public struct FutureBuilder<T> | |
{ | |
private Future<T> _future; | |
// Store boxed state machine, so that it happens only once. | |
private IAsyncStateMachine _stateMachine; | |
// Lazy initialization | |
public Future<T> Task => | |
_future ??= new Future<T>(); | |
public static FutureBuilder<T> Create() => | |
new FutureBuilder<T>(); | |
public void Start<TStateMachine>(ref TStateMachine stateMachine) | |
where TStateMachine : IAsyncStateMachine => | |
stateMachine.MoveNext(); | |
// Note: we don't really have exception in our case | |
// => oversimplified impl. | |
public void SetException(Exception exception) => | |
throw new NotImplementedException(); | |
public void SetResult(T result) | |
{ | |
Task.Result = result; | |
Task.IsCompleted = true; | |
} | |
// TAwaiter + TAsyncStateMachine => avoid additional boxings and hence heap allocations | |
// ref => avoid making copies of value type instances (and both structures can potentially be rather "big") | |
public void AwaitOnCompleted<TAwaiter, TAsyncStateMachine>( | |
ref TAwaiter awaiter, ref TAsyncStateMachine stateMachine) | |
where TAwaiter : IAwaiter<T> | |
where TAsyncStateMachine : IAsyncStateMachine | |
{ | |
// Now it's time to force a future (ie. Task property) to show up (ie. remember that the container is a value type) | |
// Task; | |
// => Can't call the a property like that, so we can avoid GCtion with: | |
GC.KeepAlive(Task); | |
// Another alternative is to write something like: | |
// _future ??= new Future<T>(); | |
if (_stateMachine == null) | |
{ | |
Console.WriteLine("Boxing: should happen only once"); | |
// Explicit cast much needed here, or _stateMachine will be "null-ed" every time. | |
var boxedStateMachine = (IAsyncStateMachine)stateMachine; | |
_stateMachine = boxedStateMachine; | |
boxedStateMachine.SetStateMachine(boxedStateMachine); | |
} | |
awaiter.OnCompleted(_stateMachine.MoveNext); | |
} | |
// In theory to support specifics about Execution Context | |
public void AwaitUnsafeOnCompleted<TAwaiter, TAsyncStateMachine>( | |
ref TAwaiter awaiter, ref TAsyncStateMachine stateMachine) | |
where TAwaiter : IAwaiter<T> | |
where TAsyncStateMachine : IAsyncStateMachine => | |
AwaitOnCompleted(ref awaiter, ref stateMachine); | |
public void SetStateMachine(IAsyncStateMachine stateMachine) => | |
_stateMachine = stateMachine; | |
} | |
public static class Program | |
{ | |
public static void Main() | |
{ | |
// .NET Terminology | |
// Promise => TCS | |
// Future => Task | |
// Calls a method => Create Promise => Returns associated Future | |
new ExampleA().Run(); | |
new ExampleB().Run(); | |
new ExampleC().Run(); | |
new ExampleD().Run(); | |
} | |
} | |
// Task Equivalent | |
public class Future | |
{ | |
private readonly ConcurrentQueue<Future> _continuations = new ConcurrentQueue<Future>(); | |
private readonly ManualResetEventSlim _mutex = new ManualResetEventSlim(); | |
private bool _isCompleted; | |
public FutureScheduler Scheduler { get; } | |
internal Future(FutureScheduler scheduler = null) | |
{ | |
Scheduler = scheduler; | |
} | |
public bool IsCompleted | |
{ | |
get => _isCompleted; | |
internal set | |
{ | |
_isCompleted = value; | |
if (value) | |
{ | |
_mutex.Set(); | |
InvokeContinuations(); | |
} | |
} | |
} | |
// Convention: returned Future has completed when the callback is done | |
public Future ContinueWith(Action<Future> continuation, FutureScheduler scheduler = null) | |
{ | |
var future = new FutureContinuation(continuation, this, scheduler); | |
AddContinuation(future); | |
return future; | |
} | |
// ReSharper disable once VSTHRD200 | |
public Future<T> ContinueWith<T>(Func<Future, T> continuation, FutureScheduler scheduler = null) | |
{ | |
var future = new FutureContinuation<T>(continuation, this, scheduler); | |
AddContinuation(future); | |
return future; | |
} | |
// Careful to call that one only when it makes sense | |
// Aka when there is a proper impl. available | |
internal virtual void Invoke() => | |
throw new NotImplementedException(); | |
internal void ScheduleAndStart() => | |
Scheduler.QueueFuture(this); | |
private protected void AddContinuation(Future continuation) | |
{ | |
// In case we're adding a continuation not already completed on a already completed future... | |
// (Avoid enqueueing something that is never gonna be dequeued) | |
if (IsCompleted) | |
{ | |
continuation.ScheduleAndStart(); | |
return; | |
} | |
_continuations.Enqueue(continuation); | |
} | |
private void InvokeContinuations() | |
{ | |
if (_continuations.Count == 1) | |
{ | |
_continuations.TryDequeue(out var continuation); | |
if (continuation != null && !continuation.Scheduler.TryExecuteFutureInline(continuation)) | |
{ | |
continuation.ScheduleAndStart(); | |
} | |
return; | |
} | |
while (_continuations.TryDequeue(out var continuation)) | |
{ | |
continuation.ScheduleAndStart(); | |
} | |
} | |
public void Wait() => | |
_mutex.Wait(); | |
} | |
// Task<T> Equivalent | |
// => Tells the compiler that whenever an asynchronous method returns a future, use the given builder. | |
[AsyncMethodBuilder(typeof(FutureBuilder<>))] | |
public class Future<T> : Future | |
{ | |
public T Result { get; internal set; } | |
internal Future(FutureScheduler scheduler = null) | |
: base(scheduler) | |
{ | |
} | |
public Future ContinueWith(Action<Future<T>> continuation, FutureScheduler scheduler = null) | |
{ | |
// We cut corners here, we actually need to create 4 different types of continuation | |
// The impl. adds a heap allocation | |
var future = new FutureContinuation(f => continuation((Future<T>)f), this, scheduler); | |
AddContinuation(future); | |
return future; | |
} | |
// ReSharper disable once VSTHRD200 | |
public Future<TResult> ContinueWith<TResult>(Func<Future<T>, TResult> continuation, FutureScheduler scheduler = null) | |
{ | |
var future = new FutureContinuation<TResult>(f => continuation((Future<T>)f), this, scheduler); | |
AddContinuation(future); | |
return future; | |
} | |
} | |
// TaskScheduler Equivalent | |
public abstract class FutureScheduler | |
{ | |
public static readonly FutureScheduler Default = new ThreadPoolFutureScheduler(); | |
// Original: QueueTask Equivalent | |
protected internal abstract void QueueFuture(Future future); | |
protected void ExecuteFuture(Future future) | |
{ | |
// Sanity Checks | |
if (future.Scheduler != this) | |
{ | |
throw new InvalidOperationException(); | |
} | |
future.Invoke(); | |
} | |
// Original: TryExecuteTaskInline | |
protected internal virtual bool TryExecuteFutureInline(Future future) | |
{ | |
return false; | |
} | |
} | |
public class ThreadPoolFutureScheduler : FutureScheduler | |
{ | |
protected internal override void QueueFuture(Future future) => | |
ThreadPool.QueueUserWorkItem(_ => ExecuteFuture(future)); | |
protected internal override bool TryExecuteFutureInline(Future future) | |
{ | |
Console.WriteLine("Inlining"); | |
if (Thread.CurrentThread.IsThreadPoolThread) | |
{ | |
ExecuteFuture(future); | |
return true; | |
} | |
return false; | |
} | |
} | |
// Merge the feature of Promise + Future into a single class | |
// Reduced number of (heap) allocations + easier impl. later on | |
// Can be improved (less allocations) by adding... | |
// ... two other missing and specific equivalentTPL methods (ie. total 4) | |
internal class FutureContinuation : Future | |
{ | |
private readonly Future _parent; | |
private readonly Action<Future> _action; | |
public FutureContinuation(Action<Future> action, Future parent, FutureScheduler scheduler) | |
: base(scheduler ?? FutureScheduler.Default) | |
{ | |
_action = action; | |
_parent = parent; | |
} | |
internal override void Invoke() | |
{ | |
_action(_parent); | |
IsCompleted = true; | |
} | |
} | |
internal class FutureContinuation<T> : Future<T> | |
{ | |
private readonly Future _parent; | |
private readonly Func<Future, T> _func; | |
public FutureContinuation(Func<Future, T> func, Future parent, FutureScheduler scheduler) | |
: base(scheduler ?? FutureScheduler.Default) | |
{ | |
_func = func; | |
_parent = parent; | |
} | |
internal override void Invoke() | |
{ | |
Result = _func(_parent); | |
IsCompleted = true; | |
} | |
} | |
// TCS Equivalent | |
public class Promise | |
{ | |
public Future Future { get; } | |
public Promise() => | |
Future = new Future(); | |
public void Complete() | |
{ | |
Future.IsCompleted = true; | |
} | |
} | |
// TCS<T> Equivalent | |
// Shamelessly copying-pasting Promise with a generic flavoring | |
public class Promise<T> | |
{ | |
public Future<T> Future { get; } | |
public Promise() => | |
Future = new Future<T>(); | |
public void Complete(T result) | |
{ | |
Future.Result = result; | |
Future.IsCompleted = true; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment