Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Forked from jhusain/gist:c8b7703e60bdd2a8fd5f
Last active May 6, 2024 03:19
Show Gist options
  • Save mattpodwysocki/8fb0182a587b759ef478 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/8fb0182a587b759ef478 to your computer and use it in GitHub Desktop.
/*
* Performs a exclusive waiting for the first to finish before subscribing to another observable.
* Observables that come in between subscriptions will be dropped on the floor.
* @returns {Observable} A exclusive observable with only the results that happen when subscribed.
*/
observableProto.exclusive = function () {
var sources = this;
return new AnonymousObservable(function (observer) {
var hasCurrent = false,
isStopped = false,
m = new SingleAssignmentDisposable(),
g = new CompositeDisposable();
g.add(m);
m.setDisposable(sources.subscribe(
function (innerSource) {
if (!hasCurrent) {
hasCurrent = true;
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
var innerSubscription = new SingleAssignmentDisposable();
g.add(innerSubscription);
innerSubscription.setDisposable(innerSource.subscribe(
observer.onNext.bind(observer),
function(e) {
hasCurrent = false;
observer.onError(e);
},
function () {
g.remove(innerSubscription);
hasCurrent = false;
if (isStopped && g.length === 1) {
observer.onCompleted();
}
));
}
},
observer.onError.bind(observer),
function () {
// Don't want to prematurely stop the outer if the inner is still going
isStopped = true;
if (!hasCurrent && g.length === 1) {
observer.onCompleted();
}
}));
return g;
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment