Last active
November 23, 2018 14:34
-
-
Save santaklouse/690ed876ff90c3a426e0e9f06daa5bd9 to your computer and use it in GitHub Desktop.
NodeJs script for ElasticSearch to MongoDB Import
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const elasticClient = require( "elasticsearch" ); | |
| // Set global base url for later use in importing scripts | |
| const path = require("path"); | |
| const config = require( path.join( __dirname, "config" ) ); | |
| /* | |
| * ElasticSearch init section | |
| */ | |
| const ESClient = new elasticClient.Client( { | |
| host: config.esHosts, | |
| requestTimeout: 1000 * 60 * 60, | |
| keepAlive: true, | |
| // log: 'debug', | |
| } ); | |
| // ping the cluster | |
| ESClient.ping( { | |
| requestTimeout: 30000 | |
| }, function( error ) { | |
| if ( error ) { | |
| console.error( "elastic-search is down" ); | |
| } else { | |
| console.log( "elastic-search is ready" ); | |
| } | |
| } ); | |
| let _fetchAll = function _fetchAll () { | |
| }; | |
| _fetchAll.prototype[Symbol.asyncIterator] = async function* values () { | |
| // first we do a search, and specify a scroll timeout | |
| let {_scroll_id, hits} = await ESClient.search({ | |
| index: '_all', | |
| // type: '*', | |
| scroll: '30s', | |
| body: { | |
| query: { | |
| "match_all": {} | |
| }, | |
| } | |
| }); | |
| this.scroll_id = _scroll_id; | |
| this.hits = hits; | |
| while(this.hits && this.hits.hits.length) { | |
| yield {records: this.hits.hits, total: this.hits.total}; | |
| let { _scroll_id, hits } = await ESClient.scroll({ | |
| scrollId: this.scroll_id, | |
| scroll: '30s' | |
| }); | |
| this.scroll_id = _scroll_id; | |
| this.hits = hits; | |
| } | |
| }; | |
| ESClient.fetchAll = function () { | |
| return new _fetchAll(); | |
| }; | |
| module.exports = ESClient; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env node | |
| const mongoDbClient = require("./mongodb-import"); | |
| const elasticClient = require('./elastic-client'); | |
| const yargs = require('yargs'); | |
| const argv = yargs | |
| .option('import', { | |
| alias: 'i', | |
| describe: 'import to MongoDB', | |
| demandOption: true | |
| }) | |
| .choices('import', ['all', 'elastic-search']) | |
| .help() | |
| .argv; | |
| /******-End Of Init Section-***** */ | |
| mongoDbClient | |
| .connect() | |
| .then(() => { | |
| mongoDbClient.import(esRecords).then(() => { | |
| console.log(`Complete: ${mongoDbClient.getTotal()} records retrieved`); | |
| }); | |
| }) | |
| .catch(() => { | |
| console.error('Unable connect to MongoDB!'); | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const MongoClient = require('mongodb').MongoClient; | |
| const _ = require('lodash'); | |
| const _cliProgress = require('cli-progress'); | |
| const progressBar = new _cliProgress.Bar({}, _cliProgress.Presets.shades_classic); | |
| // Connection URL | |
| const url = 'mongodb://localhost:27017'; | |
| // Database Name | |
| const client = new MongoClient(url, { useNewUrlParser: true }); | |
| const insertChunk = async function(collectionName = 'main', data, db) { | |
| collectionName = collectionName === 'false' ? 'main' : collectionName; | |
| const collection = db.collection(collectionName); | |
| for (let i = 0, l = data.length; i < l; i++) { | |
| data[i]._es_id = data[i]._id; | |
| data[i]._id = undefined; | |
| } | |
| try { | |
| await collection.insertMany(data); | |
| } catch(e) { | |
| return console.log('Error while importing, ids:', _.map(data, '_id'), e.message); | |
| } | |
| }; | |
| client.getTotal = progressBar.getTotal; | |
| client.import = async function(scroll) { | |
| if (scroll === undefined) { | |
| return; | |
| } | |
| progressBar.start(scroll.total, 0); | |
| const db = client.db('elasticsearch'); | |
| for await (const record of scroll.records) { | |
| const promises = _.chain(record) | |
| .groupBy(rec => rec._type) | |
| .map((value, key) => insertChunk(key, value, db)) | |
| .values(); | |
| progressBar.increment(record.length); | |
| await Promise.all(promises); | |
| } | |
| console.log('ES => Mongo Done!'); | |
| }; | |
| module.exports = client; |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
http://codephoto.ru/i/Rmeo2