Skip to content

Instantly share code, notes, and snippets.

@raed667
Last active December 29, 2016 09:55
Show Gist options
  • Save raed667/abfc528ba541fda0773fc2e337544586 to your computer and use it in GitHub Desktop.
Save raed667/abfc528ba541fda0773fc2e337544586 to your computer and use it in GitHub Desktop.
Bulk insertion with a buffer
const elasticsearch = require('elasticsearch');
const mqtt = require('mqtt');
const chalk = require('chalk');
const jsonSize = require('json-size');
const ProgressBar = require('ascii-progress');
const memwatch = require('memwatch-next');
const os = require('os');
const INDEX = 'mqtt',
TYPE = 'eclipse',
BUFFER_SIZE = 100; // kilobytes
const bar = new ProgressBar({
schema: ':bar.green [:current.bold.yellow/:total] Kilobytes (:percent) :elapseds.yellow',
total: BUFFER_SIZE,
colors: ':bar.red :percent.green'
});
let bulkRecords = [],
tempRecords = [],
mutex = false;
const ESclient = new elasticsearch.Client({
host: 'localhost:9200',
log: 'error'
});
const client = mqtt.connect('mqtt://iot.eclipse.org');
client.on('connect', () => {
client.subscribe('#');
});
client.on('message', (topic, message) => {
addARecord(message.toString(), topic);
});
function addARecord(message, topic) {
const record = {
date: new Date().toISOString(),
message: message,
topic: topic
};
if (mutex) {
tempRecords.push({create: {_index: INDEX, _type: TYPE}});
tempRecords.push(record);
} else {
if (tempRecords.length > 0) {
console.log(`Temporary : ${chalk.yellow(tempRecords.length)} Size : ${jsonSize(tempRecords) / 1024} Kilobyte`);
bulkRecords = tempRecords;
tempRecords = [];
console.log("Passed temporary buffer to data buffer");
}
bulkRecords.push({create: {_index: INDEX, _type: TYPE}});
bulkRecords.push(record);
const bulkSize = jsonSize(bulkRecords) / 1024;
if (bulkSize >= BUFFER_SIZE) {
bar.clear();
console.log();
console.log(`Total size : ${chalk.bold.underline(bulkSize)} Kilobytes`);
console.log(`Number of elements ${bulkRecords.length}`);
saveBulk();
} else {
bar.update(bulkSize / BUFFER_SIZE);
}
}
}
function saveBulk() {
mutex = true;
ESclient.bulk({
body: bulkRecords
}, function (err, resp) {
bulkRecords = []; // Reset the array
mutex = false; // release the mutex
if (err) {
console.log(chalk.bold.red(`ERROR : ${err}`));
} else {
console.log(chalk.green('All is good'));
if (resp.errors) {
console.log(chalk.bold.red('WITH ERRORS'));
}
}
});
}
/* Output sample:
▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇———————— [260/300] Kilobytes (87%) 2732.2s
Total size : 443.671875 Kilobytes
Number of elements 41764
All is good
TEMP : 44642
Passed from temp to data
*/
memwatch.on('leak', function (info) {
console.error('Memory leak detected: ', info);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment