Created
August 21, 2015 00:24
-
-
Save jashmenn/d8f5cbf5fc20640bac30 to your computer and use it in GitHub Desktop.
RxJS Support for Angular 2 Async Pipes - (angular-2.0.0.alpha-35 and above)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <reference path="../../typings/app.d.ts" /> | |
// | |
// Creates a pipe suitable for a RxJS observable: | |
// | |
// @View({ | |
// template: '{{ someObservable | rx}}' | |
// pipes: [RxPipe] | |
// }) | |
// | |
// Originally written by @gdi2290 but updated for 2.0.0.alpha-35 and use AsyncPipe | |
// (Soon the Angular team will be using RxJS natively and this pipe will be | |
// unnecessary because we'll be able to use the `async` pipe.) | |
// | |
// References: | |
// * rxPipeRegistry.ts https://gist.github.com/gdi2290/e9b2880a1d13057197d7 by @gdi2290 | |
// * AsyncPipe https://github.com/angular/angular/blob/master/modules/angular2/src/pipes/async_pipe.ts | |
import {PipeFactory, Pipe, Injectable, bind, ChangeDetectorRef} from "angular2/angular2"; | |
import {AsyncPipe} from "angular2/pipes"; | |
import * as Rx from 'rx'; | |
import {Observable} from 'rx'; | |
function isObservable(obs) { | |
return obs && typeof obs.subscribe === 'function'; | |
} | |
class RxStrategy { | |
createSubscription(async: any, updateLatestValue: any): any { | |
return async.subscribe(updateLatestValue, e => { throw e; }); | |
} | |
dispose(subscription: any): void { subscription.dispose(); } | |
onDestroy(subscription: any): void { subscription.dispose(); } | |
} | |
var _rxStrategy = new RxStrategy(); | |
@Pipe({name: 'rx'}) | |
export class RxPipe extends AsyncPipe { | |
constructor(public _ref: ChangeDetectorRef) { super(_ref); } | |
supports(obs) { return isObservable(obs); } | |
_selectStrategy(obj: Observable<any>): any { | |
return _rxStrategy; | |
} | |
} | |
export var rxPipeInjectables: Array<any> = [ | |
bind(RxPipe).toValue(RxPipe) | |
]; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To get this to work in alpha37, change line 39 to
(After importing Inject). The manual bind().toValue() no longer seem necessary.
_Edit_ The above will stop the errors but it doesn't actually work (necessarily... it could still work as a side-effect). The change detection doesn't automatically kick-off again, after the Observable fires. What I've done, which could be cleaned up a bit:
Import NgZone, and Rx.Scheduler
Inject NgZone into the pipe. Pass it to the strategy, either in the constructor or by setting a property
Change line 29 to
_Note_ The combined use of
observeOn
/NgZone
is to regularize behaviour regardless of whether an observable is asynchronous or synchronous. Otherwise, coding in expectation of a synchronous observable and in fact receiving an asynchronous observable, or vice-versa, can result in anomalous behaviour.Here's a working pipe for .38, (using rx 4.0): https://gist.github.com/endash/776938fad064a7fd88de