Last active
November 3, 2015 16:53
-
-
Save endash/776938fad064a7fd88de to your computer and use it in GitHub Desktop.
Angular2 Alpha39 rx async pipe: based on using rxjs 4.0. NOTE: removing the `observeOn` will work if an observable resolves asynchronously, but will break for any observer that resolves synchronously.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Creates a pipe suitable for a RxJS observable: | |
* | |
* @View({ | |
* template: '{{ someObservable | rx}}' | |
* pipes: [RxPipe] | |
* }) | |
* | |
* Originally written by @gdi2290 but updated for 2.0.0.alpha-35 and use AsyncPipe | |
* (Soon the Angular team will be using RxJS natively and this pipe will be | |
* unnecessary because we'll be able to use the `async` pipe.) | |
* | |
* References: | |
* * rxPipeRegistry.ts https://gist.github.com/gdi2290/e9b2880a1d13057197d7 by @gdi2290 | |
* * AsyncPipe https://github.com/angular/angular/blob/master/modules/angular2/src/pipes/async_pipe.ts | |
* | |
* @class | |
*/ | |
import {Pipe, bind, ChangeDetectorRef, Inject, Injectable, NgZone, AsyncPipe} from "angular2/angular2"; | |
import {Observable, Scheduler} from "rx"; | |
function isObservable(obs: any): boolean { | |
return obs && typeof obs.subscribe === "function"; | |
} | |
class RxStrategy { | |
constructor(public _zone: NgZone) {} | |
createSubscription(async: any, updateLatestValue: any): any { | |
return async.observeOn(Scheduler.async).subscribe((val) => { this._zone.run(() => { updateLatestValue(val); }); }, (e: Error) => { throw e; }); | |
} | |
dispose(subscription: any): void { subscription.dispose(); } | |
onDestroy(subscription: any): void { subscription.dispose(); } | |
} | |
@Pipe({name: "rx", pure: false}) | |
export class RxPipe extends AsyncPipe { | |
constructor(public _ref: ChangeDetectorRef, public _zone: NgZone) { super(_ref); } | |
supports(obs: any): boolean { return isObservable(obs); } | |
_selectStrategy(obj: Observable<any>): any { | |
return new RxStrategy(this._zone); | |
} | |
} | |
@Pipe({ | |
name: 'get' | |
}) | |
export class GetPipe { | |
transform(value, args) { if (value) { return value[args[0]]; } } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
On alpha 45 I was getting the following on line 29:
Property 'async' does not exist on type '{ nextTick: NextTickScheduler; immediate: ImmediateScheduler; }'.
I removed the .observeOn which fixed the TypeScript errors and the pipe was working correctly in the browser, but I was getting the following console error:
LifeCycle.tick is called recursively
Removing the
this._zone.run
removed the console error.Not sure what my change implicates but changing line 29 to
return async.subscribe((val) => { updateLatestValue(val); }, (e: Error) => { throw e; });
seems to work.