Created
May 9, 2014 01:11
-
-
Save habahut/06a79a201c4c91a58ab4 to your computer and use it in GitHub Desktop.
data viz beginnings
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
My understanding so far: | |
one object per topic: for example TPS could be called the TPSTracker | |
Each object will have many little processes it is forwarding data to, so for example in TPS there would be a little process for "# of jobs started", "# of jobs on each TPS server" etc... | |
the TPSTracker will forward each kafka message it recevies to each of hte little entities, which will maintain their own state on the aggregate totals for their piece, throwing away the rest. So the TPS load piece would receive the entire TPS message from kafka, and only update the state for the load on the TPS servers, ignoring everything else. | |
Then, we stringify the state of each individual peice and send that to the client, and allow them to put that data into the charts on the clients page. | |
btw Kyle your idea of not bothering to verify producer output makes a lot more sense when we look at it this way, though I still think we should enforce some regularity :) | |
var app = require('http').createServer(handler) | |
, io = require('socket.io').listen(app) | |
, fs = require('fs') | |
app.listen(9180); | |
function handler (req, res) { | |
fs.readFile(__dirname + '/index.html', | |
function (err, data) { | |
if (err) { | |
res.writeHead(500); | |
return res.end('Error loading index.html'); | |
} | |
res.writeHead(200); | |
res.end(data); | |
}); | |
} | |
// ------------- start config -------------- | |
var aggregators = []; | |
var aggregationInterval = 5000; // 5 seconds | |
// to set this to listen to a new topic, all you have to do is add the | |
// javascript code for that topic below, and then call createTopicListener | |
// with a tracker object | |
createTopicListener(new tpsTracker('/tps', 'TPS_REPORTS')); | |
//start aggregation | |
setInterval(function() { | |
var count = aggregators.length; | |
for (var i = 0;i < count;i++) { | |
aggregators[i].aggregate(); | |
} | |
}, aggregationInterval); | |
// ------------- end config ---------------- | |
function createTopicListener (trackerObj) { | |
//if (typeof(trackerObj) === 'undefined') { console.log('trackerObj is undefined!'); return } | |
var socketConn = io | |
.of(trackerObj.homepage) | |
.on('connection', function (socket) { | |
trackerObj.welcome(socket); | |
}); | |
var kafka = require('kafka-node'), | |
Consumer = kafka.Consumer, | |
client = new kafka.Client('localhost:2181'), | |
consumer = new Consumer( | |
client, | |
[ | |
{ topic: trackerObj.topicname, partition: 0 } | |
], | |
{ | |
autoCommit: true, | |
} | |
); | |
consumer.on('message', function(message) { | |
trackerObj.onMessage(message); | |
}); | |
if (trackerObj.hasAggregator === true) { | |
aggregators.push(trackerObj); | |
} | |
} | |
// TODO: add inheritance to make adding new stuff easier to manage! | |
function tpsLoadTracker(homepage, topicname) { | |
tpsRunningJbosT | |
var self = this; | |
self.init = function(homepage, topicname) { | |
self.homepage = homepage; | |
self.topicname = topicname; | |
self.longestTask = ''; | |
self.averageLength = 0; | |
self.currentJobs = []; // modulo this guy | |
self.currentJobsMax = 20; | |
self.currentJobsIndex = 0; | |
self.allMessages = {}; | |
self.newMessages = {}; | |
self.hasAgregator = true; | |
} | |
self.welcome = function(socket) { | |
// push aggregations to viewer!!! | |
socket.emit('aggregates','welcome'); | |
} | |
// need to somehow store the socket stuff here? | |
self.onMessage = function(message) { | |
tpsLoad.add(message) | |
runningJobs.add(message) | |
// tps | |
var o = JSON.parse(message); | |
self.newMessages[o.id] = o; | |
switch(o.type) { | |
case "T1": | |
self.currentJobs += 1; | |
break; | |
case "T2": | |
if (self.allMessages[o.id]) { | |
// delete that one | |
self.currentJobs -= 1; | |
} | |
break; | |
default: | |
// wtf? | |
break; | |
} | |
io.of('/tps').emit('aggregates', message); | |
} | |
self.aggregate = function() { | |
// calculate all agregations here! | |
// then emit them to the webpages | |
tps.emit('aggregates', 'hello!'); | |
} | |
self.init(homepage, topicname); | |
} | |
function apiTracker() { | |
var self =this; | |
self.aggtotals = {}; | |
event.on('message', function(data){ | |
countAggregate(data, self.aggtotals, 'task_id'); | |
totalsAgg(); | |
} | |
JSON.stirgify(apiTracker) | |
{ | |
<---- task_id:"123456", message: sup | |
obj = match(pass it the above message): | |
obj.messages.append(above.message) | |
123546:{ | |
"status": 1, | |
"messages": [ | |
] | |
stop: | |
start: | |
} | |
time1: "635469843216584163" | |
time2:"2341654324132165341" | |
} | |
} | |
} | |
// this is a comment!!! | |
// consumer.on -> parse -> aggregate | |
// -> send relevant data to sockets | |
// some sort of ongoing timer that fires off aggregate jobs, should by syncronized? | |
//io.sockets.on('connection', function (socket) { | |
// socket.on('my other event', function (data) { | |
// console.log(data); | |
// }); | |
//}); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment