Skip to content

Instantly share code, notes, and snippets.

@mannharleen
Last active June 19, 2019 03:53
Show Gist options
  • Save mannharleen/f1a684d7b285cdf4b3834d6f1a41a6d3 to your computer and use it in GitHub Desktop.
Save mannharleen/f1a684d7b285cdf4b3834d6f1a41a6d3 to your computer and use it in GitHub Desktop.
// async always returns a promise
// (1) using await, pause the execution of the function.. event loop will NOT be blocked
// (2) without await, execute other statements in the function
// output of (1): 1 is printed, wait...., 2 & p2 are printed
// output of (2): 1 & 2 are printed, wait....., p2 is printed
async function f1() {
console.log(1);
res = await new Promise((resolve, reject) => { // use of await here
setTimeout(f2, 3000);
function f2() {resolve('p2')};
});
console.log(2);
return res;
}
f1().then(res => console.log(res));
nodejs child processes
// node.js <-> OS
1. spawn: execute command new process.
on('exit') child exited with code and signal
on('disconnect') parent calls child to disconect
on('error') child errored or could not be killed
on('message') child usese process.sent to send messages
on('close') when child processs stdio (stdin, out, err) are closed
Spawn can also created shell with {shell: true} option
Spawn can inherit the stdios with {stdio: 'inherit'} option
Other options: env, cwd
Opton: detached => parent process can exit indipendently of child. {stdio: 'ignore'} must be used
2. exec: (vs spawn) it creates a shell. Also, IMPO: it buffers the output generated and passes whole value to callback func
3. fork: for spawning node processes. a communication channel is established
forked.on('message')
forked.send()
And in child:
process.on('message')
process.send()
4. execFile: just like exec. no shell
const {spawn} = require ('child_process');
const child = spawn('ls', ['-lrt']);
child.stdout.on('data', (data)=> console.log(`stdout = ${data}`) );
child.on('exit', () => ...)
fs = require('fs');
http = require('http')
// PART1: generate a big file
file = fs.createWriteStream('bigfile.txt');
async function f1() {
console.time('totalTime');
for (i=0; i<=1e5; i++) {
data = Date.now().toString() + 'a\n'
if (!file.write(data)) {
await new Promise((resolve, reject ) => file.once('drain',resolve));
}
}
}
f1().then(() => {
file.end();
console.timeEnd('totalTime');
});
// PART 2: download a big file
inpStream = fs.createReadStream('bigfile.txt')
http.createServer((req, res) => {
inpStream.pipe(res);
// fs.readFile('bigfile.txt',
// (err, data) => {
// if (err) {
// throw err
// }
// console.log('finish reading file');
// res.end(data);
// }
// )
console.log('begin reading file')
}).listen(8080, () => console.log('Listening on 8080'))
// bind is used to define that value will be "this"
function f1() {
x = 10;
return this.x;
}
data = {x: 20} // we will bind f1 to data so that this.x becomes = 20
console.log(f1() + " - " + f1.bind(data)())
"use strict"
// the simplest custom stream
const stream = require('stream');
let rs = new stream.Readable();
let ws = new stream.Writable();
rs._read = () => {};
ws._write = function(chunk, encoding, callback) {
console.log(chunk.toString());
callback()
};
rs.pipe(ws);
rs.push("msg1");
rs.push("msg2");
// end
// tg creating steams.js
const stream = require('stream');
const util = require('util');
// on('readable', )/read()
// pause()/resume()
r1data = "123456789"
r1 = new stream.Readable({
objectMode: true,
read() {
size =1
x = r1data.substring(0,size)
r1data = r1data.substring(size)
if (x) this.push(x)
else this.push(null)
}
});
r2data = [{a:1}, {b:2}, {c:3}, {d:4}]
r2 = new stream.Readable({
objectMode: true,
read() {
size =1
x = r2data.splice(0,size)
// data = data.splice(size)
if (x.length > 0) this.push(x[0])
else this.push(null)
}
})
t1 = new stream.Transform({
objectMode: true,
transform(data, encoding, callback) {
this.push(parseFloat(Object.values(data)) + 90)
callback(null)
}
});
w1 = new stream.Writable({
objectMode: true,
write(chunk, encoding, callback) {
console.log("w1 ** " + util.inspect(chunk, false))
callback()
}
});
async function etl(name, ...streams) {
console.log('starting pipeling ' + name)
await stream.pipeline(
streams,
(err) => {
if (err) console.error("ERROR ", err)
// else console.log('Pipeline success')
}
)
console.log('finishing pipeling ' + name)
}
etl('pl1', r1, w1)
etl('pl2', r2, w1)
etl('pl3', r2, t1, w1)
//usage: node <nameOfFile> "<sql>"
// implements use of __content and __proto__ for storing attributes
const stream = require('stream');
const util = require('util')
const alasql = require('alasql')
/* example sqls
select id, name, if age>10 then "yes" from ?
select id, name, (case when age>10 then 'yes' else 'no' end) as gt10 from ?
*/
r1data = [{id:1, name: 'amber', age:10, class_: 'A'},{id:2, name: 'bear', age:11, class_:'A'}]
sql = process.argv[2] || "select * from ?"
source = {}
source.__filename = 'xxx';
r1 = new stream.Readable({
objectMode: true,
read() {
x = Object.create(source)
x.__content = r1data.shift(1)
// console.log(x.__proto__.__filename)
if (x.__content != null) this.push(x)
else this.push(null)
}
})
t1 = new stream.Transform({
objectMode: true,
transform(chunk, e, cb) {
x = Object.create(chunk.__proto__)
x.__content = alasql(sql, [[chunk.__content]])
if (x.__content.length > 0) this.push(x)
cb()
}
})
// implementing t1 using extends
t2 = class T2 extends stream.Transform {
constructor(options) {
super(options);
}
_transform(chunk, e, cb) {
x = Object.create(chunk.__proto__)
x.__content = alasql(sql, [[chunk.__content]])
if (x.__content.length > 0) this.push(x)
cb()
}
}
w1 = new stream.Writable({
objectMode: true,
write(chunk, encoding, cb) {
console.log('w1 - ' + util.inspect(chunk, false) + ' & ' + util.inspect(chunk.__proto__, false))
cb(null)
}
})
// r1.pipe(w1)
r1.pipe(new t2({objectMode: true})).pipe(w1)
const stream = require('stream');
const fs = require('fs');
const util = require('util')
const alasql = require('alasql')
const csv_parser = require('csv-parser')().__proto__.constructor
const bufferFrom = require('buffer-from')
const bufferAlloc = require('buffer-alloc')
const [cr] = bufferFrom('\r')
const [nl] = bufferFrom('\n')
const defaults = {
escape: '"',
headers: null,
mapHeaders: ({ header }) => header,
mapValues: ({ value }) => value,
newline: '\n',
quote: '"',
raw: false,
separator: ',',
skipComments: false,
skipLines: null,
maxRowBytes: Number.MAX_SAFE_INTEGER,
strict: false
}
//////////////////////
class ArrayStream extends stream.Readable {
constructor(data, options = {}) {
options.objectMode = true
super(options)
this.data = data.data
// console.log(data.data)
}
_read() {
let x = this.data.shift(1)
// console.log(x)
if (x != null) this.push(x)
else this.push(null)
}
}
class Dir extends stream.Transform {
constructor(path, options = {}) {
options.objectMode = true
super(options)
this.info = options.info || {};
this.path = path
this.info.__path = path;
fs.readdir(path, options, (err, files) => {
// console.log(files)
const obj = Object.create(this.info);
if (files && files.length > 0) obj.data = files;
else obj.data = [];
new ArrayStream(obj).pipe(this)
})
}
_transform(chunk, encoding, callback) {
this.push({"data": chunk})
callback(null)
}
}
class File extends stream.Transform {
constructor(file, options = {}) {
options.objectMode = true
super(options)
// file = file;
fs.createReadStream(file, options = options).pipe(this)
let filename = file.split('/');
filename = filename[filename.length-1];
this.info = options.info || {};
this.info.__path = file;
this.info.__name = filename;
}
_transform(chunk, e, cb) {
const obj = Object.create(this.info);
obj.data = chunk;
this.push(obj);
cb()
}
}
class Csv extends csv_parser {
constructor(options = {}) {
options.objectMode = true
super(options)
}
_transform (chunk, enc, cb) {
this.row = chunk
let data = chunk.data
if (typeof data === 'string') data = bufferFrom(data)
let start = 0
let buf = data
if (this._prev) {
start = this._prev.length
buf = Buffer.concat([this._prev, data])
this._prev = null
}
const bufLen = buf.length
for (let i = start; i < bufLen; i++) {
const chr = buf[i]
const nextChr = i + 1 < bufLen ? buf[i + 1] : null
this._currentRowBytes++
if (this._currentRowBytes > this.maxRowBytes) {
return cb(new Error('Row exceeds the maximum size'))
}
if (!this._escaped && chr === this.escape && nextChr === this.quote && i !== start) {
this._escaped = true
continue
} else if (chr === this.quote) {
if (this._escaped) {
this._escaped = false
// non-escaped quote (quoting the cell)
} else {
this._quoted = !this._quoted
}
continue
}
if (!this._quoted) {
if (this._first && !this.customNewline) {
if (chr === nl) {
this.newline = nl
} else if (chr === cr) {
if (nextChr !== nl) {
this.newline = cr
}
}
}
if (chr === this.newline) {
this._online(buf, this._prevEnd, i + 1)
this._prevEnd = i + 1
this._currentRowBytes = 0
}
}
}
if (this._prevEnd === bufLen) {
this._prevEnd = 0
return cb()
}
if (bufLen - this._prevEnd < data.length) {
this._prev = data
this._prevEnd -= (bufLen - data.length)
return cb()
}
this._prev = buf
cb()
}
_emit (Row, cells) {
let row = Object.create(this.row.__proto__)
row.data = new Row(cells)
this.push(row)
}
}
class Sql extends stream.Transform {
constructor(sql, options = {}) {
options.objectMode = true
super(options);
this.sql = sql;
this.__line = 0;
}
_transform(chunk, e, cb) {
// this._data = chunk
console.log(chunk)
this.__line++
let row = Object.create(chunk.__proto__)
row.data = alasql(this.sql, [[{...chunk.data, ...chunk.__proto__, __line: this.__line}]])
if (row.data.length > 0) this.push(row)
cb()
}
}
class ConsoleWriter extends stream.Writable {
constructor(options = {}) {
options.objectMode = true
super(options)
}
_write(chunk, encoding, cb) {
console.log('w1 - ' + util.inspect(chunk, false) + ' & ' + util.inspect(chunk.__proto__, false))
cb(null)
}
}
// all these work:
// new File('test.csv').pipe(new Csv()).pipe(w1)
// new File('test.csv').pipe(new Csv()).pipe(new Sql('select * from ?')).pipe(new ConsoleWriter())
// new File('test.csv').pipe(new Csv()).pipe(new Sql("select * from ? where id == '2' ")).pipe(w1) // ->__ssproto__->__name = 'test.csv'
// new Dir(".").pipe(new ConsoleWriter())
new Dir(".").pipe(new Sql("select * from ? where name = 'text.csv' ")).pipe(new ConsoleWriter())
// works -> fs.createReadStream('test.csv').pipe(csv()).pipe(w1)
const stream = require('stream')
const fs = require('fs')
const { StringDecoder } = require('string_decoder');
const csvParser = require('csv-parser')().__proto__.constructor
const alasql = require('alasql')
const util = require('util')
class BaseTransformer extends stream.Transform {
constructor(data = null, options = {}) {
/*
For extending the BaseTransofrmer, the options should have a bare minimum of
1. an initialization function eg. {f_init: fs.createReadStream} or {f_init: (x) => fs.createReadStream(x)}
2. a transform function eg. {f_transform: (x) => parseInt(x)}
-. If no f_init is provided, the object/string is passed as a single chunk (i.e not really streaming)
*/
options.objectMode = options.objectMode || true
options.f_init_send_options = true
options.f_transform_send_options = false
super(options)
this.options = options
if (data != null) {
if (options.f_init) {
if (options.f_init_send_options) options.f_init(data, options).pipe(this)
else options.f_init(data).pipe(this)
}
else {
let s = new stream.Readable(options)
s._read = () => {};
s.push(data)
s.push(null)
s.pipe(this)
}
}
}
_transform(chunk, e, cb) {
// console.log(chunk) !!!
let options = this.options
if (options.f_transform) {
if (options.f_transform_send_options) {
let d = options.f_transform(chunk, options)
if (d && Object.entries(d).length !== 0) this.push(d)
}
else {
let d = options.f_transform(chunk)
if (d && Object.entries(d).length !== 0) this.push(d)
}
}
else this.push(chunk)
cb(null)
}
}
// new BaseTransformer("hi mate").pipe(process.stdout)
// new BaseTransformer('test.csv', {f_init: fs.createReadStream}).pipe(process.stdout)
// new BaseTransformer().pipe(process.stdout)
class StdoutTransformer extends BaseTransformer {
constructor(data = null, options = {}) {
// options.f_init = null
options.f_transform = (x) => {
if (typeof x === 'string') process.stdout.write(x)
else console.log(util.inspect(x, null))
}
super(data, options)
}
}
// new BaseTransformer("yoyo").pipe(new StdoutTransformer())
class ReadFileTransformer extends BaseTransformer {
constructor(data = null, options = {}) {
const decoder = new StringDecoder('utf8');
options.f_init = fs.createReadStream
options.f_transform = (x) => decoder.write(x)
super(data, options)
}
}
// class ReadFileTransformer extends BaseTransformer {
// constructor(data = null, options = {}) {
// const decoder = new StringDecoder('utf8');
// options.f_init = fs.createReadStream
// // if data == null => filename wasnt provided by the use but rather is coming as a stream into this
// if (data != null) {
// options.f_transform = (x) => decoder.write(x)
// }
// else {
// options.f_transform = (x) => {
// let y = fs.createReadStream(x)
// let chunk;
// y.on('readable', () => {
// chunk = y.read()
// // while (null !== (chunk = y.read())) {
// // // console.log(`Received ${chunk} `);
// // }
// // return chunk
// });
// return decoder.write(chunk)
// }
// }
// super(data, options)
// }
// }
// new ReadFileTransformer("test.csv").pipe(new StdoutTransformer())
class ParseCsvTransformer extends csvParser {
constructor(data = null, options = {}) {
super(data, options)
}
}
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer()).pipe(new StdoutTransformer())
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(new StdoutTransformer())
class StreamingSqlTransformer extends BaseTransformer {
constructor(data = null, options = {}) {
// const decoder = new StringDecoder('utf8');
// options.f_init = null
options.f_transform = (x) => alasql(data, [[x]])
super(null, options) // not sending data so that data is obtained from pipe predecessor
}
}
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer()).pipe(new StreamingSqlTransformer("select * from ? where id = '3'")).pipe(new StdoutTransformer())
class StdinTransformer extends BaseTransformer {
constructor(data = null, options = {}) {
options.f_init = (x) => process.stdin
const decoder = new StringDecoder('utf8');
options.f_transform = (x) => decoder.write(x)
super('', options) // dummy data so that f_init is run
this.on('data', data => { process.stdin.destroy() }) // this lines helps take only 1 line of input
}
}
// new StdinTransformer().pipe(new StdoutTransformer)
// var globalNs = {}
// class SetVar extends BaseTransformer {
// constructor(varName, options = {}) {
// // options.objectMode = true
// // this.varName = varName
// options.f_transform = (y) => { globalNs[varName] = y.trim(); return y.trim() }
// super(null, options)
// }
// // _transform(chunk, e, cb) {
// // globalNs
// // }
// }
// dont work
// new StdinTransformer().pipe(new SetVar('variable')).pipe(new StdoutTransformer())
// new StdinTransformer().pipe(new SetVar('file_name')).pipe(new ReadFileTransformer()).pipe(new StdoutTransformer())
// new ReadFileTransformer().pipe(new StdoutTransformer())
// new StdinTransformer().pipe(new StdoutTransformer())
const JSONStream = require('JSONStream')
const es = require('event-stream')
const csv = require('csv-parser')
const fs = require('fs')
const alasql = require('alasql')
const through = require('through')
//Q
// new BaseTransformer("hi mate").pipe(process.stdout)
// new BaseTransformer('test.csv', {f_init: fs.createReadStream}).pipe(process.stdout)
//A
// es.readArray(["hi mate"]).pipe(process.stdout)
// fs.createReadStream("test.csv").pipe(process.stdout)
//Q
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(new StdoutTransformer())
//A
// fs.createReadStream("test.csv").pipe(csv({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(es.stringify()).pipe(process.stdout)
//Q
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer()).pipe(new StreamingSqlTransformer("select * from ? where id = '3'")).pipe(new StdoutTransformer())
//A
// fs.createReadStream("test.csv").pipe(csv()).pipe(es.mapSync(x=> alasql("select * from ? where id = '3' ", [[x]]))).pipe(es.filterSync(x =>x.length >0 )).pipe(es.stringify()).pipe(process.stdout)
//Q
// new ReadFileTransformer("config.json").pipe(new ParseJsonTransformer()).pipe(es.mapSync( data => data.filename)).pipe(new StdoutTransformer()) //.pipe(JSONStream.parse('*')) //
//A
// fs.createReadStream("config.json").pipe(JSONStream.parse()).pipe(es.mapSync(x=> x.filename)).pipe(es.wait((err, x)=> fs.createReadStream(x).pipe(process.stdout) ))
//Q. Print number of characters in a file
//A
// for a small file. < 16kb
// fs.createReadStream("bigfile.txt").pipe(es.mapSync(ch => ch.length)).pipe(es.stringify()).pipe(process.stdout)
//.
// for a big file, but reasonable to fit in memory
// fs.createReadStream("bigfile.txt").pipe(es.wait((e, x) => es.readArray([x.length]).pipe(es.stringify()).pipe(process.stdout) ))
//.
// for a very big file
function red(acc = 0) {
return through(
(chunk) => {acc = acc + chunk.length},
function() {this.emit("data", acc); this.emit('end')}
)
}
//fs.createReadStream("bigfile.txt").pipe(red()).pipe(es.stringify()).pipe(process.stdout)
const stream = require('stream')
const JSONStream = require('JSONStream')
const es = require('event-stream')
const csv_parser = require('csv-parser')
// const csv_parse = require('csv-parse')
const fs = require('fs')
const alasql = require('alasql')
const through = require('through')
const pump = require('pump');
const winston = require('winston');
const util = require('util')
const { Transform } = require('json2csv');
// var jsoncsv = require('json-csv')
// const stringify = require('csv-stringify')
// var duplexify = require('duplexify');
//Q
// new BaseTransformer("hi mate").pipe(process.stdout)
// new BaseTransformer('test.csv', {f_init: fs.createReadStream}).pipe(process.stdout)
//A
// es.readArray(["hi mate"]).pipe(process.stdout)
// fs.createReadStream("test.csv").pipe(process.stdout)
//Q
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(new StdoutTransformer())
//A
// fs.createReadStream("test.csv").pipe(csv({ headers:['a', 'b', 'c', 'd'], skipLines:1})).pipe(es.stringify()).pipe(process.stdout)
//Q
// new ReadFileTransformer("test.csv").pipe(new ParseCsvTransformer()).pipe(new StreamingSqlTransformer("select * from ? where id = '3'")).pipe(new StdoutTransformer())
//A
// fs.createReadStream("test.csv").pipe(csv()).pipe(es.mapSync(x=> alasql("select * from ? where id = '3' ", [[x]]))).pipe(es.filterSync(x =>x.length >0 )).pipe(es.stringify()).pipe(process.stdout)
//Q
// new ReadFileTransformer("config.json").pipe(new ParseJsonTransformer()).pipe(es.mapSync( data => data.filename)).pipe(new StdoutTransformer()) //.pipe(JSONStream.parse('*')) //
//A
// fs.createReadStream("config.json").pipe(JSONStream.parse()).pipe(es.mapSync(x=> x.filename)).pipe(es.wait((err, x)=> fs.createReadStream(x).pipe(process.stdout) ))
//Q. Print number of characters in a file
//A
// for a small file. < 16kb
// fs.createReadStream("bigfile.txt").pipe(es.mapSync(ch => ch.length)).pipe(es.stringify()).pipe(process.stdout)
//.
// for a big file, but reasonable to fit in memory
// fs.createReadStream("bigfile.txt").pipe(es.wait((e, x) => es.readArray([x.length]).pipe(es.stringify()).pipe(process.stdout) ))
//.
// for a very big file
// function red(acc = 0) {
// return through(
// (chunk) => {acc = acc + chunk.length},
// function() {this.emit("data", acc); this.emit('end')}
// )
// }
//fs.createReadStream("bigfile.txt").pipe(red()).pipe(es.stringify()).pipe(process.stdout)
//Q. Alasql
// fs.createReadStream("test.csv").pipe(csv()).pipe(es.mapSync(x=> alasql("select * from ? where id = '3' ", [[x]]))).pipe(es.filterSync(x =>x.length >0 )).pipe(es.stringify()).pipe(process.stdout)
// fs.createReadStream('test.csv').pipe(csv()).pipe(sql("select * from ? where id ='3'")).pipe(es.stringify()).pipe(process.stdout)
/////
////
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
// defaultMeta: { service: 'user-service' },
transports: [
new winston.transports.Console({level: 'debug', format: winston.format.simple()})
]
});
// job related functions
let jobs = {} // {jobId: [jobSteps]}
let jobId = 0
function job(...steps) {
jobId++
jobs[jobId] = []
for (step of steps) {
// Check if is an array, then spread the array
switch (Array.isArray(step)) {
case true:
jobs[jobId].push(...step)
break;
default:
jobs[jobId].push(step)
break;
}
}
logger.info('Created job with jobId = ' + jobId)
return jobId
}
function run(jobId) {
logger.info('Running job with jobId = ' + jobId)
console.time(`Time taken for jobId ${jobId} = `)
pump(...jobs[jobId], function(err) {
if (err) {
// logger.error(`Pipeline error: ${err}`)
logger.error(err.stack)
}
else {
logger.info(`Pipeline success`)
console.timeEnd(`Time taken for jobId ${jobId} = `)
}
})
}
// helper functions
class Csv extends csv_parser().__proto__.constructor {
constructor(options = {}) {
options.objectMode = true
super(options)
}
}
csv2json = function(options = {}) {
logger.debug("Running csv2json")
options.mapHeaders = ({ header, index }) => header.trim().toLowerCase().replace(' ', '_')
// return csv_parser()
return new Csv(options)
}
json2csv = function(options = {}) {
logger.debug("Running json2csv")
return [stringify(), new Transform(null, options)] // original
}
collect = function (numObj, callback) {
/*
collects a stream until number of objects
numObj = 0 or 1 is same not specifying it. i.e. wait() = wait(0) = wait(1)
*/
logger.debug("Running collect")
let arr = []
return through(
function (data) {
if (Array.isArray(data)) {
arr.concat(data)
}
else
arr.push(data)
if (arr.length === numObj) {
this.emit('data', arr)
arr = []
}
},
function () {
let body = null
if (arr.length > 0 ) {
body = Array.isArray(arr) ? arr : Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
: typeof arr.join('') //Array.isArray(arr) && arr.length > 0 ? arr :
this.emit('data', body)
}
this.emit('end')
if(callback) callback(null, body)
}
)
}
stringify = function () {
logger.debug("Running stringify")
var Buffer = require('buffer').Buffer
return es.mapSync(function (e){
// console.log(e)
if (typeof e !== 'string') {
// console.log(JSON.stringify(e))
// return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n'
return Buffer.isBuffer(e) ? e.toString() : JSON.stringify(e) + '\n'
}
else return e
})
}
// etl functions
function sql(query, tableName) {
/*
Take a query and an optional tableName. Use the tablename with collect() to be able to lookup this table
further down the job.
If the result is an empty array i.e. 0 rows, the stream does not return anything i.e. stops there
*/
logger.debug("Running sql")
if (tableName == null) return [es.mapSync(ch => alasql(query, [[ch]])), es.filterSync(x =>x.length >0 )]
else {
let tableCreated = 0
return [
es.mapSync(ch => {
// console.log(ch)
let keys = Array.isArray(ch) ? Object.keys(ch[0]) : Object.keys(ch)
let ddl = `create table if not exists ${tableName} ( ${keys.toString()} ) `
if (!tableCreated) {
logger.info(`creating table ${tableName} with cols = ${keys.toString()}`)
tableCreated = alasql(ddl)
} else {
// delete existing data
alasql(`delete from ${tableName} `)
}
Array.isArray(ch) ? alasql(`insert into ${tableName} select * from ?`, [ch])
: alasql(`insert into ${tableName} values ?`, [ch])
let result = alasql(query.replace("?", tableName))
// console.log(result)
return result
}),
es.filterSync(x => x.length >0 )
]
}
}
function print() {
/*
It converts obhects into strings and prints them on the stdout
It doesn't print the console.time/endtime
*/
logger.debug("Running print")
return [stringify(), process.stdout]
}
function readFile(filename, options = {}) {
/*
Reads the file and returns the steaming buffer
*/
logger.debug("Running readFile")
return fs.createReadStream(filename, options)
}
function writeFile(filename, options = {}) {
/*
Write the file after strigifying it
*/
logger.debug("Running writeFile")
options.flags = options.flags || 'a'
return [stringify(), fs.createWriteStream(filename, options)]
}
// lets create jobs!
// recipies
// 1.
// works
// let jobId = job(
// readFile('test.csv'),
// csv(),
// sql("select * from ? where id ='3'"),
// print()
// )
//2.
let jobId1 = job(
readFile('test.csv'),
csv2json(),
collect(),
sql("select * from ? ", "testtable"), //where age::NUMBER > 9
// print()
// json2csv(),
// writeFile('filteredbigfile.csv', {flags: "w"}),
// stringify(),
print()
)
//3. read 2 files and join them
let jobId2 = job(
readFile('test1.csv'),
csv2json(),
collect(),
sql("select * from testtable a, test1table b where a.id = b.id", "test1table"), //where age::NUMBER > 9
print()
)
run(jobId1)
run(jobId2)
// stream.pipeline(readFile('test1.csv'))
// TODOs:
// 1. Implelement mapSync, filterSync, reduceSync (see above red())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment