Skip to content

Instantly share code, notes, and snippets.

@tonistiigi
Created November 8, 2012 15:09
Show Gist options
  • Save tonistiigi/4039357 to your computer and use it in GitHub Desktop.
Save tonistiigi/4039357 to your computer and use it in GitHub Desktop.
data replicate stream
/**
* Experimental data store test
* - Dynamic data, any JS object structure
* - All changes evented
* - Stream based replication/syncronization
* - Parts inspired by substack/emit-stream
* - Persistence can be done via dominictarr/kv
**/
var Stream = require('stream').Stream
var through = require('through')
function DataStream() {
// todo: clear if not inited
this.writable = true;
this._emit = this.emit
var self = this
this.emit = function () {
if (self._replicateStreams) {
var args = [].slice.call(arguments)
self._replicateStreams.forEach(function (s) {
s.write(args)
})
}
self._emit.apply(self, arguments)
}
this.write = function(data) {
self.emit.apply(self, data)
}
this.end = function() {}
this.on('pipe', function() {
self._emit('clear')
})
}
DataStream.prototype = Object.create(Stream.prototype)
DataStream.prototype.replicateStream = function() {
var self = this
var nextTick, ended, stack = []
var s = through(
function write (args) {
if (nextTick) s.emit('data', args)
else stack.push(args)
},
function end () {
if (!nextTick) {
ended = true
}
else {
var ix = self._replicateStreams.indexOf(s)
self._replicateStreams.splice(ix, 1)
s.emit('end')
}
}
)
process.nextTick(function() {
nextTick = true
stack.forEach(s.write)
if (ended) s.end()
})
this._emit('replicate', {
emit: function () {
s.write([].slice.call(arguments))
}
})
if (!this._replicateStreams) this._replicateStreams = []
this._replicateStreams.push(s)
return s
}
module.exports = DataStream
var Data = require('./data.js')
function getObject() {
var data = new Data()
data.on('add:child', function(a) {
data.children.push(a)
})
data.on('del:child', function(id) {
data.children = data.children.filter(function(d){
return d.id != id
})
})
data.on('set:foo', function(val) {
data.foo = val
})
data.on('clear', function() {
data.children = []
data.foo = 'default'
})
data.on('replicate', function(copy) {
data.children.forEach(function(d) {
copy.emit('add:child', d)
})
if (data.foo != 'default') {
copy.emit('set:foo', data.foo)
}
})
data.emit('clear')
return data
}
var d1 = getObject()
var d2 = getObject()
d1.replicateStream().pipe(d2)
d1.emit('add:child', {id: 1, name: 'foo'})
var d3 = getObject()
setTimeout(function(){
d1.emit('add:child', {id: 2, name: 'bar'})
console.log(d1.children, d1.foo)
console.log(d2.children, d2.foo)
d1.emit('set:foo', 'bar')
d2.replicateStream().pipe(d3)
d1.emit('add:child', {id: 3, name: 'baz'})
d1.emit('del:child', 1)
}, 100)
setTimeout(function() {
console.log('---\n')
console.log(d1.children, d1.foo)
console.log(d2.children, d2.foo)
console.log(d3.children, d3.foo)
}, 200);
@dominictarr
Copy link

cool, can you use this duplex style, double ended?

@tonistiigi
Copy link
Author

You can do

s.pipe(obj.replicateStream()).pipe(s)

and then replicatestream will be properly cleaned up after remote stream goes away.

But you can't pipe data out of object itself, only in. Thats because the information you send to remote depends on the time the it connects. You only get the output of replicate event not all emitters that have run.

Also if you pipe from replicatestream to the object itselt(through some remote loop for example) you will currently get an infinite loop. This is something that could be easily fixable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment