Skip to content

Instantly share code, notes, and snippets.

@mfellner
Last active August 3, 2017 04:32
Show Gist options
  • Save mfellner/9c03cb2f2fa54bf2f1a1 to your computer and use it in GitHub Desktop.
Save mfellner/9c03cb2f2fa54bf2f1a1 to your computer and use it in GitHub Desktop.
Playing with RxJS Observables and ES6
const Rx = require('Rx');
require('babel/register');
const Observables = require('./observables.es6');
// Define a very simple, router-like object
class Router {
constructor() {
this.routes = {};
}
// Register a route
on(route, callback) {
this.routes[route] = callback;
}
// Activate the route
onRoute(route, arg) {
this.routes[route](arg);
}
}
// Little helper generator
function sequence(max, step = 1) {
return {
[Symbol.iterator]: function*() {
for (let i = 1; i <= max; i += step) yield i;
}
};
}
// Create the router
const router = new Router();
// A. Register a plain old callback ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
router.on('/callback', arg => console.log('on /callback', arg));
// Trigger the route
for (let i of sequence(2)) {
router.onRoute('/callback', `(${i})`);
}
// Output:
// on /callback (1)
// on /callback (2)
// B. Create Observable from callback ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// Rx.Observable.fromCallback(func, [context], [selector])
const routerOn = Rx.Observable.fromCallback(router.on, router);
// Create Observable sequence for route
const observableFromCallback = routerOn('/observableFromCallback');
// Subscribe a callback
observableFromCallback.subscribe(arg => console.log('on /observableFromCallback', arg));
// Trigger the route
for (let i of sequence(2)) {
router.onRoute('/observableFromCallback', `(${i})`);
}
// Output:
// on /observableFromCallback (1)
// C.1. Manual Observable creation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// Rx.Observable.create(subscribe)
const customObservable = Rx.Observable.create((observer) => {
router.on('/customObservable', observer.onNext.bind(observer));
});
// Subscribe a callback
customObservable.subscribe(arg => console.log('on /customObservable', arg));
// Trigger the route
for (let i of sequence(2)) {
router.onRoute('/customObservable', `(${i})`);
}
// Output:
// on /customObservable (1)
// on /customObservable (2)
// C.2. Custom Observable creation (from Callback) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
const customRouterOn = Observables.fromCallback(router.on, router);
const customObservableFromCallback = customRouterOn('/customObservableFromCallback');
customObservableFromCallback.subscribe(arg => console.log('on /customObservableFromCallback', arg));
// Trigger the route
for (let i of sequence(2)) {
router.onRoute('/customObservableFromCallback', `(${i})`);
}
// Output:
// on /customObservableFromCallback (1)
// on /customObservableFromCallback (2)
// D. Use Subject as a proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
const subject = new Rx.Subject();
customObservableFromCallback.subscribe(subject);
subject
.map(arg => {
return arg.replace('(', '[').replace(')', ']');
})
.subscribe(arg => console.log('on /customObservableFromCallback subscription', arg));
// Trigger the route
for (let i of sequence(2)) {
router.onRoute('/customObservableFromCallback', `(${i})`);
}
// Output:
// on /customObservableFromCallback subscription [1]
// on /customObservableFromCallback subscription [2]
// E. Chaining Subjects ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// A class that contains 'actions' as Subjects.
class Actions {
static register(updates) {
this.navigateTo
.map(route => {
return nav => {
nav.route = route;
return nav;
};
})
.subscribe(updates);
}
}
Actions.navigateTo = new Rx.Subject();
// The 'state' of the application's navigation.
const NAV = {route: null};
// A BehaviorSubject of the navigation 'state' that will receive updates.
const navUpdates = new Rx.BehaviorSubject(NAV);
// A proxy Subject from which the up-to-date navigation 'state' can be read.
const currentNav = new Rx.Subject();
// Subscribe the Subject to a stream of updates applied to the BehaviorSubject.
navUpdates
.scan((nav, operation) => {
return operation(nav);
})
.filter(nav => {
return !!nav.route;
})
.subscribe(currentNav);
// Subscribe navUpdates to all the 'action' Subjects.
Actions.register(navUpdates);
// A list of routes for the router
const ROUTES = [
{index: 0, name: 'Home', path: '/home'},
{index: 1, name: 'About', path: '/about'}
];
// For each route create an Observable from the router callback
// and subscribe the navigateTo action to its updates.
for (let route of ROUTES) {
customRouterOn(route.path)
.map(() => {
return route;
})
.subscribe(Actions.navigateTo);
}
// Read the current navigation 'state'.
currentNav.subscribe(nav => console.log('currentNav (early)', nav));
router.onRoute('/home');
currentNav.subscribe(nav => console.log('currentNav (late)', nav));
router.onRoute('/about');
console.log('final nav:', NAV);
// Route updates are propagated from the event source to the navigation 'store':
// Router.onRoute -> Router.on -> customRouterOn Observable ->
// Actions.navigateTo Subject -> navUpdates BehaviorSubject -> NAV
// Output:
// currentNav (early) { route: { index: 0, name: 'Home', path: '/home' } }
// currentNav (early) { route: { index: 1, name: 'About', path: '/about' } }
// currentNav (late) { route: { index: 1, name: 'About', path: '/about' } }
// final nav: { route: { index: 1, name: 'About', path: '/about' } }
const Rx = require('rx');
const Observables = {};
/**
* Converts a callback function to an observable sequence.
*
* An alternative to Rx.Observable.fromCallback that does
* not call onComplete, which allows Subscribers of the
* Observable to receive multiple callback events.
*
* @param fn {Function} Function with a callback as the last parameter to
* convert to an Observable sequence.
*
* @param context {Object} The context for the func parameter to be executed.
* If not specified, defaults to undefined.
*
* @param selector {Function} A selector which takes the arguments from the
* callback to produce a single item to yield on
* next.
*
* @returns {Function} A function, when executed with the required parameters
* minus the callback, produces an Observable sequence with
* a single value of the arguments to the callback as an
* array if no selector given, else the object created by
* the selector function.
*/
Observables.fromCallback = (fn, context = undefined, selector = null) => {
return function () {
const fnArgs = Array.prototype.slice.call(arguments);
return Rx.Observable.create(observer => {
const callbackHandler = function () {
const arg = typeof selector === 'function'
? selector.apply(context, arguments)
: arguments.length === 1 ? arguments[0] : arguments;
observer.onNext(arg);
};
fn.apply(context, fnArgs.concat(callbackHandler));
});
};
};
module.exports = Observables;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment