Skip to content

Instantly share code, notes, and snippets.

@freddi301
Last active October 18, 2019 10:25
Show Gist options
  • Save freddi301/4f02967482f23f7dca85caef64e557e4 to your computer and use it in GitHub Desktop.
Save freddi301/4f02967482f23f7dca85caef64e557e4 to your computer and use it in GitHub Desktop.
Channel based coroutines with await syntax and backpressure
/*
Channel based coroutines with await syntax and backpressure
`await send(item);` blocks execution until a corresponding `const item = await receive()` is called
`const item = await receive();` blocks execution until a corresponding `await send(item);` is called
the send function should be used by a single producer (for code readability)
the receive function should be used by a single consumer (for code readabilty)
see examples at bottom
a channel is single use (if multiple threads subscribe, every thread will get an item at round robin)
if multicast semantics are needed, this could be accomodated with an operator
backpressure is for free, is send and receive are awaited
stacktrace should be clear
channel is "synchroneus" because pulls wait for pushes and vice versa
preferred syntax is async functions with await on ever send or receive
*/
type Send<T> = (item: T) => Promise<void>;
type Receive<T> = () => Promise<T>;
type Channel<T> = {
send: Send<T>;
receive: Receive<T>;
};
function makeChannel<T>(): Channel<T> {
type Sender = { item: T; resolve(): void };
type Receiver = { resolve(item: T): void };
const senders: Array<Sender> = [];
const receivers: Array<Receiver> = [];
function send(item: T) {
const result = new Promise<void>(resolve =>
senders.push({ item, resolve })
);
pair();
return result;
}
function receive() {
const result = new Promise<T>(resolve => receivers.push({ resolve }));
pair();
return result;
}
function pair() {
if (senders.length && receivers.length) {
const sender = senders.shift() as Sender;
const receiver = receivers.shift() as Receiver;
receiver.resolve(sender.item);
sender.resolve();
}
}
return { send, receive };
}
function makeSimpleChannel<T>(factory: (send: Send<T>) => any) {
const channel = makeChannel<T>();
factory(channel.send);
return channel.receive;
}
function range(from: number = 0, to: number = Infinity, step: number = 1) {
const channel: Channel<number> = makeChannel();
async function produce() {
for (let i = from; i <= to; i += step) {
await channel.send(i);
}
}
produce();
return channel.receive;
}
function receiveAtInterval<T>(interval: number, receive: Receive<T>) {
const channel = makeChannel<T>();
async function produce() {
await channel.send(await receive());
setTimeout(produce, interval);
}
produce();
return channel.receive;
}
function forEach<T>(receive: Receive<T>, callback: (item: T) => any) {
async function produce() {
while (true) {
callback(await receive());
}
}
produce();
}
function map<A, B>(receive: Receive<A>, f: (a: A) => B): Receive<B> {
const channel = makeChannel<B>();
async function produce() {
while (true) {
await channel.send(f(await receive()));
}
}
produce();
return channel.receive;
}
function scan<T, M>(
receive: Receive<T>,
reducer: (m: M, item: T) => M,
initial: M
): Receive<M> {
const channel = makeChannel<M>();
let state = initial;
async function produce() {
await channel.send(initial);
while (true) {
await channel.send((state = reducer(state, await receive())));
}
}
produce();
return channel.receive;
}
function mailbox<T>(size: number, receive: Receive<T>) {
const channel = makeChannel<T>();
const items: T[] = [];
async function produce(): Promise<void> {
if (items.length <= size) {
items.push(await receive());
produce();
} else {
await channel.send(items.shift() as T);
produce();
}
}
produce();
return channel.receive;
}
function filter<T>(receive: Receive<T>, predicate: (item: T) => boolean) {
return makeSimpleChannel(async send => {
while (true) {
const item = await receive();
if (predicate(item)) {
send(item);
}
}
});
}
function toAsyncIterable<T>(receive: Receive<T>) {
return {
[Symbol.asyncIterator]: () => ({
next: async () => ({ done: false, value: await receive() })
})
};
}
forEach(map(receiveAtInterval(1000, range()), x => x * 2), console.log);
forEach(
filter(
receiveAtInterval(500, scan(mailbox(5, range()), (m, i) => m + i, 0)),
x => x % 2 === 0
),
console.log
);
const fib = () =>
makeSimpleChannel<number>(send => {
const fib = async (a: number, b: number) => {
const c = a + b;
await send(a);
fib(b, c);
};
fib(1, 1);
});
(async () => {
for await (const item of toAsyncIterable(fib())) {
await new Promise(resolve => setTimeout(resolve, 500));
console.log(item);
}
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment