Created
November 24, 2013 04:27
-
-
Save rebcabin/7623311 to your computer and use it in GitHub Desktop.
A basic Observable / Observer in Mathematica / Wolfram
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Or, we can emulate the Reactive Framework's "GenerateWithTime". | |
| Cold GenerateWithTime | |
| Rx.Observable.GenerateWithTime = function( | |
| initialState, // : State | |
| condition, // : State -> bool | |
| resultSelector, // : State -> Result | |
| timeSelector, // : State -> int | |
| iterate) // : State -> State | |
| An observable is a function that returns an object with a Subscribe method. The Subscribe method takes an observer as input. An observer is an object exposing one, two, or three functions: onNext[value], onCompleted, and onError[error]. The Subscribe method, when called with an observer, returns a Disposable: an interface with one method, Dispose. Dispose, when called, unsubscribes the observer and cleans up any fixed resources held by the subscription. | |
| A cold observable is one that starts to produce values only when an observer subscribes to it. A hot observable is always producing values whether any observer is listening or not. Example: the mouse position or a clock. When an observer subscribes, it "hops on the train" and starts receiving callbacks immediately. | |
| Represent objects as symbols with string keys (just like JavaScript). | |
| ClearAll[createObserver]; | |
| createObserver[onNext_, onCompleted_: False, onError_: False] := | |
| Module[{o = Unique[]}, | |
| o["OnNext"] = onNext; | |
| If[onCompleted =!= False, o["OnCompleted"] = onCompleted]; | |
| If[onError =!= False, o["OnError"] = onError]; | |
| o]; | |
| GenerateWithTime returns a restartable, interleavable observable: | |
| ClearAll[generateWithTime]; | |
| generateWithTime[ | |
| initialState_, | |
| condition_,(* State \[Rule] Bool *) | |
| resultSelector_, (* State \[Rule] | |
| Result *) | |
| timeSelector_, (* State \[Rule] Integer *) | |
| iterate_ (* | |
| State \[Rule] State *)] := | |
| Module[{ | |
| observable = Unique[], | |
| (* Mathematica "Catch" wraps the try block - | |
| so is more like a combo of C# catch & finally. | |
| "Catch" must return something. We'll have it return a unique symbol. | |
| The default case signals that no Throw was evaluated. *) | |
| defaultCatchResult = Unique[] | |
| }, | |
| observable["Subscribe"] = Function[observer, | |
| Module[{ | |
| (* unpack the observer *) | |
| (* | |
| ValueQ is Mathematica equivalent to JavaScript testing against \ | |
| undefined. If ValueQ[expr] is true, | |
| then the expression has a value other than itself. *) | |
| hasOnNext = ValueQ[observer["OnNext"]], | |
| OnNext = observer["OnNext"], | |
| hasOnCompleted = ValueQ[observer["OnCompleted"]], | |
| OnCompleted = observer["OnCompleted"], | |
| hasOnError = ValueQ[observer["OnError"]], | |
| OnError = observer["OnError"], | |
| (* set up non-interleaving task structure *) | |
| lastTaskForCleanup, | |
| runNextTask, | |
| (* initialize stuff when observer subscribes *) | |
| state = initialState, | |
| disposed = False, | |
| disposable = Unique[] | |
| }, | |
| (* Don't catch this throw here. This is a bad call of generateWithTime *) | |
| If[Not[hasOnNext], Throw["Observer must have OnNext"]]; | |
| (* The following Clear is a trick that makes runNextTask invulnerable to \ | |
| garbage collection in the time window between two invocations. *) | |
| (* | |
| see http://mathematica.stackexchange.com/questions/3807/module-variable- | |
| scoping-in-scheduled-tasks *) | |
| ClearAll[runNextTask]; | |
| (* "runNextTask" is a function that does work and then schedules another \ | |
| invocation of itself. *) | |
| runNextTask = Function[ | |
| Module[{oldState = state, proceed, catchResult}, | |
| (* | |
| Clean up the last task -- the one that scheduled the current \ | |
| invocation *) | |
| (Quiet@RemoveScheduledTask@lastTaskForCleanup; | |
| (* catch errors in creating the task and in the user-supplied time- | |
| selector function *) | |
| catchResult = Catch[ | |
| (* | |
| create a task to do the work on the current invocation and save it \ | |
| for later cleanup by the next invocation *) | |
| lastTaskForCleanup = | |
| CreateScheduledTask[ | |
| (* catch errors in the observer's OnNext, OnCompleted, | |
| and in the user-supplied condition, resultSelector, | |
| and iterate functions *) | |
| catchResult = Catch[ | |
| (* | |
| find out if we should do this iteration and all following \ | |
| iterations -- invoke caller-supplied "condition" function. *) | |
| proceed = condition[oldState]; | |
| If[proceed && Not[disposed], | |
| (* | |
| if we should go then call observer's OnNext with result of \ | |
| caller-supplied resultSelector[ | |
| currentState] *) | |
| (OnNext[ | |
| resultSelector[oldState]]; | |
| (* update state via user-supplied lambda *) | |
| state = iterate[oldState]; | |
| (* recurse to create the next task *) | |
| runNextTask[]), | |
| (* else, if we should NOT go, clean up, | |
| call user's OnCompleted, | |
| and don't schedule next task *) | |
| (Quiet@ | |
| RemoveScheduledTask@lastTaskForCleanup; | |
| If[hasOnCompleted, OnCompleted[]])]; | |
| (* if nothing was thrown, | |
| return the following as the value of the Catch *) | |
| \ | |
| (* this is the end of the INNER catch block *) | |
| defaultCatchResult]; | |
| (* check if something was thrown *) | |
| If[ | |
| catchResult =!= defaultCatchResult && hasOnError, | |
| (* if something was thrown, call OnError and cleaup. | |
| If observer's OnError throws, let that escape; | |
| so be it *) | |
| (OnError[catchResult]; | |
| Quiet@RemoveScheduledTask@lastTaskForCleanup)], | |
| (* the following is how long to wait before starting the task. | |
| If it throws, | |
| catch in our outer Catch block *) | |
| {timeSelector[ | |
| oldState]}]; | |
| (* if nothing was thrown, | |
| return the following as the value of the Catch *) | |
| (* | |
| this is the end of the OUTER catch block *) | |
| defaultCatchResult]; | |
| (* Check for throw by timeSelector *) | |
| If[ | |
| catchResult =!= defaultCatchResult && hasOnError, | |
| (* if caller- | |
| supplied timeSelector threw or something else went wrong, | |
| call OnError and cleaup. If OnError throws, let it escape; | |
| so be it *) | |
| (OnError[catchResult]; | |
| Quiet@RemoveScheduledTask@lastTaskForCleanup), | |
| (* | |
| This version of generateWithTime does not start the first task \ | |
| immediately, | |
| but rather after the delay specified by the result of calling the \ | |
| user's time selector on the initial state value. | |
| That gives the user the opportunity to race the task and call \ | |
| dispose before the first observation is created. *) | |
| StartScheduledTask @ lastTaskForCleanup] | |
| )(* End of Module internal to runNextTask *) | |
| ](* | |
| return value of runNextTask *) | |
| ];(* | |
| end of def runNextTask *) | |
| (* | |
| start everything up by calling runNextTask the first time *) | |
| runNextTask[]; | |
| (* create a disposable that can asynchronously set the flag that's \ | |
| checked in the inner loop. *) | |
| disposable["Dispose"] = Function[disposed = True]; | |
| disposable]];(* end of def Subscribe; return disposable *) | |
| observable](* end of generateWithTime; return observable *) | |
| ClearAll[MergeColdObservables]; | |
| MergeColdObservables[ervables_List] := | |
| Module[{mergedErvable, completionCount = Length@ervables}, | |
| (* Technically, | |
| there is a race here between all the observers running to subscribe to all \ | |
| the observables. | |
| Should none of the observables start until all the observers have signed \ | |
| up? *) | |
| mergedErvable["Subscribe"] = Function[erver, | |
| MapThread[#1["Subscribe"][#2] &, { | |
| ervables, | |
| Table[createObserver[ | |
| Function[ervation, erver["OnNext"][ervation]], | |
| Function[If[(--completionCount) === 0, erver["OnCompleted"][]]], | |
| Function[err, erver["OnError"][err]]], | |
| {Length@ervables}]}]]; | |
| mergedErvable] | |
| ClearAll[MapObservable]; | |
| (* See my ACM paper on why the function needs to be the second parameter *) | |
| MapObservable[ervable_, func_] := | |
| Module[{newErvable}, | |
| newErvable["Subscribe"] = Function[erver, | |
| ervable["Subscribe"][createObserver[ | |
| Function[ervation, erver["OnNext"][func[ervation]]], | |
| Function[erver["OnCompleted"][]], | |
| Function[err, erver["OnError"][err]]]]]; | |
| newErvable] | |
| Unit Tests | |
| The following should display traces of two interleaved observers. | |
| (*ClearAll[observer,observable,disposable,task]; | |
| observable= | |
| generateWithTime[ | |
| (* Initial State *)0, | |
| (* Continue Condition *)#<3&, | |
| (* Result Selector *)#&, | |
| (* Time Selector *)0.10&, | |
| (* State Updater *)#+1&]; | |
| observer1=createObserver[ | |
| (* OnNext *)Print["Rx 1 ONNEXT: "<>ToString@#]&, | |
| (* OnCompleted *)Print["Rx 1 DONE!"]&, | |
| (* OnError *)Print["Rx 1 ERROR: "<>ToString@#]&]; | |
| observer2=createObserver[ | |
| (* OnNext *)Print["Rx 2 ONNEXT: "<>ToString@#]&, | |
| (* OnCompleted *)Print["Rx 2 DONE!"]&, | |
| (* OnError *)Print["Rx 2 ERROR: "<>ToString@#]&];*) | |
| (*{observable["Subscribe"][observer1],observable["Subscribe"][observer2]}*) | |
| (*observable["Subscribe"][observer2]*) | |
| (*ClearAll[observer,observable,disposable,task]; | |
| observable= | |
| generateWithTime[ | |
| (* Initial State *)0, | |
| (* Continue Condition *)#<3&, | |
| (* Result Selector *)If[#===2,Throw[#],#]&, | |
| (* Time Selector *)0.10&, | |
| (* State Updater *)#+1&]; | |
| observer=createObserver[ | |
| (* OnNext *)Print["Rx ONNEXT: "<>ToString@#]&, | |
| (* OnCompleted *)Print["Rx DONE!"]&, | |
| (* OnError *)Print["Rx ERROR: "<>ToString@#]&]; | |
| (* expect two ONNEXT messages and then an ERROR *) | |
| \ | |
| disposable=observable["Subscribe"][observer]; | |
| (* unless you run the following task; then expect two ONNEXT messages and \ | |
| then a DONE *) | |
| (*task=CreateScheduledTask[ | |
| (disposable["Dispose"][]; | |
| RemoveScheduledTask[task]), | |
| {0.25}]; | |
| StartScheduledTask[task];*)*) | |
| The following should always print a zero. Otherwise, generateWithTime or the unit test is leaking tasks. | |
| resetTasks[] := (Print[ScheduledTasks[] // Length]; | |
| RemoveScheduledTask[ScheduledTasks[]]) | |
| resetTasks[]; | |
| 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment