Last active
October 25, 2016 21:41
-
-
Save eiriklv/a498112b3a73fc2b92975649924cf5c3 to your computer and use it in GitHub Desktop.
Handling effects declaratively with generators and custom runtimes (fun and learning inspired by redux-saga - but for broader use)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const { createStore, applyMiddleware } = require('./redux'); | |
const { addActionTakersToStore, addLoggingToStore } = require('./middleware'); | |
const { EventEmitter } = require('events'); | |
const { | |
isPromise, | |
isFunction, | |
isObject, | |
isSpecObject, | |
delay, | |
call, | |
cps, | |
race, | |
parallel, | |
putAction, | |
takeAction, | |
putStream, | |
takeStream, | |
putEvent, | |
takeEvent, | |
handleFork, | |
handleSpawn, | |
handleDelay, | |
handleCall, | |
handleCps, | |
handleRace, | |
handleParallel, | |
createRuntime, | |
} = require('./utils'); | |
/** | |
* Middleware to add logging of the context | |
* (useless middleware that just logs out the context) | |
*/ | |
function* logMiddleware() { | |
console.log(this); | |
} | |
/** | |
* A process we want to run | |
* that communicates with another | |
* process by putting actions into | |
* the event loop and listening for actions | |
*/ | |
function* processOne() { | |
while (true) { | |
yield takeAction('PING'); | |
yield delay(2000); | |
yield putAction({ type: 'PONG' }); | |
} | |
} | |
/** | |
* A process we want to run | |
* that communicates with another | |
* process by putting actions into | |
* the event loop and listening for actions | |
*/ | |
function* processTwo() { | |
while (true) { | |
yield putAction({ type: 'PING' }); | |
yield takeAction('PONG'); | |
yield delay(2000); | |
} | |
} | |
/** | |
* A process that listens for | |
* events on a stream and outputs | |
* events to another stream | |
*/ | |
function* streamProcess() { | |
while (true) { | |
const data = yield takeStream(process.stdin); | |
yield putStream(process.stdout, `message received: ${data}`); | |
} | |
} | |
/** | |
* A process that communicates with | |
* another process over a socket / emitter | |
* via events | |
*/ | |
function* socketProcessOne({ socket }) { | |
while (true) { | |
yield delay(2000); | |
yield putEvent(socket, 'my_event', 'ping!'); | |
const data = yield takeEvent(socket, 'my_event'); | |
yield putStream(process.stdout, `(1) event received: ${data}\n`); | |
} | |
} | |
/** | |
* A process that communicates with | |
* another process over a socket / emitter | |
* via events | |
*/ | |
function* socketProcessTwo({ socket }) { | |
while (true) { | |
const data = yield takeEvent(socket, 'my_event'); | |
yield putStream(process.stdout, `(2) event received: ${data}\n`); | |
yield delay(2000); | |
yield putEvent(socket, 'my_event', 'pong!'); | |
} | |
} | |
/** | |
* A process that waits for stdin | |
* and outputs the data to stdout | |
*/ | |
function* stdEchoProcess() { | |
while (true) { | |
const data = yield takeStream(process.stdin); | |
yield putStream(process.stdout, `${data}`); | |
} | |
} | |
/** | |
* A process that races two async calls | |
* and alternates who "wins" every turn | |
*/ | |
function* raceProcess() { | |
let delayTable = [200, 500, 1000, 1500]; | |
while (true) { | |
/** | |
* Race two async calls | |
*/ | |
const data = yield race([ | |
call((val) => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve(10); | |
}, delayTable[0]); | |
}) | |
}), | |
call((val) => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve(20); | |
}, delayTable[1]); | |
}) | |
}), | |
race([ | |
call((val) => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve(30); | |
}, delayTable[2]); | |
}) | |
}), | |
call((val) => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve(40); | |
}, delayTable[3]); | |
}) | |
}), | |
]) | |
]); | |
/** | |
* Cycle the delay table | |
*/ | |
const last = delayTable.pop(); | |
delayTable.unshift(last); | |
yield call(console.log.bind(console), `${data}`); | |
} | |
} | |
/** | |
* A sub-process that writes a string to | |
* stdout one character at the time with an interval | |
*/ | |
function* slowPrint(str, interval) { | |
const chars = str.split(''); | |
let char; | |
while (char = chars.shift()) { | |
yield putStream(process.stdout, char); | |
yield delay(interval); | |
} | |
} | |
/** | |
* A process that waits for stdin | |
* and outputs the data to stdout | |
*/ | |
function* slowEchoProcess() { | |
while (true) { | |
const data = yield takeStream(process.stdin); | |
yield* slowPrint(data.toString(), 50); | |
} | |
} | |
/** | |
* A process that waits for stdin | |
* and outputs the data to stdout | |
*/ | |
function* slowEchoForkProcess() { | |
yield fork(slowEchoProcess); | |
yield fork(slowEchoProcess); | |
} | |
/** | |
* A process that runs two races in parallel | |
* and alternates who "wins" every turn | |
*/ | |
function* parallelProcess() { | |
let delayTable = [200, 500, 1000, 1500]; | |
while (true) { | |
/** | |
* Perform two async races in parallel | |
*/ | |
const data = yield parallel([ | |
race([ | |
call((val) => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve(10); | |
}, delayTable[0]); | |
}) | |
}), | |
call((val) => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve(20); | |
}, delayTable[1]); | |
}) | |
}), | |
]), | |
race([ | |
call((val) => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve(30); | |
}, delayTable[2]); | |
}) | |
}), | |
call((val) => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve(40); | |
}, delayTable[3]); | |
}) | |
}), | |
]) | |
]); | |
/** | |
* Cycle the delay table | |
*/ | |
const last = delayTable.pop(); | |
delayTable.unshift(last); | |
yield call(console.log.bind(console), `${data}`); | |
} | |
} | |
/** | |
* Create a handler that will handle | |
* the built up context of each program that is run | |
*/ | |
function finalHandler(err, value) { | |
console.log(this); | |
} | |
/** | |
* Create a handler than will handle the | |
* resolving of yields of effects | |
*/ | |
function createEffectsHandler({ | |
handleFork, | |
handleSpawn, | |
handleSelect, | |
handleDelay, | |
handleRace, | |
handleParallel, | |
handleCall, | |
handleCps, | |
handleTakeAction, | |
handlePutAction, | |
handleTakeStream, | |
handlePutStream, | |
handleTakeEvent, | |
handlePutEvent, | |
}) { | |
/** | |
* Return a function that where the runtime | |
* can inject a reference to itself and the | |
* context to be able to call itself recursively | |
*/ | |
return function (runtime, context) { | |
/** | |
* Return the actual effects handler | |
* that will be used to resolve effects | |
*/ | |
return function handleEffects( | |
value = {}, | |
cb = () => { console.warn('No callback passed for resolving effect') } | |
) { | |
/** | |
* Make sure we only handle valid effect descriptions | |
*/ | |
if (!isSpecObject(value)) { | |
return cb(new Error('You can only yield effects as object descriptions')); | |
} | |
/** | |
* Choose the correct effects handler | |
*/ | |
switch (value.type) { | |
case '@@spawn': | |
return handleSpawn({ | |
proc: value.proc, | |
args: value.args, | |
runtime, | |
context, | |
}, cb); | |
case '@@fork': | |
return handleFork({ | |
proc: value.proc, | |
args: value.args, | |
runtime, | |
context, | |
}, cb); | |
case '@@select': | |
return handleSelect({ | |
selector: value.selector, | |
}, cb); | |
case '@@delay': | |
return handleDelay({ | |
time: value.time, | |
val: value.val | |
}, cb); | |
case '@@parallel': | |
return handleParallel({ | |
handleEffects, | |
effects: value.effects, | |
}, cb); | |
case '@@race': | |
return handleRace({ | |
handleEffects, | |
effects: value.effects, | |
}, cb); | |
case '@@call': | |
return handleCall({ | |
func: value.func, | |
args: value.args, | |
}, cb); | |
case '@@cps': | |
return handleCps({ | |
func: value.func, | |
args: value.args, | |
}, cb); | |
case '@@putStream': | |
return handlePutStream({ | |
stream: value.stream, | |
data: value.data, | |
}, cb); | |
case '@@takeStream': | |
return handleTakeStream({ | |
stream: value.stream | |
}, cb); | |
case '@@putEvent': | |
return handlePutEvent({ | |
emitter: value.emitter, | |
event: value.event, | |
data: value.data, | |
}, cb); | |
case '@@takeEvent': | |
return handleTakeEvent({ | |
emitter: value.emitter, | |
event: value.event, | |
}, cb); | |
case '@@putAction': | |
return handlePutAction({ | |
action: value.action | |
}, cb); | |
case '@@takeAction': | |
return handleTakeAction({ | |
actionType: value.actionType | |
}, cb); | |
default: | |
return cb(new Error(`Unrecognized effect type: ${value.type}`)); | |
} | |
} | |
} | |
} | |
/** | |
* Create a state reducer function | |
*/ | |
function reducer(state = {}, action) { | |
switch (action.type) { | |
case 'SOME_ACTION': | |
return state; | |
default: | |
return state; | |
} | |
} | |
/** | |
* Run the program using our runtime | |
*/ | |
function application () { | |
/** | |
* Create instance of takeActionsMiddleware | |
*/ | |
const takeActionsMiddleware = addActionTakersToStore({}); | |
/** | |
* Create instance of logger middleware | |
*/ | |
const loggerMiddleware = addLoggingToStore({}); | |
/** | |
* Application state handler | |
*/ | |
const store = createStore( | |
reducer, | |
applyMiddleware( | |
takeActionsMiddleware, | |
loggerMiddleware | |
) | |
); | |
/** | |
* Create subscriber for state changes | |
*/ | |
store.subscribe(() => { | |
console.log('state changed!', store.getState()); | |
}); | |
/** | |
* Effects handler | |
*/ | |
const effectsHandler = createEffectsHandler({ | |
handleFork, | |
handleSpawn, | |
handleDelay, | |
handleRace, | |
handleParallel, | |
handleCall, | |
handleCps, | |
handleSelect: (selector, cb) => { | |
cb(null, selector(store.getState())); | |
}, | |
handlePutAction: ({ action }, cb) => { | |
cb(null, store.dispatch(action)); | |
}, | |
handleTakeAction: ({ actionType }, cb) => { | |
takeActionsMiddleware.take(actionType) | |
.then((action) => cb(null, action)) | |
.catch((error) => cb(error)) | |
}, | |
handlePutStream: ({ stream, data }, cb) => { | |
stream.write(data); | |
cb(null); | |
}, | |
handleTakeStream: ({ stream }, cb) => { | |
const listener = (data) => { | |
stream.removeListener('data', listener); | |
cb(null, data); | |
} | |
stream.on('data', listener); | |
}, | |
handlePutEvent: ({ emitter, event, data }, cb) => { | |
emitter.emit(event, data); | |
cb(null); | |
}, | |
handleTakeEvent: ({ emitter, event }, cb) => { | |
const listener = (data) => { | |
emitter.removeListener(event, listener); | |
cb(null, data); | |
} | |
emitter.on(event, listener); | |
}, | |
}); | |
/** | |
* TODO: | |
* | |
* Create channels / emitters for | |
* - input (key, stdin) | |
* - events | |
* - sockets | |
* - streams | |
* - what else..? | |
* | |
* CSP (Communicating Sequencial Processes) ? | |
* | |
* NOTE: | |
* - eventEmitters/sockets do not have buffering and are asynchronous | |
* - csp channels have buffering and are "synchronous" (put will wait until message is taken) | |
* | |
*/ | |
const socket = new EventEmitter(); | |
/** | |
* Create a runtime | |
*/ | |
const runtime = createRuntime([logMiddleware], effectsHandler); | |
/** | |
* Gather all the processes | |
*/ | |
const processes = [ | |
processOne, | |
processTwo, | |
streamProcess, | |
socketProcessOne, | |
socketProcessTwo, | |
stdEchoProcess, | |
raceProcess, | |
parallelProcess, | |
slowEchoProcess, | |
]; | |
/** | |
* Arguments for each process, | |
* dependencies | |
* - channels | |
* - emitters | |
* - streams | |
* - whatever is needed as injected deps | |
*/ | |
const args = { | |
socket, | |
}; | |
/** | |
* Create a global context | |
*/ | |
const context = {}; | |
/** | |
* Run all the processes | |
*/ | |
processes.forEach((proc) => { | |
runtime(proc, context, finalHandler, args); | |
}); | |
} | |
/** | |
* Start the application | |
*/ | |
application(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Create a middleware for logging | |
*/ | |
function addLoggingToStore(options) { | |
return (store) => { | |
return (next) => { | |
return (action) => { | |
console.log(`dispatching: ${action.type}`); | |
return next(action); | |
}; | |
}; | |
}; | |
} | |
/** | |
* Create a middleware for action creators | |
*/ | |
function addActionCreatorsToStore(extraArguments) { | |
return (store) => { | |
return (next) => { | |
return (action) => { | |
if (typeof action === 'function') { | |
return action(store.dispatch, store.getState, extraArguments); | |
} else { | |
return next(action); | |
} | |
}; | |
}; | |
}; | |
} | |
/** | |
* Create a middleware for adding reactors | |
*/ | |
function addActionTakersToStore(options) { | |
/** | |
* Create an array to hold listeners/subscribers/waiters for action dispatches | |
*/ | |
const dispatchTakers = []; | |
/** | |
* Create a store enhancer that enables you | |
* to add custom listeners for actions as | |
* promises that will be resolved on a dispatched | |
* action of the type taken | |
*/ | |
const storeEnhancer = (store) => { | |
return (next) => { | |
/** | |
* Patch store.dispatch to add the epic functionality | |
*/ | |
return (action) => { | |
/** | |
* Match any dispatchTakers with the | |
* same action type, or dispatchTakers with | |
* no action type specified (waits for any action) | |
*/ | |
const fiteredDispatchTakers = dispatchTakers | |
.filter(({ actionType }) => ( | |
!actionType || | |
(action.type && action.type === actionType) | |
)); | |
/** | |
* Resolve any applicable epic dispatchTakers | |
*/ | |
fiteredDispatchTakers | |
.slice() | |
.forEach(taker => { | |
/** | |
* Remove the taker from the list of takers | |
*/ | |
dispatchTakers.splice(dispatchTakers.indexOf(taker), 1); | |
/** | |
* Resolve the taker in the next tick | |
*/ | |
setImmediate(() => { | |
taker.resolve(action); | |
}); | |
}); | |
/** | |
* Wait for the next tick to resolve the promise | |
* to enable the process to move on to the next | |
* event | |
*/ | |
return new Promise((resolve, reject) => { | |
setImmediate(() => { | |
resolve(next(action)); | |
}); | |
}); | |
}; | |
}; | |
}; | |
/** | |
* Create a function that enables reactors to wait | |
* for actions to be dispatched (single occurence) | |
* | |
* NOTE: We're monkey patching the middleware | |
*/ | |
storeEnhancer.take = (actionType) => { | |
/** | |
* Return a promise that will be resolved when | |
* a specified action is dispatched | |
*/ | |
return new Promise((resolve) => { | |
/** | |
* Push a representation of the awaited action | |
* and the resolver function into the dispatch takers | |
*/ | |
dispatchTakers.push({ actionType, resolve }); | |
}); | |
}; | |
/** | |
* Return store enhancer | |
*/ | |
return storeEnhancer; | |
} | |
/** | |
* Exports | |
*/ | |
module.exports = { | |
addLoggingToStore, | |
addActionCreatorsToStore, | |
addActionTakersToStore, | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Create a function that enables you to extend | |
* the redux store with middleware | |
*/ | |
function applyMiddleware(...middlewares) { | |
return (store) => { | |
return middlewares | |
.slice() | |
.reverse() | |
.reduce((store, middleware) => { | |
store.dispatch = middleware(store)(store.dispatch); | |
return store; | |
}, store); | |
} | |
}; | |
/** | |
* Create a super simple implementation of Redux | |
*/ | |
function createStore( | |
reducer = (state, action) => state, | |
storeEnhancer = (store) => store | |
) { | |
/** | |
* Create an list that will hold | |
* any action ever dispatched to the store | |
*/ | |
const actions = []; | |
/** | |
* Create an initial representation of the subscribers | |
*/ | |
const subscribers = []; | |
/** | |
* Create the original dispatch function | |
* for the store | |
*/ | |
const dispatch = (action) => { | |
console.log('* dispatching with pure dispatch *', action.type); | |
/** | |
* Push the dispatched action to our list | |
* holding all actions ever dispatched | |
* | |
* NOTE: We're not actually keeping any state | |
* in the store, but rather the action log, | |
* which serves as the main source of truth | |
* which the current state can be derived from. | |
*/ | |
actions.push(action); | |
/** | |
* Run all subscribers | |
*/ | |
subscribers.forEach(subscriber => { | |
subscriber(); | |
}); | |
}; | |
/** | |
* Create a function that returns the current state of the store | |
* by recreating it from scratch | |
* | |
* NOTE: Yes this is slow and will increase in slowness, | |
* but it is just for educational purposes where we want | |
* to show that the state can be derived from reducing all | |
* actions that have ever been dispatched into the current state | |
* | |
* This will also give us the possibility to do things such as time travel | |
*/ | |
const getState = () => { | |
return actions.reduce(reducer, undefined); | |
}; | |
/** | |
* Create a function that enables you to add subscribers (function) | |
* to state changes | |
*/ | |
const subscribe = (fn) => { | |
subscribers.push(fn); | |
}; | |
/** | |
* Return the store and containing methods | |
*/ | |
return storeEnhancer({ | |
getState, | |
subscribe, | |
dispatch, | |
}); | |
}; | |
/** | |
* Exports | |
*/ | |
module.exports = { | |
createStore, | |
applyMiddleware, | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function isPromise(obj) { | |
return ( | |
isObject(obj) && | |
'then' in obj && | |
typeof isFunction(obj.then) | |
); | |
} | |
function isFunction(obj) { | |
return typeof obj === 'function'; | |
} | |
function isObject(obj) { | |
return typeof obj === 'object' && obj instanceof Object; | |
} | |
function isSpecObject(obj) { | |
return ( | |
isObject(obj) && | |
typeof obj.type === 'string' && | |
!!obj.type | |
); | |
} | |
/** | |
* Create an effect spec for forking | |
* a generator function / process | |
*/ | |
function fork(proc) { | |
return { | |
type: '@@fork', | |
proc, | |
}; | |
} | |
/** | |
* Create an effect spec for spawning | |
* a generator function / process | |
*/ | |
function spawn(proc) { | |
return { | |
type: '@@spawn', | |
proc, | |
}; | |
} | |
/** | |
* Create an effect spec for selecting | |
* something from the store state | |
* using a selector function | |
*/ | |
function select(selector) { | |
return { | |
type: '@@select', | |
selector, | |
}; | |
} | |
/** | |
* Create an effect spec for calling | |
* a function that returns a promise | |
* or a value and might have side effects | |
*/ | |
function call(func, ...args) { | |
return { | |
type: '@@call', | |
func, | |
args, | |
}; | |
} | |
/** | |
* Create an effect spec for calling | |
* a node callback / continuation passing | |
* style function | |
*/ | |
function cps(func, ...args) { | |
return { | |
type: '@@cps', | |
func, | |
args, | |
}; | |
} | |
/** | |
* Create an effect spec for putting | |
* an action into the chain for processing | |
*/ | |
function putAction(action) { | |
return { | |
type: '@@putAction', | |
action, | |
}; | |
} | |
/** | |
* Create an effect spec for taking | |
* an action from the chain | |
*/ | |
function takeAction(actionType) { | |
return { | |
type: '@@takeAction', | |
actionType, | |
}; | |
} | |
/** | |
* Create an effect spec for putting | |
* an action into the chain for processing | |
*/ | |
function putStream(stream, data) { | |
return { | |
type: '@@putStream', | |
stream, | |
data, | |
}; | |
} | |
/** | |
* Create an effect spec for taking | |
* an action from the chain | |
*/ | |
function takeStream(stream) { | |
return { | |
type: '@@takeStream', | |
stream, | |
}; | |
} | |
/** | |
* Create an effect spec for putting | |
* an action into the chain for processing | |
*/ | |
function putEvent(emitter, event, data) { | |
return { | |
type: '@@putEvent', | |
emitter, | |
event, | |
data, | |
}; | |
} | |
/** | |
* Create an effect spec for taking | |
* an action from the chain | |
*/ | |
function takeEvent(emitter, event) { | |
return { | |
type: '@@takeEvent', | |
emitter, | |
event, | |
}; | |
} | |
/** | |
* Create an effect spec for racing effects | |
*/ | |
function race(effects) { | |
return { | |
type: '@@race', | |
effects, | |
}; | |
} | |
/** | |
* Create an effect spec for parallel effects | |
*/ | |
function parallel(effects) { | |
return { | |
type: '@@parallel', | |
effects, | |
}; | |
} | |
/** | |
* Create promise delay function | |
*/ | |
function delay(time, val) { | |
return { | |
type: '@@delay', | |
time, | |
val, | |
}; | |
} | |
/** | |
* Handle forks of processes | |
* | |
* NOTE: Starts a new process and immediately | |
* continues (non-blocking) | |
*/ | |
function handleFork(obj, cb) { | |
const { proc, args, runtime, context } = obj; | |
runtime(proc, context, undefined, ...args); | |
cb(); | |
} | |
/** | |
* Handle spawns of processes | |
* | |
* NOTE: Waits for the return value of the | |
* process before continuing (blocking) | |
*/ | |
function handleSpawn(obj, cb) { | |
const { proc, args, runtime, context } = obj; | |
runtime(proc, context, cb, ...args); | |
} | |
/** | |
* Create promise delay function | |
*/ | |
function handleDelay(obj, cb) { | |
const { val, time } = obj; | |
setTimeout(() => { | |
cb(null, val); | |
}, time); | |
} | |
/** | |
* Handle an effect spec of the race type | |
*/ | |
function handleParallel(obj, cb) { | |
const { effects, handleEffects } = obj; | |
return Promise.all(effects.map(effect => { | |
return new Promise((resolve, reject) => { | |
handleEffects(effect, (err, result) => { | |
if (err) { | |
return reject(err); | |
} else { | |
return resolve(result); | |
} | |
}) | |
}); | |
})) | |
.then((result) => cb(null, result)) | |
.catch((error) => cb(error)); | |
} | |
/** | |
* Handle an effect spec of the race type | |
*/ | |
function handleRace(obj, cb) { | |
const { effects, handleEffects } = obj; | |
return Promise.race(effects.map(effect => { | |
return new Promise((resolve, reject) => { | |
handleEffects(effect, (err, result) => { | |
if (err) { | |
return reject(err); | |
} else { | |
return resolve(result); | |
} | |
}) | |
}); | |
})) | |
.then((result) => cb(null, result)) | |
.catch((error) => cb(error)); | |
} | |
/** | |
* Handle an effect spec of the call type | |
* which handles both synchronous function | |
* calls and function calls that returns a promise | |
*/ | |
function handleCall(obj, cb) { | |
const { func, args } = obj; | |
let result; | |
let error; | |
try { | |
result = func(...args); | |
} catch (e) { | |
error = e; | |
} | |
return (error ? Promise.reject(error) : Promise.resolve(result)) | |
.then((res) => cb(null, res)) | |
.catch((err) => cb(err)); | |
} | |
/** | |
* Handle an effect spec of the call type | |
* which handles both synchronous function | |
* calls and function calls that returns a promise | |
*/ | |
function handleCps(obj, cb) { | |
const { func, args } = obj; | |
return func(...args, cb); | |
} | |
/** | |
* The runtime responsible | |
* of running / stepping through | |
* our program, and to handle all yields | |
* | |
* Our very basic runtime just handles | |
* all yields by returning the yielded value | |
*/ | |
function createRuntime( | |
middleware = [], | |
effectsHandler = function () {} | |
) { | |
/** | |
* Return a runtime / runner function | |
* that will automatically run all the | |
* middlewares before running the program | |
*/ | |
return function runtime( | |
program = function* () {}, | |
context = {}, | |
finalHandler = function () {}, | |
...args | |
) { | |
/** | |
* Create an instance of the effects handler where | |
* we inject the runtime recursively, to be able to | |
* yield forks and spawns of nested processes | |
*/ | |
const yieldHandler = effectsHandler(runtime, context); | |
/** | |
* Create a collection holding both | |
* the middlewares and the program | |
*/ | |
const programs = [...middleware, program]; | |
/** | |
* Create a generator that runs the entire flow | |
* of middlewares and the program in sequence | |
*/ | |
const runner = function* (...args) { | |
/** | |
* Create a variable to hold the current program / middleware | |
*/ | |
let currentProgram; | |
/** | |
* Continue to run programs in sequence | |
* until we exhaust the collection | |
*/ | |
while (currentProgram = programs.shift()) { | |
/** | |
* Run each program / middleware in sequence with all nested yields | |
*/ | |
yield* currentProgram.call(context, ...args); | |
} | |
} | |
/** | |
* Create an iterator of the flow generator | |
*/ | |
const iterator = runner.call(context, ...args); | |
/** | |
* Initialize the iterator | |
*/ | |
let initialState = iterator.next(); | |
/** | |
* Step through the entire program flow using the iterator, | |
* while we also handle all the yield statements in our program flow | |
* (here we just return the yielded value) | |
*/ | |
function resolveYieldRecursively(state) { | |
yieldHandler(state.value, function (err, result) { | |
if (err) { | |
return iterator.throw(err); | |
} | |
/** | |
* Yield back to the generator / process | |
* with the resolved effect / value | |
*/ | |
const nextState = iterator.next(result); | |
/** | |
* Continue stepping through the flow and resolving | |
* yield, or if done we'll call the final handler with | |
* the context and final result / return | |
*/ | |
return !nextState.done ? | |
resolveYieldRecursively(nextState) : | |
finalHandler.call(context, null, nextState.value) | |
}); | |
} | |
resolveYieldRecursively(initialState); | |
} | |
} | |
module.exports = { | |
isPromise, | |
isFunction, | |
isObject, | |
isSpecObject, | |
fork, | |
spawn, | |
delay, | |
call, | |
cps, | |
race, | |
parallel, | |
putAction, | |
takeAction, | |
putStream, | |
takeStream, | |
putEvent, | |
takeEvent, | |
handleFork, | |
handleSpawn, | |
handleDelay, | |
handleCall, | |
handleRace, | |
handleParallel, | |
createRuntime, | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment