Skip to content

Instantly share code, notes, and snippets.

@edwardsmarkf
Created December 17, 2016 20:23
Show Gist options
  • Save edwardsmarkf/089b30971d7690bcafcf1a62eba89dd2 to your computer and use it in GitHub Desktop.
Save edwardsmarkf/089b30971d7690bcafcf1a62eba89dd2 to your computer and use it in GitHub Desktop.
'use strict'
const defaults = require('../defaults.js');
const dyncol = require('dyncol'); // added 2016-12-17
const events = require('events');
const pckg = require('../package.json');
const sequelize = require('sequelize'); // added 2016-12-17
const util = require('util');
const sql = require('mariasql');
const dbGenerator = (options, self) => {
let dbOptions = Object.assign({}, options.mariasql);
delete dbOptions.db;
let db = new sql(dbOptions);
db.on('ready', () => {
db.query('CREATE DATABASE IF NOT EXISTS ' + options.mariasql.db, (e, rows) => {
if (e) return self.emit('error', e);
db.query('use ' + options.mariasql.db, (e2, rows2) => {
if (e2) return self.emit('error', e2);
db.query(
('CREATE TABLE IF NOT EXISTS ' + options.table.name +
' (pk bigint AUTO_INCREMENT PRIMARY KEY, ds_key ' + options.table.keyType +
', ds_value ' + options.table.valueType +
', KEY ds_key (ds_key(32)), CONSTRAINT `ds_key_unique` UNIQUE(`ds_key`(32))) ENGINE=InnoDB DEFAULT CHARSET UTF8;'),
(e3, rows3) => {
if (e3) return self.emit('error', e3);
db.query('SHOW TABLES', (e4, rows4) => {
if (e4) return self.emit('error', e4);
rows4.forEach((x, i, ar) => {
self._tableList.push(x.Tables_in_deepstream);
});
});
}
);
self.isReady = true;
self.emit('ready');
});
});
});
db.on('error', (e) => {
if (e.message.includes(`Unknown database \'${self._dbName}\'`)) {
let cleanOptions = Object.assign({}, options);
cleanOptions.mariasql = Object.assign({}, options.mariasql);
cleanOptions.table = Object.assign({}, options.table);
delete cleanOptions.mariasql.db;
db = dbGenerator(cleanOptions, self);
} else self.emit('error', e);
});
db.end();
db.connect();
return db;
};
class Connector extends events.EventEmitter {
constructor (options) {
super();
if (! (typeof options === 'object')) throw new TypeError('Incorrect connection options passed');
this.isReady = false;
this.name = pckg.name;
this.version = pckg.version;
options = Object.assign({}, defaults, options);
options.mariasql = Object.assign({}, defaults.mariasql, options.mariasql);
options.table = Object.assign({}, defaults.table, options.table);
if (process.env.ds_host) options.mariasql.host = process.env.ds_host;
if (process.env.ds_user) options.mariasql.user = process.env.ds_user;
if (process.env.ds_password) options.mariasql.password = process.env.ds_password;
if (process.env.ds_databaseName) options.mariasql.db = process.env.ds_databaseName;
if (process.env.ds_tableName) options.table.name = process.env.ds_tableName;
if (process.env.ds_keyType) options.table.keyType = process.env.ds_keyType;
if (process.env.ds_valueType) options.table.valueType = process.env.ds_valueType;
if (process.env.ds_splitter) options.splitter = process.env.ds_splitter;
this.options = Object.assign({}, options);
this._dbName = options.mariasql.db;
this._table = options.table;
this._splitter = options.splitter;
this._tableList = [];
this._db = dbGenerator(options, this);
}
_upsert (tableName, key, value) {
return new Promise((go, stop) => {
if ( value._v && value._d ) { // ignore the blank ones
var sqlInsertUpdate = 'INSERT INTO '
+ tableName
+ ' SET ds_key = ?'
+ ' , ds_value ='
+ dyncol.createQuery(value)
+ ' ON DUPLICATE KEY UPDATE '
+ ' ds_value ='
+ dyncol.updateQuery('ds_value', value)
+ ';'
;
this._db.query(sqlInsertUpdate, [ key ], {metadata: true}, (e, rows) => {
if (e) {
console.log('update error! ' + e);
return stop(e);
} else {
return go();
}
});
} else {
return go(); // ignore the blank ones
}
});
}
set (key, value, callback) {
var splitted = undefined;
try {
splitted = key.split(this._splitter);
} catch (e) {
return callback(e);
};
let tableName = (splitted.length > 1) ? splitted[0] : this.options.table.name;
if (this._tableList.includes(tableName)) {
this._upsert(tableName, key, value).then(() => {
return callback(null);
}).catch((e) => {
return callback(e);
});
} else {
this._db.query(
('CREATE TABLE IF NOT EXISTS ' + tableName +
' (pk bigint AUTO_INCREMENT PRIMARY KEY, ds_key ' + this._table.keyType +
', ds_value ' + this._table.valueType +
', KEY `ds_key` (`ds_key`(32)), CONSTRAINT ds_key_unique UNIQUE (`ds_key`(32)) ) ENGINE=InnoDB DEFAULT CHARSET UTF8;'),
(e, rows) => {
if (e) return this.emit('error', e);
this._upsert(tableName, key, value).then(() => {
return callback(null);
}).catch((e) => {
return cllback(e);
});
}
);
};
}
get (key, callback) {
console.log('IN get ' );
var splitted = undefined;
try {
splitted = key.split(this._splitter);
} catch (e) {
console.log ('error on split! ' + e );
return callback(e);
};
let tableName = (splitted.length > 1) ? splitted[0] : this._table.name;
console.log('tableName: ' + tableName );
let sqlSelect = 'SELECT COLUMN_JSON(ds_value) "ds_value" '
+ 'FROM '
+ tableName
+ ' WHERE 1'
+ ' AND ds_key = ?'
;
console.log('sqlSelect: ' + sqlSelect);
this._db.query(sqlSelect, [ key ], (e, rows) => {
console.log('rows: ' + util.inspect(rows));
if (e) {
console.log('error on initial SELECT: ' + e );
return callback(e);
}
if ( rows[0] ) {
var sequelizedJson = sequelize.col(rows[0].ds_value);
//return callback(null, JSON.parse('{"_v":7,"_d":{"atata":"fake fake!"}}'));
return callback(null, JSON.parse(sequelizedJson.col));
} else {
return callback(null, null);
}
});
}
delete (key, callback) {
var splitted = undefined;
try {
splitted = key.split(this._splitter);
} catch (e) {
return callback(e);
};
let tableName = (splitted.length > 1) ? splitted[0] : this._table.name;
this._db.query('delete from ' + tableName + ' where ds_key = ?', [ key ], (e, rows) => {
if (e) callback(e);
return callback(null);
});
}
}
module.exports = Connector
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment