Skip to content

Instantly share code, notes, and snippets.

@alxhub
Last active March 19, 2016 00:18
Show Gist options
  • Save alxhub/b3c6eb3ec15926da6b75 to your computer and use it in GitHub Desktop.
Save alxhub/b3c6eb3ec15926da6b75 to your computer and use it in GitHub Desktop.
import {Injectable} from 'angular2/core';
import {Store, Action} from '@ngrx/store';
import {Query} from './query';
interface QueryEvent {
type: string;
query: Query;
}
function trackQueryEvents(queries: Observable<Query[]>): Observable<QueryEvent> {
var oldQueries = [];
return queries.flatMap((queries: Query[]) => {
var res = Observable.fromArray([]
.concat(queries
.filter(query =>
oldQueries.find(oldQuery => oldQuery.match(query)) === undefined)
.map(query => {type: 'added', query}))
.concat(oldQueries
.filter(oldQuery => queries.find(query => query.match(oldQuery)) === undefined)
.map(oldQuery => {type: 'removed', query})));
oldQueries = queries;
return res;
});
}
export interface BackendAdapter {
query(query: Query): Observable<any[]>;
}
function makeToAction(resource: string): Function {
return (source: Observable<any[]>): Observable<Action> => source
.flatMap(resp => Observable.fromArray(resp))
.map(entity => {
type: RESOURCE_ADD,
payload: {
resource: query.resource,
entity
}
});
}
export class BackendManager {
queryEvents: Observable<QueryEvent>
constructor(store: Store<any>, adapter: BackendAdapter) {
queryEvents = store.select('queries').let(trackQueryEvents);
this
.queries
// Map queries to actions that update entities in the store.
.flatMap(query => {
// Filter the stream of cancelled queries to detect when this query
// is cancelled.
var queryCancelled = this
.cancelledQueries
.filter(cancelled => cancelled.match(query))
.take(1);
// Stream of responses with reference counting, so one query is made
// despite multiple subscriptions below.
var responses = adapter
.query(query)
.takeUntil(queryCancelled)
.publish()
.refCount();
// Make an operator function that turns responses into actions.
var responsesToActions = makeToAction(query.resource);
// Join together first response entities, query fulfillment action, and subsequent responses.
return Observable
.concat(
// This is the first response (which could span multiple RESOURCE_ADD actions).
responses
.first()
.let(responsesToActions),
// After the first responses is received, dispatch a QUERY_FULFILL action to inform consumers
// that authoritative data is available.
responses
.first()
.mapTo({
type: QUERY_FULFILL,
payload: query
}),
// The backend may continue to send updated entities, so dispatch those as they come in.
responses
.skip(1)
.let(responsesToActions);
);
})
// Reflect all actions to the store.
.subscribe(action => store.dispatch(action));
}
get queries(): Observable<Query> {
return this
.queryEvents
.filter(event => event.type == 'added')
.map(event => event.query);
}
get cancelledQueries(): Observable<Query> {
return this
.queryEvents
.filter(event => event.type == 'removed')
.map(event => event.query);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment