Last active
January 19, 2021 06:03
-
-
Save kriskowal/abcba770b5dba526a1af801d692fd94e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function *count(n) { | |
for (let i = 0; i < n; i++) { | |
yield i; | |
} | |
} | |
const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); | |
const asyncForEach = async (values, callback) => { | |
for await (const value of values) { | |
await callback(value); | |
} | |
}; | |
const demoAsyncForEach = async () => { | |
console.log('demo serial async for each'); | |
await asyncForEach(count(10), async (n) => { | |
await delay(Math.random() * 100); | |
console.log(n); | |
}); | |
}; | |
const parallel = (limit, callback) => { | |
function *workers() { | |
for (const worker of count(limit)) { | |
yield callback(worker); | |
} | |
} | |
return Promise.all(workers()); | |
}; | |
const parallelForEach = async (limit, values, callback) => { | |
return parallel(limit, () => asyncForEach(values, callback)); | |
}; | |
const demoParallelForEach = async () => { | |
console.log('demo parallel async for each'); | |
await parallelForEach(5, count(20), async (n) => { | |
await delay(Math.random() * 100); | |
console.log(n); | |
}); | |
}; | |
const asyncReduce = async (zero, values, callback) => { | |
for await (const value of values) { | |
zero = await callback(zero, value); | |
} | |
return zero; | |
}; | |
const parallelReduce = async (limit, zero, values, callback) => { | |
values = await parallel(limit, () => asyncReduce(zero, values, callback)); | |
return asyncReduce(zero, values, callback); | |
}; | |
const demoParallelReduce = async () => { | |
console.log('demo parallel reduce'); | |
const sum = await parallelReduce(10, 0, count(10), async (a, b) => { | |
if (a == 0) { | |
return b; | |
} | |
await delay(Math.random() * 100); | |
console.log(a, '+', b, '=', a + b); | |
return a + b; | |
}); | |
console.log(sum); | |
}; | |
const defer = () => { | |
let resolve, reject; | |
const promise = new Promise((res, rej) => { | |
resolve = res; | |
reject = rej; | |
}); | |
return { promise, resolve, reject }; | |
}; | |
const queue = () => { | |
const ends = defer(); | |
return { | |
put(value) { | |
const next = defer(); | |
const promise = next.promise; | |
ends.resolve({ value, promise }); | |
ends.resolve = next.resolve; | |
}, | |
get() { | |
const promise = ends.promise.then(next => next.value); | |
ends.promise = ends.promise.then(next => next.promise); | |
return promise; | |
}, | |
}; | |
}; | |
const stream = (up, down) => ({ | |
next(value) { | |
up.put({ value, done: false }); | |
return down.get(); | |
}, | |
return(value) { | |
up.put({ value, done: true }); | |
return down.get(); | |
}, | |
throw(error) { | |
up.put(Promise.reject(error)); | |
return down.get(); | |
}, | |
[Symbol.asyncIterator]() { | |
return this; | |
}, | |
}); | |
const pipe = () => { | |
const syn = queue(); | |
const ack = queue(); | |
const input = stream(syn, ack); | |
const output = stream(ack, syn); | |
return [input, output]; | |
}; | |
const demoPipe = async () => { | |
console.log('demo pipe'); | |
const producer = async (output) => { | |
for (token of count(10)) { | |
console.log(token, '->'); | |
await output.next(token); | |
} | |
output.return(); | |
}; | |
const consumer = async (input) => { | |
for await (token of input) { | |
console.log('->', token); | |
await delay(Math.random() * 100); | |
} | |
}; | |
const [input, output] = pipe(); | |
await Promise.all([ | |
producer(output), | |
consumer(input), | |
]); | |
}; | |
// pump pulls from one stream and pushes to another. | |
// The pump slows down for output back-pressure. | |
const pump = async (output, input) => { | |
try { | |
let value, done; | |
while ({value, done} = await input.next()) { | |
if (done) { | |
return output.return(value); | |
} | |
await output.next(value); | |
} | |
} catch (error) { | |
return output.throw(error); | |
} | |
}; | |
const demoPump = async () => { | |
console.log('demo pump'); | |
async function *producer() { | |
for (token of count(10)) { | |
console.log(token, '->'); | |
await(yield token); | |
} | |
}; | |
const consumer = async (input) => { | |
for await (token of input) { | |
console.log('->', token); | |
await delay(Math.random() * 100); | |
} | |
}; | |
const [input, output] = pipe(); | |
await Promise.all([ | |
pump(output, producer()), | |
consumer(input), | |
]); | |
}; | |
async function *asyncFlatten(streams) { | |
for await (const stream of streams) { | |
for await (const value of stream) { | |
yield value; | |
} | |
} | |
} | |
const demoAsyncFlatten = async () => { | |
console.log('demo async flatten'); | |
for await (const value of asyncFlatten([count(3), count(3)])) { | |
console.log(value); | |
} | |
}; | |
async function *asyncMap(values, callback) { | |
for await (const value of values) { | |
await(yield await callback(value)); | |
} | |
} | |
const demoAsyncFlattenMap = async () => { | |
console.log('demo async flatten'); | |
const streams = asyncMap(count(3), () => count(3)); | |
for await (const value of asyncFlatten(streams)) { | |
console.log(value); | |
} | |
}; | |
const parallelMap = (limit, values, callback) => { | |
const [input, output] = pipe(); | |
parallel(limit, () => pump(input, asyncMap(values, callback))); | |
return output; | |
} | |
const demoParallelMap = async () => { | |
console.log('demo parallel map'); | |
const streams = parallelMap(7, count(3), async n => { | |
await delay(Math.random() * 100); | |
return asyncMap(count(5), async m => { | |
await delay(Math.random() * 100); | |
return (m+1) * 10 + n; | |
}) | |
}); | |
for await (const value of asyncFlatten(streams)) { | |
console.log(value); | |
} | |
}; | |
const delayWithContext = (context, ms) => { | |
const { promise, resolve, reject } = defer(); | |
let handle = setTimeout(resolve, ms); | |
context.cancelled.catch((error) => { | |
reject(error); | |
clearTimeout(handle); | |
}); | |
return promise; | |
}; | |
const never = defer().promise; | |
const background = Object.freeze({ | |
cancelled: never, | |
deadline: Infinity, | |
with(child) { | |
return Object.freeze({ | |
__proto__: this, | |
...child, | |
}); | |
}, | |
withTimeout(timeout) { | |
const deadline = Date.now() + timeout; | |
return this.withTimeoutAndDeadline(timeout, deadline); | |
}, | |
withDeadline(deadline) { | |
const timeout = deadline - Date.now(); | |
return this.withTimeoutAndDeadline(timeout, deadline); | |
}, | |
withTimeoutAndDeadline(timeout, deadline) { | |
if (deadline > this.deadline) { | |
return this; | |
} | |
const { cancel, context } = this.withCancel(); | |
delayWithContext(this, timeout).then(() => cancel(new Error(`Timed out`))); | |
return context.with({ deadline }); | |
}, | |
withCancel() { | |
const { promise, reject } = defer(); | |
const context = this.with({ cancelled: promise }); | |
this.cancelled.catch(reject); | |
return {cancel: reject, context}; | |
}, | |
}); | |
const streamWithContext = (context, stream) => ({ | |
next(value) { | |
return Promise.race([context.cancelled, stream.next(value)]); | |
}, | |
return(value) { | |
return Promise.race([context.cancelled, stream.return(value)]); | |
}, | |
throw(error) { | |
return Promise.race([context.cancelled, stream.throw(error)]); | |
}, | |
[Symbol.asyncIterator]() { | |
return this; | |
}, | |
}); | |
const demoStreamTimeout = async () => { | |
console.log('streaming with a timeout'); | |
const context = background.withTimeout(1000); | |
const stream = streamWithContext(context, count(1000)); | |
try { | |
await parallelForEach(10, stream, async (n) => { | |
await delayWithContext(context, Math.random() * 1000); | |
console.log(n); | |
}); | |
} catch (error) { | |
console.log(error.message); | |
} | |
}; | |
(async () => { | |
await demoAsyncForEach(); | |
await demoParallelForEach(); | |
await demoParallelReduce(); | |
await demoAsyncFlatten(); | |
await demoAsyncFlattenMap(); | |
await demoParallelMap(); | |
await demoPipe(); | |
await demoPump(); | |
await demoStreamTimeout(); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment