Created
May 20, 2021 17:49
-
-
Save qti3e/99c57dfeefca2e88b4746d19e860c45f to your computer and use it in GitHub Desktop.
Go concurrency in JavaScript
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Go, Chan, Lock } from "./go"; | |
const chan = Chan<number>(); | |
const sumPromise = Go(async ($: WaitFn) => { | |
let sum = 0; | |
for await (const msg of chan.receive()) { | |
console.log('Received', msg); | |
sum += msg; | |
} | |
return sum; | |
}); | |
Go(async ($: WaitFn) => { | |
for await (const i of $.range(1000)) { | |
console.log('Send', i); | |
chan.send(i); | |
} | |
chan.close(); | |
}); | |
const sum = await sumPromise; | |
console.log('Sum =', sum); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Go, Chan, Lock } from "./go"; | |
Go(async ($: WaitFn) => { | |
for await (const i of $.range(5)) { | |
console.log('F1', i); | |
} | |
}); | |
Go(async ($: WaitFn) => { | |
for await (const i of $.range(10)) { | |
console.log('F2', i); | |
} | |
}); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { once } from './once'; | |
// The codes in this file allows us to have a concurrency model like the Go lang | |
// there is an exported function called `Go` which works somewhat like the `go` | |
// keyword in the Golang. | |
// | |
// ```ts | |
// Go(async ($: WaitFn) => { | |
// for await (const i of $.range(5)) { | |
// console.log('F1', i); | |
// } | |
// }); | |
// | |
// Go(async ($: WaitFn) => { | |
// for await (const i of $.range(10)) { | |
// console.log('F2', i); | |
// } | |
// }); | |
// ``` | |
// | |
// Both of these functions will run concurrently. | |
// | |
// The current implementation uses a global job task scheduler, and it is | |
// designed to block the main thread for 100 consecutive ms at most (if used | |
// correctly of course.), calling the WaitFn ($) will return a promise that | |
// gets resolved once the function is allowed to continue execution, using the | |
// same idea there are some utility functions that are using the waitFn in their | |
// implementation including `waitFn.range()` and `waitFn.iter()`, also Channels | |
// and Locks are implemented on the same idea. | |
export interface WaitFn { | |
/** | |
* Returns a promise that get resolved once the function is allowed to | |
* continue execution. | |
*/ | |
(): Promise<void>; | |
/** | |
* Returns an async iterable that yields numbers from 0 to n (exclusive). | |
* @param n The end number. | |
*/ | |
range(n: number): AsyncIterable<number>; | |
/** | |
* Returns an async iterable over the given iterable. | |
* @param iterable The iterable. | |
*/ | |
iter<T>(iterable: Iterable<T>): AsyncIterable<T>; | |
} | |
/** | |
* Call this function to release the lock, it will throw if called | |
* more than one time. | |
*/ | |
export type LockReleaseCb = () => void; | |
/** | |
* A lock can be used to lock a specific resource. | |
*/ | |
export interface Lock { | |
/** | |
* Retains the lock asynchronously. | |
*/ | |
retain(): Promise<LockReleaseCb>; | |
/** | |
* Tries to retain the lock synchronously if the lock is already retained by | |
* someone else it simply returns undefined. | |
*/ | |
retainSync(): LockReleaseCb | undefined; | |
} | |
/** | |
* A channel can be used to send messages from/to routines. | |
* | |
* # Example | |
* | |
* ```ts | |
* const chan = Chan<number>(); | |
* const sumPromise = Go(async ($: WaitFn) => { | |
* let sum = 0; | |
* for await (const msg of chan.receive()) { | |
* console.log('Received', msg); | |
* sum += msg; | |
* } | |
* return sum; | |
* }); | |
* | |
* Go(async ($: WaitFn) => { | |
* for await (const i of $.range(1000)) { | |
* console.log('Send', i); | |
* chan.send(i); | |
* } | |
* chan.close(); | |
* }); | |
* | |
* const sum = await sumPromise; | |
* console.log('Sum =', sum); | |
* ``` | |
*/ | |
export interface Chan<T> { | |
/** | |
* Indicates if the channel is closed, you cannot send messages in a closed | |
* channel. | |
*/ | |
readonly isClosed: boolean; | |
/** | |
* Closes the channel. | |
*/ | |
close(): void; | |
/** | |
* Send the given data thought the channel, only the currently active | |
* receivers will receive the message (Receivers that were registered | |
* before calling this function.) | |
* @param data The data which you want to send. | |
*/ | |
send(data: T): void; | |
/** | |
* Returns an async iterable over the message of this channel. | |
*/ | |
receive(): AsyncIterable<T>; | |
/** | |
* Returns the first message, if there us no message before channel being | |
* closed returns an undefined. | |
*/ | |
first(): Promise<T | undefined>; | |
} | |
/** | |
* A function that can be executed using the Go function. | |
*/ | |
export type ConcurrentFunction<Args extends any[] = [], R = void> = ( | |
$: WaitFn, | |
...args: Args | |
) => Promise<R>; | |
/* tslint:disable:no-namespace */ | |
namespace Concurrency { | |
// The implementation. | |
interface Resolvable extends Promise<void> { | |
resolve(): void; | |
} | |
function createResolvable(): Resolvable { | |
let resolve: () => void; | |
const promise: Resolvable = new Promise(r => (resolve = r)) as Resolvable; | |
Object.assign(promise, { resolve: resolve! }); | |
return promise; | |
} | |
// Max time in ms that each execution round blocks the main thread. | |
const LIMIT = 100; | |
// A task in the tasks queue, it is usually the resolve cb of a promise, | |
// returned by waitFn(). | |
type Task = () => void; | |
// The task queue. | |
const tasks: Task[] = []; | |
let started = 0; | |
// Executes a set of tasks from the queue for about 100ms. | |
function tick() { | |
started = Date.now(); | |
while (tasks.length) { | |
tasks.shift()!(); | |
if (Date.now() - started > LIMIT) { | |
return; | |
} | |
} | |
} | |
declare function setTimeout(cb: () => void, t: number): any; | |
// Starts the timer. | |
function startTimer() { | |
setTimeout(tick, 0); | |
} | |
// Pushes the given task in the tasks queue. | |
function enqueue(task: Task) { | |
if (tasks.length === 0) { | |
startTimer(); | |
} | |
tasks.push(task); | |
} | |
function returnResolved() { | |
return tasks.length === 0 && Date.now() - started <= LIMIT; | |
} | |
// Implementation of the `WaitFn`. | |
function waitFn() { | |
if (returnResolved()) { | |
return Promise.resolve(); | |
} | |
return new Promise<void>(resolve => { | |
enqueue(resolve); | |
}); | |
} | |
namespace waitFn { | |
export async function* range(n: number): AsyncIterable<number> { | |
for (let i = 0; i < n; ++i) { | |
await waitFn(); | |
yield i; | |
} | |
} | |
export async function* iter<T>(iterable: Iterable<T>): AsyncIterable<T> { | |
for (const data of iterable) { | |
yield data; | |
await waitFn(); | |
} | |
} | |
} | |
// Implementation of the `Go` function. | |
export function go<Args extends any[] = [], R = void>( | |
fn: ConcurrentFunction<Args, R>, | |
...args: Args | |
): Promise<R> { | |
let resolve: (data: R) => void; | |
fn(waitFn, ...args).then(data => { | |
resolve(data); | |
}); | |
return new Promise<R>(r => (resolve = r)); | |
} | |
// Channel implementation. | |
class Channel<T> implements Chan<T> { | |
private static FINISH = Symbol('finish'); | |
private instances: (T | typeof Channel.FINISH)[][] = []; | |
private newMessageNotify: Resolvable = createResolvable(); | |
isClosed = false; | |
send(data: T): void { | |
if (this.isClosed) { | |
throw new Error('Cannot send new data to a closed channel.'); | |
} | |
for (const instance of this.instances) { | |
instance.push(data); | |
} | |
this.newMessageNotify.resolve(); | |
this.newMessageNotify = createResolvable(); | |
} | |
close() { | |
this.isClosed = true; | |
this.newMessageNotify.resolve(); | |
} | |
async *receive(): AsyncIterable<T> { | |
const instance: (T | typeof Channel.FINISH)[] = []; | |
this.instances.push(instance); | |
while (!this.isClosed || instance.length) { | |
await this.newMessageNotify; | |
if (instance.length) { | |
const data = instance.shift()!; | |
if (data === Channel.FINISH) { | |
return; | |
} | |
yield data as T; | |
} | |
} | |
} | |
async first(): Promise<T | undefined> { | |
const iter = this.receive(); | |
const instance = this.instances[this.instances.length - 1]; | |
for await (const x of iter) { | |
const instanceIndex = this.instances.indexOf(instance); | |
if (instanceIndex >= 0) { | |
instance[0] = Channel.FINISH; | |
this.instances.splice(instanceIndex, 1); | |
} | |
return x; | |
} | |
} | |
} | |
export function chan<T>(): Chan<T> { | |
return new Channel(); | |
} | |
// Lock implementation. | |
class ZLock implements Lock { | |
private isLocked = false; | |
private lockChangeNotify: Resolvable = createResolvable(); | |
retainSync(): LockReleaseCb | undefined { | |
if (!this.isLocked) { | |
this.isLocked = true; | |
return once(() => { | |
this.lockChangeNotify.resolve(); | |
this.lockChangeNotify = createResolvable(); | |
this.isLocked = false; | |
}); | |
} | |
} | |
async retain(): Promise<LockReleaseCb> { | |
while (true) { | |
const release = this.retainSync(); | |
if (release) { | |
return release; | |
} | |
await this.lockChangeNotify; | |
} | |
} | |
} | |
export function lock(): Lock { | |
return new ZLock(); | |
} | |
} | |
/* tslint:enable:no-namespace */ | |
export const Go = Concurrency.go; | |
export const Chan = Concurrency.chan; | |
export const Lock = Concurrency.lock; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export function once<A extends any[], R>( | |
fn: (...args: A) => R | |
): (...args: A) => R { | |
let called = false; | |
return (...args): R => { | |
if (called) { | |
throw new Error('This function can only be called once.'); | |
} | |
called = true; | |
return fn(...args); | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment