Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created March 30, 2011 01:58
Show Gist options
  • Select an option

  • Save anaisbetts/893726 to your computer and use it in GitHub Desktop.

Select an option

Save anaisbetts/893726 to your computer and use it in GitHub Desktop.
// This works in C# (puts out 1..10, one each second), but the RxJS one
// waits 1 second then dumps out the entire Range:
Observable.Range(1,10)
.Select(x => Observable.Return(x).Delay(TimeSpan.FromMilliseconds(1000)))
.Concat()
// Here's a version I wrote that seems to work in CoffeeScript:
Rx.Observable.WorkingConcat = (x) ->
Rx.Observable.CreateWithDisposable (subj) ->
isCompleted = false
ret = new Rx.CompositeDisposable()
md = new Rx.MutableDisposable()
md.Replace(Rx.Disposable.Empty)
ret.Add(md)
queue = []
#console.log("Starting")
dequeue = () ->
#console.log("Dequeue start")
return if queue.length == 0 or md.Get() != Rx.Disposable.Empty
#console.log("succeeded")
item = queue.shift()
itemDisp = item.Subscribe subj.OnNext, subj.OnError, () ->
#console.log("Completing")
md.Replace(Rx.Disposable.Empty)
if isCompleted and queue.length == 0
subj.OnCompleted()
return
Rx.Scheduler.CurrentThread.Schedule(dequeue)
md.Replace(itemDisp)
onNext = (y) ->
#console.log("Pushing y")
queue.push(y)
return if md.Get() != Rx.Disposable.Empty
Rx.Scheduler.CurrentThread.Schedule(dequeue)
onErr = () -> {}
disp = x.Subscribe(onNext, onErr, () -> isCompleted = true)
ret.Add(disp)
ret
// Here's the JS version:
Rx.Observable.WorkingConcat = function(x) {
return Rx.Observable.CreateWithDisposable(function(subj) {
var dequeue, disp, isCompleted, md, onErr, onNext, queue, ret;
isCompleted = false;
ret = new Rx.CompositeDisposable();
md = new Rx.MutableDisposable();
md.Replace(Rx.Disposable.Empty);
ret.Add(md);
queue = [];
dequeue = function() {
var item, itemDisp;
if (queue.length === 0 || md.Get() !== Rx.Disposable.Empty) {
return;
}
item = queue.shift();
itemDisp = item.Subscribe(subj.OnNext, subj.OnError, function() {
md.Replace(Rx.Disposable.Empty);
if (isCompleted && queue.length === 0) {
subj.OnCompleted();
return;
}
return Rx.Scheduler.CurrentThread.Schedule(dequeue);
});
return md.Replace(itemDisp);
};
onNext = function(y) {
queue.push(y);
if (md.Get() !== Rx.Disposable.Empty) {
return;
}
return Rx.Scheduler.CurrentThread.Schedule(dequeue);
};
onErr = function() {
return {};
};
disp = x.Subscribe(onNext, onErr, function() {
return isCompleted = true;
});
ret.Add(disp);
return ret;
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment