Skip to content

Instantly share code, notes, and snippets.

@rebcabin
Created November 24, 2013 04:27
Show Gist options
  • Select an option

  • Save rebcabin/7623311 to your computer and use it in GitHub Desktop.

Select an option

Save rebcabin/7623311 to your computer and use it in GitHub Desktop.
A basic Observable / Observer in Mathematica / Wolfram
Or, we can emulate the Reactive Framework's "GenerateWithTime".
Cold GenerateWithTime
Rx.Observable.GenerateWithTime = function(
initialState, // : State
condition, // : State -> bool
resultSelector, // : State -> Result
timeSelector, // : State -> int
iterate) // : State -> State
An observable is a function that returns an object with a Subscribe method. The Subscribe method takes an observer as input. An observer is an object exposing one, two, or three functions: onNext[value], onCompleted, and onError[error]. The Subscribe method, when called with an observer, returns a Disposable: an interface with one method, Dispose. Dispose, when called, unsubscribes the observer and cleans up any fixed resources held by the subscription.
A cold observable is one that starts to produce values only when an observer subscribes to it. A hot observable is always producing values whether any observer is listening or not. Example: the mouse position or a clock. When an observer subscribes, it "hops on the train" and starts receiving callbacks immediately.
Represent objects as symbols with string keys (just like JavaScript).
ClearAll[createObserver];
createObserver[onNext_, onCompleted_: False, onError_: False] :=
Module[{o = Unique[]},
o["OnNext"] = onNext;
If[onCompleted =!= False, o["OnCompleted"] = onCompleted];
If[onError =!= False, o["OnError"] = onError];
o];
GenerateWithTime returns a restartable, interleavable observable:
ClearAll[generateWithTime];
generateWithTime[
initialState_,
condition_,(* State \[Rule] Bool *)
resultSelector_, (* State \[Rule]
Result *)
timeSelector_, (* State \[Rule] Integer *)
iterate_ (*
State \[Rule] State *)] :=
Module[{
observable = Unique[],
(* Mathematica "Catch" wraps the try block -
so is more like a combo of C# catch & finally.
"Catch" must return something. We'll have it return a unique symbol.
The default case signals that no Throw was evaluated. *)
defaultCatchResult = Unique[]
},
observable["Subscribe"] = Function[observer,
Module[{
(* unpack the observer *)
(*
ValueQ is Mathematica equivalent to JavaScript testing against \
undefined. If ValueQ[expr] is true,
then the expression has a value other than itself. *)
hasOnNext = ValueQ[observer["OnNext"]],
OnNext = observer["OnNext"],
hasOnCompleted = ValueQ[observer["OnCompleted"]],
OnCompleted = observer["OnCompleted"],
hasOnError = ValueQ[observer["OnError"]],
OnError = observer["OnError"],
(* set up non-interleaving task structure *)
lastTaskForCleanup,
runNextTask,
(* initialize stuff when observer subscribes *)
state = initialState,
disposed = False,
disposable = Unique[]
},
(* Don't catch this throw here. This is a bad call of generateWithTime *)
If[Not[hasOnNext], Throw["Observer must have OnNext"]];
(* The following Clear is a trick that makes runNextTask invulnerable to \
garbage collection in the time window between two invocations. *)
(*
see http://mathematica.stackexchange.com/questions/3807/module-variable-
scoping-in-scheduled-tasks *)
ClearAll[runNextTask];
(* "runNextTask" is a function that does work and then schedules another \
invocation of itself. *)
runNextTask = Function[
Module[{oldState = state, proceed, catchResult},
(*
Clean up the last task -- the one that scheduled the current \
invocation *)
(Quiet@RemoveScheduledTask@lastTaskForCleanup;
(* catch errors in creating the task and in the user-supplied time-
selector function *)
catchResult = Catch[
(*
create a task to do the work on the current invocation and save it \
for later cleanup by the next invocation *)
lastTaskForCleanup =
CreateScheduledTask[
(* catch errors in the observer's OnNext, OnCompleted,
and in the user-supplied condition, resultSelector,
and iterate functions *)
catchResult = Catch[
(*
find out if we should do this iteration and all following \
iterations -- invoke caller-supplied "condition" function. *)
proceed = condition[oldState];
If[proceed && Not[disposed],
(*
if we should go then call observer's OnNext with result of \
caller-supplied resultSelector[
currentState] *)
(OnNext[
resultSelector[oldState]];
(* update state via user-supplied lambda *)
state = iterate[oldState];
(* recurse to create the next task *)
runNextTask[]),
(* else, if we should NOT go, clean up,
call user's OnCompleted,
and don't schedule next task *)
(Quiet@
RemoveScheduledTask@lastTaskForCleanup;
If[hasOnCompleted, OnCompleted[]])];
(* if nothing was thrown,
return the following as the value of the Catch *)
\
(* this is the end of the INNER catch block *)
defaultCatchResult];
(* check if something was thrown *)
If[
catchResult =!= defaultCatchResult && hasOnError,
(* if something was thrown, call OnError and cleaup.
If observer's OnError throws, let that escape;
so be it *)
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup)],
(* the following is how long to wait before starting the task.
If it throws,
catch in our outer Catch block *)
{timeSelector[
oldState]}];
(* if nothing was thrown,
return the following as the value of the Catch *)
(*
this is the end of the OUTER catch block *)
defaultCatchResult];
(* Check for throw by timeSelector *)
If[
catchResult =!= defaultCatchResult && hasOnError,
(* if caller-
supplied timeSelector threw or something else went wrong,
call OnError and cleaup. If OnError throws, let it escape;
so be it *)
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup),
(*
This version of generateWithTime does not start the first task \
immediately,
but rather after the delay specified by the result of calling the \
user's time selector on the initial state value.
That gives the user the opportunity to race the task and call \
dispose before the first observation is created. *)
StartScheduledTask @ lastTaskForCleanup]
)(* End of Module internal to runNextTask *)
](*
return value of runNextTask *)
];(*
end of def runNextTask *)
(*
start everything up by calling runNextTask the first time *)
runNextTask[];
(* create a disposable that can asynchronously set the flag that's \
checked in the inner loop. *)
disposable["Dispose"] = Function[disposed = True];
disposable]];(* end of def Subscribe; return disposable *)
observable](* end of generateWithTime; return observable *)
ClearAll[MergeColdObservables];
MergeColdObservables[ervables_List] :=
Module[{mergedErvable, completionCount = Length@ervables},
(* Technically,
there is a race here between all the observers running to subscribe to all \
the observables.
Should none of the observables start until all the observers have signed \
up? *)
mergedErvable["Subscribe"] = Function[erver,
MapThread[#1["Subscribe"][#2] &, {
ervables,
Table[createObserver[
Function[ervation, erver["OnNext"][ervation]],
Function[If[(--completionCount) === 0, erver["OnCompleted"][]]],
Function[err, erver["OnError"][err]]],
{Length@ervables}]}]];
mergedErvable]
ClearAll[MapObservable];
(* See my ACM paper on why the function needs to be the second parameter *)
MapObservable[ervable_, func_] :=
Module[{newErvable},
newErvable["Subscribe"] = Function[erver,
ervable["Subscribe"][createObserver[
Function[ervation, erver["OnNext"][func[ervation]]],
Function[erver["OnCompleted"][]],
Function[err, erver["OnError"][err]]]]];
newErvable]
Unit Tests
The following should display traces of two interleaved observers.
(*ClearAll[observer,observable,disposable,task];
observable=
generateWithTime[
(* Initial State *)0,
(* Continue Condition *)#<3&,
(* Result Selector *)#&,
(* Time Selector *)0.10&,
(* State Updater *)#+1&];
observer1=createObserver[
(* OnNext *)Print["Rx 1 ONNEXT: "<>ToString@#]&,
(* OnCompleted *)Print["Rx 1 DONE!"]&,
(* OnError *)Print["Rx 1 ERROR: "<>ToString@#]&];
observer2=createObserver[
(* OnNext *)Print["Rx 2 ONNEXT: "<>ToString@#]&,
(* OnCompleted *)Print["Rx 2 DONE!"]&,
(* OnError *)Print["Rx 2 ERROR: "<>ToString@#]&];*)
(*{observable["Subscribe"][observer1],observable["Subscribe"][observer2]}*)
(*observable["Subscribe"][observer2]*)
(*ClearAll[observer,observable,disposable,task];
observable=
generateWithTime[
(* Initial State *)0,
(* Continue Condition *)#<3&,
(* Result Selector *)If[#===2,Throw[#],#]&,
(* Time Selector *)0.10&,
(* State Updater *)#+1&];
observer=createObserver[
(* OnNext *)Print["Rx ONNEXT: "<>ToString@#]&,
(* OnCompleted *)Print["Rx DONE!"]&,
(* OnError *)Print["Rx ERROR: "<>ToString@#]&];
(* expect two ONNEXT messages and then an ERROR *)
\
disposable=observable["Subscribe"][observer];
(* unless you run the following task; then expect two ONNEXT messages and \
then a DONE *)
(*task=CreateScheduledTask[
(disposable["Dispose"][];
RemoveScheduledTask[task]),
{0.25}];
StartScheduledTask[task];*)*)
The following should always print a zero. Otherwise, generateWithTime or the unit test is leaking tasks.
resetTasks[] := (Print[ScheduledTasks[] // Length];
RemoveScheduledTask[ScheduledTasks[]])
resetTasks[];
0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment