Skip to content

Instantly share code, notes, and snippets.

@latentflip
Forked from jaz303/gist:d56ee88acf652ca79bf7
Created September 17, 2014 09:25
Show Gist options
  • Save latentflip/c1480de35fcaec5e2e55 to your computer and use it in GitHub Desktop.
Save latentflip/c1480de35fcaec5e2e55 to your computer and use it in GitHub Desktop.
//
// Lame implementation of a channel
function channel() {
return new Channel();
}
var id = 0;
function Channel(opts) {
this._queue = [];
this._waiters = [];
this.id = id++;
if (opts) this.parent = opts.parent;
}
Channel.prototype.take = function() {
var _resolve = null;
var promise = new Promise(function(resolve, reject) {
_resolve = resolve;
});
promise.resolve = _resolve;
this._waiters.push(promise);
this._drain();
return promise;
}
Channel.prototype.put = function(item) {
this._queue.push(item);
this._drain();
}
Channel.prototype._drain = function() {
if (this._drainRequested) return;
this._drainRequested = true;
var self = this;
setTimeout(function() {
self._drainRequested = false;
while (self._queue.length && self._waiters.length) {
self._waiters.shift().resolve(self._queue.shift());
}
}, 0);
};
Channel.prototype.reduce = function (fn, initialValue) {
var newCh = new Channel({ parent: this.id });
var self = this;
spawn(function* () {
var currentValue = initialValue;
while (true) {
var value = yield self.take();
var nextValue = fn(currentValue, value);
if (nextValue !== undefined) {
currentValue = nextValue;
newCh.put(currentValue);
}
}
});
return newCh;
};
Channel.prototype.map = function (fn) {
return this.reduce(function (_, val) {
return fn(val);
});
};
Channel.prototype.filter = function (fn) {
return this.reduce(function (_, val) {
if (fn(val)) return val;
});
};
//
//
function wait(delay) {
return new Promise(function(resolve) {
setTimeout(resolve, delay);
});
}
function engine() {
var tasks = [];
var waiting = [];
function spawn(fn, args, ctx) {
tasks.push([fn.apply(ctx || null, args), undefined]);
tick();
}
function tick() {
var task = tasks.shift();
var result = task[0].next(task[1]);
if (result.done) {
// do nothing, task complete
} else if (result.value && typeof result.value.then === 'function') {
result.value.then(function(res) {
tasks.push([task[0], res]);
tick();
});
} else {
tasks.push(task);
process.nextTick(tick);
}
}
return spawn;
}
var spawn = engine();
var ch = channel();
spawn(function *() {
while (true) {
yield wait(250);
ch.put(1);
}
});
spawn(function *() {
while (true) {
yield wait(1000);
ch.put(2);
}
});
spawn(function *() {
while (true) {
yield wait(1500);
ch.put(3);
}
});
var modifiedCh = ch.map(function (v) { return v*v; })
.filter(function (v) { return v > 3; })
.reduce(function (sum, v) { return sum + v; }, 0);
spawn(function *() {
while (true) {
var value = yield modifiedCh.take();
console.log('Value: ' + value);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment