Skip to content

Instantly share code, notes, and snippets.

@mkulke
Last active October 13, 2017 22:14
Show Gist options
  • Save mkulke/58441f5614b464ee1e8541ab8128a074 to your computer and use it in GitHub Desktop.
Save mkulke/58441f5614b464ee1e8541ab8128a074 to your computer and use it in GitHub Desktop.
rx_tx_handling.ts
import Rx = require('rxjs/Rx');
interface Obj {
id: number;
}
type Chunk = Obj[]
const chunks$: Rx.Observable<Chunk> = Rx.Observable.from([
[ { id: 1 }, { id: 2} ],
[ { id: 3 }, { id: 2} ],
[ { id: 1 } ],
[ { id: 5 }, { id: 7}, { id: 4 } ],
]);
interface Acc {
chunk?: Chunk;
ids: number[];
}
const seed: Acc = {
ids: [],
}
function omitDuplicates(chunks$: Rx.Observable<Chunk>): Rx.Observable<Chunk> {
return chunks$
.scan((acc, chunk) => {
const filteredChunk = chunk.filter(obj => {
return acc.ids.indexOf(obj.id) === -1;
});
const ids = filteredChunk.map(obj => obj.id);
return {
chunk: filteredChunk,
ids: [...acc.ids, ...ids],
};
}, seed)
.map(acc => acc.chunk);
}
interface Client {
startTx: () => Promise<void>;
endTx: () => Promise<void>;
abortTx: () => Promise<void>;
insert: (chunk: Chunk) => Promise<void>;
}
function connect(): Promise<Client> {
const client = {
startTx() {
console.log('start tx');
return Promise.resolve();
},
endTx() {
console.log('end tx');
return Promise.resolve();
},
abortTx() {
console.log('abort tx');
return Promise.resolve();
},
insert(chunk: Chunk) {
console.log('written: %j', chunk);
return Promise.resolve();
},
};
return Promise.resolve(client);
}
function tx(client: Client, queries$: Rx.Observable<Chunk>): Rx.Observable<void> {
return Rx.Observable.defer(client.startTx)
.mergeMap(() => Rx.Observable.forkJoin(queries$))
.mergeMap(() => Rx.Observable.defer(client.endTx))
.catch(error => {
return Rx.Observable.defer(client.abortTx)
.mergeMap(() => Rx.Observable.throw(error));
})
;
}
const queries$ = chunks$
.let(omitDuplicates)
.do(chunk => {
if (chunk.length > 2) {
throw new Error('fail!');
}
})
.do(console.log)
;
const connection = connect();
Rx.Observable.from(connection)
.mergeMap(client => tx(client, queries$))
.subscribe();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment