Last active
June 2, 2016 17:47
-
-
Save crdrost/66876fc880cb17c2a6763968aef1040d to your computer and use it in GitHub Desktop.
In ES6, async code can already be written synchronously with generators and yield -- no need to wait for ES7's async and await.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
/* This is mostly just an example workflow to illustrate how a combination of ES6 generators and promises | |
* fully allow you to write your asynchronous workflow in a single synchronous-looking block. | |
* | |
* Unlike most such workflows which give you a really short snippet that can hide all of the thornier | |
* problems, this one is approximated from some of my work at IntegriShield: this is all of the real | |
* stuff that an actual daemon might have to do: manage a queue or three of concurrent workloads; load | |
* each item of each queue in order; make a bunch of concurrent requests on each item; preprocess each | |
* of these requests; save preprocessed results to a cache in the database; if all of them have completed | |
* successfully, report success, or else report failure. | |
* | |
* The primitives you need, `go` and `promised` and `text_stream`, are attached in the next file and are | |
* so simple that it's probably just as easy to embed them in a target application rather than to make a | |
* canonical NPM repository that you'd normally import. | |
* | |
* Anyway, the point is that you can read this as a wholly synchronous process even though every `yield` | |
* offers up the single JavaScript process to deal with some other logic while the given coroutine is | |
* waiting on data. | |
* | |
* Some things to think about: how can we create a go-style channel? And, how can we create a lazy iterator | |
* which does I/O in the background, so that we can just write `for (let row of db_query) {` and have it not | |
* fetch all the rows at once? | |
*/ | |
// probably more of these should be in some lib.js file, but whatever... | |
const {go, promised, sleep, text_stream} = require('./syncify.js'), | |
// wrap common methods in a promise API: | |
mysql = require('mysql'), | |
make_db = config => promised(mysql.createConnection, config), | |
fs = require('fs'), | |
text_file = path => promised(fs.readFile, path, {encoding: 'utf8'}), | |
http = require('http'), | |
http_get = url => promised(http.get, url).then(text_stream); | |
// probably we need some other stuff like the actual processing logic and globally unique IDs and local logging: | |
processor = require('./processor.js'), | |
coroutine_id = (() => { | |
let i = 1, host = require('os').hostname(), pid = process.pid; | |
return () => `${host}/${pid}(${i++})` | |
})(), | |
pad2 = digit => ('0' + digit).slice(-2), | |
localeTime = d => `${d.getFullYear()}-${pad2(d.getMonth() + 1)}-${pad2(d.getDate())} ${pad2(d.getHours())}:${pad2(d.getMinutes())}:${pad2(d.getSeconds())}`, | |
log = x => `[${localeTime(new Date())}] ${x}`; | |
// note that this whole function is essentially its own coroutine; when some promise is off doing | |
// async stuff another go() can be using this same process... | |
function* coroutine(config_file, mode) { | |
// yield expressions can sit inside a JSON.parse(), and can be chained into each other... | |
const config = JSON.parse(yield text_file(config_file)), | |
db = yield make_db(config.database), | |
query = (statement, ...args) => promised(db, db.query, statement, args), | |
co_id = coroutine_id(); | |
// so, Node.js will hang as long as that db connection is open, which can happen in async code | |
// if any exception breaks any callback function anywhere. Let's just catch this and call db.end(): | |
try { | |
// Maybe we want to atomically reserve some rows in the database using our coroutine ID, then | |
// fetch the ones we reserved to process. No problem: | |
yield query(` | |
UPDATE work_queue | |
SET status = 'running', processor_id = ? | |
WHERE status = 'queued' AND mode = '${mode}' | |
LIMIT 100 | |
`, $co_id); | |
// the only "not quite there yet" thing from doing things this obvious way is streaming rows: | |
// instead, we just grab the IDs to keep this part lighter on memory. | |
let work_to_do = yield query(`SELECT item_id FROM work_queue WHERE processor_id = ''`); | |
// We've seen try-catch and method calls, how about for-loops? Maybe this coroutine only handles | |
// one work_queue item at a time. Let's call it our current `project`... | |
for (let p = 0; p < work_to_do.length; p++) { | |
log(`${co_id}: Now beginning project ${p+1}/${work_to_do.length}...`); | |
let project = yield query(`SELECT * FROM work_queue WHERE item_id = ${work_to_do[p].item_id}`); | |
// usually concurrent processes start to wait on the same things, random waits can help | |
// put entropy back into their access patterns. Let's just define a sleep() promise in | |
// syncify.js and use it here: | |
yield sleep(Math.floor(500 * Math.random())); | |
// OK, then we probably want to fetch a bunch of pieces of data for each work_item, process each | |
// one, then combine them together. Something like that. But maybe on failures we want to record | |
// partial results, too, for later analysis. Let's nest some try-catch blocks: | |
let resultIDs = []; | |
try { | |
let data = yield query(`SELECT * FROM work_data WHERE item_id = ${project.item_id} ORDER BY processor_order`); | |
// OK, now we've got a bunch of rows of data, maybe those also point at other async operations like files, | |
// or they otherwise need to be preprocessed in a parallelizable step. Let's just do that with another go(): | |
let preprocessed_results = Promise.all(data.map(row => { | |
let preprocess_this_row = go(function* () { | |
// check if we've already done this work... | |
let already_done = yield query(` | |
SELECT partial_id, result | |
FROM work_results_partial | |
WHERE item_id = ${project.item_id} AND data_id = ${row.data_id}`); | |
// we haven't done an if-statement yet, but that's no problem either: | |
if (already_done.length > 0) { | |
// go() defined below properly threads a function*'s return as its Promise-resolution... | |
resultIDs.push(already_done[0].partial_id); | |
return JSON.parse(already_done[0].result); | |
} | |
// otherwise maybe we need to load some file and process it and cache it in work_results_partial... | |
yield sleep(Math.floor(500 * Math.random())); | |
// do these in parallel... | |
let [file, request] = yield Promise.all([ | |
text_file(row.resource_path), | |
http_get(processor.get_url(config.http_api, row.api_request)) | |
]).then(x => x.map(JSON.parse)); | |
// hand them to the preprocessor | |
let result = processor.preprocess(project, config.preprocess, file, api); | |
let {itemId} = yield query(` | |
INSERT INTO work_results_partial SET | |
item_id = ${project.item_id}, | |
data_id = ${row.data_id}, | |
api_request_time = NOW(), | |
result = ? | |
`, JSON.stringify(result)); | |
resultIDs.push(itemId); | |
log(`${co_id}: New partial result ${itemId}...`); | |
return result; | |
}); | |
// once go() is run we want to make all of these promises succeed, so that any error still allows | |
// the maximum of preprocessing to occur. | |
return preprocess_this_row.then( | |
value => ({success: true, value: value}), | |
error => ({success: false, value: error}) | |
); | |
})); | |
let total_errors = preprocessed_results.filter(x => !x.success); | |
if (total_errors.length > 0) { | |
let estr = total_errors.length == 1 ? 'an error' : total_errors.length + ' errors' | |
throw new Error(`Caught ${estr} during preprocessing:` + total_errors.map(x => x.stack).join('\n-----\n')); | |
} | |
// all of those succeeded? Hooray, we can process the result: | |
let final_result = processor.process(project, config.processor, preprocessed_results); | |
yield query(` | |
UPDATE work_queue SET | |
status = 'done', | |
pid = NULL, | |
finish_time = NOW(), | |
result = ? | |
partial_ids = '${resultIDs.join(',')}'`, JSON.stringify(final_result)); | |
} catch (err) { | |
// something failed, either some of the preprocessing steps or the processor or I don't know what... | |
yield query(` | |
UPDATE work_queue SET | |
status = 'failed', | |
pid = NULL, | |
partial_ids = '${resultIDs.join(',')}', | |
daemon_message = ? `, err); | |
} | |
} // <- for each project | |
} finally { | |
db.end(() => process.exit(0)) | |
} | |
} | |
// launch some parallel processes with this process ID. Make concurrent 'plain' requests to 2 different HTTP servers, | |
// plus handle any that need extra processing. | |
go(coroutine, 'config.plain.api1.json', 'plain'); | |
go(coroutine, 'config.plain.api2.json', 'plain'); | |
go(coroutine, 'config.extra-processing.json', 'advanced'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
/* This is a trampoline for a generator function. The idea is a little bit complicated. :) | |
* | |
* So, we have these Promises which represent asynchronous work which does a thing, and then we have | |
* these generator-functions which can be paused to yield something to their surrounding context, | |
* and that context can then send a message into the generator, which becomes the value of the yield | |
* expression. | |
* | |
* The idea of `go()` is that it takes a generator as an argument, and whenever that generator | |
* yields a promise to it, it waits on the promise before restarting the generator with the result | |
* of that promise. We therefore get to restore a synchronous API to an asynchronous process, with | |
* the asynchronous calls looking like `var result = yield promise_returning_function();`. You drop | |
* down to the trampoline and it bounces you back up into the function when the promise resolves. | |
* | |
* The result of go() is a Promise that resolves whenever the generator is complete. | |
*/ | |
function go(gen, ...args) { | |
// technically the generator is the function* and the iterator is created when you apply it to | |
// its arguments; usually we expect no arguments in this use-case: | |
let iterator = gen(...args); | |
return new Promise(function (accept, reject) { | |
var sendError, sendValue, next; | |
next = next_action => { | |
try { | |
let result = next_action(); // deferred via a function so that we can catch exceptions. | |
if (result.done) { | |
accept(result.value); | |
} else { | |
let promise = result.value; | |
if (!promise || typeof promise.then !== 'function') { | |
promise = Promise.resolve(result.value); | |
} | |
// Here is where you go "Oh my gosh it's so nice dealing with smart people!" | |
// The inventors of ES6 were smarter than me and realized that occasionally you might | |
// want to throw an exception into a promise, to give it a chance to try/catch(){} it, | |
// before handling whatever didn't work out: | |
promise.then( | |
p => next(() => iterator.next(p) ), | |
e => next(() => iterator.throw(e))); | |
} | |
} catch (err) { | |
// this will handle any exceptions not caught by the generator above: | |
reject(err); | |
} | |
} | |
// kick the iterator off. | |
next(() => iterator.next()); | |
}); | |
}; | |
// Turn any Node.js style function into a Promise in 12 lines of code. | |
function promised(self, fn, ...args) { | |
if (typeof self === 'function') { // special shortcut if `this` is not important to the function | |
args = [fn, ...args]; | |
fn = self; | |
self = null; | |
} | |
return new Promise((accept, reject) => { | |
let cb = (err, ...vals) => err? reject(err) : accept(...vals), | |
full_args = [...args, cb]; | |
fn.apply(self, args); | |
}; | |
}; | |
function text_stream(stream) { | |
stream.setEncoding('utf8'); | |
let buff = ''; | |
stream.on('data', chunk => buff += chunk); | |
return new Promise((accept, reject) => { | |
stream.on('end', () => accept(buff)); | |
stream.on('error', e => reject(e)); | |
}); | |
} | |
exports.go = go; | |
exports.promised = promised; | |
exports.streamed = streamed; | |
exports.sleep = (time) => new Promise((acc, rej) => setTimeout(acc(), time)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment