Skip to content

Instantly share code, notes, and snippets.

@santaklouse
Last active November 23, 2018 14:34
Show Gist options
  • Select an option

  • Save santaklouse/690ed876ff90c3a426e0e9f06daa5bd9 to your computer and use it in GitHub Desktop.

Select an option

Save santaklouse/690ed876ff90c3a426e0e9f06daa5bd9 to your computer and use it in GitHub Desktop.
NodeJs script for ElasticSearch to MongoDB Import
const elasticClient = require( "elasticsearch" );
// Set global base url for later use in importing scripts
const path = require("path");
const config = require( path.join( __dirname, "config" ) );
/*
* ElasticSearch init section
*/
const ESClient = new elasticClient.Client( {
host: config.esHosts,
requestTimeout: 1000 * 60 * 60,
keepAlive: true,
// log: 'debug',
} );
// ping the cluster
ESClient.ping( {
requestTimeout: 30000
}, function( error ) {
if ( error ) {
console.error( "elastic-search is down" );
} else {
console.log( "elastic-search is ready" );
}
} );
let _fetchAll = function _fetchAll () {
};
_fetchAll.prototype[Symbol.asyncIterator] = async function* values () {
// first we do a search, and specify a scroll timeout
let {_scroll_id, hits} = await ESClient.search({
index: '_all',
// type: '*',
scroll: '30s',
body: {
query: {
"match_all": {}
},
}
});
this.scroll_id = _scroll_id;
this.hits = hits;
while(this.hits && this.hits.hits.length) {
yield {records: this.hits.hits, total: this.hits.total};
let { _scroll_id, hits } = await ESClient.scroll({
scrollId: this.scroll_id,
scroll: '30s'
});
this.scroll_id = _scroll_id;
this.hits = hits;
}
};
ESClient.fetchAll = function () {
return new _fetchAll();
};
module.exports = ESClient;
#!/usr/bin/env node
const mongoDbClient = require("./mongodb-import");
const elasticClient = require('./elastic-client');
const yargs = require('yargs');
const argv = yargs
.option('import', {
alias: 'i',
describe: 'import to MongoDB',
demandOption: true
})
.choices('import', ['all', 'elastic-search'])
.help()
.argv;
/******-End Of Init Section-***** */
mongoDbClient
.connect()
.then(() => {
mongoDbClient.import(esRecords).then(() => {
console.log(`Complete: ${mongoDbClient.getTotal()} records retrieved`);
});
})
.catch(() => {
console.error('Unable connect to MongoDB!');
});
const MongoClient = require('mongodb').MongoClient;
const _ = require('lodash');
const _cliProgress = require('cli-progress');
const progressBar = new _cliProgress.Bar({}, _cliProgress.Presets.shades_classic);
// Connection URL
const url = 'mongodb://localhost:27017';
// Database Name
const client = new MongoClient(url, { useNewUrlParser: true });
const insertChunk = async function(collectionName = 'main', data, db) {
collectionName = collectionName === 'false' ? 'main' : collectionName;
const collection = db.collection(collectionName);
for (let i = 0, l = data.length; i < l; i++) {
data[i]._es_id = data[i]._id;
data[i]._id = undefined;
}
try {
await collection.insertMany(data);
} catch(e) {
return console.log('Error while importing, ids:', _.map(data, '_id'), e.message);
}
};
client.getTotal = progressBar.getTotal;
client.import = async function(scroll) {
if (scroll === undefined) {
return;
}
progressBar.start(scroll.total, 0);
const db = client.db('elasticsearch');
for await (const record of scroll.records) {
const promises = _.chain(record)
.groupBy(rec => rec._type)
.map((value, key) => insertChunk(key, value, db))
.values();
progressBar.increment(record.length);
await Promise.all(promises);
}
console.log('ES => Mongo Done!');
};
module.exports = client;
@santaklouse
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment