Created
October 26, 2017 23:21
-
-
Save langpavel/bc02d9ca024760aa2acdc354bfcaf7f6 to your computer and use it in GitHub Desktop.
Node Streams: HashThroughStream, ObjectStreamToJSON and CsvTransformStream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Transform } from 'stream'; | |
export default class CsvTransformStream extends Transform { | |
constructor(options) { | |
super({ | |
decodeStrings: false, | |
readableObjectMode: true, | |
}); | |
this.rowFlushed = true; | |
this.currentColumn = []; | |
this.currentRow = []; | |
this.columns = options && options.columns; | |
this.ignoreUnnamed = options && options.ignoreUnnamed; | |
this.emptyAsNull = options && options.emptyAsNull; | |
} | |
flushRow() { | |
if (!this.rowFlushed) { | |
this.currentRow.push(this.currentColumn.join('')); | |
this.currentColumn = []; | |
if (this.columns) { | |
this.push( | |
this.currentRow.reduce((result, value, index) => { | |
const name = this.columns[index]; | |
if (!name && this.ignoreUnnamed) { | |
return result; | |
} | |
if (this.emptyAsNull && value === '') { | |
// eslint-disable-next-line no-param-reassign | |
result[name || `column${index}`] = null; | |
} else { | |
// eslint-disable-next-line no-param-reassign | |
result[name || `column${index}`] = value; | |
} | |
return result; | |
}, {}), | |
); | |
} else { | |
this.push(this.currentRow); | |
} | |
this.currentRow = []; | |
this.rowFlushed = true; | |
} | |
} | |
_transform(data, encoding, callback) { | |
if (encoding !== 'utf8') { | |
throw new Error('Use utf8 encoding please'); | |
} | |
const length = data.length; | |
for (let i = 0; i < length; i += 1) { | |
const char = data[i]; | |
switch (char) { | |
case '\uFEFF': { | |
break; | |
} | |
case ';': { | |
this.rowFlushed = false; | |
this.currentRow.push(this.currentColumn.join('')); | |
this.currentColumn = []; | |
break; | |
} | |
case '\r': | |
case '\n': { | |
this.flushRow(); | |
break; | |
} | |
default: { | |
this.rowFlushed = false; | |
this.currentColumn.push(char); | |
} | |
} | |
} | |
callback(); | |
} | |
_flush(callback) { | |
this.flushRow(); | |
callback(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* eslint-disable no-underscore-dangle */ | |
import { Transform } from 'stream'; | |
import { createHash } from 'crypto'; | |
export default class HashThroughStream extends Transform { | |
constructor(options) { | |
const { hash, digest, ...opts } = options || {}; | |
super(opts); | |
this.digest = null; | |
this._hash = createHash(hash || 'sha512'); | |
this._digest = digest || 'hex'; | |
this._size = 0; | |
} | |
_transform(chunk, encoding, cb) { | |
this._hash.update(chunk); | |
this._size += chunk.length; | |
cb(null, chunk); | |
} | |
_flush(cb) { | |
this.digest = this._hash.digest(this._digest); | |
cb(); | |
} | |
getHash() { | |
if (this.digest === null) { | |
throw new Error('Stream is not flushed'); | |
} | |
return this.digest; | |
} | |
getSize() { | |
if (this.digest === null) { | |
throw new Error('Stream is not flushed'); | |
} | |
return this._size; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Transform } from 'stream'; | |
export default class ObjectStreamToJSON extends Transform { | |
constructor(options) { | |
super({ | |
writableObjectMode: true, | |
}); | |
this.push('[\n'); | |
this.pretty = options && options.pretty; | |
} | |
_transform(data, encoding, callback) { | |
this.push(JSON.stringify(data, null, this.pretty ? 2 : null)); | |
this.push(',\n'); | |
callback(); | |
} | |
_flush(callback) { | |
this.push('\n]'); | |
callback(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment