-
-
Save jashmenn/d8f5cbf5fc20640bac30 to your computer and use it in GitHub Desktop.
/// <reference path="../../typings/app.d.ts" /> | |
// | |
// Creates a pipe suitable for a RxJS observable: | |
// | |
// @View({ | |
// template: '{{ someObservable | rx}}' | |
// pipes: [RxPipe] | |
// }) | |
// | |
// Originally written by @gdi2290 but updated for 2.0.0.alpha-35 and use AsyncPipe | |
// (Soon the Angular team will be using RxJS natively and this pipe will be | |
// unnecessary because we'll be able to use the `async` pipe.) | |
// | |
// References: | |
// * rxPipeRegistry.ts https://gist.github.com/gdi2290/e9b2880a1d13057197d7 by @gdi2290 | |
// * AsyncPipe https://github.com/angular/angular/blob/master/modules/angular2/src/pipes/async_pipe.ts | |
import {PipeFactory, Pipe, Injectable, bind, ChangeDetectorRef} from "angular2/angular2"; | |
import {AsyncPipe} from "angular2/pipes"; | |
import * as Rx from 'rx'; | |
import {Observable} from 'rx'; | |
function isObservable(obs) { | |
return obs && typeof obs.subscribe === 'function'; | |
} | |
class RxStrategy { | |
createSubscription(async: any, updateLatestValue: any): any { | |
return async.subscribe(updateLatestValue, e => { throw e; }); | |
} | |
dispose(subscription: any): void { subscription.dispose(); } | |
onDestroy(subscription: any): void { subscription.dispose(); } | |
} | |
var _rxStrategy = new RxStrategy(); | |
@Pipe({name: 'rx'}) | |
export class RxPipe extends AsyncPipe { | |
constructor(public _ref: ChangeDetectorRef) { super(_ref); } | |
supports(obs) { return isObservable(obs); } | |
_selectStrategy(obj: Observable<any>): any { | |
return _rxStrategy; | |
} | |
} | |
export var rxPipeInjectables: Array<any> = [ | |
bind(RxPipe).toValue(RxPipe) | |
]; | |
To get this to work in alpha37, change line 39 to
constructor(@Inject(ChangeDetectorRef) public _ref: ChangeDetectorRef) { super(_ref); }
(After importing Inject). The manual bind().toValue() no longer seem necessary.
_Edit_ The above will stop the errors but it doesn't actually work (necessarily... it could still work as a side-effect). The change detection doesn't automatically kick-off again, after the Observable fires. What I've done, which could be cleaned up a bit:
-
Import NgZone, and Rx.Scheduler
-
Inject NgZone into the pipe. Pass it to the strategy, either in the constructor or by setting a property
-
Change line 29 to
return async.observeOn(Scheduler.timeout).subscribe((val) => { this._zone.run(() => { updateLatestValue(val); }); }, (e: Error) => { throw e; });
_Note_ The combined use of observeOn
/NgZone
is to regularize behaviour regardless of whether an observable is asynchronous or synchronous. Otherwise, coding in expectation of a synchronous observable and in fact receiving an asynchronous observable, or vice-versa, can result in anomalous behaviour.
Here's a working pipe for .38, (using rx 4.0): https://gist.github.com/endash/776938fad064a7fd88de
nice one jashmenn, most helpful!