Created
June 20, 2014 20:43
-
-
Save FLYBYME/895532bd7d6415298624 to your computer and use it in GitHub Desktop.
TimeSeries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var _ = require("underscore"); | |
var mongoose = require('mongoose'), Schema = mongoose.Schema, Mixed = mongoose.Schema.Types.Mixed; | |
var options = { | |
db : { | |
native_parser : true | |
}, | |
server : { | |
poolSize : 15 | |
} | |
}; | |
mongoose.connect('mongodb://localhost/data/db', options); | |
/** | |
* Module dependencies. | |
*/ | |
var _ = require("underscore"); | |
var mongoose = require('mongoose'), Schema = mongoose.Schema, Mixed = mongoose.Schema.Types.Mixed; | |
var fs = require('fs'); | |
var options = { | |
actor : 0, | |
interval : 1, // seconds | |
millisecond : false, | |
verbose : false, | |
postProcessImmediately : false, | |
paths : { | |
value : { | |
type : 'number' | |
}, | |
metadata : { | |
type : Mixed | |
} | |
} | |
}; | |
var roundDay = function(d) { | |
var t = new Date(d.getFullYear(), d.getMonth(), d.getDate()); | |
return t; | |
}; | |
/** | |
* Schema definition | |
*/ | |
var TimeSeries = new Schema({ | |
day : { | |
type : Date, | |
index : true, | |
required : true | |
}, | |
metadata : { | |
interval : { | |
type : Number | |
} | |
}, | |
group : { | |
type : String, | |
required : true, | |
'default' : 'default' | |
}, | |
name : String, | |
token : String, | |
latest : { | |
timestamp : { | |
type : Date | |
}, | |
value : { | |
type : Number, | |
'default' : 0 | |
}, | |
metadata : { | |
type : Mixed | |
}, | |
}, | |
createdAt : { | |
date : { | |
type : Date, | |
'default' : Date | |
}, | |
user : { | |
type : String | |
} | |
}, | |
updatedAt : { | |
date : { | |
type : Date | |
}, | |
user : { | |
type : String | |
} | |
}, | |
statistics : { | |
i : { | |
type : Number, | |
'default' : 0 | |
}, | |
avg : { | |
type : Number | |
}, | |
max : { | |
value : { | |
type : Number | |
}, | |
timestamp : { | |
type : Date | |
} | |
}, | |
min : { | |
value : { | |
type : Number | |
}, | |
timestamp : { | |
type : Date | |
} | |
} | |
}, | |
seconds : [Schema.Types.Mixed] | |
}); | |
/** | |
* Post hook. | |
*/ | |
TimeSeries.pre('save', function(next) { | |
//console.log(this); | |
if (this.isNew) { | |
console.log('saving new..'); | |
this.metadata.interval = options.interval; | |
this.statistics.i = 1; | |
if (this.latest) { | |
this.statistics.min.value = this.latest.value; | |
this.statistics.min.timestamp = this.latest.timestamp; | |
this.statistics.max.value = this.latest.value; | |
this.statistics.max.timestamp = this.latest.timestamp; | |
this.statistics.avg = this.latest.value; | |
} | |
} else { | |
console.log('updating old..'); | |
} | |
next(); | |
}); | |
var dataFormat = function(timestamp, value, format, ext) { | |
switch(format) { | |
case('[ms,y]'): | |
return [timestamp.getTime(), value] | |
case('[x,y]'): | |
return [timestamp, value] | |
default: | |
case('hash'): | |
return _.extend({ | |
timestamp : timestamp, | |
value : value | |
}, ext); | |
} | |
}; | |
/** | |
* Virtual methods | |
*/ | |
TimeSeries.method('getData', function(interval, format) { | |
var data = []; | |
var year = this.day.getFullYear(); | |
var month = this.day.getMonth(); | |
var day = this.day.getDate(); | |
var dateObj = new Date() | |
if (interval == 'second') { | |
for (var hour = 0; hour < this.seconds.length; hour++) { | |
for (var minute = 0; minute < this.seconds[hour].length; minute++) { | |
for (var second = 0; second < this.seconds[hour][minute].length; second++) { | |
if (roundDay(dateObj).getTime() == roundDay(this.day).getTime()) { | |
if (hour == dateObj.getHours() && minute == dateObj.getMinutes() && second == dateObj.getSeconds()) { | |
return data; | |
} | |
} | |
var timestamp = new Date(year, month, day, hour, minute, second); | |
var d = this.seconds[hour][minute][second]; | |
data.push([timestamp.getTime(), d ? d.value : d]); | |
if (roundDay(dateObj).getTime() == roundDay(this.day).getTime()) { | |
if (hour == dateObj.getHours() && minute == dateObj.getMinutes() && second == dateObj.getSeconds()) { | |
return data; | |
} | |
} | |
}; | |
}; | |
}; | |
} | |
if (interval == 'minute') { | |
for (var hour = 0; hour < this.seconds.length; hour++) { | |
for (var minute = 0; minute < this.seconds[hour].length; minute++) { | |
var secondSum = null; | |
for (var second = 0; second < this.seconds[hour][minute].length; second++) { | |
var d = this.seconds[hour][minute][second]; | |
if (d) { | |
if (secondSum == null) | |
secondSum = 0; | |
secondSum += d.value; | |
} | |
}; | |
var timestamp = new Date(year, month, day, hour, minute, 0); | |
data.push([timestamp.getTime(), secondSum]); | |
}; | |
}; | |
} | |
if (interval == 'hour') { | |
for (var hour = 0; hour < this.seconds.length; hour++) { | |
var minuteSum = null; | |
for (var minute = 0; minute < this.seconds[hour].length; minute++) { | |
var secondSum = null; | |
for (var second = 0; second < this.seconds[hour][minute].length; second++) { | |
var d = this.seconds[hour][minute][second]; | |
if (d) { | |
if (secondSum == null) | |
secondSum = 0; | |
if (minuteSum == null) | |
minuteSum = 0; | |
secondSum += d.value; | |
} | |
}; | |
if (secondSum != null) | |
minuteSum += secondSum; | |
}; | |
var timestamp = new Date(year, month, day, hour, 0, 0); | |
data.push([timestamp.getTime(), minuteSum]); | |
}; | |
} | |
if (interval == 'day') { | |
var hourSum = null; | |
for (var hour = 0; hour < this.seconds.length; hour++) { | |
var minuteSum = null; | |
for (var minute = 0; minute < this.seconds[hour].length; minute++) { | |
var secondSum = null; | |
for (var second = 0; second < this.seconds[hour][minute].length; second++) { | |
var d = this.seconds[hour][minute][second]; | |
if (d) { | |
if (secondSum == null) | |
secondSum = 0; | |
if (minuteSum == null) | |
minuteSum = 0; | |
if (hourSum == null) | |
hourSum = 0; | |
secondSum += d.value; | |
} | |
}; | |
if (secondSum != null) | |
minuteSum += secondSum; | |
}; | |
if (minuteSum != null) | |
hourSum += minuteSum; | |
}; | |
var timestamp = new Date(year, month, day, hour, 0, 0); | |
data.push([timestamp.getTime(), hourSum]); | |
} | |
return data; | |
}); | |
TimeSeries.method('minmax', function(timestamp, value) { | |
var updates = {}, needToSave = false; | |
if (_.isNumber(this.statistics.max.value)) { | |
if (value > this.statistics.max.value) { | |
updates['statistics.max.timestamp'] = timestamp; | |
updates['statistics.max.value'] = value; | |
needToSave = true; | |
} | |
} else { | |
updates['statistics.max.timestamp'] = timestamp; | |
updates['statistics.max.value'] = value; | |
needToSave = true; | |
} | |
if (_.isNumber(this.statistics.min.value)) { | |
if (value < this.statistics.min.value) { | |
updates['statistics.min.timestamp'] = timestamp; | |
updates['statistics.min.value'] = value; | |
needToSave = true; | |
} | |
} else { | |
updates['statistics.min.timestamp'] = timestamp; | |
updates['statistics.min.value'] = value; | |
needToSave = true; | |
} | |
if (needToSave) { | |
this.set(updates); | |
this.save(function(error, ok) { | |
if (error) | |
console.log(error); | |
}); | |
} | |
}); | |
/** | |
* Static methods | |
*/ | |
TimeSeries.static('findMax', function(conditions, callback) { | |
var condition = { | |
'$and' : [{ | |
'day' : { | |
$gte : conditions.from | |
} | |
}, { | |
'day' : { | |
$lte : conditions.to | |
} | |
}], | |
'group' : conditions.group, | |
'name' : conditions.name | |
}; | |
//console.log('findMax: '+JSON.stringify(condition)); | |
this.find(condition).limit(1).select('statistics.max').sort({ | |
'statistics.max.value' : -1 | |
}).exec(function(error, doc) { | |
if (error) | |
callback(error) | |
else if (doc.length == 1) { | |
callback(null, doc[0].statistics.max); | |
} else | |
callback(null, NaN); | |
}); | |
}); | |
TimeSeries.static('findMin', function(conditions, callback) { | |
var condition = { | |
'$and' : [{ | |
'day' : { | |
$gte : conditions.from | |
} | |
}, { | |
'day' : { | |
$lte : conditions.to | |
} | |
}], | |
'group' : conditions.group, | |
'name' : conditions.name | |
}; | |
console.log('findMin: ' + JSON.stringify(condition)); | |
this.find(condition).limit(1).select('statistics.min').sort({ | |
'statistics.min.value' : 1 | |
}).exec(function(error, doc) { | |
if (error) | |
callback(error); | |
else if (doc.length == 1) { | |
//console.log(doc); | |
callback(null, doc[0].statistics.min); | |
} else | |
callback(null, NaN); | |
}); | |
}); | |
TimeSeries.static('findData', function(request, callback) { | |
var condition = { | |
'$and' : [], | |
'group' : request.group, | |
'name' : request.name | |
}; | |
_.extend(condition, request.condition); | |
if (!request.to) | |
request.to = new Date(); | |
if (!request.dir) | |
request.dir = 1; | |
if (Object.keys(request.condition).length > 0) | |
condition['$and'].push(request.condition); | |
condition['$and'].push({ | |
'day' : { | |
'$gte' : request.from, | |
} | |
}); | |
condition['$and'].push({ | |
'day' : { | |
'$lte' : request.to, | |
} | |
}); | |
if (options.verbose) { | |
console.log(request); | |
console.log(JSON.stringify(condition)); | |
} | |
this.find(condition).sort({ | |
'day' : request.dir | |
}).exec(function(error, docs) { | |
if (error) { | |
callback(error); | |
} else { | |
if (options.verbose) | |
console.log('Doc count: ' + docs.length); | |
var data = [], i; | |
docs.forEach(function(doc) { | |
doc.getData(request.interval, request.format).forEach(function(row) { | |
data.push(row); | |
}); | |
}); | |
callback(null, data); | |
} | |
}); | |
}); | |
function getInitializer() { | |
var updates = {}; | |
updates.seconds = []; | |
for (var i = 0; i < 24; i++) { | |
updates.seconds[i] = []; | |
for (var j = 0; j < 60; j++) { | |
updates.seconds[i][j] = []; | |
//initialize length | |
updates.seconds[i][j][59] = null; | |
//initialize length | |
} | |
} | |
return updates; | |
} | |
function getUpdates(timestamp, value, metadata, first) { | |
var updates = {} | |
var set = { | |
value : value | |
}; | |
updates['seconds.' + timestamp.getHours() + '.' + timestamp.getMinutes() + '.' + timestamp.getSeconds()] = set | |
//statistics | |
updates['updatedAt.date'] = new Date(); | |
updates['latest.timestamp'] = timestamp; | |
updates['latest.value'] = value; | |
updates['$inc'] = { | |
'statistics.i' : 1 | |
}; | |
return updates; | |
} | |
TimeSeries.method('minmax', function(timestamp, value) { | |
var updates = {}, needToSave = false; | |
if (_.isNumber(this.statistics.max.value)) { | |
if (value > this.statistics.max.value) { | |
updates['statistics.max.timestamp'] = timestamp; | |
updates['statistics.max.value'] = value; | |
needToSave = true; | |
} | |
} else { | |
updates['statistics.max.timestamp'] = timestamp; | |
updates['statistics.max.value'] = value; | |
needToSave = true; | |
} | |
if (_.isNumber(this.statistics.min.value)) { | |
if (value < this.statistics.min.value) { | |
updates['statistics.min.timestamp'] = timestamp; | |
updates['statistics.min.value'] = value; | |
needToSave = true; | |
} | |
} else { | |
updates['statistics.min.timestamp'] = timestamp; | |
updates['statistics.min.value'] = value; | |
needToSave = true; | |
} | |
if (needToSave) { | |
this.set(updates); | |
this.save(function(error, ok) { | |
if (error) | |
console.log(error); | |
}); | |
} | |
}); | |
TimeSeries.method('recalc', function(timestamp, value, cb) { | |
var updates = {} | |
var sum = 0, i = 0, hour, min, sec, ms; | |
if (options.verbose) | |
console.log('s recalc'); | |
sum = 0; | |
i = 0; | |
for (hour in this.seconds) { | |
if (isNaN(parseInt(hour))) | |
break; | |
for (min in this.seconds[hour]) { | |
if (isNaN(parseInt(min))) | |
break; | |
for (sec in this.seconds[hour][min]) { | |
if (isNaN(parseInt(sec))) | |
break; | |
if (!this.seconds[hour][min][sec]) | |
continue; | |
if (isNaN(parseInt(this.seconds[hour][min][sec].value))) | |
continue; | |
sum += this.seconds[hour][min][sec].value; | |
i++; | |
} | |
} | |
} | |
if (i <= 0) | |
i = 1; | |
if (i > 0) { | |
if (_.isNumber(sum)) { | |
updates['statistics.avg'] = sum / i; | |
} | |
} | |
if (options.verbose) | |
console.log(updates); | |
this.set(updates); | |
this.save(cb); | |
}); | |
TimeSeries.static('recalc', function(timestamp, extraCondition, cb) { | |
var day = roundDay(timestamp); | |
var condition = { | |
'day' : day | |
}; | |
_.extend(condition, extraCondition); | |
this.findOne(condition, function(e, doc) { | |
if (e) { | |
cb(e); | |
} else { | |
doc.recalc(timestamp, doc.latest.value, cb); | |
} | |
}); | |
}); | |
TimeSeries.method('push', function(timestamp, value, cb) { | |
var ts = new Date(timestamp); | |
var h = ts.getHours(), m = ts.getMinutes(), s = ts.getSeconds(); | |
this.set('daily', value); | |
this.set('hourly.' + h + '', value); | |
this.set('minute.' + h + '.' + m, value); | |
this.set('minute.' + h + '.' + m + '.' + s, value); | |
this.save(cb); | |
}); | |
TimeSeries.static('push', function(timestamp, value, metadata, cb) { | |
var day = roundDay(timestamp); | |
var condition = { | |
'day' : day, | |
group : metadata.group, | |
name : metadata.name | |
}; | |
var updates = getUpdates(timestamp, value); | |
var self = this; | |
if (options.verbose) | |
console.log('\nCond: ' + JSON.stringify(condition)); | |
if (options.verbose) | |
console.log('Upda: ' + JSON.stringify(updates)); | |
this.findOneAndUpdate(condition, updates, function(error, doc) { | |
if (error) { | |
if (cb) | |
cb(error); | |
} else if (doc) { | |
//doc.minmax(timestamp, value); | |
if (cb) | |
cb(null, doc); | |
//doc.recalc(timestamp, value); | |
} else { | |
//console.log('Create new'); | |
var datainit = getInitializer(); | |
var doc = new self({ | |
day : day, | |
group : metadata.group, | |
name : metadata.name | |
}); | |
doc.set(datainit); | |
doc.set(updates); | |
doc.save(cb); | |
} | |
}); | |
}); | |
/**** | |
* | |
* | |
* | |
*/ | |
var TimeSeriesModel = function(collection, options) { | |
var model, schema; | |
/** | |
* Methods | |
*/ | |
/** | |
* Model initialization | |
*/ | |
function init(collection, options) { | |
if (mongoose.connection.modelNames().indexOf(collection) >= 0) { | |
model = connection.model(collection); | |
} else { | |
model = mongoose.model(collection, TimeSeries); | |
} | |
} | |
/** | |
* Push new value to collection | |
*/ | |
var push = function push(timestamp, value, metadata, cb) { | |
model.push(timestamp, value, metadata, cb); | |
}; | |
/** | |
* Find data of given period | |
*/ | |
var findData = function(options, cb) { | |
model.findData(options, cb); | |
}; | |
/** | |
* Find Max value of given period | |
*/ | |
var findMax = function(options, cb) { | |
model.findMax(options, cb); | |
}; | |
/** | |
* Find Min value of given period | |
*/ | |
var findMin = function(options, cb) { | |
model.findMin(options, cb); | |
}; | |
var getModel = function() { | |
return model; | |
}; | |
init(collection, options); | |
/* Return model api */ | |
return { | |
push : push, | |
findData : findData, | |
findMax : findMax, | |
findMin : findMin, | |
model : model | |
}; | |
}; | |
var ts = TimeSeriesModel('my-token', { | |
interval : 1 | |
}); | |
var usage = require('usage'); | |
var pid = process.pid; | |
setInterval(function() { | |
usage.lookup(pid, { | |
keepHistory : true | |
}, function(err, result) { | |
ts.push(new Date(), result.cpu, { | |
group : 'usage', | |
name : 'cpu' | |
}); | |
ts.push(new Date(), result.memory, { | |
group : 'usage', | |
name : 'memory' | |
}); | |
}); | |
}, 1000); | |
(function() { | |
var express = require('express'); | |
var app = module.exports = express.createServer(); | |
app.configure(function() { | |
app.use(express.bodyParser()); | |
app.use(express.methodOverride()); | |
app.use(app.router); | |
app.all('*', function(req, res, next) { | |
res.header("Access-Control-Allow-Origin", "*"); | |
res.header('Access-Control-Allow-Methods', 'GET'); | |
res.header("Access-Control-Allow-Headers", "X-Requested-With"); | |
next(); | |
}); | |
}); | |
app.configure('production', function() { | |
app.use(express.errorHandler()); | |
}); | |
// Routes | |
app.get('/', function(req, res) { | |
ts.findData({ | |
condition : {}, | |
from : new Date(new Date() - 1000 * 60 * 60 * 48), | |
to : new Date(), | |
group : 'usage', | |
name : 'cpu', | |
interval : req.query.interval || 'second'//day,hour,minute,second | |
}, function(error, data) { | |
if (error) | |
console.log(error); | |
else | |
console.log('len: ' + data.length); | |
res.setHeader('x-count', data.length) | |
res.send(data) | |
}); | |
}); | |
app.get('/find', function(req, res) { | |
ts.model.find({}, function(err, docs) { | |
res.send(docs) | |
}); | |
}); | |
app.get('/min', function(req, res) { | |
ts.findMin({ | |
group : 'usage', | |
name : 'cpu', | |
from : new Date(new Date() - 1000 * 60 * 60 * 24), | |
to : new Date() | |
}, function(error, data) { | |
if (error) | |
console.log(error); | |
else | |
res.send(data) | |
}); | |
}); | |
app.get('/max', function(req, res) { | |
ts.findMax({ | |
group : 'usage', | |
name : 'cpu', | |
from : new Date(new Date() - 1000 * 60 * 60 * 24), | |
to : new Date() | |
}, function(error, data) { | |
if (error) | |
console.log(error); | |
else | |
res.send(data) | |
}); | |
}); | |
app.get('/minmax', function(req, res) { | |
ts.findMax({ | |
from : new Date(new Date() - 1000 * 60 * 60 * 24), | |
to : new Date() | |
}, function(error, data) { | |
if (error) | |
console.log(error); | |
else | |
res.send(data) | |
}); | |
}); | |
app.listen(3001); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment