Created
March 30, 2011 01:58
-
-
Save anaisbetts/893726 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // This works in C# (puts out 1..10, one each second), but the RxJS one | |
| // waits 1 second then dumps out the entire Range: | |
| Observable.Range(1,10) | |
| .Select(x => Observable.Return(x).Delay(TimeSpan.FromMilliseconds(1000))) | |
| .Concat() | |
| // Here's a version I wrote that seems to work in CoffeeScript: | |
| Rx.Observable.WorkingConcat = (x) -> | |
| Rx.Observable.CreateWithDisposable (subj) -> | |
| isCompleted = false | |
| ret = new Rx.CompositeDisposable() | |
| md = new Rx.MutableDisposable() | |
| md.Replace(Rx.Disposable.Empty) | |
| ret.Add(md) | |
| queue = [] | |
| #console.log("Starting") | |
| dequeue = () -> | |
| #console.log("Dequeue start") | |
| return if queue.length == 0 or md.Get() != Rx.Disposable.Empty | |
| #console.log("succeeded") | |
| item = queue.shift() | |
| itemDisp = item.Subscribe subj.OnNext, subj.OnError, () -> | |
| #console.log("Completing") | |
| md.Replace(Rx.Disposable.Empty) | |
| if isCompleted and queue.length == 0 | |
| subj.OnCompleted() | |
| return | |
| Rx.Scheduler.CurrentThread.Schedule(dequeue) | |
| md.Replace(itemDisp) | |
| onNext = (y) -> | |
| #console.log("Pushing y") | |
| queue.push(y) | |
| return if md.Get() != Rx.Disposable.Empty | |
| Rx.Scheduler.CurrentThread.Schedule(dequeue) | |
| onErr = () -> {} | |
| disp = x.Subscribe(onNext, onErr, () -> isCompleted = true) | |
| ret.Add(disp) | |
| ret | |
| // Here's the JS version: | |
| Rx.Observable.WorkingConcat = function(x) { | |
| return Rx.Observable.CreateWithDisposable(function(subj) { | |
| var dequeue, disp, isCompleted, md, onErr, onNext, queue, ret; | |
| isCompleted = false; | |
| ret = new Rx.CompositeDisposable(); | |
| md = new Rx.MutableDisposable(); | |
| md.Replace(Rx.Disposable.Empty); | |
| ret.Add(md); | |
| queue = []; | |
| dequeue = function() { | |
| var item, itemDisp; | |
| if (queue.length === 0 || md.Get() !== Rx.Disposable.Empty) { | |
| return; | |
| } | |
| item = queue.shift(); | |
| itemDisp = item.Subscribe(subj.OnNext, subj.OnError, function() { | |
| md.Replace(Rx.Disposable.Empty); | |
| if (isCompleted && queue.length === 0) { | |
| subj.OnCompleted(); | |
| return; | |
| } | |
| return Rx.Scheduler.CurrentThread.Schedule(dequeue); | |
| }); | |
| return md.Replace(itemDisp); | |
| }; | |
| onNext = function(y) { | |
| queue.push(y); | |
| if (md.Get() !== Rx.Disposable.Empty) { | |
| return; | |
| } | |
| return Rx.Scheduler.CurrentThread.Schedule(dequeue); | |
| }; | |
| onErr = function() { | |
| return {}; | |
| }; | |
| disp = x.Subscribe(onNext, onErr, function() { | |
| return isCompleted = true; | |
| }); | |
| ret.Add(disp); | |
| return ret; | |
| }); | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment