Created
August 21, 2018 18:59
-
-
Save leidegre/5dea27d9caa841526d17d44372e16481 to your computer and use it in GitHub Desktop.
Go channel abstraction for JavaScript
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { default: Channel } = require("./channel"); | |
const timeout = 100; | |
async function unbuffered() { | |
const xs = []; | |
var ch = new Channel(); | |
xs.push("start"); | |
setTimeout(async () => { | |
xs.push("timeout"); | |
await ch.receive(); | |
xs.push("received"); | |
}, timeout); | |
xs.push("send"); | |
await ch.send(1); | |
assertEqual(["start", "send", "timeout", "received"], xs); | |
} | |
async function unbuffered2() { | |
const xs = []; | |
var ch = new Channel(); | |
xs.push("start"); | |
setTimeout(async () => { | |
xs.push("timeout 1"); | |
const x = await ch.receive(); | |
xs.push("received " + x); | |
}, timeout); | |
setTimeout(async () => { | |
xs.push("timeout 2"); | |
const x = await ch.receive(); | |
xs.push("received " + x); | |
}, 2 * timeout); | |
setTimeout(async () => { | |
xs.push("timeout 3"); | |
const x = await ch.receive(); | |
xs.push("received " + x); | |
}, 3 * timeout); | |
xs.push("send 1"); | |
await ch.send(1); | |
xs.push("send 2"); | |
await ch.send(2); | |
xs.push("send 3"); | |
await ch.send(3); | |
assertEqual( | |
[ | |
"start", | |
"send 1", | |
"timeout 1", | |
"received 1", | |
"send 2", | |
"timeout 2", | |
"received 2", | |
"send 3", | |
"timeout 3", | |
"received 3" | |
], | |
xs | |
); | |
} | |
async function buffered() { | |
const xs = []; | |
var ch = new Channel(2); | |
xs.push("start"); | |
setTimeout(async () => { | |
xs.push("timeout 1"); | |
const x = await ch.receive(); | |
xs.push("received " + x); | |
}, timeout); | |
setTimeout(async () => { | |
xs.push("timeout 2"); | |
const x = await ch.receive(); | |
xs.push("received " + x); | |
}, 2 * timeout); | |
setTimeout(async () => { | |
xs.push("timeout 3"); | |
const x = await ch.receive(); | |
xs.push("received " + x); | |
}, 3 * timeout); | |
xs.push("send 1"); | |
await ch.send(1); | |
xs.push("send 2"); | |
await ch.send(2); | |
xs.push("send 3"); | |
await ch.send(3); | |
xs.push("send done"); | |
await new Promise(resolve => setTimeout(resolve, 4 * timeout)); // wait... | |
xs.push("receive done"); | |
assertEqual( | |
[ | |
"start", | |
"send 1", | |
"send 2", | |
"send 3", | |
"timeout 1", | |
"received 1", | |
"send done", | |
"timeout 2", | |
"received 2", | |
"timeout 3", | |
"received 3", | |
"receive done" | |
], | |
xs | |
); | |
} | |
async function close() { | |
const xs = []; | |
var ch = new Channel(); | |
xs.push("start"); | |
setTimeout(async () => { | |
xs.push("receive"); | |
const x = await ch.receive(); | |
xs.push("received " + x); | |
}, timeout); | |
setTimeout(() => { | |
xs.push("close"); | |
ch.close(); | |
}, 2 * timeout); | |
setTimeout(async () => { | |
xs.push("send 1"); | |
await ch.send(1); | |
xs.push("send done"); | |
}, 3 * timeout); | |
await new Promise(resolve => setTimeout(resolve, 4 * timeout)); // wait... | |
xs.push("done"); | |
assertEqual( | |
[ | |
"start", | |
"receive", | |
"close", | |
"received undefined", | |
"send 1", | |
"send done", | |
"done" | |
], | |
xs | |
); | |
} | |
async function receive() { | |
const xs = []; | |
var ch = new Channel(); | |
xs.push("start"); | |
setTimeout(async () => { | |
xs.push("receive"); | |
const x = await ch.receive(); | |
xs.push("received " + x); | |
}, timeout); | |
setTimeout(async () => { | |
xs.push("send 1"); | |
await ch.send(1); | |
xs.push("send done"); | |
}, 2 * timeout); | |
await new Promise(resolve => setTimeout(resolve, 3 * timeout)); // wait... | |
xs.push("done"); | |
assertEqual( | |
["start", "receive", "send 1", "received 1", "send done", "done"], | |
xs | |
); | |
} | |
async function main() { | |
const tests = [unbuffered(), unbuffered2(), buffered(), close(), receive()]; | |
await Promise.all(tests); | |
} | |
function assertEqual(expected, actual) { | |
const a = JSON.stringify(expected); | |
const b = JSON.stringify(actual); | |
if (a !== b) { | |
console.error(arguments.callee.caller.name, ":", a, "does not equal", b); | |
} else { | |
console.debug(arguments.callee.caller.name, ":", "ok"); | |
} | |
} | |
main(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
Object.defineProperty(exports, "__esModule", { value: true }); | |
class Channel { | |
/** | |
* The capacity, in number of elements, sets the size of the buffer in the channel. | |
* succeeds only when both a sender and receiver are ready. | |
* | |
* see https://golang.org/ref/spec#Channel_types | |
*/ | |
constructor(cap = 0) { | |
this._producers = []; | |
this._consumers = []; | |
this._buffer = []; | |
this._closed = false; | |
this._cap = cap; | |
} | |
/** | |
* The number of elements queued (unread) in the channel buffer. | |
*/ | |
get size() { | |
return this._buffer.length; | |
} | |
async send(value) { | |
if (value === undefined) { | |
throw new TypeError("value cannot be undefined"); | |
} | |
if (!this._closed) { | |
this._buffer.push(value); | |
// unblock a consumer | |
if (0 < this._consumers.length) { | |
const consumer = this._consumers.shift(); | |
consumer(); | |
} | |
if (!(this._buffer.length <= this._cap)) { | |
// block the producer | |
const waitTask = new Promise(resolve => { | |
this._producers.push(resolve); | |
}); | |
await waitTask; // yield | |
} | |
} | |
} | |
async receive() { | |
if (!(0 < this._buffer.length)) { | |
// block the consumer | |
const waitTask = new Promise(resolve => { | |
this._consumers.push(resolve); | |
}); | |
await waitTask; // yield | |
} | |
if (!this._closed) { | |
if (0 < this._buffer.length) { | |
const value = this._buffer.shift(); | |
// unblock a producer | |
if (0 < this._producers.length) { | |
const producer = this._producers.shift(); | |
producer(); | |
} | |
return value; | |
} | |
} | |
return undefined; | |
} | |
/** | |
* Close the channel | |
* note: this is not a Promise-based asynchrnous API | |
*/ | |
close() { | |
this._closed = true; | |
// unblock! | |
while (0 < this._consumers.length) { | |
const consumer = this._consumers.shift(); | |
consumer(); | |
} | |
while (0 < this._producers.length) { | |
const producer = this._producers.shift(); | |
producer(); | |
} | |
} | |
} | |
exports.default = Channel; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* starts the execution of a function call as an independent concurrent thread of control. | |
*/ | |
function go(f: () => Promise<any>) { | |
setImmediate(() => | |
f().catch((err: any) => { | |
console.error(err); | |
process.exit(1); | |
}) | |
); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment