Last active
October 5, 2020 18:19
-
-
Save dcolthorp/0ee87537624c596ccc1d72bffa73387b to your computer and use it in GitHub Desktop.
Ref and Store redux-like implementation for c#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public delegate TResult RefFunc<TState, TResult>(ref TState current); | |
public delegate void RefAction<TState>(ref TState current); | |
public delegate void RefAction<TState, TEvent>(ref TState state, TEvent action); | |
namespace Framework | |
{ | |
public static class RefPublishStrategies | |
{ | |
public static void DefaultStrategy(Action act) | |
{ | |
act(); | |
} | |
public static Action<Action> PublishWithScheduler(IScheduler scheduler) | |
{ | |
return (Action act) => { | |
scheduler.Schedule(() => { | |
act(); | |
}); | |
}; | |
} | |
} | |
public class Ref<TState> where TState : struct | |
{ | |
private TState _value; | |
private readonly Action<Action> _refPublishStrategy; | |
private readonly BehaviorSubject<StrongBox<TState>> _subject; | |
private readonly object _lock = new object(); | |
private bool _handling = false; | |
private readonly ConcurrentDictionary<TaskCompletionSource<object>, bool> _completions = new ConcurrentDictionary<TaskCompletionSource<object>, bool>(); | |
public Ref(TState start) : this(start, RefPublishStrategies.DefaultStrategy) | |
{ | |
} | |
public Ref(TState start, Action<Action> refPublishStrategy) | |
{ | |
_value = start; | |
_refPublishStrategy = refPublishStrategy; | |
_subject = new BehaviorSubject<StrongBox<TState>>(new StrongBox<TState>(_value)); | |
} | |
public TState Value | |
{ | |
get { lock (_lock) return _value; } | |
} | |
public Task SubmitAndPublishAsync(RefAction<TState> f) | |
{ | |
var publish = new StrongBox<TState>(); | |
lock (_lock) | |
{ | |
if (_handling) throw new Exception("Handler resubmitted"); | |
_handling = true; | |
try { | |
f(ref _value); | |
publish.Value = _value; | |
return PublishAction(() => { | |
_subject.OnNext(publish); | |
return Task.CompletedTask; | |
}); | |
} finally { | |
_handling = false; | |
} | |
} | |
} | |
public Task AllComplete => Task.WhenAll(_completions.Keys.Select(a => a.Task).ToArray()); | |
public Task PublishAction(Func<Task> act) | |
{ | |
var tcs = new TaskCompletionSource<object>(); | |
_completions[tcs] = true; | |
_refPublishStrategy(async () => | |
{ | |
try | |
{ | |
await act(); | |
tcs.SetResult(null); | |
} | |
catch (Exception exception) | |
{ | |
tcs.SetException(exception); | |
} | |
bool ignored; | |
if (!_completions.TryRemove(tcs, out ignored)) | |
{ | |
throw new Exception("Unable to remove completion handler!"); | |
} | |
}); | |
return tcs.Task; | |
} | |
public void Submit(RefAction<TState> f) | |
{ | |
SubmitAndPublishAsync(f); | |
} | |
public IObservable<TResult> Observe<TResult>(RefFunc<TState, TResult> projection) | |
{ | |
return _subject.Select(publish => _PerformProjection(ref publish.Value, projection)); | |
} | |
public TResult Project<TResult>(RefFunc<TState, TResult> projection) | |
{ | |
return _PerformProjection(ref _value, projection); | |
} | |
private TResult _PerformProjection<TResult>(ref TState state, RefFunc<TState, TResult> projection) | |
{ | |
lock (_lock) | |
{ | |
#if DEBUG | |
var original = state; | |
#endif | |
try | |
{ | |
var ret = projection(ref state); | |
#if DEBUG | |
if (!state.Equals(original)) | |
{ | |
throw new ArgumentException("Observer mutated state!"); | |
} | |
#endif | |
return ret; | |
} | |
catch (Exception e) when (ExceptionHandling.Never(e)) | |
{ | |
throw; | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace UnitTests | |
{ | |
interface IAction { } | |
struct DummyState | |
{ | |
public int Count; | |
} | |
class SagaAction : IAction { } | |
class IncAction : IAction { } | |
class ExpectAndIncAction : IAction | |
{ | |
public int Expected { get; } | |
public ExpectAndIncAction(int expected) | |
{ | |
Expected = expected; | |
} | |
} | |
[TestFixture] | |
public class SagaTest | |
{ | |
Store<IAction, DummyState> store; | |
static int Count(ref DummyState s) | |
{ | |
return s.Count; | |
} | |
[SetUp] | |
public void Setup() | |
{ | |
store = new Store<IAction, DummyState>( | |
default(DummyState), | |
RefPublishStrategies.PublishWithScheduler(new EventLoopScheduler())); | |
} | |
[Test] | |
public async Task TestSaga() | |
{ | |
var tc1 = new TaskCompletionSource<object>(); | |
var tc2 = new TaskCompletionSource<object>(); | |
store.RegisterHandler((ref DummyState s, IncAction action) => | |
{ | |
s.Count += 1; | |
}); | |
store.RegisterSaga<SagaAction>(async (store, evt) => | |
{ | |
await tc1.Task; | |
store.Dispatch(new IncAction()); | |
await tc2.Task; | |
store.Dispatch(new IncAction()); | |
}); | |
Assert.That(store.Project(Count), Is.EqualTo(0)); | |
var sagaTask = store.DispatchAndPublishAsync(new SagaAction()); | |
tc1.SetResult(null); | |
await store.Observe(Count).Where(c => c == 1).FirstAsync(); | |
Assert.That(store.Project(Count), Is.EqualTo(1)); | |
tc2.SetResult(null); | |
await sagaTask; | |
Assert.That(store.Project(Count), Is.EqualTo(2)); | |
} | |
[Test] | |
public async Task SagasRunInParallel() | |
{ | |
var tc1 = new TaskCompletionSource<object>(); | |
var tc2 = new TaskCompletionSource<object>(); | |
store.RegisterHandler((ref DummyState s, ExpectAndIncAction action) => | |
{ | |
Assert.AreEqual(action.Expected, s.Count); | |
s.Count += 1; | |
}); | |
store.RegisterSaga<SagaAction>(async (store, evt) => | |
{ | |
store.Dispatch(new ExpectAndIncAction(0)); | |
await tc2.Task; | |
store.Dispatch(new ExpectAndIncAction(2)); | |
}); | |
store.RegisterSaga<SagaAction>(async (store, evt) => | |
{ | |
await tc1.Task; | |
store.Dispatch(new ExpectAndIncAction(1)); | |
}); | |
Assert.That(store.Project(Count), Is.EqualTo(0)); | |
store.Dispatch(new SagaAction()); | |
await store.Observe(Count).Where(c => c == 1).FirstAsync(); | |
tc1.SetResult(null); | |
await store.Observe(Count).Where(c => c == 2).FirstAsync(); | |
tc2.SetResult(null); | |
await store.AllComplete; | |
Assert.That(store.Project(Count), Is.EqualTo(3)); | |
} | |
[Test] | |
public async Task TaskWaitsForAllToComplete() | |
{ | |
store.RegisterHandler((ref DummyState s, ExpectAndIncAction action) => | |
{ | |
Assert.AreEqual(action.Expected, s.Count); | |
s.Count += 1; | |
}); | |
store.RegisterSaga<SagaAction>((store, evt) => | |
{ | |
store.Dispatch(new ExpectAndIncAction(0)); | |
return Task.CompletedTask; | |
}); | |
store.RegisterSaga<SagaAction>(async (store, evt) => | |
{ | |
await Task.Delay(500); | |
store.Dispatch(new ExpectAndIncAction(1)); | |
}); | |
Assert.That(store.Project(Count), Is.EqualTo(0)); | |
await store.DispatchAndPublishAsync(new SagaAction()); | |
Assert.That(store.Project(Count), Is.EqualTo(2)); | |
} | |
[Test] | |
public async Task DispatchAndPublishAsyncThrowsSagaException() | |
{ | |
store.RegisterHandler((ref DummyState s, ExpectAndIncAction action) => | |
{ | |
throw new InvalidCastException(); | |
}); | |
store.RegisterSaga<SagaAction>((store, evt) => | |
{ | |
store.Dispatch(new ExpectAndIncAction(0)); | |
return Task.CompletedTask; | |
}); | |
Assert.That(store.Project(Count), Is.EqualTo(0)); | |
try | |
{ | |
await store.DispatchAndPublishAsync(new SagaAction()); | |
Assert.Fail("exception not thrown"); | |
} catch (InvalidCastException) | |
{ | |
// do nothing | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Store<TActionBase, TState> where TState : struct | |
{ | |
public delegate Task Saga(Store<TActionBase, TState> store, TActionBase evt); | |
protected readonly Ref<TState> _ref; | |
private readonly Dictionary<Type, List<RefAction<TState, TActionBase>>> _handlerMap; | |
private readonly List<RefAction<TState, TActionBase>> _globalBeforeHandlers; | |
private Dictionary<Type, List<Saga>> _sagaMap; | |
public Store(TState start) | |
{ | |
_ref = new Ref<TState>(start); | |
_handlerMap = new Dictionary<Type, List<RefAction<TState, TActionBase>>>(); | |
_sagaMap = new Dictionary<Type, List<Saga>>(); | |
_globalBeforeHandlers = new List<RefAction<TState, TActionBase>>(); | |
} | |
public Store(TState start, Action<Action> refPublishStrategy) | |
{ | |
_ref = new Ref<TState>(start, refPublishStrategy); | |
_handlerMap = new Dictionary<Type, List<RefAction<TState, TActionBase>>>(); | |
_sagaMap = new Dictionary<Type, List<Saga>>(); | |
_globalBeforeHandlers = new List<RefAction<TState, TActionBase>>(); | |
} | |
public void RegisterHandler<TEvent>(RefAction<TState, TEvent> f) where TEvent : TActionBase | |
{ | |
var t = (typeof(TEvent)); | |
List<RefAction<TState, TActionBase>> list; | |
if (!_handlerMap.TryGetValue(t, out list)) | |
{ | |
list = new List<RefAction<TState, TActionBase>>(); | |
_handlerMap[t] = list; | |
} | |
RefAction<TState, TActionBase> wrappedHandler = (ref TState state, TActionBase e) => | |
{ | |
f(ref state, (TEvent)e); | |
}; | |
list.Add(wrappedHandler); | |
} | |
public void RegisterGlobalHandler(RefAction<TState, TActionBase> f) | |
{ | |
_globalBeforeHandlers.Add(f); | |
} | |
public void RegisterSaga<TEvent>(Saga f) where TEvent : TActionBase | |
{ | |
var t = (typeof(TEvent)); | |
List<Saga> list; | |
if (!_sagaMap.TryGetValue(t, out list)) | |
{ | |
list = new List<Saga>(); | |
_sagaMap[t] = list; | |
} | |
Saga wrappedHandler = (state, e) => | |
f(this, (TEvent)e); | |
list.Add(wrappedHandler); | |
} | |
public Task DispatchAndPublishAsync(TActionBase e) | |
{ | |
var eventType = e.GetType(); | |
List<RefAction<TState, TActionBase>> list; | |
IEnumerable<Task> tasks = new Task[0]; | |
if (_handlerMap.TryGetValue(eventType, out list)) | |
{ | |
var publishTask = _ref.SubmitAndPublishAsync((ref TState state) => | |
{ | |
foreach (var beforeHandler in _globalBeforeHandlers) | |
{ | |
beforeHandler(ref state, e); | |
} | |
foreach (var entry in list) | |
{ | |
entry(ref state, e); | |
} | |
}); | |
tasks = tasks.Concat(new[] { publishTask }); | |
} | |
if (_sagaMap.ContainsKey(eventType)) | |
{ | |
var sagaTasks = _sagaMap[eventType] | |
.Select(f => _ref.PublishAction(() => f(this, e))); | |
tasks = tasks.Concat(sagaTasks); | |
} | |
return Task.WhenAll(tasks); | |
} | |
public void Dispatch(TActionBase e) | |
{ | |
DispatchAndPublishAsync(e); | |
} | |
public Task AllComplete => _ref.AllComplete; | |
public IObservable<TResult> Observe<TResult>(RefFunc<TState, TResult> project) => | |
_ref.Observe(project).DistinctUntilChanged(); | |
public TResult Project<TResult>(RefFunc<TState, TResult> projection) => | |
_ref.Project(projection); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment