Skip to content

Instantly share code, notes, and snippets.

@TrevorBasinger
Last active August 29, 2015 14:19
Show Gist options
  • Save TrevorBasinger/23fd73f85b78761c0596 to your computer and use it in GitHub Desktop.
Save TrevorBasinger/23fd73f85b78761c0596 to your computer and use it in GitHub Desktop.
/* jshint laxbreak: true, laxcomma: true */
/* Create a view using the following:
* create view {{resource}}_view as
* select * from {{resource}} r1
* where r1.inserted_at = (select r2.inserted_at from {{resource}} r2
* where r1.internalid = r2.internalid
* order by CAST(r2.inserted_at as UNSIGNED) desc limit 1);
*
*/
var R = require ('ramda'),
B = require ('baconjs'),
Future = require ('data.task'),
request = require ('request'),
c = require ('../lib/helpers/common'),
ns = require ('../lib/netsuite'),
mysql = require ('../lib/drivers/mysql'),
log = console.log,
logI = function (x) { log (x); return x; },
logM = R.curry (function (m, x) { log (m); log (x); return x; }),
logF = R.curry (function (f, x) { log (f (x)); return x; }),
id = function (x) { return x; },
konst = R.curry (function (a, b) { return a; }),
config = require ('../config.json'),
// [NSRecord]
employees = require ('./employees.json'),
// insertDate :: Record -> Record
insertDate = function (r) {
var record = R.clone (r);
if (record && record.columns) {
record.columns.inserted_at = new Date().getTime();
}
return record;
},
// pluckColumns :: Object -> Object
pluckColumns = R.prop ('columns'),
// pluckAndFlattenColumns :: Object -> Object
pluckAndFlattenColumns = R.compose (R.mapObj (ns.flatten), pluckColumns),
// evalColumnTypes :: [NSRecord] -> Object
evalColumnTypes = R.compose (R.mapObj (c.typeOf), R.mergeAll),
// buildCol :: String -> String -> String
buildCol = R.curry (function (name, type) { return name + " " + type; }),
// escapeColumns :: [String] -> [String]
escapeColumns = function (xs) {
var fst = "`" + (xs[0]) + "`",
snd = xs[1];
return [fst, snd];
},
// getBuildStrings :: [NSRecord] -> [String]
getBuildStrings = R.pipe ( R.map (insertDate)
, R.map (pluckAndFlattenColumns)
, evalColumnTypes
, R.mapObj (ns.str2type)
, R.toPairs
, R.map (escapeColumns)
, R.map (R.apply (buildCol)) ),
// prependPrimaryKey :: [String] -> [String]
prependPrimaryKey = R.prepend ('id INT PRIMARY KEY AUTO_INCREMENT'),
// surround :: String -> String -> String -> String
surround = R.curry (function (start, end, str) {
return R.join ('', R.prepend (start, R.append (end, str.toString())));
}),
// mkCreateTableStr :: String -> [NSRecord] -> String
mkCreateViewStr= R.curry (function (resource) {
var mkView = "select * from {{resource}} r1"
+ " where r1.inserted_at = (select r2.inserted_at from {{resource}} r2"
+ " where r1.internalid = r2.internalid"
+ " order by CAST(r2.inserted_at as UNSIGNED) desc limit 1);";
return "CREATE VIEW "+ resource + "_view as " + R.replace (/{{resource}}/g, resource, mkView);
}),
// mkCreateTableStr :: String -> [NSRecord] -> String
mkCreateTableStr = R.curry (function (resource, records) {
var joinColStr = R.pipe ( getBuildStrings
, prependPrimaryKey
, R.join (', ')
, surround ('(', ')') );
return "CREATE TABLE "+ resource + " " + joinColStr (records);
}),
// mkSelectStr :: String -> Record -> String
mkSelectStr = R.curry (function (resource, record) {
var rec = pluckAndFlattenColumns (record),
keys = R.join (", ", R.keys (rec)),
stmt = "select " + keys + " from " + resource + '_view where ',
mkSelectFromRecord = R.pipe ( R.toPairs
, R.map (preparePairs)
, R.map (R.join (' = '))
, R.join (' and ')
, surround (stmt, '') );
return mkSelectFromRecord (rec);
}),
// mkInsertStr :: String -> Record -> String
mkInsertStr = R.curry (function (str, obj) {
var r = pluckAndFlattenColumns (obj);
return mysql.mkInsert (str, R.keys (r), [r]);
}),
// tableExists :: Conn -> String -> Future Boolean
tableExists = function (conn, name) {
return mysql.queryC (conn, 'SHOW TABLES LIKE ' + mysql.escape (name))
.map (R.compose (R.lt (0), R.length));
},
// exists :: Conf -> String -> Record -> Future Boolean
exists = function (conf, name, record) {
return mysql.queryC (conf, mkSelectStr (name, record))
.map (R.compose (R.lt (0), R.length));
},
// createTable :: Conn -> String -> [NSRecord] -> Future [NSRecord]
createTable = R.curry (function (conn, resource, records) {
var mkTbl = mysql.queryC (conn, mkCreateTableStr (resource, records))
.chain (konst (createView (conn, resource)));
return tableExists (conn, resource, records)
.chain (branch (Future.of ({}), mkTbl))
.map (konst (records));
}),
// createView :: Conf -> String -> [NSRecord] -> Future Unit
createView = R.curry (function (conf, resource) {
var str = mkCreateViewStr (resource);
return mysql.queryC (conf, str).map (konst ({}));
}),
// insertRecord :: Conf -> String -> NSRecord -> Future Unit
insertRecord = R.curry (function (conf, resource, record) {
var str = mkInsertStr (resource, insertDate (record));
return mysql.queryC (conf, str).map (logI).map (konst ({}));
}),
// branch :: a -> a -> Boolean -> a
branch = R.curry (function (a, b, bool) { return bool ? a : b; }),
// preparePairs :: [[
preparePairs = function (xs) {
var ys = escapeColumns (xs);
return [ys[0], mysql.escapeVals (ys[1])];
},
// selectiveInsert :: Conf -> String -> Record -> Future Unit
selectiveInsert = R.curry (function (conf, resource, record) {
var insertF = insertRecord (conf, resource, record);
return exists (conf, resource, record)
.chain (branch (Future.of ({}), insertF));
}),
search = function (searchName) {
return new Future (function (reject, resolve) {
var params = { record_type: null , search_id: searchName , lower_bound: 0 , batch_size: 5000 };
var options = { url: "https://rest.sandbox.netsuite.com/app/site/hosting/restlet.nl?script=109&deploy=1"
, method: "POST"
, headers: { 'Authorization': "NLAuth nlauth_account=" + config.account
+ ", nlauth_email=" + config.email
+ ", nlauth_signature=" + config.password
+ ", nlauth_role=1099"
, 'content-type': 'application/json' }
, json: params };
request.post (options, function (err, data) {
if (err) { return reject (err); }
else { return resolve (data.body);}
});
});
},
// emitFutures :: Stream -> Stream -> [Future a] -> Future Eff Unit
emitFutures = R.curry (function (e, s, futures) {
return new Future (function (reject, resolve) {
forkCPS (R.head (futures), R.tail (futures), function (f) { f.fork (emit (e), emit (s)); })
resolve ({});
});
}),
forkCPS = R.curry (function (x, xs, cb) {
if (xs.length !== 0) { forkCPS (R.head (xs), R.tail (xs), cb); }
cb (x);
}),
fork = R.curry (function (e, s, f) { f.fork (e, s); }),
// emit :: Stream -> a -> Eff Unit
emit = R.curry (function (s, v) { s.push(v); }),
// closeConnF :: Connection -> Future Eff Unit
closeConnF = function (conn) {
return new Future (function (reject, resolve) {
logI ("ENDING CONNECTION");
conn.end();
resolve ({});
});
},
// toStream :: a -> Stream a
toStream = function (xs) { return B.fromArray(xs); }
nil = null;
(function main (args) {
var conf = 'mysql://localhost/test',
resource = args[2],
e = new B.Bus(),
s = new B.Bus(),
conn = mysql.getConnection (conf),
doMe = search (args[3])
.map (R.prop ('result'))
.chain (createTable (conn, resource))
.map (toStream)
.map (R.map (selectiveInsert (conn, resource)));
s.onValue (log);
e.onValue (log);
doMe.fork (log, function (x) {
x.onValue (fork (emit (e), emit (s)));
});
})(process.argv);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment