Skip to content

Instantly share code, notes, and snippets.

@puuble
Last active February 23, 2023 18:22
Show Gist options
  • Save puuble/c0d33b19fe5f6a016816a07afba81004 to your computer and use it in GitHub Desktop.
Save puuble/c0d33b19fe5f6a016816a07afba81004 to your computer and use it in GitHub Desktop.
lambda and cloudwatch trigger
const MongoClient = require('mongodb').MongoClient;
const { Client } = require('@elastic/elasticsearch');
const uri = 'mongodb://localhost:27017/mydb';
const dbName = 'mydb';
const collectionName = 'mycollection';
const elasticUrl = 'http://localhost:9200';
const indexName = 'myindex';
exports.handler = async function(event, context) {
const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });
const elasticClient = new Client({ node: elasticUrl });
try {
await client.connect();
const collection = client.db(dbName).collection(collectionName);
const data = await collection.find({}).toArray();
const transformedData = transformData(data);
const result = await elasticClient.bulk({ body: transformedData });
console.log(`Sent ${result.body.items.length} documents to Elasticsearch`);
} catch (error) {
console.error(`Failed to transfer data: ${error.message}`);
throw error;
} finally {
await client.close();
await elasticClient.close();
}
};
function transformData(data) {
return data.map(doc => ({
index: {
_index: indexName,
_type: '_doc',
_id: doc._id.toString()
}
}), {
...doc,
});
}
const maxRetries = 3;
const retryDelay = 1000; // 1 second
async function sendBatch(batch) {
let retries = 0;
while (retries < maxRetries) {
try {
const transformedData = transformData(batch);
const result = await elasticClient.bulk({ body: transformedData });
console.log(`Sent ${result.body.items.length} documents to Elasticsearch`);
return;
} catch (error) {
console.error(`Failed to send batch: ${error.message}`);
retries++;
await new Promise(resolve => setTimeout(resolve, retryDelay * retries));
}
}
console.error(`Failed to send batch after ${maxRetries} retries`);
}
const pageSize = 1000;
let page = 0;
let data = [];
let hasMore = true;
while (hasMore) {
data = await collection.find({})
.skip(page * pageSize)
.limit(pageSize)
.toArray();
hasMore = data.length === pageSize;
const transformedData = transformData(data);
const result = await elasticClient.bulk({ body: transformedData });
console.log(`Sent ${result.body.items.length} documents to Elasticsearch`);
page++;
}
const cron = require('node-cron');
cron.schedule('* * * * *', async () => {
await mongoClient.connect();
const db = mongoClient.db('your-db-name');
const collection = db.collection('your-collection-name');
const data = await collection.find({}).toArray();
const transformedData = transformData(data);
const result = await elasticClient.bulk({ body: transformedData });
console.log(`Sent ${result.body.items.length} documents to Elasticsearch`);
await mongoClient.close();
});
@puuble
Copy link
Author

puuble commented Feb 23, 2023

You can create a lambda function for trigger with CloudWatch each min.

@puuble
Copy link
Author

puuble commented Feb 23, 2023

aws events put-rule --name "TransferDataRule" --schedule-expression "rate(1 minute)"
this is cli code for creating cloud watch trigger. You can also create on AWS console

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment