Skip to content

Instantly share code, notes, and snippets.

@shovon
Last active September 12, 2020 06:59
Show Gist options
  • Save shovon/77b0cf9da0fa21edd3b1cf67449ec24f to your computer and use it in GitHub Desktop.
Save shovon/77b0cf9da0fa21edd3b1cf67449ec24f to your computer and use it in GitHub Desktop.
My attempt at implementing the Observable proposal, as proposed by the TC39

Attempt at Implementing TC39's Observable Proposal

This was my attempt at implementing the TC39's observable proposal.

Unfortunately, way too many tests fail.

If you want, you can give it a try to have all the tests pass.

Running the Tests

First, install the dependencies:

npm i

Then, run the tests:

npx ts-node index

Note

I deleted the .gitignore file for this gist, because otherwise, the gist's highlight is no longer the README file, for some reason.

import Observable from "./observable";
require("es-observable-tests").runTests(Observable);
type Subscription = {
unsubscribe(): void;
readonly closed: boolean;
};
type SubscriptionObserver<T> = {
next(value: T);
error(errorValue: any);
complete(value?: T);
readonly closed: boolean;
};
type Observer<T> = {
start?(subscription: Subscription);
next?(value: T);
error?(errorValue: any);
complete?(value?: T);
};
type SubscriberFunctionOutput = (() => void) | Subscription;
type SubscriberFunction<T> = (
observer: SubscriptionObserver<T>
) => SubscriberFunctionOutput;
export default class Observable<T> {
constructor(private subscriber: SubscriberFunction<T>) {
if (!subscriber || typeof subscriber !== "function") {
throw new TypeError("Must provide a subscriber");
}
}
subscribe(
onNext: Observer<T> | ((value: T) => void),
onError: (error: any) => void = () => {},
onComplete: (x?: T) => void = () => {}
): Subscription {
function getObserver(): Observer<T> {
if (typeof onNext === "function") {
return {
next(value: T) {
onNext(value);
},
error(errorValue: any) {
onError(errorValue);
},
complete(x: T) {
onComplete(x);
},
};
}
if (!onNext || typeof onNext !== "object") {
throw new TypeError("Observer needs to be an object");
}
return onNext;
}
const observer = getObserver();
let closed = false;
const subscription = this.subscriber({
next(value: T) {
if (observer.next) {
observer.next(value);
}
},
error(errorValue: any) {
closed = true;
if (observer.error) {
observer.error(errorValue);
} else {
throw errorValue;
}
setTimeout(() => {
if (typeof subscription === "function") {
subscription();
} else if (typeof subscription.unsubscribe === "function") {
subscription.unsubscribe();
}
});
},
complete(x: T) {
closed = true;
if (observer.complete) {
observer.complete(x);
}
setTimeout(() => {
if (typeof subscription === "function") {
subscription();
} else {
subscription.unsubscribe();
}
});
},
get closed() {
return closed;
},
});
if (typeof subscription === "function") {
return {
unsubscribe() {
subscription();
},
get closed() {
return closed;
},
};
}
return subscription;
}
["@@observable"](): Observable<T> {
return this;
}
static of<T>(...items): Observable<T> {
return new Observable((observer) => {
for (const item of items) {
observer.next(item);
}
observer.complete();
return () => {};
});
}
static from<T>(observable: Iterable<T> | Observable<T>) {
if ("subscribe" in observable) {
return observable;
}
return Observable.of(...observable);
}
}
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"@types/node": {
"version": "14.10.0",
"resolved": "https://registry.npmjs.org/@types/node/-/node-14.10.0.tgz",
"integrity": "sha512-SOIyrdADB4cq6eY1F+9iU48iIomFAPltu11LCvA9PKcyEwHadjCFzNVPotAR+oEJA0bCP4Xvvgy+vwu1ZjVh8g=="
},
"es-observable-tests": {
"version": "0.3.0",
"resolved": "https://registry.npmjs.org/es-observable-tests/-/es-observable-tests-0.3.0.tgz",
"integrity": "sha1-NJKFB2dlQFyJTjZ/CX/BiVrq2yA="
}
}
}
{
"dependencies": {
"@types/node": "^14.10.0",
"es-observable-tests": "^0.3.0"
}
}
{
"compilerOptions": {
"target": "ES2020"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment