Skip to content

Instantly share code, notes, and snippets.

@dead-claudia
Last active December 28, 2016 09:45
Show Gist options
  • Save dead-claudia/da874c2e2cb8c20ed9d54e8914a29bd4 to your computer and use it in GitHub Desktop.
Save dead-claudia/da874c2e2cb8c20ed9d54e8914a29bd4 to your computer and use it in GitHub Desktop.
A prollyfill for a proposed observer-related augmentation to the async iterator proposal. It also features two sample event emitter adapters.
// A simple adapter for DOM events. Listens for `name` events on `target`,
// with various `opts` (optional).
//
// `opts` accepts any `addEventListener` options like `capture`, etc., as well
// as the following:
// - `preventDefault` - Call `event.preventDefault()`
// - `stopImmediatePropagation` - Call `event.stopImmediatePropagation()`
// - `stopPropagation` - Call `event.stopPropagation()`
// - `buffered` - Whether to buffer each event received.
module.exports = (target, name, opts = undefined) => {
return new Observer(context => {
function callback(event) {
if (typeof opts !== "object" && opts !== null) {
if (opts.preventDefault) event.preventDefault()
if (opts.stopImmediatePropagation) event.stopImmediatePropagation()
if (opts.stopPropagation) event.stopPropagation()
}
context.next(event)
}
target.addEventListener(name, callback, opts)
return () => target.removeEventListener(name, callback, opts)
}, opts != null && !!opts.buffered)
}
// A simple adapter for Node.js events. Listens for `name` events on `target`,
// optionally buffering them (if `buffered` is truthy).
module.exports = (target, name, buffered = false) => {
return new Observer(context => {
function callback(event) { context.next(event) }
target.on(name, callback)
return () => target.removeListener(name, callback)
}, buffered)
}
/**
* This polyfill is intentionally written in a very speccy way, so it should be
* easier to follow what I'm proposing.
*
* Notes:
* - This is designed to be subclassable.
* - This should extend `%AsyncIteratorPrototype%` in the actual spec.
* - The polyfill requires a global `Symbol.asyncIterator` polyfill.
*
* Here's a quick overview of the types/protocols this uses, using
* TypeScript-ish syntax:
*
* class Observer {
* // Buffering is optional and not the default.
* constructor(
* init: (context: ObserverContext) => ObserverCleanup?,
* buffer = false
* );
*
* static from(iterable: Iterable | Array): Observer;
* static of(...items): Observer;
*
* [Symbol.asyncIterator](): this;
* closed: boolean; // `true` if the observer is closed
* poll(): boolean; // `true` if there's any value waiting
* next(): Promise<IteratorResult>;
* throw(error): Promise<never>;
* return(value): Promise<IteratorResult>;
* }
*
* type ObserverCleanup = () => void;
* interface ObserverContext {
* next(value): void;
* throw(error): void;
* return(value): void;
* }
*/
"use strict"
// Two queues, to make it easier to manage enqueued values. Not optimal, but
// it's the easiest.
class UnitQueue {
constructor() {
this._value = undefined
}
empty() {
return this._value !== undefined
}
push(value) {
this._value = value
}
pull() {
const value = this._value
this._value = undefined
return value
}
}
class MultiQueue {
constructor() {
this._value = []
}
empty() {
return !this._value.length
}
push(value) {
this._value.push(value)
}
pull() {
return this._value.shift()
}
}
// Abstract operations
function isObject(value) {
return value !== null &&
(typeof value === "object" || typeof value === "function")
}
function observerPoll(observer) {
return observer._values !== undefined && !observer._values.empty()
}
function observerClosing(observer) {
return observer._context === undefined
}
function observerClosed(observer) {
return observerClosing(observer) && !observerPoll(observer)
}
function contextClosed(context) {
return context._observer === undefined
}
function initCleanup(observer, ref) {
if (observerClosing(observer)) {
return undefined
}
const cleanup = observer._cleanup
// Clean up the initial state and shut down the generation.
observer._cleanup = undefined
observer._context._observer = undefined
observer._context = undefined
if (cleanup === undefined) {
return undefined
}
try {
const result = cleanup()
if (!isObject(result)) {
return undefined
}
const then = result.then
if (typeof then !== "function") {
return undefined
}
return new Promise(resolve => {
then.call(value,
resolve,
e => {
ref.isThrow = true
ref.value = e
resolve()
})
})
} catch (e) {
ref.isThrow = true
ref.value = e
return undefined
}
}
function finishCleanup(observer) {
// Clean up everything else and shut down the resolution.
observer._values = undefined
observer._callbacks = undefined
}
function observerResolve(data, value, done) {
if (!isObject(value)) {
const resolve = data.resolve
resolve({value, done})
return
}
try {
const then = value.then
if (then === null) {
resolve({value, done})
return
}
then.call(value,
value => {
const resolve = data.resolve
resolve({value, done})
},
data.reject)
} catch (e) {
const reject = data.reject
reject(e)
}
}
function observerEmitComplete(observer, ref, data) {
finishCleanup(observer)
if (ref.isThrow) {
const reject = data.reject
reject(ref.value)
} else {
observerResolve(data, data.value, true)
}
}
function observerEmitNext(observer, value) {
const data = observer._callbacks.shift()
if (data === undefined) {
const data = {
value,
isThrow: false,
}
observer._values.push(data)
} else {
observerResolve(data, data.value, false)
}
}
function observerEmitAbrupt(observer, value, isThrow) {
const ref = {value, isThrow}
const data = observer._callbacks.shift()
if (data === undefined) {
observer._values.push(ref)
initCleanup(observer, ref)
} else {
const p = initCleanup(observer, ref)
if (p === undefined) {
observerEmitComplete(observer, ref, data)
} else {
p.then(() => {
observerEmitComplete(observer, ref, data)
})
}
}
}
function observerResult(value, done) {
if (!isObject(value)) {
return Promise.resolve({value, done})
}
try {
const then = value.then
if (then === null) {
return Promise.resolve({value, done})
}
return new Promise((resolve, reject) => {
then.call(value,
value => resolve({value, done}),
reject)
})
} catch (e) {
return Promise.reject(e)
}
}
class ObserverContext {
constructor(observer) {
this._observer = observer
}
get closed() {
return contextClosed(this)
}
next(value) {
if (contextClosed(this)) return
observerEmitNext(this._observer, value)
}
throw(error) {
if (contextClosed(this)) return
observerEmitAbrupt(this._observer, error, true)
}
return(value) {
if (contextClosed(this)) return
observerEmitAbrupt(this._observer, value, false)
}
}
module.exports = class Observer {
constructor(init, buffered) {
if (typeof init !== "function") {
throw new TypeError("`init` must be a function")
}
if (buffered) {
this._values = new MultiQueue()
} else {
this._values = new UnitQueue()
}
this._callbacks = []
this._context = new ObserverContext(this)
this._cleanup = undefined
try {
const cleanup = init(this._context)
if (typeof cleanup === "function") this._cleanup = cleanup
} catch (e) {
observerEmitAbrupt(this, value, true)
}
}
static from(iterable) {
let context
const observer = new this(ctx => context = ctx, true)
if (typeof iterable[Symbol.iterator] === "function") {
for (const item of iterable) {
context.next(item)
}
} else {
for (let i = 0; i < iterable.length; i++) {
context.next(iterable[i])
}
}
context.return()
return observer
}
static of(...items) {
let context
const observer = new this(ctx => context = ctx, true)
for (let i = 0; i < items.length; i++) {
context.next(items[i])
}
context.return()
return observer
}
[Symbol.asyncIterator]() {
return this
}
poll() {
return observerPoll(this)
}
get closed() {
return observerClosed(this)
}
next() {
if (observerClosed(this)) {
return observerResult(undefined, true)
}
const value = this._values.pull()
if (value === undefined) {
return new Promise((resolve, reject) => {
this._callbacks.push({resolve, reject})
})
}
if (value.isThrow) {
finishCleanup(this)
return Promise.reject(value.value)
}
let done = false
if (observerClosed(this)) {
done = true
finishCleanup(this)
}
return observerResult(value.value, result)
}
return(value) {
const resolved = observerResult(value, true)
if (observerClosed(this)) {
return resolved
}
if (this._context === undefined) {
finishCleanup(this)
return resolved
}
const p = initCleanup(this)
if (p === undefined) {
finishCleanup(this)
return resolved
}
return p.then(() => {
// Resolve all outstanding callbacks as done, since this is
// finishing the iterator.
for (const data of this._callbacks) {
observerResolve(data, undefined, true)
}
finishCleanup(this)
return resolved
})
}
throw(value) {
const rejection = Promise.reject(value)
if (observerClosed(this)) {
return rejection
}
if (observerClosing(this)) {
finishCleanup(this)
return rejection
}
const p = initCleanup(this)
if (p === undefined) {
finishCleanup(this)
return rejection
}
return p.then(() => {
// Reject all outstanding callbacks, since this is finishing the
// iterator.
for (const data of this._callbacks) {
const reject = data.reject
reject(rejection)
}
finishCleanup(this)
return rejection
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment