Skip to content

Instantly share code, notes, and snippets.

@fatihky
Created July 1, 2016 15:52
Show Gist options
  • Select an option

  • Save fatihky/99c939a2ea6ab6539d9e9070aa42068b to your computer and use it in GitHub Desktop.

Select an option

Save fatihky/99c939a2ea6ab6539d9e9070aa42068b to your computer and use it in GitHub Desktop.
var levelup = require('levelup');
var Promise = require('any-promise');
var co = require('co');
var levelPromisify = require('level-promisify');
var fsp = require('fs-promise');
var fs = require('fs');
var Emitter = require('events').EventEmitter;
var util = require('util');
var Measured = require('measured');
var histogram = new Measured.Histogram();
var meter = new Measured.Meter();
var path = require('path');
var byw = 0;
var lts = Date.now();
setInterval(function() {
var dur = Date.now() - lts;
var thr = 0;
if (byw > 0)
thr = (byw * 1000) / dur;
console.log(thr);
lts = Date.now();
byw = 0;
}, 5000 * 100);
setInterval(function() {
console.log(meter.toJSON());
console.log(histogram.toJSON());
}, 30000 * 100);
function gendefer() {
var obj = {};
obj.promise = new Promise(function (resolve, reject) {
obj.resolve = resolve;
obj.reject = reject;
});
return obj;
}
function sleep(ms) {
return new Promise(function (resolve) {
setTimeout(resolve, ms);
});
}
function genpoint(m, ts, v) {
var buf = new Buffer(12);
buf.writeInt32LE(m, 0);
buf.writeInt32LE(ts, 4);
buf.writeInt32LE(v, 8);
return buf;
}
function deferredWrite(fd, start, buf, log) {
var defer = gendefer();
var written = 0;
var towrite = buf.length;
if (log === true)
console.log('deferredWrite .write', start, buf, buf.length);
r();
function r() {
if (written >= towrite)
return defer.resolve();
fs.write(fd, buf, written, towrite - written, start + written, function (err, written_) {
if (log === true)
console.log('deferredWrite .write', err, written_);
if (err)
return defer.reject(err);
written += written_;
if (written >= towrite)
return defer.resolve();
r();
});
}
return defer.promise;
}
function genTsKey(mon, ts) {
var dt = new Date(ts * 1000);
dt = new Date(dt.getFullYear(), dt.getMonth(), dt.getDate());
dt = '' + parseInt('' + dt.getTime() / 1000, 10);
return '' + mon + '.' + dt;
}
function encodeTsKeyBuf(buf, key) {
var arr = key.split('.');
var mon = parseInt(arr[0], 10);
var ts = parseInt(arr[1], 10);
console.log('encodeTsKeyBuf', key, buf, arr, mon, ts);
buf.writeInt32LE(mon, 0);
buf.writeInt32LE(ts, 4);
}
function URTSDB(opts) {
if (!opts)
throw new Error('opts is required');
var _self = this;
this.bulks = [];
this.currBulk = [];
this.blobSz = opts.blobSz || (128 * 1024 * 1024);
this.maxBulkSz = opts.maxBulkSz || 100;
this.maxWalSz = opts.maxWalSz || (2 * 1024 * 1024);
this.lastWal = null;
this.currWalPath = null;
this.lastWalOffset = 0;
this.lastBlob = null;
this.lastBlobOffset = 0;
this.walFileSize = 0;
this.writePointsScheduled = false;
this.bulkTmo = null;
this.metaDB = levelPromisify(levelup('ts/meta'));
this.blobfds = {};
Emitter.call(this);
//this.load();
this
.openWal()
.then(function () {
_self.emit('ready');
})
.catch(function (err) {
_self.emit('error', err);
});
this.once('ready', writeBulk);
setTimeout(clearWalFiles, 1000);
function *readWal(path) {
var ret = {
mons: {
// monitorID > [Buffer(4) int32le, Buffer(4) int32le]
}
};
var data = yield fsp.readFile('ts/wals/' + path);
var remaing = data.length;
var pos = 0;
var buf = new Buffer(12);
var mon;
var ts;
var key;
while (remaing >= 12) {
data.copy(buf, 0, pos, pos + 12);
mon = buf.readInt32LE(0);
ts = buf.readInt32LE(4);
key = genTsKey(mon, ts);
if (!ret.mons[key])
ret.mons[key] = [];
var nbuf = new Buffer(8);
buf.copy(nbuf, 0, 4);
ret.mons[key].push(nbuf);
pos += 12;
remaing -= 12;
}
return ret;
}
function clearWalFiles() {
co(function *() {
var wals = fs.readdirSync('ts/wals');
var currWal = path.basename(_self.currWalPath);
var idx = wals.indexOf(currWal);
var keybuf = new Buffer(8);
if (idx >= 0)
wals.splice(idx, 1);
if (wals.length === 0)
return setTimeout(clearWalFiles, 2000);
console.log(wals, currWal);
for (var i = 0; i < wals.length; i++) {
var walPath = wals[i];
console.log('clean wal:', walPath);
var wal = yield readWal(walPath);
// find corresponding blob
for (var key in wal.mons) {
var blobfd;
var blobname;
var blobOffset = 0;
var dataOffset = 0;
try {
var data;
var blobLocationMeta = yield _self.metaDB.get('blob-of-' + key);
console.log('blob found.', blobLocationMeta, key);
try {
data = JSON.parse(blobLocationMeta);
} catch (err) {
throw new Error('blob bilgisi json olarak kaydedilmemiş');
}
if (!_self.blobfds[data.name]) {
_self.blobfds[data.name] = yield fsp.open('ts/blobs/' + data.name, 'r+');
}
blobfd = _self.blobfds[data.name];
blobname = data.name;
blobOffset = data.blobOffset;
dataOffset = data.dataOffset;
if (_self.lastBlob === null) {
_self.lastBlob = blobfd;
_self.lastBlobOffset = blobOffset + (24 * 12 * 8) + 8;
}
} catch (err) {
if (err.type !== 'NotFoundError')
throw err;
// blob bulunamadı. son blob'u kullanayım eğer varsa, ve yeterince alana sahipse.
if (_self.lastBlob !== null && (_self.lastBlobOffset + ((24 * 12 * 8) + 8)) < (_self.blobSz)) {
blobfd = _self.lastBlob;
blobOffset = _self.lastBlobOffset;
blobname = _self.lastBlobName;
} else {
// ya şu an bir blob yok ya da yeterince alanı kalmamış
blobname = Date.now() + '.blob';
var blobpath = 'ts/blobs/' + blobname;
yield truncateFile(blobpath, _self.blobSz);
blobfd = yield fsp.open(blobpath, 'r+');
/* referans sayımlı bir veri yapısı ile bunları periyoduk olarak
zamanaşımına uğratıp silmeliyim */
_self.blobfds[blobname] = blobfd;
_self.lastBlob = blobfd;
_self.lastBlobOffset = 0;
_self.lastBlobName = blobname;
}
// şimdi bu anahtarın ait olduğu blob leveldb'de bulunmuyor. bu yüzden
// veriyi ben yazmalıyım
yield _self.metaDB.put('blob-of-' + key, JSON.stringify({
name: blobname,
blobOffset: blobOffset,
dataOffset: 0
}));
_self.lastBlobOffset += ((24 * 12 * 8) + 8);
}
// şimdi bu anahtara dair verileri ilgili bloba yazmalıyım
var pointbuf = wal.mons[key];
pointbuf = Buffer.concat(pointbuf);
wal.mons[key] = null;
// keybuf'u her zaman yazılmış sayıyorum ve pos'a 8'i ekliyorum
// ve dataOffset'i de pos'a ekliyorum ki bu anahtara daha önceden veri
// yazıldı ise üzerine yazılmasın, devamına yazılsın.
var pos = 8 + dataOffset;
// bir de, veri tümden yazılamaz. yazılabilecek boyut sınırlı ve bu
// boyutu aşmamak lazım.
var sz = pointbuf.length <= (24 * 12 * 8) ? pointbuf.length : (24 * 12 * 8);
// data önceden veri yazıldı yazılan kadarki kısmı yazılacak miktardan
// düşürmek lazım
sz -= dataOffset;
// eğer daha yeni veri yazmaya başlıyorsam öncelikle anahtarı yazmalıyım
if (dataOffset === 0) {
console.log('encodeTsKeyBuf', key);
encodeTsKeyBuf(keybuf, key);
// direkt bloboffset'e yazıyorum, çünkü verinin başlangıç noktası orası
yield deferredWrite(blobfd, blobOffset, keybuf);
}
// şimdi de asıl veriyi yazıp blob'daki şu anki işlemimi bitireyim
console.log('veri yazılacak.', {
blobname: blobname,
len: sz,
actuallen: pointbuf.length,
blobfd: blobfd,
pos: pos
});
yield deferredWrite(blobfd, pos, pointbuf.slice(0, sz), true);
// meta'yı güncelleyeyim
console.log('bloba yazma şu anahtar için tamam:', key);
yield _self.metaDB.put('blob-of-' + key, JSON.stringify({
name: blobname,
blobOffset: blobOffset,
dataOffset: dataOffset + sz
}));
}
// wal'ı tümden bloblara aktardım. şimdi onu silebilirim
yield fsp.unlink('ts/wals/' + walPath);
}
setTimeout(clearWalFiles, 10000);
})
.catch(function(err) {
console.log(err.stack);
});
}
function writeBulk() {
if (_self.bulks.length === 0)
return setTimeout(writeBulk, 500);
var bulk = _self.bulks.shift();
co (function *() {
if ((_self.lastWalOffset + bulk.length) > _self.maxWalSz)
yield _self.genNewWal();
var start = Date.now();
yield deferredWrite(_self.lastWal, _self.lastWalOffset, bulk);
var end = Date.now();
_self.lastWalOffset += bulk.length;
yield _self.metaDB.put('lastWalOffset', '' + _self.lastWalOffset);
byw += bulk.length;
meter.mark();
histogram.update(end - start);
process.nextTick(writeBulk);
})
.catch(function(err) {
_self.emit('error', err);
// do not try to write points anymore
console.log('do not try to write points anymore');
});
}
}
util.inherits(URTSDB, Emitter);
URTSDB.prototype.openWal = function () {
var defer = gendefer();
var _self = this;
co(function *() {
var lastWal = yield _self.metaDB.get('lastWal');
var lastWalOffset = yield _self.metaDB.get('lastWalOffset');
lastWalOffset = parseInt(lastWalOffset, 10);
console.log('lastWal:', lastWal, lastWalOffset);
if (!fs.existsSync(lastWal)) {
var tmp = fs.openSync(lastWal, 'w+');
fs.closeSync(tmp);
lastWalOffset = 0;
}
var fd = yield fsp.open(lastWal, 'r+');
_self.lastWal = fd;
_self.lastWalOffset = lastWalOffset;
_self.currWalPath = lastWal;
defer.resolve();
}).catch(function (err) {
if (!err.type || err.type !== 'NotFoundError')
return defer.reject(err);
_self
.genNewWal()
.then(defer.resolve)
.catch(defer.reject);
});
return defer.promise;
};
function truncateFile(filename, sz){
var defer = gendefer();
fs.open(filename, 'w', function (err, fd) {
if (err)
return defer.reject(err);
try {
fs.closeSync(fd);
} catch (err) {
return defer.reject(err);
}
fs.truncate(filename, sz, function (err) {
if (err) {
console.log('truncateFile err', err);
return defer.reject(err);
}
defer.resolve();
});
});
return defer.promise;
}
URTSDB.prototype.genNewWal = function () {
var _self = this;
var defer = gendefer();
var filename = 'ts/wals/' + Date.now() + '.wal';
co(function *() {
yield truncateFile(filename, _self.maxWalSz);
yield _self.metaDB.put('lastWal', filename);
yield _self.metaDB.put('lastWalOffset', '0');
var fd = yield fsp.open(filename, 'r+');
_self.lastWal = fd;
_self.lastWalOffset = 0;
_self.currWalPath = filename;
defer.resolve();
}).catch(function(err) {
defer.reject(err);
});
return defer.promise;
};
function scheduleBulkTmo(_self) {
if (_self.bulkTmo === null) {
_self.bulkTmo = setTimeout(function () {
if (_self.currBulk.length === 0)
return scheduleBulkTmo(_self);
var bulk;
do {
bulk = _self.currBulk.length <= 200 ? _self.currBulk.length : 200;
bulk = _self.currBulk.splice(0, bulk);
_self.bulks.push(Buffer.concat(bulk));
} while(_self.currBulk.length > 0);
_self.currBulk = [];
_self.bulkTmo = null;
}, 300);
}
}
URTSDB.prototype.put = function (m, ts, val) {
var buf = genpoint(m, ts, val);
if (this.currBulk.length >= this.maxBulkSz) {
this.bulks.push(Buffer.concat(this.currBulk));
this.currBulk = [];
}
this.currBulk.push(buf);
scheduleBulkTmo(this);
};
URTSDB.prototype.scheduleWrite = function () {
var _self = this;
if (this.writePointsScheduled === true)
return;
this.writePointsScheduled = true;
process.nextTick(function () {
_self.writePointsScheduled = false;
_self.writePoints();
});
};
var tsdb = new URTSDB({
maxWalSz: 12 * 24 * 8 * 200,
maxBulkSz: 300,
blobSz: 12 * 24 * 8 * 200
});
tsdb.on('error', function (err) {
console.log(err.stack);
});
/*function r() {
for (var i = 0; i < (2 * 1024 * 1024) / 12; i++)
tsdb.put(i % 1000, i, 274);
}
setInterval(r, 50);*/
console.log(tsdb);
co(function *() {
var filename = 'sa.wal';
var fd = yield fsp.open(filename, 'r+');
yield truncateFile(filename, 100);
yield fsp.write(fd, 'sa', 50);
console.log('fd:', fd);
yield fsp.close(fd);
});
co(function *() {
var dt = new Date();
dt = new Date(dt.getFullYear(), dt.getMonth(), dt.getDate());
var daystart = parseInt('' + dt.getTime() / 1000, 10);
for (var i = 0; i < 400; i++) {
for (var j = 0; j < 288; j++) {
tsdb.put(i + 1, daystart + (j * 300), 274);
}
yield sleep(10);
}
console.log('tüm verileri yazdım');
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment