Created
October 21, 2016 16:51
-
-
Save johndstein/7ddb8ca68bbf2b2460ff9ad905a7a431 to your computer and use it in GitHub Desktop.
Working example of Node.js readable and writable stream implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
// This is a working example of a readable stream sending rows to insert into | |
// SQL Server. | |
const Readable = require('stream').Readable; | |
const mssql = require('mssql'); | |
const MsSqlStreamInsert = require('../../lib/source/mssql-stream-insert'); | |
const showProgress = require('../../lib/transform/show-progress')(); | |
const options = require('../../.config'); | |
const AnySourceReadable = require('../../lib/source/any-source-readable'); | |
options.mssql.database = 'RE_Reporter'; | |
options.verbose = false; | |
options.tableName = 'sf_contact'; | |
options.tableDefinition = [ | |
['re_legacy_id', mssql.Int, { | |
nullable: false | |
}], | |
['sf_id', mssql.VarChar(18), { | |
nullable: false | |
}], | |
['re_constiuent_id', mssql.VarChar(20), { | |
nullable: true | |
}] | |
]; | |
const msSqlStreamInsert = new MsSqlStreamInsert(options); | |
// Trivial implementation of a source that sends rows till max is reached. | |
class Source { | |
constructor(max) { | |
this.i = 0; | |
this.max = max; | |
this.stopped = false; | |
this.intervalId = null; | |
} | |
// readStart() gets called every time _read() is called. which could be | |
// millions of times. we don't need to do anything here except say stopped = | |
// false and the very first time we are called, kick off our data generating | |
// interval. | |
readStart() { | |
this.stopped = false; | |
if (!this.intervalId) { | |
// Kick off the data generating function on an interval. | |
this.intervalId = setInterval(() => { | |
// If we are stopped, do nothing. | |
if (!this.stopped) { | |
// as long as we are not stopped we keep sending data. | |
if (this.i++ < this.max) { | |
showProgress(); | |
this.onData({ | |
re_legacy_id: this.i, | |
sf_id: `SF_${this.i}`, | |
re_constiuent_id: `RE_${this.i}` | |
}); | |
} else { | |
// once we have sent all our rows, clear the interval that sends | |
// data and tell the downstream that we are done by calling the | |
// onEnd() method. | |
clearInterval(this.intervalId); | |
this.onEnd(); | |
} | |
} | |
}, 2); | |
} | |
} | |
// This will only be called when the downstream can't handle any more data. | |
// So we nicely stop sending it data till it calls _read() again. | |
readStop() { | |
console.log('readStop'); | |
this.stopped = true; | |
} | |
} | |
// The following shows that we correctly get back pressure on the stream and | |
// are able to start pushing again when the downstream is ready. | |
// jstein@VORLPC105 MINGW64 ~/sfloader (master) | |
// $ node spottest/john/mssql-stream-insert-10000000.js | |
// .readStop | |
// .readStop | |
// .readStop | |
// .readStop | |
// .readStop | |
// Look Ma! No callbacks! Just pure note streaming joy. | |
// MsSqlStreamInsert logs into SQL Server, drops the table, creates it, | |
// and when it's all good and ready starts accepting input from AnySourceReadable. | |
new AnySourceReadable( | |
new Source(3333), { | |
objectMode: true, | |
highWaterMark: 3 | |
}) | |
.pipe(msSqlStreamInsert) | |
.on('error', (err) => { | |
throw err; | |
}) | |
.on('finish', () => { | |
console.log('finish'); | |
// TODO must be something else i need to close because we are't quitting | |
// immediately. | |
mssql.close(); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment