Last active
August 3, 2017 04:32
-
-
Save mfellner/9c03cb2f2fa54bf2f1a1 to your computer and use it in GitHub Desktop.
Playing with RxJS Observables and ES6
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Rx = require('Rx'); | |
require('babel/register'); | |
const Observables = require('./observables.es6'); | |
// Define a very simple, router-like object | |
class Router { | |
constructor() { | |
this.routes = {}; | |
} | |
// Register a route | |
on(route, callback) { | |
this.routes[route] = callback; | |
} | |
// Activate the route | |
onRoute(route, arg) { | |
this.routes[route](arg); | |
} | |
} | |
// Little helper generator | |
function sequence(max, step = 1) { | |
return { | |
[Symbol.iterator]: function*() { | |
for (let i = 1; i <= max; i += step) yield i; | |
} | |
}; | |
} | |
// Create the router | |
const router = new Router(); | |
// A. Register a plain old callback ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
router.on('/callback', arg => console.log('on /callback', arg)); | |
// Trigger the route | |
for (let i of sequence(2)) { | |
router.onRoute('/callback', `(${i})`); | |
} | |
// Output: | |
// on /callback (1) | |
// on /callback (2) | |
// B. Create Observable from callback ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
// Rx.Observable.fromCallback(func, [context], [selector]) | |
const routerOn = Rx.Observable.fromCallback(router.on, router); | |
// Create Observable sequence for route | |
const observableFromCallback = routerOn('/observableFromCallback'); | |
// Subscribe a callback | |
observableFromCallback.subscribe(arg => console.log('on /observableFromCallback', arg)); | |
// Trigger the route | |
for (let i of sequence(2)) { | |
router.onRoute('/observableFromCallback', `(${i})`); | |
} | |
// Output: | |
// on /observableFromCallback (1) | |
// C.1. Manual Observable creation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
// Rx.Observable.create(subscribe) | |
const customObservable = Rx.Observable.create((observer) => { | |
router.on('/customObservable', observer.onNext.bind(observer)); | |
}); | |
// Subscribe a callback | |
customObservable.subscribe(arg => console.log('on /customObservable', arg)); | |
// Trigger the route | |
for (let i of sequence(2)) { | |
router.onRoute('/customObservable', `(${i})`); | |
} | |
// Output: | |
// on /customObservable (1) | |
// on /customObservable (2) | |
// C.2. Custom Observable creation (from Callback) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
const customRouterOn = Observables.fromCallback(router.on, router); | |
const customObservableFromCallback = customRouterOn('/customObservableFromCallback'); | |
customObservableFromCallback.subscribe(arg => console.log('on /customObservableFromCallback', arg)); | |
// Trigger the route | |
for (let i of sequence(2)) { | |
router.onRoute('/customObservableFromCallback', `(${i})`); | |
} | |
// Output: | |
// on /customObservableFromCallback (1) | |
// on /customObservableFromCallback (2) | |
// D. Use Subject as a proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
const subject = new Rx.Subject(); | |
customObservableFromCallback.subscribe(subject); | |
subject | |
.map(arg => { | |
return arg.replace('(', '[').replace(')', ']'); | |
}) | |
.subscribe(arg => console.log('on /customObservableFromCallback subscription', arg)); | |
// Trigger the route | |
for (let i of sequence(2)) { | |
router.onRoute('/customObservableFromCallback', `(${i})`); | |
} | |
// Output: | |
// on /customObservableFromCallback subscription [1] | |
// on /customObservableFromCallback subscription [2] | |
// E. Chaining Subjects ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
// A class that contains 'actions' as Subjects. | |
class Actions { | |
static register(updates) { | |
this.navigateTo | |
.map(route => { | |
return nav => { | |
nav.route = route; | |
return nav; | |
}; | |
}) | |
.subscribe(updates); | |
} | |
} | |
Actions.navigateTo = new Rx.Subject(); | |
// The 'state' of the application's navigation. | |
const NAV = {route: null}; | |
// A BehaviorSubject of the navigation 'state' that will receive updates. | |
const navUpdates = new Rx.BehaviorSubject(NAV); | |
// A proxy Subject from which the up-to-date navigation 'state' can be read. | |
const currentNav = new Rx.Subject(); | |
// Subscribe the Subject to a stream of updates applied to the BehaviorSubject. | |
navUpdates | |
.scan((nav, operation) => { | |
return operation(nav); | |
}) | |
.filter(nav => { | |
return !!nav.route; | |
}) | |
.subscribe(currentNav); | |
// Subscribe navUpdates to all the 'action' Subjects. | |
Actions.register(navUpdates); | |
// A list of routes for the router | |
const ROUTES = [ | |
{index: 0, name: 'Home', path: '/home'}, | |
{index: 1, name: 'About', path: '/about'} | |
]; | |
// For each route create an Observable from the router callback | |
// and subscribe the navigateTo action to its updates. | |
for (let route of ROUTES) { | |
customRouterOn(route.path) | |
.map(() => { | |
return route; | |
}) | |
.subscribe(Actions.navigateTo); | |
} | |
// Read the current navigation 'state'. | |
currentNav.subscribe(nav => console.log('currentNav (early)', nav)); | |
router.onRoute('/home'); | |
currentNav.subscribe(nav => console.log('currentNav (late)', nav)); | |
router.onRoute('/about'); | |
console.log('final nav:', NAV); | |
// Route updates are propagated from the event source to the navigation 'store': | |
// Router.onRoute -> Router.on -> customRouterOn Observable -> | |
// Actions.navigateTo Subject -> navUpdates BehaviorSubject -> NAV | |
// Output: | |
// currentNav (early) { route: { index: 0, name: 'Home', path: '/home' } } | |
// currentNav (early) { route: { index: 1, name: 'About', path: '/about' } } | |
// currentNav (late) { route: { index: 1, name: 'About', path: '/about' } } | |
// final nav: { route: { index: 1, name: 'About', path: '/about' } } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Rx = require('rx'); | |
const Observables = {}; | |
/** | |
* Converts a callback function to an observable sequence. | |
* | |
* An alternative to Rx.Observable.fromCallback that does | |
* not call onComplete, which allows Subscribers of the | |
* Observable to receive multiple callback events. | |
* | |
* @param fn {Function} Function with a callback as the last parameter to | |
* convert to an Observable sequence. | |
* | |
* @param context {Object} The context for the func parameter to be executed. | |
* If not specified, defaults to undefined. | |
* | |
* @param selector {Function} A selector which takes the arguments from the | |
* callback to produce a single item to yield on | |
* next. | |
* | |
* @returns {Function} A function, when executed with the required parameters | |
* minus the callback, produces an Observable sequence with | |
* a single value of the arguments to the callback as an | |
* array if no selector given, else the object created by | |
* the selector function. | |
*/ | |
Observables.fromCallback = (fn, context = undefined, selector = null) => { | |
return function () { | |
const fnArgs = Array.prototype.slice.call(arguments); | |
return Rx.Observable.create(observer => { | |
const callbackHandler = function () { | |
const arg = typeof selector === 'function' | |
? selector.apply(context, arguments) | |
: arguments.length === 1 ? arguments[0] : arguments; | |
observer.onNext(arg); | |
}; | |
fn.apply(context, fnArgs.concat(callbackHandler)); | |
}); | |
}; | |
}; | |
module.exports = Observables; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment