Last active
February 15, 2020 10:11
-
-
Save isaacmg/0001d94625dd441d484f2fd8e53b0e30 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//require everything | |
var app = require('express')(); | |
var http = require('http').Server(app); | |
var io = require('socket.io')(http); | |
//Basic NodeJS stuff | |
app.get('/', function(req, res){ | |
res.sendFile(__dirname + '/index.html'); | |
}); | |
//Call SocketIO with the message from Kafka | |
function callSockets(io, message){ | |
io.sockets.emit('channel', message); | |
} | |
// Init the Kafka client. Basically just make topic the same topic as your producer and you are ready to go. group-id can be anything. | |
var kafka = require('kafka-node'), | |
HighLevelConsumer = kafka.HighLevelConsumer, | |
client = new kafka.Client(), | |
consumer = new HighLevelConsumer( | |
client, | |
[ | |
{ topic: 'test' } | |
], | |
{ | |
groupId: 'my-group' | |
} | |
); | |
consumer.on('message', function (message) { | |
//Call our SocketIO function | |
callSockets(io,message); | |
// Saving the message is optional but reccomended in case you need it again | |
// In production you could write the record directly to your database. But for now we are just going to create an index of files. | |
var fs = require('fs'); | |
var key = message.key | |
var seconds = new Date().getTime() / 1000; | |
fs.writeFile( __dirname + "/public/river_data/" + key +"/" + seconds + ".json", JSON.stringify(message), function(err) { | |
if(err) { | |
return console.log(err); | |
} | |
console.log("The file was saved!"); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment