Skip to content

Instantly share code, notes, and snippets.

@baetheus
Last active October 26, 2017 23:17
Show Gist options
  • Select an option

  • Save baetheus/b8986a9373b4d05fae26e3eba087cf8a to your computer and use it in GitHub Desktop.

Select an option

Save baetheus/b8986a9373b4d05fae26e3eba087cf8a to your computer and use it in GitHub Desktop.
fromStream POC
/// <reference path="../node_modules/@types/node/index.d.ts" />
'use strict';
import { fromStream } from './index';
import { createReadStream } from 'fs';
import 'rxjs/add/operator/reduce';
const stream = createReadStream('./example.ts', {encoding: 'utf-8'});
const obs = fromStream(stream, ['data'], ['error'], ['end', 'close']);
obs
.reduce((acc: string, curr: string) => acc += curr)
.subscribe(
n => console.log(n),
e => console.error('Uh oh!', e),
() => 'All Done!'
);
/// <reference path="../node_modules/@types/node/index.d.ts" />
'use strict';
import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs';
import { EventEmitter } from 'events';
import { ok } from 'assert';
export interface metaListener {
type: 'next' | 'error' | 'complete';
event: string;
listener: (...args: any[]) => void;
}
export function fromStream<T>(
emitter: EventEmitter,
nexts: string[],
errors: string[] = [],
completes: string[] = []
): Observable<T> {
ok(nexts.length >= 0, 'Must have at least one event to listen for.');
return Observable.create((observer: Observer<T>) => {
let listeners: metaListener[] = [];
const next = (event: any) => observer.next(event);
const error = (event: any) => observer.error(event);
const complete = () => observer.complete();
nexts.forEach(n => {
emitter.on(n, next);
listeners.push({type: 'next', event: n, listener: next});
});
errors.forEach(e => {
emitter.once(e, error);
listeners.push({type: 'error', event: e, listener: error})
});
completes.forEach(c => {
emitter.once(c, complete);
listeners.push({type: 'complete', event: c, listener: complete})
});
return () => listeners.forEach(l => emitter.removeListener(l.event, l.listener));
});
}
@baetheus
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment