Created
December 21, 2017 16:34
-
-
Save d00rman/f2002afd2d94f3afb9585d2221253eba to your computer and use it in GitHub Desktop.
Move Math data script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env nodejs | |
'use strict'; | |
const cassandra = require('cassandra-driver'); | |
const P = require('bluebird'); | |
const preq = require('preq'); | |
const yargs = require('yargs'); | |
const argv = yargs | |
.usage('Usage: $0 <restbase-url>') | |
.options('h', {alias: 'help'}) | |
.options('H', { | |
alias: 'hostname', | |
default: 'localhost', | |
describe: 'Contact hostname', | |
type: 'string' | |
}) | |
.options('P', { | |
alias: 'port', | |
default: 9042, | |
describe: 'Contact port number', | |
type: 'number' | |
}) | |
.options('u', { | |
alias: 'username', | |
default: 'cassandra', | |
describe: 'Cassandra username', | |
type: 'string' | |
}) | |
.options('p', { | |
alias: 'password', | |
default: 'cassandra', | |
describe: 'Cassandra password', | |
type: 'string' | |
}) | |
.argv; | |
if (argv.help) { | |
yargs.showHelp(); | |
process.exit(0); | |
} | |
const host = argv.hostname; | |
const port = argv.port; | |
const rbHost = argv._[0]; | |
const contact = `${host}:${port}`; | |
const user = argv.username; | |
const pass = argv.password; | |
/** Creates a single connection pool. */ | |
function connect() { | |
const client = new cassandra.Client({ | |
contactPoints: [ contact ], | |
authProvider: new cassandra.auth.PlainTextAuthProvider(user, pass), | |
sslOptions: { ca: '/dev/null' }, | |
promiseFactory: P.fromCallback, | |
queryOptions: { consistency: cassandra.types.consistencies.one }, | |
}); | |
return client.connect().then(() => client); | |
} | |
function _nextPage(client, query, params, pageState, options) { | |
return P.try(() => client.execute(query, params, { | |
prepare: true, | |
fetchSize: options.fetchSize || 5, | |
pageState, | |
})) | |
.catch((err) => { | |
if (!options.retries) { | |
throw err; | |
} | |
options.retries--; | |
return _nextPage(client, query, params, pageState, options); | |
}); | |
} | |
/** | |
* Async-safe Cassandra query execution | |
* | |
* Client#eachRow in the Cassandra driver relies upon a synchronous callback | |
* to provide back-pressure during paging; This function can safely execute | |
* async callback handlers. | |
* | |
* @param {object} cassandra-driver Client instance | |
* @param {string} CQL query string | |
* @param {array} CQL query params | |
* @param {object} options map | |
* @param {function} function to invoke for each row result | |
*/ | |
function eachRow(client, query, params, options, handler) { | |
options.log = options.log || (() => {}); | |
const origOptions = Object.assign({}, options); | |
function processPage(pageState) { | |
options.retries = origOptions.retries; | |
return _nextPage(client, query, params, pageState, options) | |
.then((res) => P.try(() => P.map(res.rows, row => handler(row), { concurrency: 16 })) | |
.then(() => { | |
if (!res || res.pageState === null) { | |
return resolve(); | |
} else { | |
return processPage(res.pageState); | |
// Break the promise chain, so that we don't hold onto a | |
// previous page's memory. | |
//process.nextTick(() => P.try(() => processPage(res.pageState)).catch((e) => { | |
// there's something going on, ignore | |
//}).then(() => resolve())); | |
} | |
})); | |
} | |
return processPage(null); | |
} | |
let count = 0; | |
let cc; | |
const hh = {}; | |
return connect().then((client) => { | |
cc = client; | |
return eachRow( | |
client, | |
'SELECT key, value FROM "local_group_globaldomain_T_mathoid_input".data', | |
{}, | |
{ | |
retries: 10, | |
fetchSize: 100, | |
log: console.log | |
}, | |
(row) => { | |
hh[row.key] = true; | |
count++; | |
if(count % 1000 === 0) { | |
let hhl = Object.keys(hh).length; | |
let drift = count - hhl; | |
console.log(`- ${count}\t${hhl}\t${drift}`); | |
return new P((resolve) => process.nextTick(() => resolve())); | |
} | |
return P.resolve(); | |
} | |
); | |
}).then(() => console.log(`Total count: ${count}`)).finally(() => cc.shutdown()); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment