Last active
February 25, 2016 02:59
-
-
Save thanthos/b18d465ab7771533487a to your computer and use it in GitHub Desktop.
This is a Javascript which helps with the extraction of data in elastic search. Somehow, this script uses less memory than the default Logstash. ( I encountered errors with logstash but not with Javascripts. ) This iteration of the script is single threaded and more for someone who knows how to write Javascripts. Will evolve it to become more of…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require('dotenv').load({'path': '.env'}); | |
var elasticSearch = require("elasticsearch"); | |
var esRWClient = require("./esClient"); //This is es client initialization | |
var esRClient = require("./esClient_readOnly"); | |
var bunyan = require('bunyan'); | |
var target_index = process.env.target||'reindex'; //Change this to your target index . | |
var source_index = process.env.source; //Change this to your source index. | |
var global_scroll_id; | |
var failedLoged = bunyan.createLogger({ | |
name:'failedLogger', | |
streams: [{ | |
type: 'rotating-file', | |
path: 'failIndexed.log', | |
period: '1d', // daily rotation | |
count: 3, // keep 3 back copies | |
level:'debug' | |
}], | |
}); | |
var log = bunyan.createLogger({ | |
name: 'reindex', | |
streams: [ | |
{ | |
level: 'info', | |
stream: process.stdout | |
}, | |
{ | |
level: 'debug', | |
path: 'reindex.debug.log' // log ERROR and above to a file | |
} | |
] | |
}); | |
var name = __filename; | |
var doneCount = 0; //For total Read | |
var indexCount = 0; //Successful Indexing | |
var failCount = 0; //failed | |
var alreadyDone = 0; //This is for the redo. | |
var offset = process.env.offset||0; | |
var size = process.env.size||1000;//Change this as per your need. A bigger number for more powerful elasticserach source. A smaller number for those with resource constrain. | |
var totalPopulation = 0; | |
var batchWriteSize = process.env.batchwritesize||500; | |
var startTime=0, endTime=0; | |
//Pass in the upserting function if requiring to update. | |
function read(scrollID){ | |
function printStatus(count, batchsize, success, failed, total){ | |
if ( count >= batchsize){ | |
log.info({total:total,batch:batchsize,success:success, failed, failed,count:count, offset:offset, alreadyDone:alreadyDone},"Progress so far "); | |
} | |
} | |
log.debug("read. ScrollID:- "+scrollID); | |
function processResponse(resp){ //Single Record | |
var resultArray = resp.hits.hits; | |
var batchSize = resp.hits.hits.length; | |
global_scroll_id = resp._scroll_id; | |
var current = 0; | |
log.debug({batchSize: batchSize}); | |
function nextStep(){ | |
if ( doneCount >= totalPopulation && current >= batchSize){ | |
log.info({total:totalPopulation,success:indexCount,failed:failCount},"Total Completed." ); | |
}else if (current >= batchSize) { | |
printStatus(doneCount, batchSize,indexCount,failCount,totalPopulation); | |
read(resp._scroll_id); | |
} | |
}; | |
for ( var i in resultArray ){ | |
var instance = resultArray[i]; | |
log.trace({instance:instance}); | |
try{ | |
esRWClient.create({ | |
"index":target_index, | |
"type":instance._type, | |
"id":instance._id, | |
"body":instance._source, //You may need to change this to suit your reindexing needs | |
"timestamp":instance._source["@timestamp"] | |
}).then(function (response){ | |
log.trace({index:target_index,type:instance._type,id:instance._id},"document"); | |
indexCount++; | |
doneCount++; | |
current++; | |
nextStep(); | |
}, function (error){ | |
if ( !error.status == 409 ) { | |
failedLoged.error({error:error.path},"Error Indexing"); | |
failCount++ | |
}else{ | |
alreadyDone++; | |
}// | |
current++; | |
doneCount++; | |
nextStep(); | |
}); | |
}catch(e) { | |
log.error(e); | |
} | |
} | |
} | |
function processInBatch(resp){ | |
var resultArray = resp.hits.hits; | |
var batchSize = resp.hits.hits.length; | |
var current = 0; | |
global_scroll_id = resp._scroll_id; | |
var request_body = []; //This is the payload for the bulk index | |
function composeAction(instance){ | |
request_body.push({ | |
create:{ | |
_index:target_index, | |
_type:instance._type, | |
_id:instance._id, | |
_timestamp:instance._source["@timestamp"] | |
} | |
}); | |
request_body.push(instance._source); | |
} | |
for ( var i in resultArray ){ | |
var instance = resultArray[i]; | |
composeAction(instance); | |
if ( current >= batchWriteSize || current >= batchSize || current >= totalPopulation ){ | |
esRWClient.bulk({body:request_body}).then( | |
function(response){ | |
log.debug({response:response},"success res"); | |
indexCount=indexCount+current; | |
doneCount=doneCount+current; | |
}, | |
function(error){ | |
log.debug({error:error},"err res"); | |
failCount = failCount+current; | |
doneCount=doneCount+current; | |
}); | |
} | |
} | |
} | |
if ( scrollID||global_scroll_id ) { | |
try{ | |
log.debug("To Next Frame"); | |
esRClient.scroll({scrollId:scrollID, scroll: '2m'}).then( | |
function(response){ | |
processResponse(response); | |
}, function(err){ | |
//TODO. Differentiate between the error and only retry those due to network | |
log.warn({error:err,doneCount:doneCount,totalPopulation:totalPopulation},"Issue with Scrolling to next. Retrying"); | |
if ( !(doneCount >= totalPopulation) ){ | |
setTimeout(read(global_scroll_id),1000); | |
} | |
}); | |
}catch(e){ | |
//TODO. Differentiate between the error and only retry those due to network | |
log.warn({error:e,doneCount:doneCount,totalPopulation:totalPopulation},"Issue with Scrolling to next. Retrying"); | |
if ( !(doneCount >= totalPopulation) ){ | |
setTimeout(read(global_scroll_id),1000); | |
} | |
} | |
}else{ | |
log.info("Getting Records"); | |
log.info({target_index:target_index,source_index:source_index}); | |
esRClient.search({ | |
'index':source_index, | |
'size':size, | |
'scroll':"2m", | |
'body':{"query": { | |
"filtered": { | |
"query": {"query_string": { "query": "*" } }, | |
"filter":{"bool": { | |
"must": [{"range": {"@timestamp": { "gte": startTime||0,"lte": endTime||9000000000000, "format": "epoch_millis" }}}], | |
"must_not": [] | |
} | |
} | |
} | |
}}, | |
'_source':true, | |
'_sourceInclude': ["message","type","@timestamp","post_code","device_id","@version"] | |
}).then(function(response){ | |
totalPopulation = response.hits.total; | |
log.info("Total Records Matched :"+totalPopulation); | |
processResponse(response); | |
},function(err){ | |
log.warn({err:err},"Issue with Getting Records. Not Retrying"); | |
printStatus(0, -9,indexCount,failCount,0); | |
log.error(err); | |
}); | |
} | |
} | |
function printUsage(){ | |
console.log("Incorrect Usage"); | |
console.log("node javascript_reindexing.js [optional log level]"); | |
} | |
process.on('uncaughtException', function(err){ | |
if ( err != 0){ | |
log.warn({err:err}); | |
console.log("doneCount "+doneCount +" ::::indexCount "+indexCount +" ::::failCount "+ failCount ); | |
} | |
if ( global_scroll_id && esRClient ){ | |
console.log("Clear Scroll"); | |
esRClient.clearScroll(global_scroll_id); | |
} | |
}); | |
process.on('exit', function(err){ | |
if ( err != 0 ){ | |
log.warn({err:err}); | |
console.log("doneCount "+doneCount +" ::::indexCount "+indexCount +" ::::failCount "+ failCount ); | |
} | |
if ( global_scroll_id && esRClient ){ | |
console.log("Clear Scroll"); | |
esRClient.clearScroll(global_scroll_id); | |
} | |
}); | |
if ( process.argv.length < 3 ) { | |
printUsage(); | |
process.exit(9); | |
} | |
if ( process.argv[2] ){ | |
if ( process.argv[2] != 'trace' && | |
process.argv[2] != 'debug' && | |
process.argv[2] != 'info' && | |
process.argv[2] != 'warn' && | |
process.argv[2] != 'error' && | |
process.argv[2] != 'fatal' ) { | |
log = bunyan.createLogger({ | |
name: 'reindex', | |
level: process.env.LOG_LEVEL||'info' | |
}); | |
log.warn("Incorrect Log level Specified at commandline. Defaulting to "+process.env.LOG_LEVEL||'info'); | |
}else{ | |
log = bunyan.createLogger({ | |
name: 'reindex', | |
level: process.argv[2] | |
}); | |
} | |
} | |
if (process.argv[3]) { | |
startTime = process.argv[3]||0; | |
} | |
if (process.argv[4]) { | |
endTime = process.argv[4]||0; | |
} | |
log.info("Reindexing"); | |
read(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment