Last active
February 27, 2018 17:08
-
-
Save ShlomoAbraham/1b6e8dc1c7e00fc59f48fc690f4375cf to your computer and use it in GitHub Desktop.
An example of state with Rx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Main() | |
{ | |
//For each number: If the previous even-indexed numbers sum to greater than 10, then multiply by -1 | |
var numbers = Observable.Generate(new Random(), _ => true, r => r, r => r.Next(-5, 10)) | |
.Take(100) | |
.Select((num, index) => Tuple.Create(num, index)) | |
.StateSelect(0, | |
(state, indexedInt) => state > 10 ? indexedInt.Item1 * -1 : indexedInt.Item1, //result selector | |
(state, indexedInt) => indexedInt.Item2 % 2 == 0 ? state + indexedInt.Item1 : state | |
); | |
numbers.Dump(); //LinqPad | |
//For each number: If the sum of the previous even-indexed numbers and the new number sum to greater than 10, then skip it | |
var numbers2 = Observable.Generate(new Random(), _ => true, r => r, r => r.Next(-5, 10)) | |
.Take(100) | |
.Select((num, index) => Tuple.Create(num, index)) | |
.StateSelectFiltered(0, | |
(state, indexedInt) => state > 10 ? indexedInt.Item1 * -1 : indexedInt.Item1, //result selector | |
(state, indexedInt) => indexedInt.Item2 % 2 == 0 ? state + indexedInt.Item1 : state, | |
(state, indexedInt) => !(state + indexedInt.Item1 > 10) | |
); | |
numbers2.Dump(); //LinqPad | |
//For each number: If the previous even-indexed numbers sum to greater than 10, then emit the number and also a -1 | |
var numbers3 = Observable.Generate(new Random(), _ => true, r => r, r => r.Next(-5, 10)) | |
.Take(100) | |
.Select((num, index) => Tuple.Create(num, index)) | |
.StateSelectMany(0, | |
(state, indexedInt) => state > 10 ? new[] { indexedInt.Item1, -1 }.ToObservable() : Observable.Return(indexedInt.Item1), //result selector | |
(state, indexedInt) => indexedInt.Item2 % 2 == 0 ? state + indexedInt.Item1 : state | |
); | |
numbers3.Dump(); //LinqPad | |
} | |
// Define other methods and classes here | |
public static class Extensions | |
{ | |
public static IObservable<TResult> StateSelect<TSource, TState, TResult>( | |
this IObservable<TSource> source, | |
TState initialState, | |
Func<TState, TSource, TResult> resultSelector, | |
Func<TState, TSource, TState> stateSelector | |
) | |
{ | |
return source | |
.StateSelectMany(initialState, (state, item) => Observable.Return(resultSelector(state, item)), stateSelector); | |
} | |
public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>( | |
this IObservable<TSource> source, | |
TState initialState, | |
Func<TState, TSource, IObservable<TResult>> resultSelector, | |
Func<TState, TSource, TState> stateSelector | |
) | |
{ | |
return source | |
.Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item))) | |
.SelectMany(t => t.Item2); | |
} | |
public static IObservable<TResult> StateSelectFiltered<TSource, TState, TResult>( | |
this IObservable<TSource> source, | |
TState initialState, | |
Func<TState, TSource, TResult> resultSelector, | |
Func<TState, TSource, TState> stateSelector, | |
Func<TState, TSource, bool> filter | |
) | |
{ | |
return source | |
.StateSelectMany(initialState, (state, item) => | |
filter(state, item) ? Observable.Return(resultSelector(state, item)) : Observable.Empty<TResult>(), | |
stateSelector); | |
} | |
} | |
public static class RxReimplementations | |
{ | |
//This is a functional implementation of Observable.Scan | |
public static IObservable<TState> FunctionalScan<TSource, TState>(this IObservable<TSource> source, TState initialState, Func<TState, TSource, TState> f) | |
{ | |
return source.Publish(_source => _source | |
.Take(1) | |
.SelectMany(item => f(initialState, item) | |
.Using(newState => | |
Observable.Return(newState) | |
.Concat(_source.FunctionalScan(newState, f)) | |
) | |
) | |
); | |
} | |
//A functional way to re-use a function result. | |
public static U Using<T, U>(this T t, Func<T, U> f) | |
{ | |
return f(t); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
There's three classes here:
Extensions
which has methods to help observe observables with state, aMain
runner to show the use of the new extensions, and also a classRxReimplentations
, which shows how you can implementScan
in a functional way.