Skip to content

Instantly share code, notes, and snippets.

@nodew
Created November 28, 2018 06:15
Show Gist options
  • Save nodew/81bcf9ca7e9bec1c122b56c4bf4a54a5 to your computer and use it in GitHub Desktop.
Save nodew/81bcf9ca7e9bec1c122b56c4bf4a54a5 to your computer and use it in GitHub Desktop.
observable pattern
function noop() { /* nothing */}
interface Unsubscribable {
unsubscribe(): void;
}
interface Subscription extends Unsubscribable {
unsubscribe(): void
}
export interface Subscribable<T> {
subscribe(observer?: Observer<T>): Subscription;
}
type SubscriberFunction<T> = (observer: Observer<T>) => (() => void) | Subscription;
interface Observer<T> {
start?: (subscription : Subscription) => void;
next(value: T);
error(err: any);
complete();
}
export interface UnaryFunction<T, R> { (source: T): R; }
export interface OperatorFunction<T, R> extends UnaryFunction<Observable<T>, Observable<R>> {}
export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): Subscription;
}
class Subscription {
_unsubscribe: () => void;
private _closed: boolean;
constructor(unsubscribe?: () => void) {
this._unsubscribe = unsubscribe;
this._closed = false;
}
get closed() {
return this._closed;
}
unsubscribe() {
this._closed = true;
this._unsubscribe();
}
}
class Subscriber<T> extends Subscription implements Observer<T> {
destination: Observer<T>;
isStopped: boolean;
constructor(observer: Observer<T>) {
super(() => {});
this.destination = observer;
this.isStopped = false;
};
unsubscribe() {
if (this.closed) {
return;
}
this.isStopped = true;
super.unsubscribe();
};
next(value: T) {
if (!this.isStopped) {
this._next(value);
}
}
protected _next(value: T) {
this.destination.next(value);
}
error(err: any) {
if (!this.isStopped) {
this.isStopped = true;
this._error(err);
this.unsubscribe();
}
}
protected _error(err) {
this.destination.error(err);
}
complete() {
if (!this.isStopped) {
this.isStopped = true;
this._complete();
this.unsubscribe();
}
}
protected _complete() {
this.destination.complete();
}
}
class Observable<T> implements Subscribable<T> {
_subscribe: SubscriberFunction<T>;
operator: Operator<any, T>;
source: any;
constructor(subscribe?: SubscriberFunction<T>) {
this._subscribe = subscribe;
};
static create<T>(subscribe: SubscriberFunction<T>) : Observable<T> {
return new Observable<T>(subscribe);
}
static of<T>(...items: T[]): Observable<T> {
return Observable.create((observer: Observer<T>) => {
for (const item of items) {
observer.next(item);
}
observer.complete();
return noop;
});
}
static from<T>(observer: Observable<T>): Observable<T> {
return Observable.create(nextObserver => {
return observer.subscribe(nextObserver);
});
}
subscribe(observer: Observer<T>) : Subscription {
const { operator } = this;
const subscriber = new Subscriber<T>(observer);
let subscription;
if (operator) {
subscription = operator.call(subscriber, this.source);
} else {
subscription = this._subscribe(subscriber);
}
return subscription;
}
map<R>(fn: (value: T) => R) : Observable<R> {
return new Observable<R>((observer: Observer<R>) => {
return this.subscribe({
next: (value: T) => {
observer.next(fn(value))
},
error: (err) => observer.error(err),
complete: () => observer.complete(),
});
})
}
lift<R>(operator: Operator<T, R>) : Observable<R> {
const observable = new Observable<R>();
observable.operator = operator;
observable.source = this;
return observable;
}
take(n): Observable<T> {
return take<T>(n)(this);
}
}
export const empty: Observer<any> = {
// closed: true,
next(value: any): void {},
error(err: any): void {},
complete(): void {}
};
export function take<T>(count: number): OperatorFunction<T, T> {
return (source: Observable<T>) => {
return source.lift(new TakeOperator(count));
};
}
class TakeOperator<T> implements Operator<T, T> {
constructor(private total: number) {
if (this.total < 1) {
throw new Error('count less than 1');
}
}
call(subscriber: Subscriber<T>, source: any): Subscription {
return source.subscribe(new TakeSubscriber(subscriber, this.total));
}
}
class TakeSubscriber<T> extends Subscriber<T> {
private count: number = 0;
constructor(destination: Subscriber<T>, private total: number) {
super(destination);
}
protected _next(value: T): void {
const total = this.total;
const count = ++this.count;
if (count <= total) {
this.destination.next(value);
if (count === total) {
this.destination.complete();
this.unsubscribe();
}
}
}
}
function fromEvent<T>(target: HTMLElement, eventName): Observable<T> {
return Observable.create((observer) => {
const handler = (e: T) => {
observer.next(e);
};
target.addEventListener(eventName, handler)
return new Subscription(() => {
target.removeEventListener(eventName, handler)
});
});
}
function interval(n): Observable<number> {
let count = 0;
return Observable.create((observer) => {
const tick = setInterval(() => {
count++;
observer.next(count);
}, n);
return new Subscription(() => {
clearInterval(tick);
});
});
}
Observable
.of(1, 2, 3, 4, 5)
.map(v => v * 2)
.take(3)
.map(v => `${v}`)
.subscribe({
next: (v) => {
console.log(v);
},
error: (err) => {
console.log(err);
},
complete: () => {
console.log('completed');
}
});
const subscription = interval(1000).subscribe({
next: (v) => {
console.log('tick', v);
},
error: (err) => {
console.log(err);
},
complete: () => {
console.log('completed');
}
});
setTimeout(() => {
subscription.unsubscribe();
}, 3500);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment