Last active
December 15, 2015 16:49
-
-
Save creationix/5291866 to your computer and use it in GitHub Desktop.
await wrapper using node-fibers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Fiber = require("fibers"); | |
module.exports = await; | |
function await(continuation) { | |
var fiber = Fiber.current; | |
var result; | |
var async; | |
continuation(function (err, value) { | |
if (async === undefined) { | |
async = false; | |
result = value; | |
if (err) { throw err; } | |
return; | |
} | |
if (err) fiber.throwInto(err); | |
else fiber.run(value); | |
}); | |
if (async === undefined) { | |
async = true; | |
return Fiber.yield(); | |
} | |
return result; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Queue = require('./queue'); | |
module.exports = Pipe; | |
function Pipe() { | |
this.paused = false; | |
this.processing = false; | |
this.inputQueue = new Queue; | |
this.readerQueue = new Queue; | |
this.resumeList = []; | |
} | |
Pipe.prototype.highWaterMark = 1; | |
Pipe.prototype.lowWaterMark = 1; | |
Pipe.prototype.processReaders = function () { | |
// This function is not re-entrant. Keep out recursive calls with a semaphore. | |
if (this.processing) { return; } | |
this.processing = true; | |
// Let's play matchmaker and pair data with readers | |
while (this.inputQueue.length && this.readerQueue.length) { | |
var chunk = this.inputQueue.shift(); | |
var reader = this.readerQueue.shift(); | |
reader(null, chunk); | |
} | |
// Flow control logic for high-water/low-water and pause/resume. | |
var depth = this.inputQueue.length - this.readerQueue.length; | |
if (!this.paused && depth >= this.highWaterMark) { | |
// If there is too much data and not enough readers, | |
// tell the writer to pause. | |
this.paused = true; | |
} | |
else if (this.paused && depth <= this.lowWaterMark) { | |
// If we're paused and there is room for more data, | |
// tell the writer to resume | |
this.paused = false; | |
// and flush any pending write callbacks. | |
for (var i = 0, l = this.resumeList.length; i < l; i++) { | |
process.nextTick(this.resumeList[i]); | |
} | |
this.resumeList.length = 0; | |
} | |
// We're done here, allow this function to be called again. | |
this.processing = false; | |
}; | |
Pipe.prototype.read = function () { | |
var self = this; | |
return function (callback) { | |
self.readerQueue.push(callback); | |
self.processReaders(); | |
}; | |
}; | |
Pipe.prototype.write = function (chunk) { | |
var self = this; | |
return function (callback) { | |
self.inputQueue.push(chunk); | |
self.processReaders(); | |
if (!callback) { return; } | |
if (self.paused) { | |
self.resumeList.push(callback); | |
return; | |
} | |
callback(); | |
} | |
}; | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = Queue; | |
function Queue() { | |
this.head = []; | |
this.tail = []; | |
this.index = 0; | |
this.headLength = 0; | |
this.length = 0; | |
} | |
// Get an item from the front of the queue. | |
Queue.prototype.shift = function () { | |
if (this.index >= this.headLength) { | |
// When the head is empty, swap it with the tail to get fresh items. | |
var t = this.head; | |
t.length = 0; | |
this.head = this.tail; | |
this.tail = t; | |
this.index = 0; | |
this.headLength = this.head.length; | |
if (!this.headLength) { | |
return; | |
} | |
} | |
// There was an item in the head, let's pull it out. | |
var value = this.head[this.index]; | |
// And remove it from the head | |
if (this.index < 0) { | |
delete this.head[this.index++]; | |
} | |
else { | |
this.head[this.index++] = undefined; | |
} | |
this.length--; | |
return value; | |
}; | |
// Insert a new item at the front of the queue. | |
Queue.prototype.unshift = function (item) { | |
this.head[--this.index] = item; | |
this.length++; | |
return this; | |
}; | |
// Push a new item on the end of the queue. | |
Queue.prototype.push = function (item) { | |
// Pushes always go to the write-only tail | |
this.length++; | |
this.tail.push(item); | |
return this; | |
}; | |
/* | |
var q = new Queue; | |
q.push(1); | |
q.push(2); | |
q.push(3); | |
var i = 4; | |
while (q.length > 0) { | |
console.log(q.length, q.shift()); | |
q.unshift(i++); | |
console.log(q.length, q.shift()); | |
q.push(i++); | |
console.log(q.length, q.shift()); | |
} | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var await = require('./await'); | |
var Fiber = require('fibers'); | |
var Pipe = require('./pipe'); | |
// A simple awaitable sleep function | |
function sleep(ms) { return function (callback) { | |
setTimeout(callback, ms); | |
}} | |
// Writes to one end can be read out the other end | |
// but with internal buffering and flow-control. | |
var p = new Pipe(); | |
// Override the watermarks to buffer more. | |
p.highWaterMark = 3; | |
p.lowWaterMark = 3; | |
// Implement a producer fiber. | |
Fiber(function () { | |
var i = 0; | |
while (true) { | |
await(sleep(Math.random() * 800)); | |
console.log("Writing", i); | |
await(p.write(i++)); | |
} | |
}).run(); | |
// Implement a slightly slower consumer fiber. | |
// (slower to test the flow-control inside Pipe) | |
Fiber(function () { | |
do { | |
await(sleep(Math.random() * 1000)); | |
var chunk = await(p.read()); | |
console.log("Read", chunk); | |
} while (chunk !== undefined); | |
}).run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment