Skip to content

Instantly share code, notes, and snippets.

@amulyakashyap09
Created August 20, 2022 15:15
Show Gist options
  • Save amulyakashyap09/b2420c79426e8f26c3ff05bd698e0e34 to your computer and use it in GitHub Desktop.
Save amulyakashyap09/b2420c79426e8f26c3ff05bd698e0e34 to your computer and use it in GitHub Desktop.
MongoDb To Elastic-Search Sync using Nodejs Stream.
const {
MongoClient, ObjectId
} = require("mongodb");
const {
Client
} = require('@elastic/elasticsearch');
const Transform = require("stream").Transform;
const defaultElasticSettings = {
"index": {
"number_of_replicas": 0
}
};
const userMappings = {
"last_sync_at": {
"type": "date"
},
"username": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullname": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"email": {
"type": "text",
"boost": 7,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"city": {
"type": "text",
"boost": 7,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"picture": {
"type": "text",
"boost": 7,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"state": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"country": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"status": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"zip": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
};
const serverShutDownMins = 2 * 60 * 1000;
let esClient = {};
/**
* Creates a connection to mongodb
* */
async function _connectToMongo() {
const dbName = "usersdb";
const mongoUri = process.env.MONGO_URI || "mongodb://localhost:27017";
const mongoClient = new MongoClient(mongoUri);
await mongoClient.connect();
const db = mongoClient.db(dbName);
return db;
}
/**
* Creates a connection to elastic-search
* */
async function _connectToES(index) {
const isNonProd = process.env.NODE_ENV !== "prod";
const esUri = process.env.ES_URI || "http://localhost:9200";
esClient = new Client({
node: esUri
});
const indexFound = await esClient.indices.exists({
index
});
if (!indexFound) {
await _createIndex();
await _putSettings();
await _putMapping();
}
}
/**
* deletes an index in elastic-search
* */
async function _deleteIndex(index) {
await esClient.indices.delete({
index
});
}
/**
* Creates a new index in elastic-search
* */
async function _createIndex(index) {
await esClient.indices.create({
index, body: {
settings: {
index: {
number_of_replicas: 0
}
}
}
});
}
/**
* Applies index settings to an existing index
* */
async function _putSettings(index, settings = defaultElasticSettings) {
await esClient.indices.close({
index
});
await esClient.indices.putSettings({
index, body: {
settings
}
});
}
/**
* Index mapping to user schema
* */
async function _putMapping(index, properties = userMappings) {
await esClient.indices.open({
index
});
await esClient.indices.putMapping({
index, body: {
properties
}
});
}
/**
* Here you can transform your data
* in any way you want to
*/
const transformedStreamData = new Transform({
readableObjectMode: true,
writableObjectMode: true
});
transformedStreamData._transform = function(item, enc, cb) {
if (item) {
item.last_sync_at = new Date();
transformedStreamData.push(item);
cb();
} else {
process.exit(1);
}
};
/**
* Fetches data from mongodb **/
async function _fetchFromMongodb(db, mongoId, index) {
const collection = db.collection('users');
const query = mongoId ? {
'_id': {
$gt: mongoId
}
} : {};
const options = {};
const stream = await collection.find(query, options).stream();
stream.on("end", function(err) {
console.info(`closing stream now , any error ? ${err} AND shutting down script in ${serverShutDownMins/(60000)} mins `);
setTimeout(() => {
process.exit();
}, serverShutDownMins);
});
await esClient.helpers.bulk({
retries: 3,
wait: 3000,
datasource: stream.pipe(transformedStreamData),
onDocument(doc) {
const _id = doc._id;
delete doc._id;
return {
index: {
_index: index,
_id
}
}
},
onDrop(doc) {
console.error("FAILED DOC ", doc._id);
}
});
}
/**
* Recurring Sync needs last doc inserted id
* using this id will fetch only docs inserted after this id
* **/
async function _getLastDocFromES(index) {
let lastDocId = null;
const result = await esClient.search({
index, "size": 20, "sort": {
"created_at": "desc"
}, "query": {
"match_all": {}
}
});
const records = result.hits.hits || [];
if (Array.isArray(records) && records.length) {
records.forEach(a => {
lastDocId || (lastDocId = ObjectId(a._id)), ObjectId(a._id) > lastDocId && (lastDocId = a._id);
});
}
return lastDocId;
}
/**
* Main Logic Code Block
* **/
async function sync() {
try {
const searchIndex = "users";
const db = await _connectToMongo();
await _connectToES();
const lastDocId = await _getLastDocFromES(searchIndex);
await _fetchFromMongodb(db, lastDocId);
} catch (ex) {
console.trace("EXCEPTION : ", ex, JSON.stringify(ex));
process.exit(1);
}
}
sync();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment