Skip to content

Instantly share code, notes, and snippets.

@jtmthf
Created October 20, 2021 02:59
Show Gist options
  • Save jtmthf/697c0bb2663989cab5d1f465e56ac5cc to your computer and use it in GitHub Desktop.
Save jtmthf/697c0bb2663989cab5d1f465e56ac5cc to your computer and use it in GitHub Desktop.
if (!Symbol.observable) {
Symbol.observable = Symbol('observable')
}
export class Observable<T> {
readonly #subscriber: SubscriberFunction<T>;
constructor(subscriber : SubscriberFunction<T>) {
this.#subscriber = subscriber;
}
subscribe(observer : Observer<T>) : Subscription {
let closed = false;
this.#subscriber({
next(value) {
if (!closed) {
observer.next?.(value)
}
},
error(errorValue) {
if (!closed) {
observer.error?.(errorValue);
closed = true;
}
},
complete() {
if (!closed) {
observer.complete?.();
closed = true;
}
},
get closed() {
return closed
}
});
return {
unsubscribe() {
closed = true;
},
get closed() {
return closed;
}
}
}
[Symbol.observable]() {
return this;
}
}
export class Subject<T> extends Observable<T> implements SubscriptionObserver<T> {
readonly #subscribers = new Set<SubscriptionObserver<T>>()
#closed = false;
constructor() {
super(observer => {
this.#subscribers.add(observer);
})
}
next(value: T) {
for (const subscriber of this.#subscribers) {
subscriber.next(value);
}
}
error(errorValue: unknown) {
for (const subscriber of this.#subscribers) {
subscriber.error(errorValue);
}
this.#closed = true;
}
complete() {
for (const subscriber of this.#subscribers) {
subscriber.complete();
}
this.#closed = true;
}
get closed() {
return this.#closed;
}
}
type SubscriberFunction<T> = (observer: SubscriptionObserver<T>) => void;
interface SubscriptionObserver<T> {
next(value: T): void;
error(errorValue: unknown): void;
complete(): void;
get closed() : Boolean;
}
interface Subscription {
unsubscribe() : void;
get closed() : boolean;
}
interface Observer<T> {
next?(value: T): void;
error?(errorValue: unknown): void;
complete?(): void;
}
declare global {
interface SymbolConstructor {
observable: symbol;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment