Last active
October 28, 2019 12:30
-
-
Save abiodun0/7bd759574724fe73dae2c7c3548fc8f1 to your computer and use it in GitHub Desktop.
Observable, Andre Stalltz
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
console.clear(); | |
/** | |
* A contrived data source to use in our "observable" | |
* NOTE: this will clearly never error | |
*/ | |
class DataSource { | |
constructor() { | |
let i = 0; | |
this._id = setInterval(() => this.emit(i++), 200); | |
} | |
emit(n) { | |
const limit = 10; | |
if (this.ondata) { | |
this.ondata(n); | |
} | |
if (n === limit) { | |
if (this.oncomplete) { | |
this.oncomplete(); | |
} | |
this.destroy(); | |
} | |
} | |
destroy() { | |
clearInterval(this._id); | |
} | |
} | |
/** | |
* our observable | |
*/ | |
function myObservable(observer) { | |
let datasource = new DataSource(); | |
datasource.ondata = (e) => observer.next(e); | |
datasource.onerror = (err) => observer.error(err); | |
datasource.oncomplete = () => observer.complete(); | |
return () => { | |
datasource.destroy(); | |
}; | |
} | |
/** | |
* now let's use it | |
*/ | |
const unsub = myObservable({ | |
next(x) { console.log(x); }, | |
error(err) { console.error(err); }, | |
complete() { console.log('done')} | |
}); | |
/** | |
* uncomment to try out unsubscription | |
*/ | |
setTimeout(unsub, 500); | |
// with safe Observable... | |
console.clear(); | |
/** | |
* A contrived data source to use in our "observable" | |
* NOTE: this will clearly never error | |
*/ | |
class DataSource { | |
constructor() { | |
let i = 0; | |
this._id = setInterval(() => this.emit(i++), 200); | |
} | |
emit(n) { | |
const limit = 10; | |
if (this.ondata) { | |
this.ondata(n); | |
} | |
if (n === limit) { | |
if (this.oncomplete) { | |
this.oncomplete(); | |
} | |
this.destroy(); | |
} | |
} | |
destroy() { | |
clearInterval(this._id); | |
} | |
} | |
/** | |
* Safe Observer | |
*/ | |
class SafeObserver { | |
constructor(destination) { | |
this.destination = destination; | |
} | |
next(value) { | |
// only try to next if you're subscribed have a handler | |
if (!this.isUnsubscribed && this.destination.next) { | |
try { | |
this.destination.next(value); | |
} catch (err) { | |
// if the provided handler errors, teardown resources, then throw | |
this.unsubscribe(); | |
throw err; | |
} | |
} | |
} | |
error(err) { | |
// only try to emit error if you're subscribed and have a handler | |
if (!this.isUnsubscribed && this.destination.error) { | |
try { | |
this.destination.error(err); | |
} catch (e2) { | |
// if the provided handler errors, teardown resources, then throw | |
this.unsubscribe(); | |
throw e2; | |
} | |
this.unsubscribe(); | |
} | |
} | |
complete() { | |
// only try to emit completion if you're subscribed and have a handler | |
if (!this.isUnsubscribed && this.destination.complete) { | |
try { | |
this.destination.complete(); | |
} catch (err) { | |
// if the provided handler errors, teardown resources, then throw | |
this.unsubscribe(); | |
throw err; | |
} | |
this.unsubscribe(); | |
} | |
} | |
unsubscribe() { | |
this.isUnsubscribed = true; | |
if (this.unsub) { | |
this.unsub(); | |
} | |
} | |
} | |
class Observable { | |
constructor(_subscribe) { | |
this._subscribe = _subscribe; | |
} | |
subscribe(observer) { | |
const safeObserver = new SafeObserver(observer); | |
this._subscribe(safeObserver); | |
return safeObserver.unsubscribe.bind(safeObserver); | |
} | |
} | |
/** | |
* our observable | |
*/ | |
const myObservable = new Observable((observer) => { | |
const safeObserver = new SafeObserver(observer); | |
const datasource = new DataSource(); | |
datasource.ondata = (e) => safeObserver.next(e); | |
datasource.onerror = (err) => safeObserver.error(err); | |
datasource.oncomplete = () => safeObserver.complete(); | |
safeObserver.unsub = () => { | |
datasource.destroy(); | |
}; | |
return safeObserver.unsubscribe.bind(safeObserver); | |
}) | |
/** | |
* now let's use it | |
*/ | |
const unsub = myObservable.subscribe({ | |
next(x) { console.log(x); }, | |
error(err) { console.error(err); }, | |
complete() { console.log('done')} | |
}); | |
// contrieved example with a map operator | |
/** | |
* map operator | |
*/ | |
function map(source, project) { | |
return new Observable((observer) => { | |
const mapObserver = { | |
next: (x) => observer.next(project(x)), | |
error: (err) => observer.error(err), | |
complete: () => observer.complete() | |
}; | |
return source.subscribe(mapObserver); | |
}); | |
} | |
const unsubMap = map(myObservable, (x) => x + x).subscribe({ | |
next(x) { console.log(x); }, | |
error(err) { console.error(err); }, | |
complete() { console.log('done')} | |
}); | |
/** | |
* uncomment to try out unsubscription | |
*/ | |
// setTimeout(unsub, 500); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Naive but similaly apporached functional observables | |
const observable = type => target => observe => { | |
target.addEventListener(type, observe, true); | |
return f => f(observe, target, type); | |
}; | |
const unsubscribe = (observe, target, type) => | |
target.removeEventListener(type, observe, true); | |
const complete = (observe, target, type) => {}; | |
const error = (observe, target, type) => {}; | |
const subscribe = observable => observe => observable(observe); | |
const map = f => observable => observe => { | |
return subscribe(observable) (x => observe(f(x))); | |
} | |
const comp = f => g => x => f(g(x)); | |
const log = prefix => x => console.log(prefix, x); | |
const observableClick = observable("click") (document); | |
const clickObserver = map(e => e.screenX + e.screenY) | |
(observableClick) (log("clickObserver #1")); | |
const clickObserver2 = comp( | |
map(x => x+ "!")) (map(x => x.screenX + x.screenY) | |
) (observableClick) (log("clickObserver #2")); | |
clickObserver(unsubscribe); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Best implementation so far | |
// const arr = [10, 20, 40, 50, 60, 70] | |
// console.log('before'); | |
// arr.forEach((x) => console.log(x)); | |
// console.log('end'); | |
// first iteration thinking process. just three call backs. | |
// function nextCallBack(data) { | |
// console.log(data); | |
// } | |
// function errorCallback(err) { | |
// console.error(err); | |
// } | |
// function completeCallback() { | |
// console.log('done'); | |
// } | |
// function giveMeSomeData(obj) { | |
// [10, 20, 40, 50, 60, 70].forEach(obj.next); | |
// } | |
// giveMeSomeData( | |
// nextCallBack, | |
// errorCallback, | |
// completeCallback | |
// ) | |
// const arr = [10, 20, 40, 50, 60, 70] | |
// console.log('before'); | |
// arr.forEach((x) => console.log(x)); | |
// console.log('end'); | |
// Second iteration Turning it to an object | |
// let observer = { | |
// next: function nextCallBack(data) { | |
// console.log(data); | |
// }, | |
// error: function errorCallback(err) { | |
// console.error(err); | |
// }, | |
// complete: function completeCallback() { | |
// console.log('done'); | |
// } | |
// } | |
// function subscribe(obj) { | |
// [10, 20, 40, 50, 60, 70].forEach(obj.next); | |
// obj.complete(); | |
// } | |
// Second iteration with the subscribe | |
// subscribe(observer); | |
// Third iteration turning it to an observable | |
const observer = { | |
next: function nextCallBack(data) { | |
console.log(data); | |
}, | |
error: function errorCallback(err) { | |
console.error(err); | |
}, | |
complete: function completeCallback() { | |
console.log('done'); | |
} | |
} | |
function debounce(period) { | |
const inputObservable = this; | |
const outPutObservable = createObservable(function subscribe(outputObserver){ | |
inputObservable.subscribe({ | |
next: function(x) { | |
setTimeout(() => outputObserver.next(x), period) | |
}, | |
error: outputObserver.error, | |
complete: outputObserver.complete | |
}) | |
}); | |
return outPutObservable; | |
} | |
function map(transformFn) { | |
const inputObservable = this; | |
const outPutObservable = createObservable(function subscribe(outputObserver){ | |
inputObservable.subscribe({ | |
next: function(x) { | |
const y = transformFn(x); | |
outputObserver.next(y); | |
}, | |
error: outputObserver.error, | |
complete: outputObserver.complete | |
}) | |
}); | |
return outPutObservable; | |
} | |
function filter(transformFn) { | |
const inputObservable = this; | |
const outPutObservable = createObservable(function subscribe(outputObserver){ | |
inputObservable.subscribe({ | |
next: function(x) { | |
if(transformFn(x)) outputObserver.next(x); | |
}, | |
error: outputObserver.error, | |
complete: outputObserver.complete | |
}) | |
}); | |
return outPutObservable; | |
} | |
function createObservable(subscribe) { | |
return { | |
subscribe: subscribe, | |
map: map, | |
filter: filter, | |
debounce: debounce | |
} | |
} | |
const arrayObservable = createObservable(function subscribe(obj) { | |
[10, 20, 40, 50, 60, 70].forEach(obj.next); | |
obj.complete()}); | |
const clickObservable = createObservable(function subscribe(obj){ | |
document.addEventListener('click', obj.next); | |
}); | |
// Second iteration with the subscribe | |
arrayObservable.map(x => x/10).filter(x => x % 2 === 0 ).subscribe(observer); | |
clickObservable.map(e => e.clientX).debounce(200).subscribe(observer); | |
// Key take aways.. This is just higher order functions that keeps decorating the subscribe Key | |
// Using plain objects is a little bit kind of coninceited though than using pure functions. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment