Skip to content

Instantly share code, notes, and snippets.

@abiodun0
Last active October 28, 2019 12:30
Show Gist options
  • Save abiodun0/7bd759574724fe73dae2c7c3548fc8f1 to your computer and use it in GitHub Desktop.
Save abiodun0/7bd759574724fe73dae2c7c3548fc8f1 to your computer and use it in GitHub Desktop.
Observable, Andre Stalltz
console.clear();
/**
* A contrived data source to use in our "observable"
* NOTE: this will clearly never error
*/
class DataSource {
constructor() {
let i = 0;
this._id = setInterval(() => this.emit(i++), 200);
}
emit(n) {
const limit = 10;
if (this.ondata) {
this.ondata(n);
}
if (n === limit) {
if (this.oncomplete) {
this.oncomplete();
}
this.destroy();
}
}
destroy() {
clearInterval(this._id);
}
}
/**
* our observable
*/
function myObservable(observer) {
let datasource = new DataSource();
datasource.ondata = (e) => observer.next(e);
datasource.onerror = (err) => observer.error(err);
datasource.oncomplete = () => observer.complete();
return () => {
datasource.destroy();
};
}
/**
* now let's use it
*/
const unsub = myObservable({
next(x) { console.log(x); },
error(err) { console.error(err); },
complete() { console.log('done')}
});
/**
* uncomment to try out unsubscription
*/
setTimeout(unsub, 500);
// with safe Observable...
console.clear();
/**
* A contrived data source to use in our "observable"
* NOTE: this will clearly never error
*/
class DataSource {
constructor() {
let i = 0;
this._id = setInterval(() => this.emit(i++), 200);
}
emit(n) {
const limit = 10;
if (this.ondata) {
this.ondata(n);
}
if (n === limit) {
if (this.oncomplete) {
this.oncomplete();
}
this.destroy();
}
}
destroy() {
clearInterval(this._id);
}
}
/**
* Safe Observer
*/
class SafeObserver {
constructor(destination) {
this.destination = destination;
}
next(value) {
// only try to next if you're subscribed have a handler
if (!this.isUnsubscribed && this.destination.next) {
try {
this.destination.next(value);
} catch (err) {
// if the provided handler errors, teardown resources, then throw
this.unsubscribe();
throw err;
}
}
}
error(err) {
// only try to emit error if you're subscribed and have a handler
if (!this.isUnsubscribed && this.destination.error) {
try {
this.destination.error(err);
} catch (e2) {
// if the provided handler errors, teardown resources, then throw
this.unsubscribe();
throw e2;
}
this.unsubscribe();
}
}
complete() {
// only try to emit completion if you're subscribed and have a handler
if (!this.isUnsubscribed && this.destination.complete) {
try {
this.destination.complete();
} catch (err) {
// if the provided handler errors, teardown resources, then throw
this.unsubscribe();
throw err;
}
this.unsubscribe();
}
}
unsubscribe() {
this.isUnsubscribed = true;
if (this.unsub) {
this.unsub();
}
}
}
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const safeObserver = new SafeObserver(observer);
this._subscribe(safeObserver);
return safeObserver.unsubscribe.bind(safeObserver);
}
}
/**
* our observable
*/
const myObservable = new Observable((observer) => {
const safeObserver = new SafeObserver(observer);
const datasource = new DataSource();
datasource.ondata = (e) => safeObserver.next(e);
datasource.onerror = (err) => safeObserver.error(err);
datasource.oncomplete = () => safeObserver.complete();
safeObserver.unsub = () => {
datasource.destroy();
};
return safeObserver.unsubscribe.bind(safeObserver);
})
/**
* now let's use it
*/
const unsub = myObservable.subscribe({
next(x) { console.log(x); },
error(err) { console.error(err); },
complete() { console.log('done')}
});
// contrieved example with a map operator
/**
* map operator
*/
function map(source, project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return source.subscribe(mapObserver);
});
}
const unsubMap = map(myObservable, (x) => x + x).subscribe({
next(x) { console.log(x); },
error(err) { console.error(err); },
complete() { console.log('done')}
});
/**
* uncomment to try out unsubscription
*/
// setTimeout(unsub, 500);
// Naive but similaly apporached functional observables
const observable = type => target => observe => {
target.addEventListener(type, observe, true);
return f => f(observe, target, type);
};
const unsubscribe = (observe, target, type) =>
target.removeEventListener(type, observe, true);
const complete = (observe, target, type) => {};
const error = (observe, target, type) => {};
const subscribe = observable => observe => observable(observe);
const map = f => observable => observe => {
return subscribe(observable) (x => observe(f(x)));
}
const comp = f => g => x => f(g(x));
const log = prefix => x => console.log(prefix, x);
const observableClick = observable("click") (document);
const clickObserver = map(e => e.screenX + e.screenY)
(observableClick) (log("clickObserver #1"));
const clickObserver2 = comp(
map(x => x+ "!")) (map(x => x.screenX + x.screenY)
) (observableClick) (log("clickObserver #2"));
clickObserver(unsubscribe);
// Best implementation so far
// const arr = [10, 20, 40, 50, 60, 70]
// console.log('before');
// arr.forEach((x) => console.log(x));
// console.log('end');
// first iteration thinking process. just three call backs.
// function nextCallBack(data) {
// console.log(data);
// }
// function errorCallback(err) {
// console.error(err);
// }
// function completeCallback() {
// console.log('done');
// }
// function giveMeSomeData(obj) {
// [10, 20, 40, 50, 60, 70].forEach(obj.next);
// }
// giveMeSomeData(
// nextCallBack,
// errorCallback,
// completeCallback
// )
// const arr = [10, 20, 40, 50, 60, 70]
// console.log('before');
// arr.forEach((x) => console.log(x));
// console.log('end');
// Second iteration Turning it to an object
// let observer = {
// next: function nextCallBack(data) {
// console.log(data);
// },
// error: function errorCallback(err) {
// console.error(err);
// },
// complete: function completeCallback() {
// console.log('done');
// }
// }
// function subscribe(obj) {
// [10, 20, 40, 50, 60, 70].forEach(obj.next);
// obj.complete();
// }
// Second iteration with the subscribe
// subscribe(observer);
// Third iteration turning it to an observable
const observer = {
next: function nextCallBack(data) {
console.log(data);
},
error: function errorCallback(err) {
console.error(err);
},
complete: function completeCallback() {
console.log('done');
}
}
function debounce(period) {
const inputObservable = this;
const outPutObservable = createObservable(function subscribe(outputObserver){
inputObservable.subscribe({
next: function(x) {
setTimeout(() => outputObserver.next(x), period)
},
error: outputObserver.error,
complete: outputObserver.complete
})
});
return outPutObservable;
}
function map(transformFn) {
const inputObservable = this;
const outPutObservable = createObservable(function subscribe(outputObserver){
inputObservable.subscribe({
next: function(x) {
const y = transformFn(x);
outputObserver.next(y);
},
error: outputObserver.error,
complete: outputObserver.complete
})
});
return outPutObservable;
}
function filter(transformFn) {
const inputObservable = this;
const outPutObservable = createObservable(function subscribe(outputObserver){
inputObservable.subscribe({
next: function(x) {
if(transformFn(x)) outputObserver.next(x);
},
error: outputObserver.error,
complete: outputObserver.complete
})
});
return outPutObservable;
}
function createObservable(subscribe) {
return {
subscribe: subscribe,
map: map,
filter: filter,
debounce: debounce
}
}
const arrayObservable = createObservable(function subscribe(obj) {
[10, 20, 40, 50, 60, 70].forEach(obj.next);
obj.complete()});
const clickObservable = createObservable(function subscribe(obj){
document.addEventListener('click', obj.next);
});
// Second iteration with the subscribe
arrayObservable.map(x => x/10).filter(x => x % 2 === 0 ).subscribe(observer);
clickObservable.map(e => e.clientX).debounce(200).subscribe(observer);
// Key take aways.. This is just higher order functions that keeps decorating the subscribe Key
// Using plain objects is a little bit kind of coninceited though than using pure functions.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment