Skip to content

Instantly share code, notes, and snippets.

@freddi301
Last active November 3, 2017 13:19
Show Gist options
  • Save freddi301/463710333d065ae26a9917fc76c24d46 to your computer and use it in GitHub Desktop.
Save freddi301/463710333d065ae26a9917fc76c24d46 to your computer and use it in GitHub Desktop.
Observables (alternative)
// alternative observables, knowledge about Observables implemetation is required (ex: RxJs)
// this is fully type-annoteable (typescript, flowtype)
// An observer is a function that takes an item from the stream
// and must return a function that will be called with the next item in the stream
// this way stateful observers can mantain state in clojures and every instance of them if replayable.
// observers should not have side effects
// type Observer<T> = (item: T) => Observer<T>
// example:
// log :: number => Observer<T>
// const logCount = step => item => {
// console.log(step, item);
// return logCount(step + 1);
// };
// An observable is a function that takes an observer
// and calls it, retaining state when necessary
// type Observable<T> = (observer: Observer<T>) => Observer<T>
// example:
// from1to3 :: Observable<number>
const from1to3 = observer => observer(1)(2)(3);
// fromArray :: Array<T> => Observable<T>
const fromArray = array => observer =>
array.reduce((memo, item) => memo(item), observer);
const mockObserver = expected => item => {
expect(item).toEqual(expected[0]);
return mockObserver(expected.slice(1));
};
// subscribing to an observable is a simple function call
// fromArray([1,2,3])(logCount(0))
// a very useful operator over observables is map
// that takes an observable A, an function F
// and return a new observable B whose items are the return of F
// map :: (A => B) => Observable<A> => Observable<B>
const map = fun => observable => observer => {
const callout = observer => item => callout(observer(fun(item)));
return observable(callout(observer));
};
//map(x=>x*100)(fromArray([1,2,3]))(logCount(0))
//map(x=>x/2)(map(x=>x*100)(fromArray([1,2,3])))(logCount(0))
// works well with composition
const pipe = (...args) => arg => args.reduce((memo, item) => item(memo), arg);
// pipe(map(x=>x*100), map(x=>x/2))(fromArray([1,2,3]))(logCount(0))
// filter :: (A => boolean) => Observable<A> => Observable<A>
const filter = predicate => observable => observer => {
const callout = observer => item =>
predicate(item) ? callout(observer(item)) : callout(observer);
return observable(callout(observer));
};
// const isEven = n => n % 2 === 0;
// pipe(filter(isEven), map(x=>x*2))(fromArray([1,2,3,4]))(logCount(0))
// flatMap :: (A => Observable<B>) => Observable<A> => Observable<B>
const flatMap = fun => observable => observer => {
const callout = observer => item => callout(fun(item)(observer));
return observable(callout(observer));
};
// flatMap(n => fromArray([n, n]))(fromArray([1,2,3]))(logCount(0))
// scan :: (M => A => M) => M => Observable<A> => Observable<M>
// the reducer cannot maintain internal state by returning next action,
// instead it receives previous result as first argument and the actual item as second
const scan = reducer => memo => observable => observer => {
const callout = observer => memo => item => {
const next = reducer(memo)(item);
return callout(observer(next))(next);
};
return observable(callout(observer)(memo));
};
const sum = m => x => m + x;
// scan(sum)(0)(fromArray([10,20,30]))(logCount(0))
// memoize :: Observable<T> => Observable<T>
const memoize = observable => {
const cache = [];
const callout = item => {
cache.push(item);
return callout;
};
observable(callout);
return observer => cache.reduce((memo, item) => memo(item), observer);
};
// const rangeAndLog = observer => {
// ["a", "b", "c"].reduce((memo, item) => {
// console.log(item);
// return memo(item);
// }, observer);
// };
//const memoized = memoize(rangeAndLog);
// map(x=>x+" - cold")(rangeAndLog)(logCount(0));
// map(x=>x+" - memoized")(memoized)(logCount(0));
// fluent is a decorator that enables fluent usage of the api
const fluent = observable => {
const decorate = operator => arg => fluent(operator(arg)(observable));
return {
observable,
map: decorate(map),
filter: decorate(filter),
flatMap: decorate(flatMap),
scan: r => m => fluent(scan(r)(m)(observable)),
memoize: decorate(memoize)
};
};
// fluent(fromArray([1,2,3,4])).subscribe(logCount)
// fluent(fromArray([7, 8, 9]))
// .map(x => x + 1)
// .filter(isEven)
// .flatMap(x => fromArray([x, x + 1, x * 2]))
// .memoize()
// .scan(sum)(-74);
//.observable(logCount(0))
const identity = x => x;
// once :: T => Observable<T>
const once = item => observer => observer(item);
const applyTo = arg => fun => fun(arg);
// creates a "hot" stateful observer (observers receive items from after subscription)
const hot = (operator = identity) => {
let observers = [];
const decorate = operator(identity);
const publish = item => {
observers = observers.map(applyTo(item));
return publish;
};
const subscribe = observer => {
observers.push(decorate(observer));
};
return { publish, subscribe };
};
// creates a "cold" stateful observer (observers receives items from before subscription)
const cold = (operator = identity) => {
let observers = [];
const items = [];
const decorate = operator(identity);
const publish = item => {
items.push(item);
observers = observers.map(applyTo(item));
return publish;
};
const subscribe = observer => {
observers.push(fromArray(items)(decorate(observer)));
};
return { publish, subscribe };
};
module.exports = {
fromArray,
pipe,
fluent,
hot,
cold,
once,
identity,
map,
filter,
flatMap,
scan,
memoize
};
const { fromArray, hot, map, cold, flatMap, identity, fluent } = require("./");
const mockObserver = expectedItems => {
let index = 0;
const mock = {
complete: expectedItems.length === 0,
observer: item => {
if (index >= expectedItems.length)
throw new Error("too much invocations");
expect(item).toEqual(expectedItems[index]);
if (index === expectedItems.length - 1) {
mock.complete = true;
index++
return mock.observer;
} else {
index++;
return mock.observer;
}
}
};
return mock;
};
test("fromArray", () => {
const mock = mockObserver([1, 2, 3]);
fromArray([1, 2, 3])(mock.observer);
expect(mock.complete).toBe(true);
});
describe("hot", () => {
test("map", () => {
const hotObservable = hot(map(x => [x, x]));
hotObservable.publish("event1");
const mock = mockObserver([["event2", "event2"], ["event3", "event3"]]);
hotObservable.subscribe(mock.observer);
hotObservable.publish("event2");
hotObservable.publish("event3");
expect(mock.complete).toBe(true);
});
});
describe("cold", () => {
test("flatMap", () => {
const coldObservable = cold(flatMap(x => fromArray([x, x])));
coldObservable.publish("event1");
const mock = mockObserver(["event1", "event1", "event2", "event2"]);
coldObservable.subscribe(mock.observer);
coldObservable.publish("event2");
expect(mock.complete).toBe(true);
});
});
test("connecting stateful", () => {
const subject = cold();
const adder = cold(map(x => x + 1));
const doubler = cold(map(x => x * 2));
subject.subscribe(adder.publish);
subject.subscribe(doubler.publish);
const result = cold();
adder.subscribe(result.publish);
doubler.subscribe(result.publish);
const mock = mockObserver([2, 2, 3, 4, 4, 6]);
result.subscribe(mock.observer);
fromArray([1, 2, 3])(subject.publish);
expect(mock.complete).toBe(true);
});
test("fluent", () => {
const observable = fluent(fromArray([7, 8, 9]))
.map(x => x + 1)
.filter(x => x % 2 === 0)
.flatMap(x => fromArray([x, x + 1, x * 2]))
.scan(m => x => m + x)(-74)
.observable;
const mock = mockObserver([-66, -57, -41, -31, -20, 0]);
observable(mock.observer);
expect(mock.complete).toBe(true);
});
// alternative observables, knowledge about Observables implemetation is required (ex: RxJs)
// this examples are fully type-annoteable (typescript, flowtype)
// An observer is a function that takes an item from the streamì
// and must return a function that will be called with the next item in the stream
// this way stateful observers can mantain state in clojures and every instance of them if replayable.
// observers should not have side effects
// type Observer<T> = (item: T) => Observer<T>
// example:
// log :: number => Observer<T>
const logCount = step => item => { console.log(step, item); return logCount(step + 1); }
// An observable is a function that takes an observer
// and calls it, retaining state when necessary
// type Observable<T> = (observer: Observer<T>) => Observer<T>
// example:
// from1to3 :: Observable<number>
const from1to3 = observer => observer(1)(2)(3)
// fromArray :: Array<T> => Observable<T>
const fromArray = array => observer => array.reduce((memo, item) => memo(item), observer)
// subscribing to an observable is a simple function call
//fromArray([1,2,3])(logCount(0))
// a very useful operator over observables is map
// that takes an observable A, an function F
// and return a new observable B whose items are the return of F
// map :: (A => B) => Observable<A> => Observable<B>
const map = fun => observable => observer => {
const callout = observer => item => callout(observer(fun(item)));
return observable(callout(observer));
}
//map(x=>x*100)(fromArray([1,2,3]))(logCount(0))
//map(x=>x/2)(map(x=>x*100)(fromArray([1,2,3])))(logCount(0))
// works well with composition
const pipe = (...args) => arg => args.reduce((memo, item)=> item(memo), arg)
//pipe(map(x=>x*100), map(x=>x/2))(fromArray([1,2,3]))(logCount(0))
// filter :: (A => boolean) => Observable<A> => Observable<A>
const filter = predicate => observable => observer => {
const callout = observer => item => predicate(item) ? callout(observer(item)) : callout(observer);
return observable(callout(observer));
}
const isEven = n => n % 2 === 0
//pipe(filter(isEven), map(x=>x*2))(fromArray([1,2,3,4]))(logCount(0))
// flatMap :: (A => Observable<B>) => Observable<A> => Observable<B>
const flatMap = fun => observable => observer => {
const callout = observer => item => callout(fun(item)(observer));
return observable(callout(observer));
}
//flatMap(n => fromArray([n, n]))(fromArray([1,2,3]))(logCount(0))
// scan :: (M => A => M) => M => Observable<A> => Observable<M>
// the reducer cannot maintain internal state by returning next action,
// instead it receives previous result as first argument and the actual item as second
const scan = reducer => memo => observable => observer => {
const callout = observer => memo => item => {
const next = reducer(memo)(item)
return callout(observer(next))(next)
}
return observable(callout(observer)(memo))
}
const sum = m => x => m + x
// scan(sum)(0)(fromArray([10,20,30]))(logCount(0))
// memoize :: Observable<T> => Observable<T>
const memoize = observable => {
const cache = [];
const callout = item => { cache.push(item); return callout; };
observable(callout);
return observer => cache.reduce((memo, item) => memo(item), observer);
}
const rangeAndLog = observer => { ["a", "b", "c"].reduce((memo, item) => { console.log(item); return memo(item); }, observer); }
//const memoized = memoize(rangeAndLog);
// map(x=>x+" - cold")(rangeAndLog)(logCount(0));
// map(x=>x+" - memoized")(memoized)(logCount(0));
// fluent is a decorator that enables fluent usage of the api
const fluent = observable => ({
observable,
map: f => fluent(map(f)(observable)),
filter: p => fluent(filter(p)(observable)),
flatMap: f => fluent(flatMap(f)(observable)),
scan: r => m => fluent(scan(r)(m)(observable)),
memoize: () => fluent(memoize(observable))
})
//fluent(fromArray([1,2,3,4])).subscribe(logCount)
fluent(fromArray([7,8,9]))
.map(x => x + 1)
.filter(isEven)
.flatMap(x => fromArray([x, x+1, x*2]))
.memoize()
.scan(sum)(-74)
//.observable(logCount(0))
const identity = x => x;
// once :: T => Observable<T>
const once = item => observer => observer(item)
// creates a "hot" stateful observer (observers receive items from after subscription)
const hot = (operator = map(identity)) => {
let observers = []
return {
publish(item) { observers = observers.map(observer => observer(item)) },
subscribe(observer) { observers.push(operator(identity)(observer)) }
}
}
const hotObservable = hot(map(x => [x,x]))
/*hotObservable.publish("event1")
hotObservable.subscribe(logCount(0))
hotObservable.publish("event2")
hotObservable.publish("event3")*/
// creates a "cold" stateful observer (observers receives items from before subscription)
const cold = (operator = map(identity)) => {
let observers = []
const items = []
return {
publish(item) {
items.push(item)
observers = observers.map(observer => observer(item))
},
subscribe(observer) { observers.push(fromArray(items)(operator(identity)(observer))) }
}
}
const coldObservable = cold(flatMap(x => fromArray([x,x])))
/*coldObservable.publish("event1")
coldObservable.subscribe(logCount(0))
coldObservable.publish("event2")*/
{
"scripts": {
"test": "jest"
},
"devDependencies": {
"jest": "^21.2.1"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment