Skip to content

Instantly share code, notes, and snippets.

@ajcrites
Created March 6, 2018 23:02
Show Gist options
  • Save ajcrites/5d4c76495759ae8104b75d59b568845c to your computer and use it in GitHub Desktop.
Save ajcrites/5d4c76495759ae8104b75d59b568845c to your computer and use it in GitHub Desktop.
import { timer } from 'rxjs/observable/timer';
import { fromEvent } from 'rxjs/observable/fromEvent';
import {
concatMap,
filter,
map,
switchMap,
takeUntil,
tap,
} from 'rxjs/operators';
import { createReadStream, createWriteStream } from 'fs';
timer(1000)
.pipe(
switchMap(() => {
const reader = createReadStream('./index.ts');
const writer = createWriteStream('./index.ts.out');
return fromEvent(reader, 'readable').pipe(
// read when data is available (from 'readable' event)
map(() => reader.read()),
// strip out null termination or anything empty
filter(Boolean),
tap(chunk =>
writer.write(
chunk
.toString()
.split('')
.reverse()
.join(''),
),
),
// complete the observable when reader closes.
// otherwise observable will never complete although the script will
// end when all file descriptors are closed
takeUntil(fromEvent(reader, 'close')),
);
}),
)
.subscribe({
error(err) {
console.error(`Processing error: ${err}`);
},
complete() {
console.log('Processing done');
},
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment