Last active
February 15, 2022 01:36
-
-
Save TheDeveloper/e91f7b7044d4f3a33b2f to your computer and use it in GitHub Desktop.
Script to migrate documents from one Elasticsearch cluster to another using Scan/Scroll
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Migrate documents from one elasticsearch endpoint to another. | |
To squeeze out extra performance, disable index refresh and replicas on the target cluster: | |
curl -XPUT 'http://elasticsearch:9200/_settings' -d '{ | |
"index" : { | |
"refresh_interval" : "-1", | |
"number_of_replicas": "0" | |
} | |
}' | |
*/ | |
let elasticsearch = require('elasticsearch'); | |
let cli = require('commander'); | |
let Redis = require('redis'); | |
let Warlock = require('node-redis-warlock'); | |
let bluebird = require('bluebird'); | |
let url = require('url'); | |
let _ = require('underscore'); | |
let transformFn = require('./copyDocs').index; | |
let setupFn = require('./copyDocs').setupFn; | |
let search = require('./copyDocs').query; | |
let env = process.env; | |
bluebird.promisifyAll(Redis.RedisClient.prototype); | |
bluebird.promisifyAll(Redis.Multi.prototype); | |
cli | |
.version('0.1.0') | |
.option('-f, --from [value]', 'source index, eg. http://192.168.1.100:9200/old_index/old_type') | |
.option('-t, --to [value]', 'to index, eg. http://192.168.1.100:9200/new_index/new_type') | |
.option('-b, --bulk [value]', 'number of operations in bulk batches', 100) | |
.option('-q, --query_size [value]', 'number of documents per shard to retrieve on each scroll', 100) | |
.option('-s, --scroll [value]', 'lifetime of the scroll context', '1m') | |
.option('-o, --request_timeout [value]', 'default 60000', 60000) | |
.option('-r, --trace', 'default false', false) | |
.option('-n, --max_docs [value]', 'default -1 unlimited', -1) | |
.option('-z, --compress [value]', 'if set, requests compression of data in transit', false) | |
.option('-d, --database [value]', 'if set, co-ordinate distributed workers via redis', env.ESMIGRATE_REDIS_PORT_6379_TCP) | |
.parse(process.argv); | |
// console.log(process.env); | |
// process.exit(); | |
// console.log(cli); | |
let jobID = require('crypto') | |
.createHash('sha1') | |
.update(JSON.stringify(search+cli.from)) | |
.digest('hex'); | |
let f = url.parse(cli.from); | |
let t = url.parse(cli.to); | |
let shouldQuit; | |
let unlockFn; | |
let lastScrollId; | |
let scrollKey = 'migrate:scroll-id:'+jobID; | |
let counts = { scrolls: 0, docs: 0, hits: 0 }; | |
let from_client = new elasticsearch.Client({ | |
host: f.host, | |
requestTimeout: cli.request_timeout, | |
suggestCompression: cli.compress, | |
sniffOnStart: true, | |
sniffOnConnectionFault: false, | |
sniffInterval: 60000, | |
log: cli.trace ? 'trace' : null | |
}); | |
let to_client = new elasticsearch.Client({ | |
host: t.host, | |
requestTimeout: cli.request_timeout, | |
suggestCompression: cli.compress, | |
sniffOnStart: true, | |
sniffOnConnectionFault: false, | |
sniffInterval: 60000, | |
log: cli.trace ? 'trace' : null | |
}); | |
console.log(cli, 'job:', jobID); | |
let r = url.parse(cli.database); | |
let redis = Redis.createClient(r.port, r.hostname); | |
let warlock = Warlock(redis); | |
redis.once('ready', () => { | |
waitForES() | |
.then(() => setupFn(from_client, to_client)) | |
.then(() => start()) | |
.then(process.exit) | |
.catch(abort); | |
}) | |
function lock() { | |
return new Promise((resolve, reject) => { | |
let lockKey = 'migrate:scroll:'+jobID; | |
let maxAttempts = 60; | |
let wait = 500; | |
warlock.optimistic(lockKey, 30000, maxAttempts, wait, (err, unlock) => { | |
err ? reject(err) : resolve(unlock); | |
}); | |
}) | |
} | |
function unlock() { | |
return new Promise((resolve, reject) => { | |
unlockFn((err) => { | |
err ? reject(err) : resolve(); | |
}) | |
}); | |
} | |
let signals = ['SIGINT', 'SIGTERM']; | |
function shutDown(signal) { | |
console.log(`Received ${signal}. Shutting down...`); | |
shouldQuit = true; | |
} | |
signals.forEach(signal => { | |
process.on(signal, () => shutDown(signal)); | |
}); | |
function fetchScroll() { | |
return redis.getAsync(scrollKey); | |
} | |
function setScroll(scrollId) { | |
return redis.setAsync(scrollKey, scrollId); | |
} | |
function start() { | |
return lock() | |
.then(u => unlockFn = u) | |
.then(fetchScroll) | |
.then(scrollId => { | |
if (scrollId) return unlock(); | |
search.search_type = 'scan'; | |
search.scroll = cli.scroll; | |
search.size = cli.query_size; | |
search.index = f.pathname.replace('/', ''); | |
console.log(search); | |
return from_client.search(search) | |
.then((result) => { | |
if (!result.hits.total) return; | |
console.log(`Starting scroll. ${result.hits.total} hits.`); | |
return setScroll(result._scroll_id); | |
}) | |
.then(unlock); | |
}).then(scroll) | |
} | |
function scroll() { | |
if (shouldQuit) process.exit(); | |
return fetchScroll() | |
.then(scrollId => { | |
lastScrollId = scrollId; | |
// console.log("scroll"); | |
return from_client.scroll({ | |
scrollId: scrollId, | |
scroll: cli.scroll, | |
size: cli.query_size | |
}).then(batch); | |
}) | |
} | |
function batch(result) { | |
let scrollId = result._scroll_id; | |
let numHits = result.hits.hits.length; | |
if (!numHits) return; | |
if (!counts.hits) counts.hits = result.hits.total; | |
counts.docs += numHits; | |
console.log(counts); | |
if (scrollId !== lastScrollId) { | |
// scroll id has changed. update it. | |
console.log("new scroll id", scrollId); | |
setScroll(scrollId); | |
} | |
let bulk = result.hits.hits.map(transformFn); | |
bulk = _.flatten(bulk); | |
// console.log("bulk"); | |
return to_client.bulk({ | |
body: bulk | |
}) | |
.then(bulkResult => { | |
if (bulkResult.errors) { | |
console.log("bulk failed with errors"); | |
console.log(bulkResult.items.filter(i => i.index.error)); | |
console.log(JSON.stringify(bulk, null, 2)); | |
// return new Promise((resolve) => { | |
// setTimeout(resolve, 500); | |
// }).then(r => { | |
// return batch(result); // retry batch | |
// }) | |
} | |
}) | |
.then(scroll); | |
} | |
function abort(e) { | |
console.trace(e); | |
process.exit(); | |
} | |
function waitForES() { | |
return new Promise((resolve) => { | |
setTimeout(resolve, 2000); | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment