Last active
November 3, 2017 13:19
-
-
Save freddi301/463710333d065ae26a9917fc76c24d46 to your computer and use it in GitHub Desktop.
Observables (alternative)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// alternative observables, knowledge about Observables implemetation is required (ex: RxJs) | |
// this is fully type-annoteable (typescript, flowtype) | |
// An observer is a function that takes an item from the stream | |
// and must return a function that will be called with the next item in the stream | |
// this way stateful observers can mantain state in clojures and every instance of them if replayable. | |
// observers should not have side effects | |
// type Observer<T> = (item: T) => Observer<T> | |
// example: | |
// log :: number => Observer<T> | |
// const logCount = step => item => { | |
// console.log(step, item); | |
// return logCount(step + 1); | |
// }; | |
// An observable is a function that takes an observer | |
// and calls it, retaining state when necessary | |
// type Observable<T> = (observer: Observer<T>) => Observer<T> | |
// example: | |
// from1to3 :: Observable<number> | |
const from1to3 = observer => observer(1)(2)(3); | |
// fromArray :: Array<T> => Observable<T> | |
const fromArray = array => observer => | |
array.reduce((memo, item) => memo(item), observer); | |
const mockObserver = expected => item => { | |
expect(item).toEqual(expected[0]); | |
return mockObserver(expected.slice(1)); | |
}; | |
// subscribing to an observable is a simple function call | |
// fromArray([1,2,3])(logCount(0)) | |
// a very useful operator over observables is map | |
// that takes an observable A, an function F | |
// and return a new observable B whose items are the return of F | |
// map :: (A => B) => Observable<A> => Observable<B> | |
const map = fun => observable => observer => { | |
const callout = observer => item => callout(observer(fun(item))); | |
return observable(callout(observer)); | |
}; | |
//map(x=>x*100)(fromArray([1,2,3]))(logCount(0)) | |
//map(x=>x/2)(map(x=>x*100)(fromArray([1,2,3])))(logCount(0)) | |
// works well with composition | |
const pipe = (...args) => arg => args.reduce((memo, item) => item(memo), arg); | |
// pipe(map(x=>x*100), map(x=>x/2))(fromArray([1,2,3]))(logCount(0)) | |
// filter :: (A => boolean) => Observable<A> => Observable<A> | |
const filter = predicate => observable => observer => { | |
const callout = observer => item => | |
predicate(item) ? callout(observer(item)) : callout(observer); | |
return observable(callout(observer)); | |
}; | |
// const isEven = n => n % 2 === 0; | |
// pipe(filter(isEven), map(x=>x*2))(fromArray([1,2,3,4]))(logCount(0)) | |
// flatMap :: (A => Observable<B>) => Observable<A> => Observable<B> | |
const flatMap = fun => observable => observer => { | |
const callout = observer => item => callout(fun(item)(observer)); | |
return observable(callout(observer)); | |
}; | |
// flatMap(n => fromArray([n, n]))(fromArray([1,2,3]))(logCount(0)) | |
// scan :: (M => A => M) => M => Observable<A> => Observable<M> | |
// the reducer cannot maintain internal state by returning next action, | |
// instead it receives previous result as first argument and the actual item as second | |
const scan = reducer => memo => observable => observer => { | |
const callout = observer => memo => item => { | |
const next = reducer(memo)(item); | |
return callout(observer(next))(next); | |
}; | |
return observable(callout(observer)(memo)); | |
}; | |
const sum = m => x => m + x; | |
// scan(sum)(0)(fromArray([10,20,30]))(logCount(0)) | |
// memoize :: Observable<T> => Observable<T> | |
const memoize = observable => { | |
const cache = []; | |
const callout = item => { | |
cache.push(item); | |
return callout; | |
}; | |
observable(callout); | |
return observer => cache.reduce((memo, item) => memo(item), observer); | |
}; | |
// const rangeAndLog = observer => { | |
// ["a", "b", "c"].reduce((memo, item) => { | |
// console.log(item); | |
// return memo(item); | |
// }, observer); | |
// }; | |
//const memoized = memoize(rangeAndLog); | |
// map(x=>x+" - cold")(rangeAndLog)(logCount(0)); | |
// map(x=>x+" - memoized")(memoized)(logCount(0)); | |
// fluent is a decorator that enables fluent usage of the api | |
const fluent = observable => { | |
const decorate = operator => arg => fluent(operator(arg)(observable)); | |
return { | |
observable, | |
map: decorate(map), | |
filter: decorate(filter), | |
flatMap: decorate(flatMap), | |
scan: r => m => fluent(scan(r)(m)(observable)), | |
memoize: decorate(memoize) | |
}; | |
}; | |
// fluent(fromArray([1,2,3,4])).subscribe(logCount) | |
// fluent(fromArray([7, 8, 9])) | |
// .map(x => x + 1) | |
// .filter(isEven) | |
// .flatMap(x => fromArray([x, x + 1, x * 2])) | |
// .memoize() | |
// .scan(sum)(-74); | |
//.observable(logCount(0)) | |
const identity = x => x; | |
// once :: T => Observable<T> | |
const once = item => observer => observer(item); | |
const applyTo = arg => fun => fun(arg); | |
// creates a "hot" stateful observer (observers receive items from after subscription) | |
const hot = (operator = identity) => { | |
let observers = []; | |
const decorate = operator(identity); | |
const publish = item => { | |
observers = observers.map(applyTo(item)); | |
return publish; | |
}; | |
const subscribe = observer => { | |
observers.push(decorate(observer)); | |
}; | |
return { publish, subscribe }; | |
}; | |
// creates a "cold" stateful observer (observers receives items from before subscription) | |
const cold = (operator = identity) => { | |
let observers = []; | |
const items = []; | |
const decorate = operator(identity); | |
const publish = item => { | |
items.push(item); | |
observers = observers.map(applyTo(item)); | |
return publish; | |
}; | |
const subscribe = observer => { | |
observers.push(fromArray(items)(decorate(observer))); | |
}; | |
return { publish, subscribe }; | |
}; | |
module.exports = { | |
fromArray, | |
pipe, | |
fluent, | |
hot, | |
cold, | |
once, | |
identity, | |
map, | |
filter, | |
flatMap, | |
scan, | |
memoize | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { fromArray, hot, map, cold, flatMap, identity, fluent } = require("./"); | |
const mockObserver = expectedItems => { | |
let index = 0; | |
const mock = { | |
complete: expectedItems.length === 0, | |
observer: item => { | |
if (index >= expectedItems.length) | |
throw new Error("too much invocations"); | |
expect(item).toEqual(expectedItems[index]); | |
if (index === expectedItems.length - 1) { | |
mock.complete = true; | |
index++ | |
return mock.observer; | |
} else { | |
index++; | |
return mock.observer; | |
} | |
} | |
}; | |
return mock; | |
}; | |
test("fromArray", () => { | |
const mock = mockObserver([1, 2, 3]); | |
fromArray([1, 2, 3])(mock.observer); | |
expect(mock.complete).toBe(true); | |
}); | |
describe("hot", () => { | |
test("map", () => { | |
const hotObservable = hot(map(x => [x, x])); | |
hotObservable.publish("event1"); | |
const mock = mockObserver([["event2", "event2"], ["event3", "event3"]]); | |
hotObservable.subscribe(mock.observer); | |
hotObservable.publish("event2"); | |
hotObservable.publish("event3"); | |
expect(mock.complete).toBe(true); | |
}); | |
}); | |
describe("cold", () => { | |
test("flatMap", () => { | |
const coldObservable = cold(flatMap(x => fromArray([x, x]))); | |
coldObservable.publish("event1"); | |
const mock = mockObserver(["event1", "event1", "event2", "event2"]); | |
coldObservable.subscribe(mock.observer); | |
coldObservable.publish("event2"); | |
expect(mock.complete).toBe(true); | |
}); | |
}); | |
test("connecting stateful", () => { | |
const subject = cold(); | |
const adder = cold(map(x => x + 1)); | |
const doubler = cold(map(x => x * 2)); | |
subject.subscribe(adder.publish); | |
subject.subscribe(doubler.publish); | |
const result = cold(); | |
adder.subscribe(result.publish); | |
doubler.subscribe(result.publish); | |
const mock = mockObserver([2, 2, 3, 4, 4, 6]); | |
result.subscribe(mock.observer); | |
fromArray([1, 2, 3])(subject.publish); | |
expect(mock.complete).toBe(true); | |
}); | |
test("fluent", () => { | |
const observable = fluent(fromArray([7, 8, 9])) | |
.map(x => x + 1) | |
.filter(x => x % 2 === 0) | |
.flatMap(x => fromArray([x, x + 1, x * 2])) | |
.scan(m => x => m + x)(-74) | |
.observable; | |
const mock = mockObserver([-66, -57, -41, -31, -20, 0]); | |
observable(mock.observer); | |
expect(mock.complete).toBe(true); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// alternative observables, knowledge about Observables implemetation is required (ex: RxJs) | |
// this examples are fully type-annoteable (typescript, flowtype) | |
// An observer is a function that takes an item from the streamì | |
// and must return a function that will be called with the next item in the stream | |
// this way stateful observers can mantain state in clojures and every instance of them if replayable. | |
// observers should not have side effects | |
// type Observer<T> = (item: T) => Observer<T> | |
// example: | |
// log :: number => Observer<T> | |
const logCount = step => item => { console.log(step, item); return logCount(step + 1); } | |
// An observable is a function that takes an observer | |
// and calls it, retaining state when necessary | |
// type Observable<T> = (observer: Observer<T>) => Observer<T> | |
// example: | |
// from1to3 :: Observable<number> | |
const from1to3 = observer => observer(1)(2)(3) | |
// fromArray :: Array<T> => Observable<T> | |
const fromArray = array => observer => array.reduce((memo, item) => memo(item), observer) | |
// subscribing to an observable is a simple function call | |
//fromArray([1,2,3])(logCount(0)) | |
// a very useful operator over observables is map | |
// that takes an observable A, an function F | |
// and return a new observable B whose items are the return of F | |
// map :: (A => B) => Observable<A> => Observable<B> | |
const map = fun => observable => observer => { | |
const callout = observer => item => callout(observer(fun(item))); | |
return observable(callout(observer)); | |
} | |
//map(x=>x*100)(fromArray([1,2,3]))(logCount(0)) | |
//map(x=>x/2)(map(x=>x*100)(fromArray([1,2,3])))(logCount(0)) | |
// works well with composition | |
const pipe = (...args) => arg => args.reduce((memo, item)=> item(memo), arg) | |
//pipe(map(x=>x*100), map(x=>x/2))(fromArray([1,2,3]))(logCount(0)) | |
// filter :: (A => boolean) => Observable<A> => Observable<A> | |
const filter = predicate => observable => observer => { | |
const callout = observer => item => predicate(item) ? callout(observer(item)) : callout(observer); | |
return observable(callout(observer)); | |
} | |
const isEven = n => n % 2 === 0 | |
//pipe(filter(isEven), map(x=>x*2))(fromArray([1,2,3,4]))(logCount(0)) | |
// flatMap :: (A => Observable<B>) => Observable<A> => Observable<B> | |
const flatMap = fun => observable => observer => { | |
const callout = observer => item => callout(fun(item)(observer)); | |
return observable(callout(observer)); | |
} | |
//flatMap(n => fromArray([n, n]))(fromArray([1,2,3]))(logCount(0)) | |
// scan :: (M => A => M) => M => Observable<A> => Observable<M> | |
// the reducer cannot maintain internal state by returning next action, | |
// instead it receives previous result as first argument and the actual item as second | |
const scan = reducer => memo => observable => observer => { | |
const callout = observer => memo => item => { | |
const next = reducer(memo)(item) | |
return callout(observer(next))(next) | |
} | |
return observable(callout(observer)(memo)) | |
} | |
const sum = m => x => m + x | |
// scan(sum)(0)(fromArray([10,20,30]))(logCount(0)) | |
// memoize :: Observable<T> => Observable<T> | |
const memoize = observable => { | |
const cache = []; | |
const callout = item => { cache.push(item); return callout; }; | |
observable(callout); | |
return observer => cache.reduce((memo, item) => memo(item), observer); | |
} | |
const rangeAndLog = observer => { ["a", "b", "c"].reduce((memo, item) => { console.log(item); return memo(item); }, observer); } | |
//const memoized = memoize(rangeAndLog); | |
// map(x=>x+" - cold")(rangeAndLog)(logCount(0)); | |
// map(x=>x+" - memoized")(memoized)(logCount(0)); | |
// fluent is a decorator that enables fluent usage of the api | |
const fluent = observable => ({ | |
observable, | |
map: f => fluent(map(f)(observable)), | |
filter: p => fluent(filter(p)(observable)), | |
flatMap: f => fluent(flatMap(f)(observable)), | |
scan: r => m => fluent(scan(r)(m)(observable)), | |
memoize: () => fluent(memoize(observable)) | |
}) | |
//fluent(fromArray([1,2,3,4])).subscribe(logCount) | |
fluent(fromArray([7,8,9])) | |
.map(x => x + 1) | |
.filter(isEven) | |
.flatMap(x => fromArray([x, x+1, x*2])) | |
.memoize() | |
.scan(sum)(-74) | |
//.observable(logCount(0)) | |
const identity = x => x; | |
// once :: T => Observable<T> | |
const once = item => observer => observer(item) | |
// creates a "hot" stateful observer (observers receive items from after subscription) | |
const hot = (operator = map(identity)) => { | |
let observers = [] | |
return { | |
publish(item) { observers = observers.map(observer => observer(item)) }, | |
subscribe(observer) { observers.push(operator(identity)(observer)) } | |
} | |
} | |
const hotObservable = hot(map(x => [x,x])) | |
/*hotObservable.publish("event1") | |
hotObservable.subscribe(logCount(0)) | |
hotObservable.publish("event2") | |
hotObservable.publish("event3")*/ | |
// creates a "cold" stateful observer (observers receives items from before subscription) | |
const cold = (operator = map(identity)) => { | |
let observers = [] | |
const items = [] | |
return { | |
publish(item) { | |
items.push(item) | |
observers = observers.map(observer => observer(item)) | |
}, | |
subscribe(observer) { observers.push(fromArray(items)(operator(identity)(observer))) } | |
} | |
} | |
const coldObservable = cold(flatMap(x => fromArray([x,x]))) | |
/*coldObservable.publish("event1") | |
coldObservable.subscribe(logCount(0)) | |
coldObservable.publish("event2")*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"scripts": { | |
"test": "jest" | |
}, | |
"devDependencies": { | |
"jest": "^21.2.1" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment