Skip to content

Instantly share code, notes, and snippets.

@blackakula
Last active September 4, 2019 08:32
Show Gist options
  • Save blackakula/b52513a2f81ebe2e74ca3d8f6868cae1 to your computer and use it in GitHub Desktop.
Save blackakula/b52513a2f81ebe2e74ca3d8f6868cae1 to your computer and use it in GitHub Desktop.
async iterator
import {} from 'ts-jest';
import * as parse from 'csv-parse';
import {StreamReadCallback, StreamReadable, streamReadable} from './Readable'
import {CsvRow, csvStreamReadApplyCallbacks} from './Readable_Csv'
import { testing } from 'bs-logger';
interface SpecificCSVRow {
first: string
second: string
}
describe.skip('CSV implementation', () => {
const columns = ['first', 'second', 'third'];
let parser: parse.Parser = parse(`
"1","2",3
"a","b","c"
`, {
trim: true,
skip_empty_lines: true,
columns
})
test('Chain several callbacks for CSV reading stream', () => {
let genericCsvCallback: StreamReadCallback<CsvRow> = (row: CsvRow) => {
expect(Object.keys(row)).toEqual(columns)
}
let specificCsvCallback: StreamReadCallback<SpecificCSVRow> = (row: SpecificCSVRow) => {
expect(typeof row.first).toEqual('string')
expect(typeof row.second).toEqual('string')
}
let memoryCollection: object[] = [];
let memoryCallback: StreamReadCallback<CsvRow> = (row: object) => memoryCollection.push(row);
const streamReadableCsv: StreamReadable<parse.Parser, object, parse.Parser> = streamReadable;
streamReadableCsv(parser, csvStreamReadApplyCallbacks)(genericCsvCallback)(specificCsvCallback)(memoryCallback)().on('end', () => {
expect(memoryCollection).toEqual([
{first: '1', second: '2', third: '3'},
{first: 'a', second: 'b', third: 'c'}
]);
});
})
})
import { StreamReadApplyCallbacks, StreamReadCallback, streamReadable, StreamReadable, StreamReadChainable } from "./Readable";
type Source = string[][];
type Row = string[];
type Result = number;
const data: Source = [
['first', 'second', 'third'],
['value1', 'value2', 'value3'],
['value4', 'value5', 'value6'],
['value7', 'value8', 'value9'],
['value10', 'value11', 'value12'],
['value13', 'value14', 'value15']
];
const applyCallbacks: StreamReadApplyCallbacks<Source, Row, Result> = (injectedStream: Source, callback: StreamReadCallback<Row>): Result => {
for (let i = 0; i < injectedStream.length; ++i) {
callback(injectedStream[i]);
}
return injectedStream.length;
}
type Objects = {[header: string]: string}[];
type InitializedAggregator = {objects: Objects, callback: StreamReadCallback<Row>};
const aggregator = (): InitializedAggregator => {
let headers: Row;
let objects: Objects = [];
return {
objects,
callback: (row: Row) => {
if (headers === undefined) {
headers = row;
return;
}
let rowObject = {};
headers.map((header: string) => rowObject[header] = row.shift())
objects.push(rowObject);
}
}
}
let initializedAggregator: InitializedAggregator = aggregator();
let myReadableStream: StreamReadable<Source, Row, Result> = streamReadable;
let myReadableChainable: StreamReadChainable<Row, Result> = myReadableStream(data, applyCallbacks);
myReadableChainable = myReadableChainable(
(row: Row) => console.log(row)
)
myReadableChainable = myReadableChainable(
initializedAggregator.callback
)
const result: Result = myReadableChainable()
console.log(initializedAggregator.objects);
console.log(result);
export interface StreamReadCallback<T> {
(entry: T): void
}
export interface StreamReadChainable<T, R> {
(callback: StreamReadCallback<any>): StreamReadChainable<T, R>
(): R
}
export interface StreamReadApplyCallbacks<S, T, R> {
(injectedStream: S, callback: StreamReadCallback<T>): R
}
interface CollectStreamCallbacks<S, T, R> {
(
streamInjection: S,
applyCallbacks: StreamReadApplyCallbacks<S, T, R>,
callbacks: StreamReadCallback<T>[]
): StreamReadChainable<T, R>
}
function collectStreamCallbacks<S, T, R>(
streamInjection: S,
applyCallbacks: StreamReadApplyCallbacks<S, T, R>,
callbacks: StreamReadCallback<T>[]
) {
return (callback?: StreamReadCallback<T>) => callback === undefined
? applyCallbacks(
streamInjection,
(record: T) => callbacks.map((callback: StreamReadCallback<T>) => callback(record))
) : collectStreamCallbacks(streamInjection, applyCallbacks, [...callbacks, callback])
}
export interface StreamReadable<S, T, R> {
(streamInjection: S, applyCallbacks: StreamReadApplyCallbacks<S, T, R>): StreamReadChainable<T, R>
}
export const streamReadable = function<S, T, R>(streamInjection: S, applyCallbacks: StreamReadApplyCallbacks<S, T, R>) {
const func: CollectStreamCallbacks<S, T, R> = collectStreamCallbacks as CollectStreamCallbacks<S, T, R>;
return func(streamInjection, applyCallbacks, []);
}
import {Parser} from 'csv-parse';
import {StreamReadCallback, StreamReadApplyCallbacks} from './Readable';
export interface CsvRow {
[key: string]: string
}
export interface CsvStreamReadApplyCallbacks<T> extends StreamReadApplyCallbacks<Parser, T, Parser> {}
export const csvStreamReadApplyCallbacks = function<T>(
injectedStream: Parser,
rowCallback: StreamReadCallback<T>
) {
return injectedStream.on('readable', function () {
let record: T;
while (record = this.read() as T) {
rowCallback(record);
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment