Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Forked from jhusain/gist:c8b7703e60bdd2a8fd5f
Last active May 6, 2024 03:19

Revisions

  1. mattpodwysocki renamed this gist Jul 9, 2014. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. mattpodwysocki revised this gist Jul 9, 2014. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -40,9 +40,9 @@
    },
    observer.onError.bind(observer),
    function () {
    hasCurrent = false;
    // Don't want to prematurely stop the outer if the inner is still going
    isStopped = true;
    if (g.length === 1) {
    if (!hasCurrent && g.length === 1) {
    observer.onCompleted();
    }
    }));
  3. @jhusain jhusain revised this gist Jul 9, 2014. 1 changed file with 22 additions and 18 deletions.
    40 changes: 22 additions & 18 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -7,41 +7,45 @@
    var sources = this;
    return new AnonymousObservable(function (observer) {
    var hasCurrent = false,
    isStopped = true,
    isStopped = false,
    m = new SingleAssignmentDisposable(),
    g = new CompositeDisposable();

    if (!hasCurrent) {
    hasCurrent = true;
    g.add(m);

    m.setDisposable(sources.subscribe(
    function (innerSource) {
    g.add(m);

    m.setDisposable(sources.subscribe(
    function (innerSource) {
    if (!hasCurrent) {
    hasCurrent = true;

    isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));

    var innerSubscription = new SingleAssignmentDisposable();
    g.add(innerSubscription);

    innerSubscription.setDisposable(innerSource.subscribe(
    observer.onNext.bind(observer),
    observer.onError.bind(onError),
    function(e) {
    hasCurrent = false;
    observer.onError(e);
    },
    function () {
    g.remove(innerSubscription);
    if (isStopped && !hasCurrent && g.length === 1) {
    hasCurrent = false;
    if (isStopped && g.length === 1) {
    observer.onCompleted();
    }
    ));
    },
    observer.onError.bind(observer),
    function () {
    hasCurrent = false;
    isStopped = true;
    if (g.length === 1) {
    observer.onCompleted();
    }
    }));
    }
    },
    observer.onError.bind(observer),
    function () {
    hasCurrent = false;
    isStopped = true;
    if (g.length === 1) {
    observer.onCompleted();
    }
    }));

    return g;
    });
  4. mattpodwysocki created this gist Jul 8, 2014.
    48 changes: 48 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,48 @@
    /*
    * Performs a exclusive waiting for the first to finish before subscribing to another observable.
    * Observables that come in between subscriptions will be dropped on the floor.
    * @returns {Observable} A exclusive observable with only the results that happen when subscribed.
    */
    observableProto.exclusive = function () {
    var sources = this;
    return new AnonymousObservable(function (observer) {
    var hasCurrent = false,
    isStopped = true,
    m = new SingleAssignmentDisposable(),
    g = new CompositeDisposable();

    if (!hasCurrent) {
    hasCurrent = true;
    g.add(m);

    m.setDisposable(sources.subscribe(
    function (innerSource) {

    isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));

    var innerSubscription = new SingleAssignmentDisposable();
    g.add(innerSubscription);

    innerSubscription.setDisposable(innerSource.subscribe(
    observer.onNext.bind(observer),
    observer.onError.bind(onError),
    function () {
    g.remove(innerSubscription);
    if (isStopped && !hasCurrent && g.length === 1) {
    observer.onCompleted();
    }
    ));
    },
    observer.onError.bind(observer),
    function () {
    hasCurrent = false;
    isStopped = true;
    if (g.length === 1) {
    observer.onCompleted();
    }
    }));
    }

    return g;
    });
    };