Skip to content

Instantly share code, notes, and snippets.

@johndstein
Last active June 15, 2021 20:16
Show Gist options
  • Save johndstein/88a136f3f0fbfdfb4623b18e4da97d2d to your computer and use it in GitHub Desktop.
Save johndstein/88a136f3f0fbfdfb4623b18e4da97d2d to your computer and use it in GitHub Desktop.
Node.js writable stream that connects to db, drops and creates table, and then bulk loads rows into table
#!/usr/bin/env node
'use strict';
// Stream unlimited rows into a Sql Server table.
// WARNING!!! WE DROP and RE-CREATE the table. Then stream the data into it.
// Source stream must be an object stream. Object property names must match
// table column names. Since SQL Server isn't case sensitive, don't think case
// matters. Object data types must match column data types. See 'JS Data Type
// To SQL Data Type Map' https://github.com/patriksimek/node-
// mssql/tree/12255523f892d72d440f06d1ee86a2b214961db1
const mssql = require('mssql');
// Unfortunately node's writable doesn't supply a _flush method :( so this is
// a drop in replacement that does.
const FlushWritable = require('flushwritable');
class Whatever extends FlushWritable {
constructor(options) {
options.writable = options.writable || {};
options.writable.objectMode = true;
super(options.writable);
this.options = options;
// REQUIRED this.options.tableName
// REQUIRED this.options.mssql
// {
// driver: 'msnodesqlv8',
// server: 'MY_SERVER',
// database: 'SomeDb',
// stream: true, // You can enable streaming globally or per request
// options: {
// trustedConnection: true,
// encrypt: true
// }
// }
// REQUIRED this.options.tableDefinition
// [
// [ 're_legacy_id', mssql.Int, { nullable: false }],
// [ 'sf_id', mssql.VarChar(18), { nullable: false }],
// [ 're_constiuent_id', mssql.VarChar(20), { nullable: true }]
// ]
// verbose causes mssql to log stuff.
// this.options.verbose;
this.options.batchSize = this.options.batchSize || 1000;
this.state = {};
this.state.connection = null;
this.state.connected = false;
this.state.tableDropped = false;
this.state.intervalId = null;
this.state.rows = [];
}
// Turn row which is an object into an array of it's values in the proper
// sequence (as they are listed in the colNames array).
_rowToArray(colNames, row) {
const a = [];
colNames.forEach((name) => {
a.push(row[name]);
});
return a;
}
// OK, we're sending the rows to good old microsoft.
_sendRows(cb) {
const table = new mssql.Table(this.options.tableName);
table.create = true;
const colNames = [];
// table.columns.add('re_id', mssql.VarChar(77), {
// nullable: false
// });
this.options.tableDefinition.forEach((def) => {
colNames.push(def[0]);
table.columns.add.apply(table.columns, def);
});
// this.state.rows.forEach((row) => {
// table.rows.add(`REID_${ix++}`, `SFID_${ix}`);
// });
this.state.rows.forEach((row) => {
let rowA = this._rowToArray(colNames, row);
table.rows.add.apply(table.rows, rowA);
});
const request = new mssql.Request(this.state.connection);
request.verbose = this.options.verbose;
request.bulk(table)
.on('done', () => {
this.state.rows = [];
cb();
})
.on('error', (err) => {
cb(err);
});
}
_dropTableIfNecessary(cb) {
if (this.state.tableDropped) {
process.nextTick(cb);
} else {
// For some stupid reason you need to attach an on error event to
// request. You are supposed to be able to do the callback style and
// handle error, but it complains that there's no error handler if you
// try that.
const request = new mssql.Request(this.state.connection);
request.verbose = this.options.verbose;
request.batch(`IF OBJECT_ID('${this.options.tableName}', 'U') IS NOT NULL
DROP TABLE ${this.options.tableName}`)
.on('done', () => {
this.state.tableDropped = true;
cb();
})
.on('error', (err) => {
cb(err);
});
}
}
_makeConnectionIfNecessary(cb) {
if (this.state.connected) {
process.nextTick(cb);
} else {
// For some stupid reason you need to attach an on error event to
// connection. You are supposed to be able to do the callback style and
// handle error, but it complains that there's no error handler if you
// try that.
this.state.connection = new mssql.Connection(
this.options.mssql,
(err) => {
if (err) {
cb(err);
} else {
this.state.connected = true;
cb();
}
});
}
}
_sendBatch(cb) {
if (this.state.rows.length === 0) {
// this will only happen if we were called by _flush and there just
// happen to be no remainder in the buffer.
process.nextTick(cb);
} else {
this._makeConnectionIfNecessary((err) => {
if (err) {
cb(err);
} else {
this._dropTableIfNecessary((err) => {
if (err) {
cb(err);
} else {
this._sendRows(cb);
}
});
}
});
}
}
// Built in node stream method used internally to send us data.
_write(object, encoding, cb) {
this.state.rows.push(object);
if (this.state.rows.length === this.options.batchSize) {
this._sendBatch(cb);
} else {
cb();
}
}
// Flush out whatever is remaining in the buffer if anything.
// Unfortunately node doesn't provide this method on it's writable, so need
// to use a custom one -- FlushWritable.
_flush(cb) {
this.state.flush = true;
this._sendBatch(cb);
}
}
exports = module.exports = Whatever;
@4xle
Copy link

4xle commented Jun 7, 2018

@johndstein Fantastic snippet.
Is there a license with this?
If it's not open source, I'd like to ask for permission to:

  • fix the issues listed,
  • extend it's functionality,
  • and use it.

Regards, 4xle

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment