Created
July 1, 2016 15:52
-
-
Save fatihky/99c939a2ea6ab6539d9e9070aa42068b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| var levelup = require('levelup'); | |
| var Promise = require('any-promise'); | |
| var co = require('co'); | |
| var levelPromisify = require('level-promisify'); | |
| var fsp = require('fs-promise'); | |
| var fs = require('fs'); | |
| var Emitter = require('events').EventEmitter; | |
| var util = require('util'); | |
| var Measured = require('measured'); | |
| var histogram = new Measured.Histogram(); | |
| var meter = new Measured.Meter(); | |
| var path = require('path'); | |
| var byw = 0; | |
| var lts = Date.now(); | |
| setInterval(function() { | |
| var dur = Date.now() - lts; | |
| var thr = 0; | |
| if (byw > 0) | |
| thr = (byw * 1000) / dur; | |
| console.log(thr); | |
| lts = Date.now(); | |
| byw = 0; | |
| }, 5000 * 100); | |
| setInterval(function() { | |
| console.log(meter.toJSON()); | |
| console.log(histogram.toJSON()); | |
| }, 30000 * 100); | |
| function gendefer() { | |
| var obj = {}; | |
| obj.promise = new Promise(function (resolve, reject) { | |
| obj.resolve = resolve; | |
| obj.reject = reject; | |
| }); | |
| return obj; | |
| } | |
| function sleep(ms) { | |
| return new Promise(function (resolve) { | |
| setTimeout(resolve, ms); | |
| }); | |
| } | |
| function genpoint(m, ts, v) { | |
| var buf = new Buffer(12); | |
| buf.writeInt32LE(m, 0); | |
| buf.writeInt32LE(ts, 4); | |
| buf.writeInt32LE(v, 8); | |
| return buf; | |
| } | |
| function deferredWrite(fd, start, buf, log) { | |
| var defer = gendefer(); | |
| var written = 0; | |
| var towrite = buf.length; | |
| if (log === true) | |
| console.log('deferredWrite .write', start, buf, buf.length); | |
| r(); | |
| function r() { | |
| if (written >= towrite) | |
| return defer.resolve(); | |
| fs.write(fd, buf, written, towrite - written, start + written, function (err, written_) { | |
| if (log === true) | |
| console.log('deferredWrite .write', err, written_); | |
| if (err) | |
| return defer.reject(err); | |
| written += written_; | |
| if (written >= towrite) | |
| return defer.resolve(); | |
| r(); | |
| }); | |
| } | |
| return defer.promise; | |
| } | |
| function genTsKey(mon, ts) { | |
| var dt = new Date(ts * 1000); | |
| dt = new Date(dt.getFullYear(), dt.getMonth(), dt.getDate()); | |
| dt = '' + parseInt('' + dt.getTime() / 1000, 10); | |
| return '' + mon + '.' + dt; | |
| } | |
| function encodeTsKeyBuf(buf, key) { | |
| var arr = key.split('.'); | |
| var mon = parseInt(arr[0], 10); | |
| var ts = parseInt(arr[1], 10); | |
| console.log('encodeTsKeyBuf', key, buf, arr, mon, ts); | |
| buf.writeInt32LE(mon, 0); | |
| buf.writeInt32LE(ts, 4); | |
| } | |
| function URTSDB(opts) { | |
| if (!opts) | |
| throw new Error('opts is required'); | |
| var _self = this; | |
| this.bulks = []; | |
| this.currBulk = []; | |
| this.blobSz = opts.blobSz || (128 * 1024 * 1024); | |
| this.maxBulkSz = opts.maxBulkSz || 100; | |
| this.maxWalSz = opts.maxWalSz || (2 * 1024 * 1024); | |
| this.lastWal = null; | |
| this.currWalPath = null; | |
| this.lastWalOffset = 0; | |
| this.lastBlob = null; | |
| this.lastBlobOffset = 0; | |
| this.walFileSize = 0; | |
| this.writePointsScheduled = false; | |
| this.bulkTmo = null; | |
| this.metaDB = levelPromisify(levelup('ts/meta')); | |
| this.blobfds = {}; | |
| Emitter.call(this); | |
| //this.load(); | |
| this | |
| .openWal() | |
| .then(function () { | |
| _self.emit('ready'); | |
| }) | |
| .catch(function (err) { | |
| _self.emit('error', err); | |
| }); | |
| this.once('ready', writeBulk); | |
| setTimeout(clearWalFiles, 1000); | |
| function *readWal(path) { | |
| var ret = { | |
| mons: { | |
| // monitorID > [Buffer(4) int32le, Buffer(4) int32le] | |
| } | |
| }; | |
| var data = yield fsp.readFile('ts/wals/' + path); | |
| var remaing = data.length; | |
| var pos = 0; | |
| var buf = new Buffer(12); | |
| var mon; | |
| var ts; | |
| var key; | |
| while (remaing >= 12) { | |
| data.copy(buf, 0, pos, pos + 12); | |
| mon = buf.readInt32LE(0); | |
| ts = buf.readInt32LE(4); | |
| key = genTsKey(mon, ts); | |
| if (!ret.mons[key]) | |
| ret.mons[key] = []; | |
| var nbuf = new Buffer(8); | |
| buf.copy(nbuf, 0, 4); | |
| ret.mons[key].push(nbuf); | |
| pos += 12; | |
| remaing -= 12; | |
| } | |
| return ret; | |
| } | |
| function clearWalFiles() { | |
| co(function *() { | |
| var wals = fs.readdirSync('ts/wals'); | |
| var currWal = path.basename(_self.currWalPath); | |
| var idx = wals.indexOf(currWal); | |
| var keybuf = new Buffer(8); | |
| if (idx >= 0) | |
| wals.splice(idx, 1); | |
| if (wals.length === 0) | |
| return setTimeout(clearWalFiles, 2000); | |
| console.log(wals, currWal); | |
| for (var i = 0; i < wals.length; i++) { | |
| var walPath = wals[i]; | |
| console.log('clean wal:', walPath); | |
| var wal = yield readWal(walPath); | |
| // find corresponding blob | |
| for (var key in wal.mons) { | |
| var blobfd; | |
| var blobname; | |
| var blobOffset = 0; | |
| var dataOffset = 0; | |
| try { | |
| var data; | |
| var blobLocationMeta = yield _self.metaDB.get('blob-of-' + key); | |
| console.log('blob found.', blobLocationMeta, key); | |
| try { | |
| data = JSON.parse(blobLocationMeta); | |
| } catch (err) { | |
| throw new Error('blob bilgisi json olarak kaydedilmemiş'); | |
| } | |
| if (!_self.blobfds[data.name]) { | |
| _self.blobfds[data.name] = yield fsp.open('ts/blobs/' + data.name, 'r+'); | |
| } | |
| blobfd = _self.blobfds[data.name]; | |
| blobname = data.name; | |
| blobOffset = data.blobOffset; | |
| dataOffset = data.dataOffset; | |
| if (_self.lastBlob === null) { | |
| _self.lastBlob = blobfd; | |
| _self.lastBlobOffset = blobOffset + (24 * 12 * 8) + 8; | |
| } | |
| } catch (err) { | |
| if (err.type !== 'NotFoundError') | |
| throw err; | |
| // blob bulunamadı. son blob'u kullanayım eğer varsa, ve yeterince alana sahipse. | |
| if (_self.lastBlob !== null && (_self.lastBlobOffset + ((24 * 12 * 8) + 8)) < (_self.blobSz)) { | |
| blobfd = _self.lastBlob; | |
| blobOffset = _self.lastBlobOffset; | |
| blobname = _self.lastBlobName; | |
| } else { | |
| // ya şu an bir blob yok ya da yeterince alanı kalmamış | |
| blobname = Date.now() + '.blob'; | |
| var blobpath = 'ts/blobs/' + blobname; | |
| yield truncateFile(blobpath, _self.blobSz); | |
| blobfd = yield fsp.open(blobpath, 'r+'); | |
| /* referans sayımlı bir veri yapısı ile bunları periyoduk olarak | |
| zamanaşımına uğratıp silmeliyim */ | |
| _self.blobfds[blobname] = blobfd; | |
| _self.lastBlob = blobfd; | |
| _self.lastBlobOffset = 0; | |
| _self.lastBlobName = blobname; | |
| } | |
| // şimdi bu anahtarın ait olduğu blob leveldb'de bulunmuyor. bu yüzden | |
| // veriyi ben yazmalıyım | |
| yield _self.metaDB.put('blob-of-' + key, JSON.stringify({ | |
| name: blobname, | |
| blobOffset: blobOffset, | |
| dataOffset: 0 | |
| })); | |
| _self.lastBlobOffset += ((24 * 12 * 8) + 8); | |
| } | |
| // şimdi bu anahtara dair verileri ilgili bloba yazmalıyım | |
| var pointbuf = wal.mons[key]; | |
| pointbuf = Buffer.concat(pointbuf); | |
| wal.mons[key] = null; | |
| // keybuf'u her zaman yazılmış sayıyorum ve pos'a 8'i ekliyorum | |
| // ve dataOffset'i de pos'a ekliyorum ki bu anahtara daha önceden veri | |
| // yazıldı ise üzerine yazılmasın, devamına yazılsın. | |
| var pos = 8 + dataOffset; | |
| // bir de, veri tümden yazılamaz. yazılabilecek boyut sınırlı ve bu | |
| // boyutu aşmamak lazım. | |
| var sz = pointbuf.length <= (24 * 12 * 8) ? pointbuf.length : (24 * 12 * 8); | |
| // data önceden veri yazıldı yazılan kadarki kısmı yazılacak miktardan | |
| // düşürmek lazım | |
| sz -= dataOffset; | |
| // eğer daha yeni veri yazmaya başlıyorsam öncelikle anahtarı yazmalıyım | |
| if (dataOffset === 0) { | |
| console.log('encodeTsKeyBuf', key); | |
| encodeTsKeyBuf(keybuf, key); | |
| // direkt bloboffset'e yazıyorum, çünkü verinin başlangıç noktası orası | |
| yield deferredWrite(blobfd, blobOffset, keybuf); | |
| } | |
| // şimdi de asıl veriyi yazıp blob'daki şu anki işlemimi bitireyim | |
| console.log('veri yazılacak.', { | |
| blobname: blobname, | |
| len: sz, | |
| actuallen: pointbuf.length, | |
| blobfd: blobfd, | |
| pos: pos | |
| }); | |
| yield deferredWrite(blobfd, pos, pointbuf.slice(0, sz), true); | |
| // meta'yı güncelleyeyim | |
| console.log('bloba yazma şu anahtar için tamam:', key); | |
| yield _self.metaDB.put('blob-of-' + key, JSON.stringify({ | |
| name: blobname, | |
| blobOffset: blobOffset, | |
| dataOffset: dataOffset + sz | |
| })); | |
| } | |
| // wal'ı tümden bloblara aktardım. şimdi onu silebilirim | |
| yield fsp.unlink('ts/wals/' + walPath); | |
| } | |
| setTimeout(clearWalFiles, 10000); | |
| }) | |
| .catch(function(err) { | |
| console.log(err.stack); | |
| }); | |
| } | |
| function writeBulk() { | |
| if (_self.bulks.length === 0) | |
| return setTimeout(writeBulk, 500); | |
| var bulk = _self.bulks.shift(); | |
| co (function *() { | |
| if ((_self.lastWalOffset + bulk.length) > _self.maxWalSz) | |
| yield _self.genNewWal(); | |
| var start = Date.now(); | |
| yield deferredWrite(_self.lastWal, _self.lastWalOffset, bulk); | |
| var end = Date.now(); | |
| _self.lastWalOffset += bulk.length; | |
| yield _self.metaDB.put('lastWalOffset', '' + _self.lastWalOffset); | |
| byw += bulk.length; | |
| meter.mark(); | |
| histogram.update(end - start); | |
| process.nextTick(writeBulk); | |
| }) | |
| .catch(function(err) { | |
| _self.emit('error', err); | |
| // do not try to write points anymore | |
| console.log('do not try to write points anymore'); | |
| }); | |
| } | |
| } | |
| util.inherits(URTSDB, Emitter); | |
| URTSDB.prototype.openWal = function () { | |
| var defer = gendefer(); | |
| var _self = this; | |
| co(function *() { | |
| var lastWal = yield _self.metaDB.get('lastWal'); | |
| var lastWalOffset = yield _self.metaDB.get('lastWalOffset'); | |
| lastWalOffset = parseInt(lastWalOffset, 10); | |
| console.log('lastWal:', lastWal, lastWalOffset); | |
| if (!fs.existsSync(lastWal)) { | |
| var tmp = fs.openSync(lastWal, 'w+'); | |
| fs.closeSync(tmp); | |
| lastWalOffset = 0; | |
| } | |
| var fd = yield fsp.open(lastWal, 'r+'); | |
| _self.lastWal = fd; | |
| _self.lastWalOffset = lastWalOffset; | |
| _self.currWalPath = lastWal; | |
| defer.resolve(); | |
| }).catch(function (err) { | |
| if (!err.type || err.type !== 'NotFoundError') | |
| return defer.reject(err); | |
| _self | |
| .genNewWal() | |
| .then(defer.resolve) | |
| .catch(defer.reject); | |
| }); | |
| return defer.promise; | |
| }; | |
| function truncateFile(filename, sz){ | |
| var defer = gendefer(); | |
| fs.open(filename, 'w', function (err, fd) { | |
| if (err) | |
| return defer.reject(err); | |
| try { | |
| fs.closeSync(fd); | |
| } catch (err) { | |
| return defer.reject(err); | |
| } | |
| fs.truncate(filename, sz, function (err) { | |
| if (err) { | |
| console.log('truncateFile err', err); | |
| return defer.reject(err); | |
| } | |
| defer.resolve(); | |
| }); | |
| }); | |
| return defer.promise; | |
| } | |
| URTSDB.prototype.genNewWal = function () { | |
| var _self = this; | |
| var defer = gendefer(); | |
| var filename = 'ts/wals/' + Date.now() + '.wal'; | |
| co(function *() { | |
| yield truncateFile(filename, _self.maxWalSz); | |
| yield _self.metaDB.put('lastWal', filename); | |
| yield _self.metaDB.put('lastWalOffset', '0'); | |
| var fd = yield fsp.open(filename, 'r+'); | |
| _self.lastWal = fd; | |
| _self.lastWalOffset = 0; | |
| _self.currWalPath = filename; | |
| defer.resolve(); | |
| }).catch(function(err) { | |
| defer.reject(err); | |
| }); | |
| return defer.promise; | |
| }; | |
| function scheduleBulkTmo(_self) { | |
| if (_self.bulkTmo === null) { | |
| _self.bulkTmo = setTimeout(function () { | |
| if (_self.currBulk.length === 0) | |
| return scheduleBulkTmo(_self); | |
| var bulk; | |
| do { | |
| bulk = _self.currBulk.length <= 200 ? _self.currBulk.length : 200; | |
| bulk = _self.currBulk.splice(0, bulk); | |
| _self.bulks.push(Buffer.concat(bulk)); | |
| } while(_self.currBulk.length > 0); | |
| _self.currBulk = []; | |
| _self.bulkTmo = null; | |
| }, 300); | |
| } | |
| } | |
| URTSDB.prototype.put = function (m, ts, val) { | |
| var buf = genpoint(m, ts, val); | |
| if (this.currBulk.length >= this.maxBulkSz) { | |
| this.bulks.push(Buffer.concat(this.currBulk)); | |
| this.currBulk = []; | |
| } | |
| this.currBulk.push(buf); | |
| scheduleBulkTmo(this); | |
| }; | |
| URTSDB.prototype.scheduleWrite = function () { | |
| var _self = this; | |
| if (this.writePointsScheduled === true) | |
| return; | |
| this.writePointsScheduled = true; | |
| process.nextTick(function () { | |
| _self.writePointsScheduled = false; | |
| _self.writePoints(); | |
| }); | |
| }; | |
| var tsdb = new URTSDB({ | |
| maxWalSz: 12 * 24 * 8 * 200, | |
| maxBulkSz: 300, | |
| blobSz: 12 * 24 * 8 * 200 | |
| }); | |
| tsdb.on('error', function (err) { | |
| console.log(err.stack); | |
| }); | |
| /*function r() { | |
| for (var i = 0; i < (2 * 1024 * 1024) / 12; i++) | |
| tsdb.put(i % 1000, i, 274); | |
| } | |
| setInterval(r, 50);*/ | |
| console.log(tsdb); | |
| co(function *() { | |
| var filename = 'sa.wal'; | |
| var fd = yield fsp.open(filename, 'r+'); | |
| yield truncateFile(filename, 100); | |
| yield fsp.write(fd, 'sa', 50); | |
| console.log('fd:', fd); | |
| yield fsp.close(fd); | |
| }); | |
| co(function *() { | |
| var dt = new Date(); | |
| dt = new Date(dt.getFullYear(), dt.getMonth(), dt.getDate()); | |
| var daystart = parseInt('' + dt.getTime() / 1000, 10); | |
| for (var i = 0; i < 400; i++) { | |
| for (var j = 0; j < 288; j++) { | |
| tsdb.put(i + 1, daystart + (j * 300), 274); | |
| } | |
| yield sleep(10); | |
| } | |
| console.log('tüm verileri yazdım'); | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment