Last active
March 1, 2021 05:53
-
-
Save yelouafi/a79d74e16ee731b6ee76 to your computer and use it in GitHub Desktop.
Observables with pure FP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Observable is an Union Type, with the following variants | |
const Empty = () => ['EMPTY'] | |
const Cons = (head, tail) => ['CONS', head, tail] | |
const Future = promise => ['FUTURE', promise] | |
// race between 2 promises; each promise will resolve to a lazy value | |
const lazyRace = (p1, p2) => Promise.race([p1,p2]).then(lazy => lazy()) | |
// function composition | |
const compose = (...fns) => (arg) => fns.reduceRight((res, f) => f(res), arg) | |
// Pattern matching helper: W'll use it to match an Observable instance | |
const match = cases => ([type, ...args]) => { | |
const handler = cases[type] || cases._ | |
if(!handler) | |
throw 'Unkown case ' + type | |
return handler(...args) | |
} | |
// f: any → any | |
// map: f → Observable → Observable | |
const map = fn => | |
match({ | |
EMPTY : Empty, | |
CONS : (head, tail) => Cons( fn(head), map(fn)(tail) ), | |
FUTURE : (promise) => Future( promise.then(map(fn)) ) | |
}) | |
// p: any → Boolean | |
// filter: p → Observable → Observable | |
const filter = predicate => | |
match({ | |
EMPTY : Empty, | |
CONS : (head, tail) => ( | |
predicate(head) | |
? Cons(head, filter(predicate)(tail) ) | |
: filter(predicate)(tail) | |
), | |
FUTURE : (promise) => Future( promise.then(filter(predicate)) ) | |
}) | |
// first : Observable → Promise | |
const first = match({ | |
EMPTY : () => Promise.reject('Empty Observable'), | |
CONS : (head) => Promise.resolve(head), | |
FUTURE : (promise) => promise.then(first) | |
}) | |
// takeUntil: Promise → Observable → Observable | |
const takeUntil = untilP => | |
match({ | |
EMPTY : Empty, | |
CONS : (head, tail) => Cons(head, takeUntil(untilP)(tail)), | |
FUTURE : (promise) => Future( | |
lazyRace( | |
untilP.then(() => Empty), | |
promise.then(o => () => takeUntil(untilP)(o)) | |
) | |
) | |
}) | |
// skipUntil: Promise → Observable → Observable | |
const skipUntil = untilP => o => | |
match({ | |
EMPTY : Empty, | |
CONS : (_, tail) => skipUntil(untilP)(tail), | |
FUTURE : (promise) => Future( | |
lazyRace( | |
untilP.then(() => o), | |
promise.then(ro => () => skipUntil(untilP)(ro)) | |
) | |
) | |
})(o) | |
// concat: (Observable, Observable) → Observable | |
const concat = (o1, o2) => | |
match({ | |
EMPTY : () => o2, | |
CONS : (head, tail) => Cons(head, concat(tail, o2)), | |
FUTURE : (promise) => Future( promise.then(ro1 => concat(ro1, o2)) ) | |
})(o1) | |
// merge: (Observable, Observable) → Observable | |
const merge = (o1, o2) => | |
match({ | |
EMPTY : () => o2, | |
CONS : (head, tail) => Cons(head, merge(tail, o2)), | |
FUTURE : (promise1) => ( | |
match({ | |
FUTURE: promise2 => Future( | |
lazyRace( | |
promise1.then(ro1 => () => merge(ro1, o2)), | |
promise2.then(ro2 => () => merge(ro2, o1)) | |
) | |
), | |
_ : () => merge(o2, o1) | |
})(o2) | |
) | |
})(o1) | |
// relay: (Observable, Observable) → Observable | |
const relay = (o1, o2) => | |
match({ | |
EMPTY : () => o1, | |
_ : () => concat( takeUntil(first(o2))(o1), o2 ) | |
})(o2) | |
// f : (Observable, Observable) → Observable | |
// flattenBy: f → Observable<Observable> → Observable | |
const flattenBy = fn => | |
match({ | |
EMPTY : Empty, | |
CONS : (head, tail) => fn(head, flattenBy(fn)(tail)), | |
FUTURE : (promise) => Future(promise.then(flattenBy(fn))) | |
}) | |
// *: Observable<Observable> → Observable | |
const mergeAll = flattenBy(merge) | |
const concatAll = flattenBy(concat) | |
const relayall = flattenBy(relay) | |
// f: any → Observable | |
// *: f → Observable → Observable | |
const concatMap = fn => compose(flattenBy(concat), map(fn)) | |
const flatMap = fn => compose(flattenBy(merge), map(fn)) | |
const flatMapLatest = fn => compose(flattenBy(relay), map(fn)) | |
// onNext: any → () | |
// onDone: () → () | |
const forEach = (onNext, onDone) => | |
match({ | |
EMPTY : onDone, | |
CONS : (head, tail) => { | |
onNext(head) | |
forEach(onNext, onDone)(tail) | |
}, | |
FUTURE : (promise) => promise.then(forEach(onNext, onDone)) | |
}) | |
// log: String → Observable → () | |
const log = prefix => | |
forEach( | |
console.log.bind(console), | |
console.log.bind(console, 'done') | |
) | |
// fromArray: Array<a> → Observable<a> | |
const fromArray = arr => | |
(!arr.length) | |
? Empty() | |
: Cons(arr[0], fromArray(arr.slice(1))) | |
// sequence: (Array<a>, Number) → Observable<a> | |
const sequence = (arr, ms) => | |
(!arr.length) | |
? Empty() | |
: Future(new Promise(resolve => | |
setTimeout(() => resolve( | |
Cons(arr[0], sequence(arr.slice(1), ms)) | |
), ms) | |
)) | |
// range: (Number, Number, Number) → Observable<Number> | |
const range = (min, max, ms) => | |
(min > max) | |
? Empty() | |
: Future(new Promise(resolve => | |
setTimeout(() => resolve( | |
Cons(min, range(min + 1, max, ms)) | |
), ms) | |
)) | |
// nextEvent: (EventTarget, String) → Promise<anEvent> | |
const nextEvent = (elm, eventType) => { | |
return new Promise(resolve => { | |
const cb = e => { | |
resolve(e) | |
elm.removeEventListener(eventType, cb) | |
} | |
elm.addEventListener(eventType, cb) | |
}) | |
} | |
// fromEvent: (EventTarget, String) → Observable<anEvent> | |
const fromEvent = (elm, eventType) => | |
Future(nextEvent(elm, eventType) | |
.then(e => Cons(e, fromEvent(elm, eventType)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment