Skip to content

Instantly share code, notes, and snippets.

@TheDeveloper
Last active February 15, 2022 01:36
Show Gist options
  • Save TheDeveloper/e91f7b7044d4f3a33b2f to your computer and use it in GitHub Desktop.
Save TheDeveloper/e91f7b7044d4f3a33b2f to your computer and use it in GitHub Desktop.
Script to migrate documents from one Elasticsearch cluster to another using Scan/Scroll
/*
Migrate documents from one elasticsearch endpoint to another.
To squeeze out extra performance, disable index refresh and replicas on the target cluster:
curl -XPUT 'http://elasticsearch:9200/_settings' -d '{
"index" : {
"refresh_interval" : "-1",
"number_of_replicas": "0"
}
}'
*/
let elasticsearch = require('elasticsearch');
let cli = require('commander');
let Redis = require('redis');
let Warlock = require('node-redis-warlock');
let bluebird = require('bluebird');
let url = require('url');
let _ = require('underscore');
let transformFn = require('./copyDocs').index;
let setupFn = require('./copyDocs').setupFn;
let search = require('./copyDocs').query;
let env = process.env;
bluebird.promisifyAll(Redis.RedisClient.prototype);
bluebird.promisifyAll(Redis.Multi.prototype);
cli
.version('0.1.0')
.option('-f, --from [value]', 'source index, eg. http://192.168.1.100:9200/old_index/old_type')
.option('-t, --to [value]', 'to index, eg. http://192.168.1.100:9200/new_index/new_type')
.option('-b, --bulk [value]', 'number of operations in bulk batches', 100)
.option('-q, --query_size [value]', 'number of documents per shard to retrieve on each scroll', 100)
.option('-s, --scroll [value]', 'lifetime of the scroll context', '1m')
.option('-o, --request_timeout [value]', 'default 60000', 60000)
.option('-r, --trace', 'default false', false)
.option('-n, --max_docs [value]', 'default -1 unlimited', -1)
.option('-z, --compress [value]', 'if set, requests compression of data in transit', false)
.option('-d, --database [value]', 'if set, co-ordinate distributed workers via redis', env.ESMIGRATE_REDIS_PORT_6379_TCP)
.parse(process.argv);
// console.log(process.env);
// process.exit();
// console.log(cli);
let jobID = require('crypto')
.createHash('sha1')
.update(JSON.stringify(search+cli.from))
.digest('hex');
let f = url.parse(cli.from);
let t = url.parse(cli.to);
let shouldQuit;
let unlockFn;
let lastScrollId;
let scrollKey = 'migrate:scroll-id:'+jobID;
let counts = { scrolls: 0, docs: 0, hits: 0 };
let from_client = new elasticsearch.Client({
host: f.host,
requestTimeout: cli.request_timeout,
suggestCompression: cli.compress,
sniffOnStart: true,
sniffOnConnectionFault: false,
sniffInterval: 60000,
log: cli.trace ? 'trace' : null
});
let to_client = new elasticsearch.Client({
host: t.host,
requestTimeout: cli.request_timeout,
suggestCompression: cli.compress,
sniffOnStart: true,
sniffOnConnectionFault: false,
sniffInterval: 60000,
log: cli.trace ? 'trace' : null
});
console.log(cli, 'job:', jobID);
let r = url.parse(cli.database);
let redis = Redis.createClient(r.port, r.hostname);
let warlock = Warlock(redis);
redis.once('ready', () => {
waitForES()
.then(() => setupFn(from_client, to_client))
.then(() => start())
.then(process.exit)
.catch(abort);
})
function lock() {
return new Promise((resolve, reject) => {
let lockKey = 'migrate:scroll:'+jobID;
let maxAttempts = 60;
let wait = 500;
warlock.optimistic(lockKey, 30000, maxAttempts, wait, (err, unlock) => {
err ? reject(err) : resolve(unlock);
});
})
}
function unlock() {
return new Promise((resolve, reject) => {
unlockFn((err) => {
err ? reject(err) : resolve();
})
});
}
let signals = ['SIGINT', 'SIGTERM'];
function shutDown(signal) {
console.log(`Received ${signal}. Shutting down...`);
shouldQuit = true;
}
signals.forEach(signal => {
process.on(signal, () => shutDown(signal));
});
function fetchScroll() {
return redis.getAsync(scrollKey);
}
function setScroll(scrollId) {
return redis.setAsync(scrollKey, scrollId);
}
function start() {
return lock()
.then(u => unlockFn = u)
.then(fetchScroll)
.then(scrollId => {
if (scrollId) return unlock();
search.search_type = 'scan';
search.scroll = cli.scroll;
search.size = cli.query_size;
search.index = f.pathname.replace('/', '');
console.log(search);
return from_client.search(search)
.then((result) => {
if (!result.hits.total) return;
console.log(`Starting scroll. ${result.hits.total} hits.`);
return setScroll(result._scroll_id);
})
.then(unlock);
}).then(scroll)
}
function scroll() {
if (shouldQuit) process.exit();
return fetchScroll()
.then(scrollId => {
lastScrollId = scrollId;
// console.log("scroll");
return from_client.scroll({
scrollId: scrollId,
scroll: cli.scroll,
size: cli.query_size
}).then(batch);
})
}
function batch(result) {
let scrollId = result._scroll_id;
let numHits = result.hits.hits.length;
if (!numHits) return;
if (!counts.hits) counts.hits = result.hits.total;
counts.docs += numHits;
console.log(counts);
if (scrollId !== lastScrollId) {
// scroll id has changed. update it.
console.log("new scroll id", scrollId);
setScroll(scrollId);
}
let bulk = result.hits.hits.map(transformFn);
bulk = _.flatten(bulk);
// console.log("bulk");
return to_client.bulk({
body: bulk
})
.then(bulkResult => {
if (bulkResult.errors) {
console.log("bulk failed with errors");
console.log(bulkResult.items.filter(i => i.index.error));
console.log(JSON.stringify(bulk, null, 2));
// return new Promise((resolve) => {
// setTimeout(resolve, 500);
// }).then(r => {
// return batch(result); // retry batch
// })
}
})
.then(scroll);
}
function abort(e) {
console.trace(e);
process.exit();
}
function waitForES() {
return new Promise((resolve) => {
setTimeout(resolve, 2000);
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment