Skip to content

Instantly share code, notes, and snippets.

View cartant's full-sized avatar

Nicholas Jamieson cartant

View GitHub Profile
const findAddresses = actions$ => actions$.pipe(
ofType(actions.FIND_ADDRESSES),
map(action => action.partialAddress),
debounceTime(400),
distinctUntilChanged(),
switchMap(partialAddress => this.backend
.findAddresses(partialAddress)
.pipe(
map(results => actions.findAddressesFulfilled(results)),
catchError(error => of(actions.findAddressesRejected(error)))
@Effect()
public findAddresses = this.actions.pipe(
ofType(LocationActionTypes.FindAddresses),
map(action => action.partialAddress),
debounceTime(400),
distinctUntilChanged(),
switchMap(partialAddress => this.backend
.findAddresses(partialAddress)
.pipe(
map(results => new FindAddressesFulfilled(results)),
const removeFromCart = actions$ => actions$.pipe(
ofType(actions.REMOVE_FROM_CART),
switchMap(action => backend
.removeFromCart(action.payload)
.pipe(
map(response => actions.removeFromCartFulfilled(response)),
catchError(error => of(actions.removeFromCartRejected(error)))
)
)
);
@Effect()
public removeFromCart = this.actions.pipe(
ofType(CartActionTypes.RemoveFromCart),
switchMap(action => this.backend
.removeFromCart(action.payload)
.pipe(
map(response => new RemoveFromCartFulfilled(response)),
catchError(error => of(new RemoveFromCartRejected(error)))
)
)
import { empty } from "rxjs/observable/empty";
import { concatMap, expand } from "rxjs/operators";
import { get } from "./get";
const url = "https://api.github.com/users/sindresorhus/repos";
const repos = get(url).pipe(
expand(({ next }) => next ? get(next) : empty()),
concatMap(({ content }) => content)
);
repos.subscribe(repo => console.log(repo));
import { Observable } from "rxjs/Observable";
import { ajax } from "rxjs/observable/dom/ajax";
import { AjaxResponse } from "rxjs/observable/dom/AjaxObservable";
import { map } from "rxjs/operators";
export function get(url: string): Observable<{
content: object[],
next: string | null
}> {
return ajax.get(url).pipe(
const published = publish<T>()(source) as ConnectableObservable<T>;
const prioritized = new Subject<T>();
const subscription = published.subscribe(prioritized);
subscription.add(selector(prioritized, published).subscribe(observer));
subscription.add(published.connect());
return subscription;
interface AnonymousSubscription {
unsubscribe(): void;
}
type TeardownLogic = AnonymousSubscription | Function | void;
interface ISubscription extends AnonymousSubscription {
unsubscribe(): void;
readonly closed: boolean;
}
function subsequent<T>(
count: number,
operator: (source: Observable<T>) => Observable<T>
): (source: Observable<T>) => Observable<T> {
return (source: Observable<T>) => new Observable<T>(observer => {
const published = source.pipe(publish()) as ConnectableObservable<T>;
const concatenated = concat(
published.pipe(take(count)),
published.pipe(operator)
);
function prioritize<T, R>(
selector: (
prioritized: Observable<T>,
deprioritized: Observable<T>
) => Observable<R>
): (source: Observable<T>) => Observable<R> {
return (source: Observable<T>) => new Observable<T>(observer => {
const published = publish<T>()(source) as ConnectableObservable<T>;
const prioritized = new Subject<T>();
const subscription = new Subscription();