Last active
August 29, 2015 14:19
-
-
Save TrevorBasinger/23fd73f85b78761c0596 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* jshint laxbreak: true, laxcomma: true */ | |
/* Create a view using the following: | |
* create view {{resource}}_view as | |
* select * from {{resource}} r1 | |
* where r1.inserted_at = (select r2.inserted_at from {{resource}} r2 | |
* where r1.internalid = r2.internalid | |
* order by CAST(r2.inserted_at as UNSIGNED) desc limit 1); | |
* | |
*/ | |
var R = require ('ramda'), | |
B = require ('baconjs'), | |
Future = require ('data.task'), | |
request = require ('request'), | |
c = require ('../lib/helpers/common'), | |
ns = require ('../lib/netsuite'), | |
mysql = require ('../lib/drivers/mysql'), | |
log = console.log, | |
logI = function (x) { log (x); return x; }, | |
logM = R.curry (function (m, x) { log (m); log (x); return x; }), | |
logF = R.curry (function (f, x) { log (f (x)); return x; }), | |
id = function (x) { return x; }, | |
konst = R.curry (function (a, b) { return a; }), | |
config = require ('../config.json'), | |
// [NSRecord] | |
employees = require ('./employees.json'), | |
// insertDate :: Record -> Record | |
insertDate = function (r) { | |
var record = R.clone (r); | |
if (record && record.columns) { | |
record.columns.inserted_at = new Date().getTime(); | |
} | |
return record; | |
}, | |
// pluckColumns :: Object -> Object | |
pluckColumns = R.prop ('columns'), | |
// pluckAndFlattenColumns :: Object -> Object | |
pluckAndFlattenColumns = R.compose (R.mapObj (ns.flatten), pluckColumns), | |
// evalColumnTypes :: [NSRecord] -> Object | |
evalColumnTypes = R.compose (R.mapObj (c.typeOf), R.mergeAll), | |
// buildCol :: String -> String -> String | |
buildCol = R.curry (function (name, type) { return name + " " + type; }), | |
// escapeColumns :: [String] -> [String] | |
escapeColumns = function (xs) { | |
var fst = "`" + (xs[0]) + "`", | |
snd = xs[1]; | |
return [fst, snd]; | |
}, | |
// getBuildStrings :: [NSRecord] -> [String] | |
getBuildStrings = R.pipe ( R.map (insertDate) | |
, R.map (pluckAndFlattenColumns) | |
, evalColumnTypes | |
, R.mapObj (ns.str2type) | |
, R.toPairs | |
, R.map (escapeColumns) | |
, R.map (R.apply (buildCol)) ), | |
// prependPrimaryKey :: [String] -> [String] | |
prependPrimaryKey = R.prepend ('id INT PRIMARY KEY AUTO_INCREMENT'), | |
// surround :: String -> String -> String -> String | |
surround = R.curry (function (start, end, str) { | |
return R.join ('', R.prepend (start, R.append (end, str.toString()))); | |
}), | |
// mkCreateTableStr :: String -> [NSRecord] -> String | |
mkCreateViewStr= R.curry (function (resource) { | |
var mkView = "select * from {{resource}} r1" | |
+ " where r1.inserted_at = (select r2.inserted_at from {{resource}} r2" | |
+ " where r1.internalid = r2.internalid" | |
+ " order by CAST(r2.inserted_at as UNSIGNED) desc limit 1);"; | |
return "CREATE VIEW "+ resource + "_view as " + R.replace (/{{resource}}/g, resource, mkView); | |
}), | |
// mkCreateTableStr :: String -> [NSRecord] -> String | |
mkCreateTableStr = R.curry (function (resource, records) { | |
var joinColStr = R.pipe ( getBuildStrings | |
, prependPrimaryKey | |
, R.join (', ') | |
, surround ('(', ')') ); | |
return "CREATE TABLE "+ resource + " " + joinColStr (records); | |
}), | |
// mkSelectStr :: String -> Record -> String | |
mkSelectStr = R.curry (function (resource, record) { | |
var rec = pluckAndFlattenColumns (record), | |
keys = R.join (", ", R.keys (rec)), | |
stmt = "select " + keys + " from " + resource + '_view where ', | |
mkSelectFromRecord = R.pipe ( R.toPairs | |
, R.map (preparePairs) | |
, R.map (R.join (' = ')) | |
, R.join (' and ') | |
, surround (stmt, '') ); | |
return mkSelectFromRecord (rec); | |
}), | |
// mkInsertStr :: String -> Record -> String | |
mkInsertStr = R.curry (function (str, obj) { | |
var r = pluckAndFlattenColumns (obj); | |
return mysql.mkInsert (str, R.keys (r), [r]); | |
}), | |
// tableExists :: Conn -> String -> Future Boolean | |
tableExists = function (conn, name) { | |
return mysql.queryC (conn, 'SHOW TABLES LIKE ' + mysql.escape (name)) | |
.map (R.compose (R.lt (0), R.length)); | |
}, | |
// exists :: Conf -> String -> Record -> Future Boolean | |
exists = function (conf, name, record) { | |
return mysql.queryC (conf, mkSelectStr (name, record)) | |
.map (R.compose (R.lt (0), R.length)); | |
}, | |
// createTable :: Conn -> String -> [NSRecord] -> Future [NSRecord] | |
createTable = R.curry (function (conn, resource, records) { | |
var mkTbl = mysql.queryC (conn, mkCreateTableStr (resource, records)) | |
.chain (konst (createView (conn, resource))); | |
return tableExists (conn, resource, records) | |
.chain (branch (Future.of ({}), mkTbl)) | |
.map (konst (records)); | |
}), | |
// createView :: Conf -> String -> [NSRecord] -> Future Unit | |
createView = R.curry (function (conf, resource) { | |
var str = mkCreateViewStr (resource); | |
return mysql.queryC (conf, str).map (konst ({})); | |
}), | |
// insertRecord :: Conf -> String -> NSRecord -> Future Unit | |
insertRecord = R.curry (function (conf, resource, record) { | |
var str = mkInsertStr (resource, insertDate (record)); | |
return mysql.queryC (conf, str).map (logI).map (konst ({})); | |
}), | |
// branch :: a -> a -> Boolean -> a | |
branch = R.curry (function (a, b, bool) { return bool ? a : b; }), | |
// preparePairs :: [[ | |
preparePairs = function (xs) { | |
var ys = escapeColumns (xs); | |
return [ys[0], mysql.escapeVals (ys[1])]; | |
}, | |
// selectiveInsert :: Conf -> String -> Record -> Future Unit | |
selectiveInsert = R.curry (function (conf, resource, record) { | |
var insertF = insertRecord (conf, resource, record); | |
return exists (conf, resource, record) | |
.chain (branch (Future.of ({}), insertF)); | |
}), | |
search = function (searchName) { | |
return new Future (function (reject, resolve) { | |
var params = { record_type: null , search_id: searchName , lower_bound: 0 , batch_size: 5000 }; | |
var options = { url: "https://rest.sandbox.netsuite.com/app/site/hosting/restlet.nl?script=109&deploy=1" | |
, method: "POST" | |
, headers: { 'Authorization': "NLAuth nlauth_account=" + config.account | |
+ ", nlauth_email=" + config.email | |
+ ", nlauth_signature=" + config.password | |
+ ", nlauth_role=1099" | |
, 'content-type': 'application/json' } | |
, json: params }; | |
request.post (options, function (err, data) { | |
if (err) { return reject (err); } | |
else { return resolve (data.body);} | |
}); | |
}); | |
}, | |
// emitFutures :: Stream -> Stream -> [Future a] -> Future Eff Unit | |
emitFutures = R.curry (function (e, s, futures) { | |
return new Future (function (reject, resolve) { | |
forkCPS (R.head (futures), R.tail (futures), function (f) { f.fork (emit (e), emit (s)); }) | |
resolve ({}); | |
}); | |
}), | |
forkCPS = R.curry (function (x, xs, cb) { | |
if (xs.length !== 0) { forkCPS (R.head (xs), R.tail (xs), cb); } | |
cb (x); | |
}), | |
fork = R.curry (function (e, s, f) { f.fork (e, s); }), | |
// emit :: Stream -> a -> Eff Unit | |
emit = R.curry (function (s, v) { s.push(v); }), | |
// closeConnF :: Connection -> Future Eff Unit | |
closeConnF = function (conn) { | |
return new Future (function (reject, resolve) { | |
logI ("ENDING CONNECTION"); | |
conn.end(); | |
resolve ({}); | |
}); | |
}, | |
// toStream :: a -> Stream a | |
toStream = function (xs) { return B.fromArray(xs); } | |
nil = null; | |
(function main (args) { | |
var conf = 'mysql://localhost/test', | |
resource = args[2], | |
e = new B.Bus(), | |
s = new B.Bus(), | |
conn = mysql.getConnection (conf), | |
doMe = search (args[3]) | |
.map (R.prop ('result')) | |
.chain (createTable (conn, resource)) | |
.map (toStream) | |
.map (R.map (selectiveInsert (conn, resource))); | |
s.onValue (log); | |
e.onValue (log); | |
doMe.fork (log, function (x) { | |
x.onValue (fork (emit (e), emit (s))); | |
}); | |
})(process.argv); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment