Skip to content

Instantly share code, notes, and snippets.

@eiriklv
Last active October 25, 2016 21:41
Show Gist options
  • Save eiriklv/a498112b3a73fc2b92975649924cf5c3 to your computer and use it in GitHub Desktop.
Save eiriklv/a498112b3a73fc2b92975649924cf5c3 to your computer and use it in GitHub Desktop.
Handling effects declaratively with generators and custom runtimes (fun and learning inspired by redux-saga - but for broader use)
'use strict';
const { createStore, applyMiddleware } = require('./redux');
const { addActionTakersToStore, addLoggingToStore } = require('./middleware');
const { EventEmitter } = require('events');
const {
isPromise,
isFunction,
isObject,
isSpecObject,
delay,
call,
cps,
race,
parallel,
putAction,
takeAction,
putStream,
takeStream,
putEvent,
takeEvent,
handleFork,
handleSpawn,
handleDelay,
handleCall,
handleCps,
handleRace,
handleParallel,
createRuntime,
} = require('./utils');
/**
* Middleware to add logging of the context
* (useless middleware that just logs out the context)
*/
function* logMiddleware() {
console.log(this);
}
/**
* A process we want to run
* that communicates with another
* process by putting actions into
* the event loop and listening for actions
*/
function* processOne() {
while (true) {
yield takeAction('PING');
yield delay(2000);
yield putAction({ type: 'PONG' });
}
}
/**
* A process we want to run
* that communicates with another
* process by putting actions into
* the event loop and listening for actions
*/
function* processTwo() {
while (true) {
yield putAction({ type: 'PING' });
yield takeAction('PONG');
yield delay(2000);
}
}
/**
* A process that listens for
* events on a stream and outputs
* events to another stream
*/
function* streamProcess() {
while (true) {
const data = yield takeStream(process.stdin);
yield putStream(process.stdout, `message received: ${data}`);
}
}
/**
* A process that communicates with
* another process over a socket / emitter
* via events
*/
function* socketProcessOne({ socket }) {
while (true) {
yield delay(2000);
yield putEvent(socket, 'my_event', 'ping!');
const data = yield takeEvent(socket, 'my_event');
yield putStream(process.stdout, `(1) event received: ${data}\n`);
}
}
/**
* A process that communicates with
* another process over a socket / emitter
* via events
*/
function* socketProcessTwo({ socket }) {
while (true) {
const data = yield takeEvent(socket, 'my_event');
yield putStream(process.stdout, `(2) event received: ${data}\n`);
yield delay(2000);
yield putEvent(socket, 'my_event', 'pong!');
}
}
/**
* A process that waits for stdin
* and outputs the data to stdout
*/
function* stdEchoProcess() {
while (true) {
const data = yield takeStream(process.stdin);
yield putStream(process.stdout, `${data}`);
}
}
/**
* A process that races two async calls
* and alternates who "wins" every turn
*/
function* raceProcess() {
let delayTable = [200, 500, 1000, 1500];
while (true) {
/**
* Race two async calls
*/
const data = yield race([
call((val) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(10);
}, delayTable[0]);
})
}),
call((val) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(20);
}, delayTable[1]);
})
}),
race([
call((val) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(30);
}, delayTable[2]);
})
}),
call((val) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(40);
}, delayTable[3]);
})
}),
])
]);
/**
* Cycle the delay table
*/
const last = delayTable.pop();
delayTable.unshift(last);
yield call(console.log.bind(console), `${data}`);
}
}
/**
* A sub-process that writes a string to
* stdout one character at the time with an interval
*/
function* slowPrint(str, interval) {
const chars = str.split('');
let char;
while (char = chars.shift()) {
yield putStream(process.stdout, char);
yield delay(interval);
}
}
/**
* A process that waits for stdin
* and outputs the data to stdout
*/
function* slowEchoProcess() {
while (true) {
const data = yield takeStream(process.stdin);
yield* slowPrint(data.toString(), 50);
}
}
/**
* A process that waits for stdin
* and outputs the data to stdout
*/
function* slowEchoForkProcess() {
yield fork(slowEchoProcess);
yield fork(slowEchoProcess);
}
/**
* A process that runs two races in parallel
* and alternates who "wins" every turn
*/
function* parallelProcess() {
let delayTable = [200, 500, 1000, 1500];
while (true) {
/**
* Perform two async races in parallel
*/
const data = yield parallel([
race([
call((val) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(10);
}, delayTable[0]);
})
}),
call((val) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(20);
}, delayTable[1]);
})
}),
]),
race([
call((val) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(30);
}, delayTable[2]);
})
}),
call((val) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(40);
}, delayTable[3]);
})
}),
])
]);
/**
* Cycle the delay table
*/
const last = delayTable.pop();
delayTable.unshift(last);
yield call(console.log.bind(console), `${data}`);
}
}
/**
* Create a handler that will handle
* the built up context of each program that is run
*/
function finalHandler(err, value) {
console.log(this);
}
/**
* Create a handler than will handle the
* resolving of yields of effects
*/
function createEffectsHandler({
handleFork,
handleSpawn,
handleSelect,
handleDelay,
handleRace,
handleParallel,
handleCall,
handleCps,
handleTakeAction,
handlePutAction,
handleTakeStream,
handlePutStream,
handleTakeEvent,
handlePutEvent,
}) {
/**
* Return a function that where the runtime
* can inject a reference to itself and the
* context to be able to call itself recursively
*/
return function (runtime, context) {
/**
* Return the actual effects handler
* that will be used to resolve effects
*/
return function handleEffects(
value = {},
cb = () => { console.warn('No callback passed for resolving effect') }
) {
/**
* Make sure we only handle valid effect descriptions
*/
if (!isSpecObject(value)) {
return cb(new Error('You can only yield effects as object descriptions'));
}
/**
* Choose the correct effects handler
*/
switch (value.type) {
case '@@spawn':
return handleSpawn({
proc: value.proc,
args: value.args,
runtime,
context,
}, cb);
case '@@fork':
return handleFork({
proc: value.proc,
args: value.args,
runtime,
context,
}, cb);
case '@@select':
return handleSelect({
selector: value.selector,
}, cb);
case '@@delay':
return handleDelay({
time: value.time,
val: value.val
}, cb);
case '@@parallel':
return handleParallel({
handleEffects,
effects: value.effects,
}, cb);
case '@@race':
return handleRace({
handleEffects,
effects: value.effects,
}, cb);
case '@@call':
return handleCall({
func: value.func,
args: value.args,
}, cb);
case '@@cps':
return handleCps({
func: value.func,
args: value.args,
}, cb);
case '@@putStream':
return handlePutStream({
stream: value.stream,
data: value.data,
}, cb);
case '@@takeStream':
return handleTakeStream({
stream: value.stream
}, cb);
case '@@putEvent':
return handlePutEvent({
emitter: value.emitter,
event: value.event,
data: value.data,
}, cb);
case '@@takeEvent':
return handleTakeEvent({
emitter: value.emitter,
event: value.event,
}, cb);
case '@@putAction':
return handlePutAction({
action: value.action
}, cb);
case '@@takeAction':
return handleTakeAction({
actionType: value.actionType
}, cb);
default:
return cb(new Error(`Unrecognized effect type: ${value.type}`));
}
}
}
}
/**
* Create a state reducer function
*/
function reducer(state = {}, action) {
switch (action.type) {
case 'SOME_ACTION':
return state;
default:
return state;
}
}
/**
* Run the program using our runtime
*/
function application () {
/**
* Create instance of takeActionsMiddleware
*/
const takeActionsMiddleware = addActionTakersToStore({});
/**
* Create instance of logger middleware
*/
const loggerMiddleware = addLoggingToStore({});
/**
* Application state handler
*/
const store = createStore(
reducer,
applyMiddleware(
takeActionsMiddleware,
loggerMiddleware
)
);
/**
* Create subscriber for state changes
*/
store.subscribe(() => {
console.log('state changed!', store.getState());
});
/**
* Effects handler
*/
const effectsHandler = createEffectsHandler({
handleFork,
handleSpawn,
handleDelay,
handleRace,
handleParallel,
handleCall,
handleCps,
handleSelect: (selector, cb) => {
cb(null, selector(store.getState()));
},
handlePutAction: ({ action }, cb) => {
cb(null, store.dispatch(action));
},
handleTakeAction: ({ actionType }, cb) => {
takeActionsMiddleware.take(actionType)
.then((action) => cb(null, action))
.catch((error) => cb(error))
},
handlePutStream: ({ stream, data }, cb) => {
stream.write(data);
cb(null);
},
handleTakeStream: ({ stream }, cb) => {
const listener = (data) => {
stream.removeListener('data', listener);
cb(null, data);
}
stream.on('data', listener);
},
handlePutEvent: ({ emitter, event, data }, cb) => {
emitter.emit(event, data);
cb(null);
},
handleTakeEvent: ({ emitter, event }, cb) => {
const listener = (data) => {
emitter.removeListener(event, listener);
cb(null, data);
}
emitter.on(event, listener);
},
});
/**
* TODO:
*
* Create channels / emitters for
* - input (key, stdin)
* - events
* - sockets
* - streams
* - what else..?
*
* CSP (Communicating Sequencial Processes) ?
*
* NOTE:
* - eventEmitters/sockets do not have buffering and are asynchronous
* - csp channels have buffering and are "synchronous" (put will wait until message is taken)
*
*/
const socket = new EventEmitter();
/**
* Create a runtime
*/
const runtime = createRuntime([logMiddleware], effectsHandler);
/**
* Gather all the processes
*/
const processes = [
processOne,
processTwo,
streamProcess,
socketProcessOne,
socketProcessTwo,
stdEchoProcess,
raceProcess,
parallelProcess,
slowEchoProcess,
];
/**
* Arguments for each process,
* dependencies
* - channels
* - emitters
* - streams
* - whatever is needed as injected deps
*/
const args = {
socket,
};
/**
* Create a global context
*/
const context = {};
/**
* Run all the processes
*/
processes.forEach((proc) => {
runtime(proc, context, finalHandler, args);
});
}
/**
* Start the application
*/
application();
/**
* Create a middleware for logging
*/
function addLoggingToStore(options) {
return (store) => {
return (next) => {
return (action) => {
console.log(`dispatching: ${action.type}`);
return next(action);
};
};
};
}
/**
* Create a middleware for action creators
*/
function addActionCreatorsToStore(extraArguments) {
return (store) => {
return (next) => {
return (action) => {
if (typeof action === 'function') {
return action(store.dispatch, store.getState, extraArguments);
} else {
return next(action);
}
};
};
};
}
/**
* Create a middleware for adding reactors
*/
function addActionTakersToStore(options) {
/**
* Create an array to hold listeners/subscribers/waiters for action dispatches
*/
const dispatchTakers = [];
/**
* Create a store enhancer that enables you
* to add custom listeners for actions as
* promises that will be resolved on a dispatched
* action of the type taken
*/
const storeEnhancer = (store) => {
return (next) => {
/**
* Patch store.dispatch to add the epic functionality
*/
return (action) => {
/**
* Match any dispatchTakers with the
* same action type, or dispatchTakers with
* no action type specified (waits for any action)
*/
const fiteredDispatchTakers = dispatchTakers
.filter(({ actionType }) => (
!actionType ||
(action.type && action.type === actionType)
));
/**
* Resolve any applicable epic dispatchTakers
*/
fiteredDispatchTakers
.slice()
.forEach(taker => {
/**
* Remove the taker from the list of takers
*/
dispatchTakers.splice(dispatchTakers.indexOf(taker), 1);
/**
* Resolve the taker in the next tick
*/
setImmediate(() => {
taker.resolve(action);
});
});
/**
* Wait for the next tick to resolve the promise
* to enable the process to move on to the next
* event
*/
return new Promise((resolve, reject) => {
setImmediate(() => {
resolve(next(action));
});
});
};
};
};
/**
* Create a function that enables reactors to wait
* for actions to be dispatched (single occurence)
*
* NOTE: We're monkey patching the middleware
*/
storeEnhancer.take = (actionType) => {
/**
* Return a promise that will be resolved when
* a specified action is dispatched
*/
return new Promise((resolve) => {
/**
* Push a representation of the awaited action
* and the resolver function into the dispatch takers
*/
dispatchTakers.push({ actionType, resolve });
});
};
/**
* Return store enhancer
*/
return storeEnhancer;
}
/**
* Exports
*/
module.exports = {
addLoggingToStore,
addActionCreatorsToStore,
addActionTakersToStore,
}
/**
* Create a function that enables you to extend
* the redux store with middleware
*/
function applyMiddleware(...middlewares) {
return (store) => {
return middlewares
.slice()
.reverse()
.reduce((store, middleware) => {
store.dispatch = middleware(store)(store.dispatch);
return store;
}, store);
}
};
/**
* Create a super simple implementation of Redux
*/
function createStore(
reducer = (state, action) => state,
storeEnhancer = (store) => store
) {
/**
* Create an list that will hold
* any action ever dispatched to the store
*/
const actions = [];
/**
* Create an initial representation of the subscribers
*/
const subscribers = [];
/**
* Create the original dispatch function
* for the store
*/
const dispatch = (action) => {
console.log('* dispatching with pure dispatch *', action.type);
/**
* Push the dispatched action to our list
* holding all actions ever dispatched
*
* NOTE: We're not actually keeping any state
* in the store, but rather the action log,
* which serves as the main source of truth
* which the current state can be derived from.
*/
actions.push(action);
/**
* Run all subscribers
*/
subscribers.forEach(subscriber => {
subscriber();
});
};
/**
* Create a function that returns the current state of the store
* by recreating it from scratch
*
* NOTE: Yes this is slow and will increase in slowness,
* but it is just for educational purposes where we want
* to show that the state can be derived from reducing all
* actions that have ever been dispatched into the current state
*
* This will also give us the possibility to do things such as time travel
*/
const getState = () => {
return actions.reduce(reducer, undefined);
};
/**
* Create a function that enables you to add subscribers (function)
* to state changes
*/
const subscribe = (fn) => {
subscribers.push(fn);
};
/**
* Return the store and containing methods
*/
return storeEnhancer({
getState,
subscribe,
dispatch,
});
};
/**
* Exports
*/
module.exports = {
createStore,
applyMiddleware,
};
function isPromise(obj) {
return (
isObject(obj) &&
'then' in obj &&
typeof isFunction(obj.then)
);
}
function isFunction(obj) {
return typeof obj === 'function';
}
function isObject(obj) {
return typeof obj === 'object' && obj instanceof Object;
}
function isSpecObject(obj) {
return (
isObject(obj) &&
typeof obj.type === 'string' &&
!!obj.type
);
}
/**
* Create an effect spec for forking
* a generator function / process
*/
function fork(proc) {
return {
type: '@@fork',
proc,
};
}
/**
* Create an effect spec for spawning
* a generator function / process
*/
function spawn(proc) {
return {
type: '@@spawn',
proc,
};
}
/**
* Create an effect spec for selecting
* something from the store state
* using a selector function
*/
function select(selector) {
return {
type: '@@select',
selector,
};
}
/**
* Create an effect spec for calling
* a function that returns a promise
* or a value and might have side effects
*/
function call(func, ...args) {
return {
type: '@@call',
func,
args,
};
}
/**
* Create an effect spec for calling
* a node callback / continuation passing
* style function
*/
function cps(func, ...args) {
return {
type: '@@cps',
func,
args,
};
}
/**
* Create an effect spec for putting
* an action into the chain for processing
*/
function putAction(action) {
return {
type: '@@putAction',
action,
};
}
/**
* Create an effect spec for taking
* an action from the chain
*/
function takeAction(actionType) {
return {
type: '@@takeAction',
actionType,
};
}
/**
* Create an effect spec for putting
* an action into the chain for processing
*/
function putStream(stream, data) {
return {
type: '@@putStream',
stream,
data,
};
}
/**
* Create an effect spec for taking
* an action from the chain
*/
function takeStream(stream) {
return {
type: '@@takeStream',
stream,
};
}
/**
* Create an effect spec for putting
* an action into the chain for processing
*/
function putEvent(emitter, event, data) {
return {
type: '@@putEvent',
emitter,
event,
data,
};
}
/**
* Create an effect spec for taking
* an action from the chain
*/
function takeEvent(emitter, event) {
return {
type: '@@takeEvent',
emitter,
event,
};
}
/**
* Create an effect spec for racing effects
*/
function race(effects) {
return {
type: '@@race',
effects,
};
}
/**
* Create an effect spec for parallel effects
*/
function parallel(effects) {
return {
type: '@@parallel',
effects,
};
}
/**
* Create promise delay function
*/
function delay(time, val) {
return {
type: '@@delay',
time,
val,
};
}
/**
* Handle forks of processes
*
* NOTE: Starts a new process and immediately
* continues (non-blocking)
*/
function handleFork(obj, cb) {
const { proc, args, runtime, context } = obj;
runtime(proc, context, undefined, ...args);
cb();
}
/**
* Handle spawns of processes
*
* NOTE: Waits for the return value of the
* process before continuing (blocking)
*/
function handleSpawn(obj, cb) {
const { proc, args, runtime, context } = obj;
runtime(proc, context, cb, ...args);
}
/**
* Create promise delay function
*/
function handleDelay(obj, cb) {
const { val, time } = obj;
setTimeout(() => {
cb(null, val);
}, time);
}
/**
* Handle an effect spec of the race type
*/
function handleParallel(obj, cb) {
const { effects, handleEffects } = obj;
return Promise.all(effects.map(effect => {
return new Promise((resolve, reject) => {
handleEffects(effect, (err, result) => {
if (err) {
return reject(err);
} else {
return resolve(result);
}
})
});
}))
.then((result) => cb(null, result))
.catch((error) => cb(error));
}
/**
* Handle an effect spec of the race type
*/
function handleRace(obj, cb) {
const { effects, handleEffects } = obj;
return Promise.race(effects.map(effect => {
return new Promise((resolve, reject) => {
handleEffects(effect, (err, result) => {
if (err) {
return reject(err);
} else {
return resolve(result);
}
})
});
}))
.then((result) => cb(null, result))
.catch((error) => cb(error));
}
/**
* Handle an effect spec of the call type
* which handles both synchronous function
* calls and function calls that returns a promise
*/
function handleCall(obj, cb) {
const { func, args } = obj;
let result;
let error;
try {
result = func(...args);
} catch (e) {
error = e;
}
return (error ? Promise.reject(error) : Promise.resolve(result))
.then((res) => cb(null, res))
.catch((err) => cb(err));
}
/**
* Handle an effect spec of the call type
* which handles both synchronous function
* calls and function calls that returns a promise
*/
function handleCps(obj, cb) {
const { func, args } = obj;
return func(...args, cb);
}
/**
* The runtime responsible
* of running / stepping through
* our program, and to handle all yields
*
* Our very basic runtime just handles
* all yields by returning the yielded value
*/
function createRuntime(
middleware = [],
effectsHandler = function () {}
) {
/**
* Return a runtime / runner function
* that will automatically run all the
* middlewares before running the program
*/
return function runtime(
program = function* () {},
context = {},
finalHandler = function () {},
...args
) {
/**
* Create an instance of the effects handler where
* we inject the runtime recursively, to be able to
* yield forks and spawns of nested processes
*/
const yieldHandler = effectsHandler(runtime, context);
/**
* Create a collection holding both
* the middlewares and the program
*/
const programs = [...middleware, program];
/**
* Create a generator that runs the entire flow
* of middlewares and the program in sequence
*/
const runner = function* (...args) {
/**
* Create a variable to hold the current program / middleware
*/
let currentProgram;
/**
* Continue to run programs in sequence
* until we exhaust the collection
*/
while (currentProgram = programs.shift()) {
/**
* Run each program / middleware in sequence with all nested yields
*/
yield* currentProgram.call(context, ...args);
}
}
/**
* Create an iterator of the flow generator
*/
const iterator = runner.call(context, ...args);
/**
* Initialize the iterator
*/
let initialState = iterator.next();
/**
* Step through the entire program flow using the iterator,
* while we also handle all the yield statements in our program flow
* (here we just return the yielded value)
*/
function resolveYieldRecursively(state) {
yieldHandler(state.value, function (err, result) {
if (err) {
return iterator.throw(err);
}
/**
* Yield back to the generator / process
* with the resolved effect / value
*/
const nextState = iterator.next(result);
/**
* Continue stepping through the flow and resolving
* yield, or if done we'll call the final handler with
* the context and final result / return
*/
return !nextState.done ?
resolveYieldRecursively(nextState) :
finalHandler.call(context, null, nextState.value)
});
}
resolveYieldRecursively(initialState);
}
}
module.exports = {
isPromise,
isFunction,
isObject,
isSpecObject,
fork,
spawn,
delay,
call,
cps,
race,
parallel,
putAction,
takeAction,
putStream,
takeStream,
putEvent,
takeEvent,
handleFork,
handleSpawn,
handleDelay,
handleCall,
handleRace,
handleParallel,
createRuntime,
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment