Last active
December 28, 2016 09:45
-
-
Save dead-claudia/da874c2e2cb8c20ed9d54e8914a29bd4 to your computer and use it in GitHub Desktop.
A prollyfill for a proposed observer-related augmentation to the async iterator proposal. It also features two sample event emitter adapters.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A simple adapter for DOM events. Listens for `name` events on `target`, | |
// with various `opts` (optional). | |
// | |
// `opts` accepts any `addEventListener` options like `capture`, etc., as well | |
// as the following: | |
// - `preventDefault` - Call `event.preventDefault()` | |
// - `stopImmediatePropagation` - Call `event.stopImmediatePropagation()` | |
// - `stopPropagation` - Call `event.stopPropagation()` | |
// - `buffered` - Whether to buffer each event received. | |
module.exports = (target, name, opts = undefined) => { | |
return new Observer(context => { | |
function callback(event) { | |
if (typeof opts !== "object" && opts !== null) { | |
if (opts.preventDefault) event.preventDefault() | |
if (opts.stopImmediatePropagation) event.stopImmediatePropagation() | |
if (opts.stopPropagation) event.stopPropagation() | |
} | |
context.next(event) | |
} | |
target.addEventListener(name, callback, opts) | |
return () => target.removeEventListener(name, callback, opts) | |
}, opts != null && !!opts.buffered) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A simple adapter for Node.js events. Listens for `name` events on `target`, | |
// optionally buffering them (if `buffered` is truthy). | |
module.exports = (target, name, buffered = false) => { | |
return new Observer(context => { | |
function callback(event) { context.next(event) } | |
target.on(name, callback) | |
return () => target.removeListener(name, callback) | |
}, buffered) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This polyfill is intentionally written in a very speccy way, so it should be | |
* easier to follow what I'm proposing. | |
* | |
* Notes: | |
* - This is designed to be subclassable. | |
* - This should extend `%AsyncIteratorPrototype%` in the actual spec. | |
* - The polyfill requires a global `Symbol.asyncIterator` polyfill. | |
* | |
* Here's a quick overview of the types/protocols this uses, using | |
* TypeScript-ish syntax: | |
* | |
* class Observer { | |
* // Buffering is optional and not the default. | |
* constructor( | |
* init: (context: ObserverContext) => ObserverCleanup?, | |
* buffer = false | |
* ); | |
* | |
* static from(iterable: Iterable | Array): Observer; | |
* static of(...items): Observer; | |
* | |
* [Symbol.asyncIterator](): this; | |
* closed: boolean; // `true` if the observer is closed | |
* poll(): boolean; // `true` if there's any value waiting | |
* next(): Promise<IteratorResult>; | |
* throw(error): Promise<never>; | |
* return(value): Promise<IteratorResult>; | |
* } | |
* | |
* type ObserverCleanup = () => void; | |
* interface ObserverContext { | |
* next(value): void; | |
* throw(error): void; | |
* return(value): void; | |
* } | |
*/ | |
"use strict" | |
// Two queues, to make it easier to manage enqueued values. Not optimal, but | |
// it's the easiest. | |
class UnitQueue { | |
constructor() { | |
this._value = undefined | |
} | |
empty() { | |
return this._value !== undefined | |
} | |
push(value) { | |
this._value = value | |
} | |
pull() { | |
const value = this._value | |
this._value = undefined | |
return value | |
} | |
} | |
class MultiQueue { | |
constructor() { | |
this._value = [] | |
} | |
empty() { | |
return !this._value.length | |
} | |
push(value) { | |
this._value.push(value) | |
} | |
pull() { | |
return this._value.shift() | |
} | |
} | |
// Abstract operations | |
function isObject(value) { | |
return value !== null && | |
(typeof value === "object" || typeof value === "function") | |
} | |
function observerPoll(observer) { | |
return observer._values !== undefined && !observer._values.empty() | |
} | |
function observerClosing(observer) { | |
return observer._context === undefined | |
} | |
function observerClosed(observer) { | |
return observerClosing(observer) && !observerPoll(observer) | |
} | |
function contextClosed(context) { | |
return context._observer === undefined | |
} | |
function initCleanup(observer, ref) { | |
if (observerClosing(observer)) { | |
return undefined | |
} | |
const cleanup = observer._cleanup | |
// Clean up the initial state and shut down the generation. | |
observer._cleanup = undefined | |
observer._context._observer = undefined | |
observer._context = undefined | |
if (cleanup === undefined) { | |
return undefined | |
} | |
try { | |
const result = cleanup() | |
if (!isObject(result)) { | |
return undefined | |
} | |
const then = result.then | |
if (typeof then !== "function") { | |
return undefined | |
} | |
return new Promise(resolve => { | |
then.call(value, | |
resolve, | |
e => { | |
ref.isThrow = true | |
ref.value = e | |
resolve() | |
}) | |
}) | |
} catch (e) { | |
ref.isThrow = true | |
ref.value = e | |
return undefined | |
} | |
} | |
function finishCleanup(observer) { | |
// Clean up everything else and shut down the resolution. | |
observer._values = undefined | |
observer._callbacks = undefined | |
} | |
function observerResolve(data, value, done) { | |
if (!isObject(value)) { | |
const resolve = data.resolve | |
resolve({value, done}) | |
return | |
} | |
try { | |
const then = value.then | |
if (then === null) { | |
resolve({value, done}) | |
return | |
} | |
then.call(value, | |
value => { | |
const resolve = data.resolve | |
resolve({value, done}) | |
}, | |
data.reject) | |
} catch (e) { | |
const reject = data.reject | |
reject(e) | |
} | |
} | |
function observerEmitComplete(observer, ref, data) { | |
finishCleanup(observer) | |
if (ref.isThrow) { | |
const reject = data.reject | |
reject(ref.value) | |
} else { | |
observerResolve(data, data.value, true) | |
} | |
} | |
function observerEmitNext(observer, value) { | |
const data = observer._callbacks.shift() | |
if (data === undefined) { | |
const data = { | |
value, | |
isThrow: false, | |
} | |
observer._values.push(data) | |
} else { | |
observerResolve(data, data.value, false) | |
} | |
} | |
function observerEmitAbrupt(observer, value, isThrow) { | |
const ref = {value, isThrow} | |
const data = observer._callbacks.shift() | |
if (data === undefined) { | |
observer._values.push(ref) | |
initCleanup(observer, ref) | |
} else { | |
const p = initCleanup(observer, ref) | |
if (p === undefined) { | |
observerEmitComplete(observer, ref, data) | |
} else { | |
p.then(() => { | |
observerEmitComplete(observer, ref, data) | |
}) | |
} | |
} | |
} | |
function observerResult(value, done) { | |
if (!isObject(value)) { | |
return Promise.resolve({value, done}) | |
} | |
try { | |
const then = value.then | |
if (then === null) { | |
return Promise.resolve({value, done}) | |
} | |
return new Promise((resolve, reject) => { | |
then.call(value, | |
value => resolve({value, done}), | |
reject) | |
}) | |
} catch (e) { | |
return Promise.reject(e) | |
} | |
} | |
class ObserverContext { | |
constructor(observer) { | |
this._observer = observer | |
} | |
get closed() { | |
return contextClosed(this) | |
} | |
next(value) { | |
if (contextClosed(this)) return | |
observerEmitNext(this._observer, value) | |
} | |
throw(error) { | |
if (contextClosed(this)) return | |
observerEmitAbrupt(this._observer, error, true) | |
} | |
return(value) { | |
if (contextClosed(this)) return | |
observerEmitAbrupt(this._observer, value, false) | |
} | |
} | |
module.exports = class Observer { | |
constructor(init, buffered) { | |
if (typeof init !== "function") { | |
throw new TypeError("`init` must be a function") | |
} | |
if (buffered) { | |
this._values = new MultiQueue() | |
} else { | |
this._values = new UnitQueue() | |
} | |
this._callbacks = [] | |
this._context = new ObserverContext(this) | |
this._cleanup = undefined | |
try { | |
const cleanup = init(this._context) | |
if (typeof cleanup === "function") this._cleanup = cleanup | |
} catch (e) { | |
observerEmitAbrupt(this, value, true) | |
} | |
} | |
static from(iterable) { | |
let context | |
const observer = new this(ctx => context = ctx, true) | |
if (typeof iterable[Symbol.iterator] === "function") { | |
for (const item of iterable) { | |
context.next(item) | |
} | |
} else { | |
for (let i = 0; i < iterable.length; i++) { | |
context.next(iterable[i]) | |
} | |
} | |
context.return() | |
return observer | |
} | |
static of(...items) { | |
let context | |
const observer = new this(ctx => context = ctx, true) | |
for (let i = 0; i < items.length; i++) { | |
context.next(items[i]) | |
} | |
context.return() | |
return observer | |
} | |
[Symbol.asyncIterator]() { | |
return this | |
} | |
poll() { | |
return observerPoll(this) | |
} | |
get closed() { | |
return observerClosed(this) | |
} | |
next() { | |
if (observerClosed(this)) { | |
return observerResult(undefined, true) | |
} | |
const value = this._values.pull() | |
if (value === undefined) { | |
return new Promise((resolve, reject) => { | |
this._callbacks.push({resolve, reject}) | |
}) | |
} | |
if (value.isThrow) { | |
finishCleanup(this) | |
return Promise.reject(value.value) | |
} | |
let done = false | |
if (observerClosed(this)) { | |
done = true | |
finishCleanup(this) | |
} | |
return observerResult(value.value, result) | |
} | |
return(value) { | |
const resolved = observerResult(value, true) | |
if (observerClosed(this)) { | |
return resolved | |
} | |
if (this._context === undefined) { | |
finishCleanup(this) | |
return resolved | |
} | |
const p = initCleanup(this) | |
if (p === undefined) { | |
finishCleanup(this) | |
return resolved | |
} | |
return p.then(() => { | |
// Resolve all outstanding callbacks as done, since this is | |
// finishing the iterator. | |
for (const data of this._callbacks) { | |
observerResolve(data, undefined, true) | |
} | |
finishCleanup(this) | |
return resolved | |
}) | |
} | |
throw(value) { | |
const rejection = Promise.reject(value) | |
if (observerClosed(this)) { | |
return rejection | |
} | |
if (observerClosing(this)) { | |
finishCleanup(this) | |
return rejection | |
} | |
const p = initCleanup(this) | |
if (p === undefined) { | |
finishCleanup(this) | |
return rejection | |
} | |
return p.then(() => { | |
// Reject all outstanding callbacks, since this is finishing the | |
// iterator. | |
for (const data of this._callbacks) { | |
const reject = data.reject | |
reject(rejection) | |
} | |
finishCleanup(this) | |
return rejection | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment