Skip to content

Instantly share code, notes, and snippets.

@thejohnfreeman
Created January 8, 2019 05:12
Show Gist options
  • Save thejohnfreeman/cd1549ce3125483f2455a52c5da3c72e to your computer and use it in GitHub Desktop.
Save thejohnfreeman/cd1549ce3125483f2455a52c5da3c72e to your computer and use it in GitHub Desktop.
import {
from,
Observable,
ObservableInput,
Observer,
Subscriber,
Subscription,
} from 'rxjs'
import { take } from 'rxjs/operators'
export function spawn(f: () => Iterator<any>) {
return async(f)()
}
export function async<T = any>(
f: (...args: any[]) => Iterator<T>,
): (...args: any[]) => Observable<T> {
// Return a function that constructs an Observable.
return function(...args) {
return Observable.create(observer => {
const iterator = f.apply(this, args)
return new IteratorSubscriber(observer, iterator)
})
}
}
class IteratorSubscriber<T> extends Subscriber<T> {
private subscription: Subscription | null
public constructor(
observer: Observer<T>,
private iterator: Iterator<any> | null,
) {
super(observer)
this.step('next')
}
private step(key: string, arg?): void {
// Unsubscribe now.
if (this.subscription) {
this.subscription.unsubscribe()
this.subscription = null
}
let result
try {
result = this.iterator![key](arg)
} catch (error) {
this.error(error)
return
}
const { value, done } = result
if (done) {
this.next(value)
this.complete()
return
}
// We're going to create an observable from the value and `subscribe` to
// it. It is possible that our subscription's `next` method is called
// before `subscribe` returns the subscription. This happens for
// "synchronous" observables, e.g. arrays. In such cases, the `next`
// method will have already saved a subscription for an observable later
// in the generator's execution, and we don't want to override it with
// our subscription. Thus, we save our subscription to a temporary
// variable and copy it to `this.subscription` only after checking that
// it does not already hold another subscription.
//
// The observables proposal adds a `start` method to the `Observer` type
// that accepts the `Subscription` before `next`, `complete`, or `error`
// are ever called. That will solve our problem, but no implementation is
// yet available.
//
// Being able to cancel the subscription before it completes will let us
// stop counting values and lose the `take` operator. It will let us
// assume in our completion callback that no values have been sent.
let count = 0
const subscription = from(value)
.pipe(take(1))
.subscribe(
value => {
count += 1
if (count > 1) {
// This means `take` failed to cancel our subscription before
// a second value was delivered.
console.error('awaited observable returned too many values')
}
// Recurse.
this.step('next', value)
},
error => this.step('throw', error),
() => {
if (count < 1) {
// This stops progress. How should we handle it?
console.error('awaited observable completed with no value')
}
},
)
if (!this.subscription) {
this.subscription = subscription
}
}
public unsubscribe() {
this.iterator = null
if (this.subscription) {
this.subscription.unsubscribe()
this.subscription = null
}
super.unsubscribe()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment