Last active
December 29, 2016 09:55
-
-
Save raed667/abfc528ba541fda0773fc2e337544586 to your computer and use it in GitHub Desktop.
Bulk insertion with a buffer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const elasticsearch = require('elasticsearch'); | |
const mqtt = require('mqtt'); | |
const chalk = require('chalk'); | |
const jsonSize = require('json-size'); | |
const ProgressBar = require('ascii-progress'); | |
const memwatch = require('memwatch-next'); | |
const os = require('os'); | |
const INDEX = 'mqtt', | |
TYPE = 'eclipse', | |
BUFFER_SIZE = 100; // kilobytes | |
const bar = new ProgressBar({ | |
schema: ':bar.green [:current.bold.yellow/:total] Kilobytes (:percent) :elapseds.yellow', | |
total: BUFFER_SIZE, | |
colors: ':bar.red :percent.green' | |
}); | |
let bulkRecords = [], | |
tempRecords = [], | |
mutex = false; | |
const ESclient = new elasticsearch.Client({ | |
host: 'localhost:9200', | |
log: 'error' | |
}); | |
const client = mqtt.connect('mqtt://iot.eclipse.org'); | |
client.on('connect', () => { | |
client.subscribe('#'); | |
}); | |
client.on('message', (topic, message) => { | |
addARecord(message.toString(), topic); | |
}); | |
function addARecord(message, topic) { | |
const record = { | |
date: new Date().toISOString(), | |
message: message, | |
topic: topic | |
}; | |
if (mutex) { | |
tempRecords.push({create: {_index: INDEX, _type: TYPE}}); | |
tempRecords.push(record); | |
} else { | |
if (tempRecords.length > 0) { | |
console.log(`Temporary : ${chalk.yellow(tempRecords.length)} Size : ${jsonSize(tempRecords) / 1024} Kilobyte`); | |
bulkRecords = tempRecords; | |
tempRecords = []; | |
console.log("Passed temporary buffer to data buffer"); | |
} | |
bulkRecords.push({create: {_index: INDEX, _type: TYPE}}); | |
bulkRecords.push(record); | |
const bulkSize = jsonSize(bulkRecords) / 1024; | |
if (bulkSize >= BUFFER_SIZE) { | |
bar.clear(); | |
console.log(); | |
console.log(`Total size : ${chalk.bold.underline(bulkSize)} Kilobytes`); | |
console.log(`Number of elements ${bulkRecords.length}`); | |
saveBulk(); | |
} else { | |
bar.update(bulkSize / BUFFER_SIZE); | |
} | |
} | |
} | |
function saveBulk() { | |
mutex = true; | |
ESclient.bulk({ | |
body: bulkRecords | |
}, function (err, resp) { | |
bulkRecords = []; // Reset the array | |
mutex = false; // release the mutex | |
if (err) { | |
console.log(chalk.bold.red(`ERROR : ${err}`)); | |
} else { | |
console.log(chalk.green('All is good')); | |
if (resp.errors) { | |
console.log(chalk.bold.red('WITH ERRORS')); | |
} | |
} | |
}); | |
} | |
/* Output sample: | |
▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇———————— [260/300] Kilobytes (87%) 2732.2s | |
Total size : 443.671875 Kilobytes | |
Number of elements 41764 | |
All is good | |
TEMP : 44642 | |
Passed from temp to data | |
*/ | |
memwatch.on('leak', function (info) { | |
console.error('Memory leak detected: ', info); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment