Last active
June 19, 2019 03:53
-
-
Save mannharleen/f1a684d7b285cdf4b3834d6f1a41a6d3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
nodejs tg |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// async always returns a promise | |
// (1) using await, pause the execution of the function.. event loop will NOT be blocked | |
// (2) without await, execute other statements in the function | |
// output of (1): 1 is printed, wait...., 2 & p2 are printed | |
// output of (2): 1 & 2 are printed, wait....., p2 is printed | |
async function f1() { | |
console.log(1); | |
res = await new Promise((resolve, reject) => { // use of await here | |
setTimeout(f2, 3000); | |
function f2() {resolve('p2')}; | |
}); | |
console.log(2); | |
return res; | |
} | |
f1().then(res => console.log(res)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
nodejs child processes | |
// node.js <-> OS | |
1. spawn: execute command new process. | |
on('exit') child exited with code and signal | |
on('disconnect') parent calls child to disconect | |
on('error') child errored or could not be killed | |
on('message') child usese process.sent to send messages | |
on('close') when child processs stdio (stdin, out, err) are closed | |
Spawn can also created shell with {shell: true} option | |
Spawn can inherit the stdios with {stdio: 'inherit'} option | |
Other options: env, cwd | |
Opton: detached => parent process can exit indipendently of child. {stdio: 'ignore'} must be used | |
2. exec: (vs spawn) it creates a shell. Also, IMPO: it buffers the output generated and passes whole value to callback func | |
3. fork: for spawning node processes. a communication channel is established | |
forked.on('message') | |
forked.send() | |
And in child: | |
process.on('message') | |
process.send() | |
4. execFile: just like exec. no shell | |
const {spawn} = require ('child_process'); | |
const child = spawn('ls', ['-lrt']); | |
child.stdout.on('data', (data)=> console.log(`stdout = ${data}`) ); | |
child.on('exit', () => ...) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fs = require('fs'); | |
http = require('http') | |
// PART1: generate a big file | |
file = fs.createWriteStream('bigfile.txt'); | |
async function f1() { | |
console.time('totalTime'); | |
for (i=0; i<=1e5; i++) { | |
data = Date.now().toString() + 'a\n' | |
if (!file.write(data)) { | |
await new Promise((resolve, reject ) => file.once('drain',resolve)); | |
} | |
} | |
} | |
f1().then(() => { | |
file.end(); | |
console.timeEnd('totalTime'); | |
}); | |
// PART 2: download a big file | |
inpStream = fs.createReadStream('bigfile.txt') | |
http.createServer((req, res) => { | |
inpStream.pipe(res); | |
// fs.readFile('bigfile.txt', | |
// (err, data) => { | |
// if (err) { | |
// throw err | |
// } | |
// console.log('finish reading file'); | |
// res.end(data); | |
// } | |
// ) | |
console.log('begin reading file') | |
}).listen(8080, () => console.log('Listening on 8080')) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// bind is used to define that value will be "this" | |
function f1() { | |
x = 10; | |
return this.x; | |
} | |
data = {x: 20} // we will bind f1 to data so that this.x becomes = 20 | |
console.log(f1() + " - " + f1.bind(data)()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict" | |
// the simplest custom stream | |
const stream = require('stream'); | |
let rs = new stream.Readable(); | |
let ws = new stream.Writable(); | |
rs._read = () => {}; | |
ws._write = function(chunk, encoding, callback) { | |
console.log(chunk.toString()); | |
callback() | |
}; | |
rs.pipe(ws); | |
rs.push("msg1"); | |
rs.push("msg2"); | |
// end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// tg creating steams.js | |
const stream = require('stream'); | |
const util = require('util'); | |
// on('readable', )/read() | |
// pause()/resume() | |
r1data = "123456789" | |
r1 = new stream.Readable({ | |
objectMode: true, | |
read() { | |
size =1 | |
x = r1data.substring(0,size) | |
r1data = r1data.substring(size) | |
if (x) this.push(x) | |
else this.push(null) | |
} | |
}); | |
r2data = [{a:1}, {b:2}, {c:3}, {d:4}] | |
r2 = new stream.Readable({ | |
objectMode: true, | |
read() { | |
size =1 | |
x = r2data.splice(0,size) | |
// data = data.splice(size) | |
if (x.length > 0) this.push(x[0]) | |
else this.push(null) | |
} | |
}) | |
t1 = new stream.Transform({ | |
objectMode: true, | |
transform(data, encoding, callback) { | |
this.push(parseFloat(Object.values(data)) + 90) | |
callback(null) | |
} | |
}); | |
w1 = new stream.Writable({ | |
objectMode: true, | |
write(chunk, encoding, callback) { | |
console.log("w1 ** " + util.inspect(chunk, false)) | |
callback() | |
} | |
}); | |
async function etl(name, ...streams) { | |
console.log('starting pipeling ' + name) | |
await stream.pipeline( | |
streams, | |
(err) => { | |
if (err) console.error("ERROR ", err) | |
// else console.log('Pipeline success') | |
} | |
) | |
console.log('finishing pipeling ' + name) | |
} | |
etl('pl1', r1, w1) | |
etl('pl2', r2, w1) | |
etl('pl3', r2, t1, w1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//usage: node <nameOfFile> "<sql>" | |
// implements use of __content and __proto__ for storing attributes | |
const stream = require('stream'); | |
const util = require('util') | |
const alasql = require('alasql') | |
/* example sqls | |
select id, name, if age>10 then "yes" from ? | |
select id, name, (case when age>10 then 'yes' else 'no' end) as gt10 from ? | |
*/ | |
r1data = [{id:1, name: 'amber', age:10, class_: 'A'},{id:2, name: 'bear', age:11, class_:'A'}] | |
sql = process.argv[2] || "select * from ?" | |
source = {} | |
source.__filename = 'xxx'; | |
r1 = new stream.Readable({ | |
objectMode: true, | |
read() { | |
x = Object.create(source) | |
x.__content = r1data.shift(1) | |
// console.log(x.__proto__.__filename) | |
if (x.__content != null) this.push(x) | |
else this.push(null) | |
} | |
}) | |
t1 = new stream.Transform({ | |
objectMode: true, | |
transform(chunk, e, cb) { | |
x = Object.create(chunk.__proto__) | |
x.__content = alasql(sql, [[chunk.__content]]) | |
if (x.__content.length > 0) this.push(x) | |
cb() | |
} | |
}) | |
// implementing t1 using extends | |
t2 = class T2 extends stream.Transform { | |
constructor(options) { | |
super(options); | |
} | |
_transform(chunk, e, cb) { | |
x = Object.create(chunk.__proto__) | |
x.__content = alasql(sql, [[chunk.__content]]) | |
if (x.__content.length > 0) this.push(x) | |
cb() | |
} | |
} | |
w1 = new stream.Writable({ | |
objectMode: true, | |
write(chunk, encoding, cb) { | |
console.log('w1 - ' + util.inspect(chunk, false) + ' & ' + util.inspect(chunk.__proto__, false)) | |
cb(null) | |
} | |
}) | |
// r1.pipe(w1) | |
r1.pipe(new t2({objectMode: true})).pipe(w1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
const fs = require('fs'); | |
const util = require('util') | |
const alasql = require('alasql') | |
const csv_parser = require('csv-parser')().__proto__.constructor | |
const bufferFrom = require('buffer-from') | |
const bufferAlloc = require('buffer-alloc') | |
const [cr] = bufferFrom('\r') | |
const [nl] = bufferFrom('\n') | |
const defaults = { | |
escape: '"', | |
headers: null, | |
mapHeaders: ({ header }) => header, | |
mapValues: ({ value }) => value, | |
newline: '\n', | |
quote: '"', | |
raw: false, | |
separator: ',', | |
skipComments: false, | |
skipLines: null, | |
maxRowBytes: Number.MAX_SAFE_INTEGER, | |
strict: false | |
} | |
////////////////////// | |
class ArrayStream extends stream.Readable { | |
constructor(data, options = {}) { | |
options.objectMode = true | |
super(options) | |
this.data = data.data | |
// console.log(data.data) | |
} | |
_read() { | |
let x = this.data.shift(1) | |
// console.log(x) | |
if (x != null) this.push(x) | |
else this.push(null) | |
} | |
} | |
class Dir extends stream.Transform { | |
constructor(path, options = {}) { | |
options.objectMode = true | |
super(options) | |
this.info = options.info || {}; | |
this.path = path | |
this.info.__path = path; | |
fs.readdir(path, options, (err, files) => { | |
// console.log(files) | |
const obj = Object.create(this.info); | |
if (files && files.length > 0) obj.data = files; | |
else obj.data = []; | |
new ArrayStream(obj).pipe(this) | |
}) | |
} | |
_transform(chunk, encoding, callback) { | |
this.push({"data": chunk}) | |
callback(null) | |
} | |
} | |
class File extends stream.Transform { | |
constructor(file, options = {}) { | |
options.objectMode = true | |
super(options) | |
// file = file; | |
fs.createReadStream(file, options = options).pipe(this) | |
let filename = file.split('/'); | |
filename = filename[filename.length-1]; | |
this.info = options.info || {}; | |
this.info.__path = file; | |
this.info.__name = filename; | |
} | |
_transform(chunk, e, cb) { | |
const obj = Object.create(this.info); | |
obj.data = chunk; | |
this.push(obj); | |
cb() | |
} | |
} | |
class Csv extends csv_parser { | |
constructor(options = {}) { | |
options.objectMode = true | |
super(options) | |
} | |
_transform (chunk, enc, cb) { | |
this.row = chunk | |
let data = chunk.data | |
if (typeof data === 'string') data = bufferFrom(data) | |
let start = 0 | |
let buf = data | |
if (this._prev) { | |
start = this._prev.length | |
buf = Buffer.concat([this._prev, data]) | |
this._prev = null | |
} | |
const bufLen = buf.length | |
for (let i = start; i < bufLen; i++) { | |
const chr = buf[i] | |
const nextChr = i + 1 < bufLen ? buf[i + 1] : null | |
this._currentRowBytes++ | |
if (this._currentRowBytes > this.maxRowBytes) { | |
return cb(new Error('Row exceeds the maximum size')) | |
} | |
if (!this._escaped && chr === this.escape && nextChr === this.quote && i !== start) { | |
this._escaped = true | |
continue | |
} else if (chr === this.quote) { | |
if (this._escaped) { | |
this._escaped = false | |
// non-escaped quote (quoting the cell) | |
} else { | |
this._quoted = !this._quoted | |
} | |
continue | |
} | |
if (!this._quoted) { | |
if (this._first && !this.customNewline) { | |
if (chr === nl) { | |
this.newline = nl | |
} else if (chr === cr) { | |
if (nextChr !== nl) { | |
this.newline = cr | |
} | |
} | |
} | |
if (chr === this.newline) { | |
this._online(buf, this._prevEnd, i + 1) | |
this._prevEnd = i + 1 | |
this._currentRowBytes = 0 | |
} | |
} | |
} | |
if (this._prevEnd === bufLen) { | |
this._prevEnd = 0 | |
return cb() | |
} | |
if (bufLen - this._prevEnd < data.length) { | |
this._prev = data | |
this._prevEnd -= (bufLen - data.length) | |
return cb() | |
} | |
this._prev = buf | |
cb() | |
} | |
_emit (Row, cells) { | |
let row = Object.create(this.row.__proto__) | |
row.data = new Row(cells) | |
this.push(row) | |
} | |
} | |
class Sql extends stream.Transform { | |
constructor(sql, options = {}) { | |
options.objectMode = true | |
super(options); | |
this.sql = sql; | |
this.__line = 0; | |
} | |
_transform(chunk, e, cb) { | |
// this._data = chunk | |
console.log(chunk) | |
this.__line++ | |
let row = Object.create(chunk.__proto__) | |
row.data = alasql(this.sql, [[{...chunk.data, ...chunk.__proto__, __line: this.__line}]]) | |
if (row.data.length > 0) this.push(row) | |
cb() | |
} | |
} | |
class ConsoleWriter extends stream.Writable { | |
constructor(options = {}) { | |
options.objectMode = true | |
super(options) | |
} | |
_write(chunk, encoding, cb) { | |
console.log('w1 - ' + util.inspect(chunk, false) + ' & ' + util.inspect(chunk.__proto__, false)) | |
cb(null) | |
} | |
} | |
// all these work: | |
// new File('test.csv').pipe(new Csv()).pipe(w1) | |
// new File('test.csv').pipe(new Csv()).pipe(new Sql('select * from ?')).pipe(new ConsoleWriter()) | |
// new File('test.csv').pipe(new Csv()).pipe(new Sql("select * from ? where id == '2' ")).pipe(w1) // ->__ssproto__->__name = 'test.csv' | |
// new Dir(".").pipe(new ConsoleWriter()) | |
new Dir(".").pipe(new Sql("select * from ? where name = 'text.csv' ")).pipe(new ConsoleWriter()) | |
// works -> fs.createReadStream('test.csv').pipe(csv()).pipe(w1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream') | |
const fs = require('fs') | |
const { StringDecoder } = require('string_decoder'); | |
const csvParser = require('csv-parser')().__proto__.constructor | |
const alasql = require('alasql') | |
const util = require('util') | |
class BaseTransformer extends stream.Transform { | |
constructor(data = null, options = {}) { | |
/* | |
For extending the BaseTransofrmer, the options should have a bare minimum of | |
1. an initialization function eg. {f_init: fs.createReadStream} or {f_init: (x) => fs.createReadStream(x)} | |
2. a transform function eg. {f_transform: (x) => parseInt(x)} | |
-. If no f_init is provided, the object/string is passed as a single chunk (i.e not really streaming) | |
*/ | |
options.objectMode = options.objectMode || true | |
options.f_init_send_options = true | |
options.f_transform_send_options = false | |
super(options) | |
this.options = options | |
if (data != null) { | |
if (options.f_init) { | |
if (options.f_init_send_options) options.f_init(data, options).pipe(this) | |
else options.f_init(data).pipe(this) | |
} | |
else { | |
let s = new stream.Readable(options) | |
s._read = () => {}; | |
s.push(data) | |
s.push(null) | |
s.pipe(this) | |
} | |
} | |
} | |
_transform(chunk, e, cb) { | |
// console.log(chunk) !!! | |
let options = this.options | |
if (options.f_transform) { | |
if (options.f_transform_send_options) { | |
let d = options.f_transform(chunk, options) | |
if (d && Object.entries(d).length !== 0) this.push(d) | |
} | |
else { | |
let d = options.f_transform(chunk) | |
if (d && Object.entries(d).length !== 0) this.push(d) | |
} | |
} | |
else this.push(chunk) | |
cb(null) | |
} | |
} | |
// new BaseTransformer("hi mate").pipe(process.stdout) | |
// new BaseTransformer('test.csv', {f_init: fs.createReadStream}).pipe(process.stdout) | |
// new BaseTransformer().pipe(process.stdout) | |
class StdoutTransformer extends BaseTransformer { | |
constructor(data = null, options = {}) { | |
// options.f_init = null | |
options.f_transform = (x) => { | |
if (typeof x === 'string') process.stdout.write(x) | |
else console.log(util.inspect(x, null)) | |
} | |
super(data, options) | |
} | |
} | |
// new BaseTransformer("yoyo").pipe(new StdoutTransformer()) | |
class ReadFileTransformer extends BaseTransformer { | |
constructor(data = null, options = {}) { | |
const decoder = new StringDecoder('utf8'); | |
options.f_init = fs.createReadStream | |
options.f_transform = (x) => decoder.write(x) | |
super(data, options) | |
} | |
} | |
// class ReadFileTransformer extends BaseTransformer { | |
// constructor(data = null, options = {}) { | |
// const decoder = new StringDecoder('utf8'); | |
// options.f_init = fs.createReadStream | |
// // if data == null => filename wasnt provided by the use but rather is coming as a stream into this | |
// if (data != null) { | |
// options.f_transform = (x) => decoder.write(x) | |
// } | |
// else { | |
// options.f_transform = (x) => { | |
// let y = fs.createReadStream(x) | |
// let chunk; | |
// y.on('readable', () => { | |
// chunk = y.read() | |
// // while (null !== (chunk = y.read())) { | |
// // // console.log(`Received ${chunk} `); | |
// // } | |
// // return chunk | |
// }); | |
// return decoder.write(chunk) | |
// } | |
// } | |
// super(data, options) | |
// } | |
// } | |
// new ReadFileTransformer("test.csv").pipe(new StdoutTransformer()) | |
class ParseCsvTransformer extends csvParser { | |
constructor(data = null, options = {}) { | |
super(data, options) | |
} | |
} | |
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer()).pipe(new StdoutTransformer()) | |
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(new StdoutTransformer()) | |
class StreamingSqlTransformer extends BaseTransformer { | |
constructor(data = null, options = {}) { | |
// const decoder = new StringDecoder('utf8'); | |
// options.f_init = null | |
options.f_transform = (x) => alasql(data, [[x]]) | |
super(null, options) // not sending data so that data is obtained from pipe predecessor | |
} | |
} | |
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer()).pipe(new StreamingSqlTransformer("select * from ? where id = '3'")).pipe(new StdoutTransformer()) | |
class StdinTransformer extends BaseTransformer { | |
constructor(data = null, options = {}) { | |
options.f_init = (x) => process.stdin | |
const decoder = new StringDecoder('utf8'); | |
options.f_transform = (x) => decoder.write(x) | |
super('', options) // dummy data so that f_init is run | |
this.on('data', data => { process.stdin.destroy() }) // this lines helps take only 1 line of input | |
} | |
} | |
// new StdinTransformer().pipe(new StdoutTransformer) | |
// var globalNs = {} | |
// class SetVar extends BaseTransformer { | |
// constructor(varName, options = {}) { | |
// // options.objectMode = true | |
// // this.varName = varName | |
// options.f_transform = (y) => { globalNs[varName] = y.trim(); return y.trim() } | |
// super(null, options) | |
// } | |
// // _transform(chunk, e, cb) { | |
// // globalNs | |
// // } | |
// } | |
// dont work | |
// new StdinTransformer().pipe(new SetVar('variable')).pipe(new StdoutTransformer()) | |
// new StdinTransformer().pipe(new SetVar('file_name')).pipe(new ReadFileTransformer()).pipe(new StdoutTransformer()) | |
// new ReadFileTransformer().pipe(new StdoutTransformer()) | |
// new StdinTransformer().pipe(new StdoutTransformer()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const JSONStream = require('JSONStream') | |
const es = require('event-stream') | |
const csv = require('csv-parser') | |
const fs = require('fs') | |
const alasql = require('alasql') | |
const through = require('through') | |
//Q | |
// new BaseTransformer("hi mate").pipe(process.stdout) | |
// new BaseTransformer('test.csv', {f_init: fs.createReadStream}).pipe(process.stdout) | |
//A | |
// es.readArray(["hi mate"]).pipe(process.stdout) | |
// fs.createReadStream("test.csv").pipe(process.stdout) | |
//Q | |
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(new StdoutTransformer()) | |
//A | |
// fs.createReadStream("test.csv").pipe(csv({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(es.stringify()).pipe(process.stdout) | |
//Q | |
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer()).pipe(new StreamingSqlTransformer("select * from ? where id = '3'")).pipe(new StdoutTransformer()) | |
//A | |
// fs.createReadStream("test.csv").pipe(csv()).pipe(es.mapSync(x=> alasql("select * from ? where id = '3' ", [[x]]))).pipe(es.filterSync(x =>x.length >0 )).pipe(es.stringify()).pipe(process.stdout) | |
//Q | |
// new ReadFileTransformer("config.json").pipe(new ParseJsonTransformer()).pipe(es.mapSync( data => data.filename)).pipe(new StdoutTransformer()) //.pipe(JSONStream.parse('*')) // | |
//A | |
// fs.createReadStream("config.json").pipe(JSONStream.parse()).pipe(es.mapSync(x=> x.filename)).pipe(es.wait((err, x)=> fs.createReadStream(x).pipe(process.stdout) )) | |
//Q. Print number of characters in a file | |
//A | |
// for a small file. < 16kb | |
// fs.createReadStream("bigfile.txt").pipe(es.mapSync(ch => ch.length)).pipe(es.stringify()).pipe(process.stdout) | |
//. | |
// for a big file, but reasonable to fit in memory | |
// fs.createReadStream("bigfile.txt").pipe(es.wait((e, x) => es.readArray([x.length]).pipe(es.stringify()).pipe(process.stdout) )) | |
//. | |
// for a very big file | |
function red(acc = 0) { | |
return through( | |
(chunk) => {acc = acc + chunk.length}, | |
function() {this.emit("data", acc); this.emit('end')} | |
) | |
} | |
//fs.createReadStream("bigfile.txt").pipe(red()).pipe(es.stringify()).pipe(process.stdout) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream') | |
const JSONStream = require('JSONStream') | |
const es = require('event-stream') | |
const csv_parser = require('csv-parser') | |
// const csv_parse = require('csv-parse') | |
const fs = require('fs') | |
const alasql = require('alasql') | |
const through = require('through') | |
const pump = require('pump'); | |
const winston = require('winston'); | |
const util = require('util') | |
const { Transform } = require('json2csv'); | |
// var jsoncsv = require('json-csv') | |
// const stringify = require('csv-stringify') | |
// var duplexify = require('duplexify'); | |
//Q | |
// new BaseTransformer("hi mate").pipe(process.stdout) | |
// new BaseTransformer('test.csv', {f_init: fs.createReadStream}).pipe(process.stdout) | |
//A | |
// es.readArray(["hi mate"]).pipe(process.stdout) | |
// fs.createReadStream("test.csv").pipe(process.stdout) | |
//Q | |
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(new StdoutTransformer()) | |
//A | |
// fs.createReadStream("test.csv").pipe(csv({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(es.stringify()).pipe(process.stdout) | |
//Q | |
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer()).pipe(new StreamingSqlTransformer("select * from ? where id = '3'")).pipe(new StdoutTransformer()) | |
//A | |
// fs.createReadStream("test.csv").pipe(csv()).pipe(es.mapSync(x=> alasql("select * from ? where id = '3' ", [[x]]))).pipe(es.filterSync(x =>x.length >0 )).pipe(es.stringify()).pipe(process.stdout) | |
//Q | |
// new ReadFileTransformer("config.json").pipe(new ParseJsonTransformer()).pipe(es.mapSync( data => data.filename)).pipe(new StdoutTransformer()) //.pipe(JSONStream.parse('*')) // | |
//A | |
// fs.createReadStream("config.json").pipe(JSONStream.parse()).pipe(es.mapSync(x=> x.filename)).pipe(es.wait((err, x)=> fs.createReadStream(x).pipe(process.stdout) )) | |
//Q. Print number of characters in a file | |
//A | |
// for a small file. < 16kb | |
// fs.createReadStream("bigfile.txt").pipe(es.mapSync(ch => ch.length)).pipe(es.stringify()).pipe(process.stdout) | |
//. | |
// for a big file, but reasonable to fit in memory | |
// fs.createReadStream("bigfile.txt").pipe(es.wait((e, x) => es.readArray([x.length]).pipe(es.stringify()).pipe(process.stdout) )) | |
//. | |
// for a very big file | |
// function red(acc = 0) { | |
// return through( | |
// (chunk) => {acc = acc + chunk.length}, | |
// function() {this.emit("data", acc); this.emit('end')} | |
// ) | |
// } | |
//fs.createReadStream("bigfile.txt").pipe(red()).pipe(es.stringify()).pipe(process.stdout) | |
//Q. Alasql | |
// fs.createReadStream("test.csv").pipe(csv()).pipe(es.mapSync(x=> alasql("select * from ? where id = '3' ", [[x]]))).pipe(es.filterSync(x =>x.length >0 )).pipe(es.stringify()).pipe(process.stdout) | |
// fs.createReadStream('test.csv').pipe(csv()).pipe(sql("select * from ? where id ='3'")).pipe(es.stringify()).pipe(process.stdout) | |
///// | |
//// | |
const logger = winston.createLogger({ | |
level: 'info', | |
format: winston.format.json(), | |
// defaultMeta: { service: 'user-service' }, | |
transports: [ | |
new winston.transports.Console({level: 'debug', format: winston.format.simple()}) | |
] | |
}); | |
// job related functions | |
let jobs = {} // {jobId: [jobSteps]} | |
let jobId = 0 | |
function job(...steps) { | |
jobId++ | |
jobs[jobId] = [] | |
for (step of steps) { | |
// Check if is an array, then spread the array | |
switch (Array.isArray(step)) { | |
case true: | |
jobs[jobId].push(...step) | |
break; | |
default: | |
jobs[jobId].push(step) | |
break; | |
} | |
} | |
logger.info('Created job with jobId = ' + jobId) | |
return jobId | |
} | |
function run(jobId) { | |
logger.info('Running job with jobId = ' + jobId) | |
console.time(`Time taken for jobId ${jobId} = `) | |
pump(...jobs[jobId], function(err) { | |
if (err) { | |
// logger.error(`Pipeline error: ${err}`) | |
logger.error(err.stack) | |
} | |
else { | |
logger.info(`Pipeline success`) | |
console.timeEnd(`Time taken for jobId ${jobId} = `) | |
} | |
}) | |
} | |
// helper functions | |
class Csv extends csv_parser().__proto__.constructor { | |
constructor(options = {}) { | |
options.objectMode = true | |
super(options) | |
} | |
} | |
csv2json = function(options = {}) { | |
logger.debug("Running csv2json") | |
options.mapHeaders = ({ header, index }) => header.trim().toLowerCase().replace(' ', '_') | |
// return csv_parser() | |
return new Csv(options) | |
} | |
json2csv = function(options = {}) { | |
logger.debug("Running json2csv") | |
return [stringify(), new Transform(null, options)] // original | |
} | |
collect = function (numObj, callback) { | |
/* | |
collects a stream until number of objects | |
numObj = 0 or 1 is same not specifying it. i.e. wait() = wait(0) = wait(1) | |
*/ | |
logger.debug("Running collect") | |
let arr = [] | |
return through( | |
function (data) { | |
if (Array.isArray(data)) { | |
arr.concat(data) | |
} | |
else | |
arr.push(data) | |
if (arr.length === numObj) { | |
this.emit('data', arr) | |
arr = [] | |
} | |
}, | |
function () { | |
let body = null | |
if (arr.length > 0 ) { | |
body = Array.isArray(arr) ? arr : Buffer.isBuffer(arr[0]) ? Buffer.concat(arr) | |
: typeof arr.join('') //Array.isArray(arr) && arr.length > 0 ? arr : | |
this.emit('data', body) | |
} | |
this.emit('end') | |
if(callback) callback(null, body) | |
} | |
) | |
} | |
stringify = function () { | |
logger.debug("Running stringify") | |
var Buffer = require('buffer').Buffer | |
return es.mapSync(function (e){ | |
// console.log(e) | |
if (typeof e !== 'string') { | |
// console.log(JSON.stringify(e)) | |
// return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n' | |
return Buffer.isBuffer(e) ? e.toString() : JSON.stringify(e) + '\n' | |
} | |
else return e | |
}) | |
} | |
// etl functions | |
function sql(query, tableName) { | |
/* | |
Take a query and an optional tableName. Use the tablename with collect() to be able to lookup this table | |
further down the job. | |
If the result is an empty array i.e. 0 rows, the stream does not return anything i.e. stops there | |
*/ | |
logger.debug("Running sql") | |
if (tableName == null) return [es.mapSync(ch => alasql(query, [[ch]])), es.filterSync(x =>x.length >0 )] | |
else { | |
let tableCreated = 0 | |
return [ | |
es.mapSync(ch => { | |
// console.log(ch) | |
let keys = Array.isArray(ch) ? Object.keys(ch[0]) : Object.keys(ch) | |
let ddl = `create table if not exists ${tableName} ( ${keys.toString()} ) ` | |
if (!tableCreated) { | |
logger.info(`creating table ${tableName} with cols = ${keys.toString()}`) | |
tableCreated = alasql(ddl) | |
} else { | |
// delete existing data | |
alasql(`delete from ${tableName} `) | |
} | |
Array.isArray(ch) ? alasql(`insert into ${tableName} select * from ?`, [ch]) | |
: alasql(`insert into ${tableName} values ?`, [ch]) | |
let result = alasql(query.replace("?", tableName)) | |
// console.log(result) | |
return result | |
}), | |
es.filterSync(x => x.length >0 ) | |
] | |
} | |
} | |
function print() { | |
/* | |
It converts obhects into strings and prints them on the stdout | |
It doesn't print the console.time/endtime | |
*/ | |
logger.debug("Running print") | |
return [stringify(), process.stdout] | |
} | |
function readFile(filename, options = {}) { | |
/* | |
Reads the file and returns the steaming buffer | |
*/ | |
logger.debug("Running readFile") | |
return fs.createReadStream(filename, options) | |
} | |
function writeFile(filename, options = {}) { | |
/* | |
Write the file after strigifying it | |
*/ | |
logger.debug("Running writeFile") | |
options.flags = options.flags || 'a' | |
return [stringify(), fs.createWriteStream(filename, options)] | |
} | |
// lets create jobs! | |
// recipies | |
// 1. | |
// works | |
// let jobId = job( | |
// readFile('test.csv'), | |
// csv(), | |
// sql("select * from ? where id ='3'"), | |
// print() | |
// ) | |
//2. | |
let jobId1 = job( | |
readFile('test.csv'), | |
csv2json(), | |
collect(), | |
sql("select * from ? ", "testtable"), //where age::NUMBER > 9 | |
// print() | |
// json2csv(), | |
// writeFile('filteredbigfile.csv', {flags: "w"}), | |
// stringify(), | |
print() | |
) | |
//3. read 2 files and join them | |
let jobId2 = job( | |
readFile('test1.csv'), | |
csv2json(), | |
collect(), | |
sql("select * from testtable a, test1table b where a.id = b.id", "test1table"), //where age::NUMBER > 9 | |
print() | |
) | |
run(jobId1) | |
run(jobId2) | |
// stream.pipeline(readFile('test1.csv')) | |
// TODOs: | |
// 1. Implelement mapSync, filterSync, reduceSync (see above red()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment