Created
November 8, 2012 15:09
-
-
Save tonistiigi/4039357 to your computer and use it in GitHub Desktop.
data replicate stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Experimental data store test | |
* - Dynamic data, any JS object structure | |
* - All changes evented | |
* - Stream based replication/syncronization | |
* - Parts inspired by substack/emit-stream | |
* - Persistence can be done via dominictarr/kv | |
**/ | |
var Stream = require('stream').Stream | |
var through = require('through') | |
function DataStream() { | |
// todo: clear if not inited | |
this.writable = true; | |
this._emit = this.emit | |
var self = this | |
this.emit = function () { | |
if (self._replicateStreams) { | |
var args = [].slice.call(arguments) | |
self._replicateStreams.forEach(function (s) { | |
s.write(args) | |
}) | |
} | |
self._emit.apply(self, arguments) | |
} | |
this.write = function(data) { | |
self.emit.apply(self, data) | |
} | |
this.end = function() {} | |
this.on('pipe', function() { | |
self._emit('clear') | |
}) | |
} | |
DataStream.prototype = Object.create(Stream.prototype) | |
DataStream.prototype.replicateStream = function() { | |
var self = this | |
var nextTick, ended, stack = [] | |
var s = through( | |
function write (args) { | |
if (nextTick) s.emit('data', args) | |
else stack.push(args) | |
}, | |
function end () { | |
if (!nextTick) { | |
ended = true | |
} | |
else { | |
var ix = self._replicateStreams.indexOf(s) | |
self._replicateStreams.splice(ix, 1) | |
s.emit('end') | |
} | |
} | |
) | |
process.nextTick(function() { | |
nextTick = true | |
stack.forEach(s.write) | |
if (ended) s.end() | |
}) | |
this._emit('replicate', { | |
emit: function () { | |
s.write([].slice.call(arguments)) | |
} | |
}) | |
if (!this._replicateStreams) this._replicateStreams = [] | |
this._replicateStreams.push(s) | |
return s | |
} | |
module.exports = DataStream |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Data = require('./data.js') | |
function getObject() { | |
var data = new Data() | |
data.on('add:child', function(a) { | |
data.children.push(a) | |
}) | |
data.on('del:child', function(id) { | |
data.children = data.children.filter(function(d){ | |
return d.id != id | |
}) | |
}) | |
data.on('set:foo', function(val) { | |
data.foo = val | |
}) | |
data.on('clear', function() { | |
data.children = [] | |
data.foo = 'default' | |
}) | |
data.on('replicate', function(copy) { | |
data.children.forEach(function(d) { | |
copy.emit('add:child', d) | |
}) | |
if (data.foo != 'default') { | |
copy.emit('set:foo', data.foo) | |
} | |
}) | |
data.emit('clear') | |
return data | |
} | |
var d1 = getObject() | |
var d2 = getObject() | |
d1.replicateStream().pipe(d2) | |
d1.emit('add:child', {id: 1, name: 'foo'}) | |
var d3 = getObject() | |
setTimeout(function(){ | |
d1.emit('add:child', {id: 2, name: 'bar'}) | |
console.log(d1.children, d1.foo) | |
console.log(d2.children, d2.foo) | |
d1.emit('set:foo', 'bar') | |
d2.replicateStream().pipe(d3) | |
d1.emit('add:child', {id: 3, name: 'baz'}) | |
d1.emit('del:child', 1) | |
}, 100) | |
setTimeout(function() { | |
console.log('---\n') | |
console.log(d1.children, d1.foo) | |
console.log(d2.children, d2.foo) | |
console.log(d3.children, d3.foo) | |
}, 200); |
You can do
s.pipe(obj.replicateStream()).pipe(s)
and then replicatestream will be properly cleaned up after remote stream goes away.
But you can't pipe data out of object itself, only in. Thats because the information you send to remote depends on the time the it connects. You only get the output of replicate event not all emitters that have run.
Also if you pipe from replicatestream to the object itselt(through some remote loop for example) you will currently get an infinite loop. This is something that could be easily fixable.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
cool, can you use this duplex style, double ended?