Skip to content

Instantly share code, notes, and snippets.

@crdrost
Last active June 2, 2016 17:47
Show Gist options
  • Save crdrost/66876fc880cb17c2a6763968aef1040d to your computer and use it in GitHub Desktop.
Save crdrost/66876fc880cb17c2a6763968aef1040d to your computer and use it in GitHub Desktop.
In ES6, async code can already be written synchronously with generators and yield -- no need to wait for ES7's async and await.
"use strict";
/* This is mostly just an example workflow to illustrate how a combination of ES6 generators and promises
* fully allow you to write your asynchronous workflow in a single synchronous-looking block.
*
* Unlike most such workflows which give you a really short snippet that can hide all of the thornier
* problems, this one is approximated from some of my work at IntegriShield: this is all of the real
* stuff that an actual daemon might have to do: manage a queue or three of concurrent workloads; load
* each item of each queue in order; make a bunch of concurrent requests on each item; preprocess each
* of these requests; save preprocessed results to a cache in the database; if all of them have completed
* successfully, report success, or else report failure.
*
* The primitives you need, `go` and `promised` and `text_stream`, are attached in the next file and are
* so simple that it's probably just as easy to embed them in a target application rather than to make a
* canonical NPM repository that you'd normally import.
*
* Anyway, the point is that you can read this as a wholly synchronous process even though every `yield`
* offers up the single JavaScript process to deal with some other logic while the given coroutine is
* waiting on data.
*
* Some things to think about: how can we create a go-style channel? And, how can we create a lazy iterator
* which does I/O in the background, so that we can just write `for (let row of db_query) {` and have it not
* fetch all the rows at once?
*/
// probably more of these should be in some lib.js file, but whatever...
const {go, promised, sleep, text_stream} = require('./syncify.js'),
// wrap common methods in a promise API:
mysql = require('mysql'),
make_db = config => promised(mysql.createConnection, config),
fs = require('fs'),
text_file = path => promised(fs.readFile, path, {encoding: 'utf8'}),
http = require('http'),
http_get = url => promised(http.get, url).then(text_stream);
// probably we need some other stuff like the actual processing logic and globally unique IDs and local logging:
processor = require('./processor.js'),
coroutine_id = (() => {
let i = 1, host = require('os').hostname(), pid = process.pid;
return () => `${host}/${pid}(${i++})`
})(),
pad2 = digit => ('0' + digit).slice(-2),
localeTime = d => `${d.getFullYear()}-${pad2(d.getMonth() + 1)}-${pad2(d.getDate())} ${pad2(d.getHours())}:${pad2(d.getMinutes())}:${pad2(d.getSeconds())}`,
log = x => `[${localeTime(new Date())}] ${x}`;
// note that this whole function is essentially its own coroutine; when some promise is off doing
// async stuff another go() can be using this same process...
function* coroutine(config_file, mode) {
// yield expressions can sit inside a JSON.parse(), and can be chained into each other...
const config = JSON.parse(yield text_file(config_file)),
db = yield make_db(config.database),
query = (statement, ...args) => promised(db, db.query, statement, args),
co_id = coroutine_id();
// so, Node.js will hang as long as that db connection is open, which can happen in async code
// if any exception breaks any callback function anywhere. Let's just catch this and call db.end():
try {
// Maybe we want to atomically reserve some rows in the database using our coroutine ID, then
// fetch the ones we reserved to process. No problem:
yield query(`
UPDATE work_queue
SET status = 'running', processor_id = ?
WHERE status = 'queued' AND mode = '${mode}'
LIMIT 100
`, $co_id);
// the only "not quite there yet" thing from doing things this obvious way is streaming rows:
// instead, we just grab the IDs to keep this part lighter on memory.
let work_to_do = yield query(`SELECT item_id FROM work_queue WHERE processor_id = ''`);
// We've seen try-catch and method calls, how about for-loops? Maybe this coroutine only handles
// one work_queue item at a time. Let's call it our current `project`...
for (let p = 0; p < work_to_do.length; p++) {
log(`${co_id}: Now beginning project ${p+1}/${work_to_do.length}...`);
let project = yield query(`SELECT * FROM work_queue WHERE item_id = ${work_to_do[p].item_id}`);
// usually concurrent processes start to wait on the same things, random waits can help
// put entropy back into their access patterns. Let's just define a sleep() promise in
// syncify.js and use it here:
yield sleep(Math.floor(500 * Math.random()));
// OK, then we probably want to fetch a bunch of pieces of data for each work_item, process each
// one, then combine them together. Something like that. But maybe on failures we want to record
// partial results, too, for later analysis. Let's nest some try-catch blocks:
let resultIDs = [];
try {
let data = yield query(`SELECT * FROM work_data WHERE item_id = ${project.item_id} ORDER BY processor_order`);
// OK, now we've got a bunch of rows of data, maybe those also point at other async operations like files,
// or they otherwise need to be preprocessed in a parallelizable step. Let's just do that with another go():
let preprocessed_results = Promise.all(data.map(row => {
let preprocess_this_row = go(function* () {
// check if we've already done this work...
let already_done = yield query(`
SELECT partial_id, result
FROM work_results_partial
WHERE item_id = ${project.item_id} AND data_id = ${row.data_id}`);
// we haven't done an if-statement yet, but that's no problem either:
if (already_done.length > 0) {
// go() defined below properly threads a function*'s return as its Promise-resolution...
resultIDs.push(already_done[0].partial_id);
return JSON.parse(already_done[0].result);
}
// otherwise maybe we need to load some file and process it and cache it in work_results_partial...
yield sleep(Math.floor(500 * Math.random()));
// do these in parallel...
let [file, request] = yield Promise.all([
text_file(row.resource_path),
http_get(processor.get_url(config.http_api, row.api_request))
]).then(x => x.map(JSON.parse));
// hand them to the preprocessor
let result = processor.preprocess(project, config.preprocess, file, api);
let {itemId} = yield query(`
INSERT INTO work_results_partial SET
item_id = ${project.item_id},
data_id = ${row.data_id},
api_request_time = NOW(),
result = ?
`, JSON.stringify(result));
resultIDs.push(itemId);
log(`${co_id}: New partial result ${itemId}...`);
return result;
});
// once go() is run we want to make all of these promises succeed, so that any error still allows
// the maximum of preprocessing to occur.
return preprocess_this_row.then(
value => ({success: true, value: value}),
error => ({success: false, value: error})
);
}));
let total_errors = preprocessed_results.filter(x => !x.success);
if (total_errors.length > 0) {
let estr = total_errors.length == 1 ? 'an error' : total_errors.length + ' errors'
throw new Error(`Caught ${estr} during preprocessing:` + total_errors.map(x => x.stack).join('\n-----\n'));
}
// all of those succeeded? Hooray, we can process the result:
let final_result = processor.process(project, config.processor, preprocessed_results);
yield query(`
UPDATE work_queue SET
status = 'done',
pid = NULL,
finish_time = NOW(),
result = ?
partial_ids = '${resultIDs.join(',')}'`, JSON.stringify(final_result));
} catch (err) {
// something failed, either some of the preprocessing steps or the processor or I don't know what...
yield query(`
UPDATE work_queue SET
status = 'failed',
pid = NULL,
partial_ids = '${resultIDs.join(',')}',
daemon_message = ? `, err);
}
} // <- for each project
} finally {
db.end(() => process.exit(0))
}
}
// launch some parallel processes with this process ID. Make concurrent 'plain' requests to 2 different HTTP servers,
// plus handle any that need extra processing.
go(coroutine, 'config.plain.api1.json', 'plain');
go(coroutine, 'config.plain.api2.json', 'plain');
go(coroutine, 'config.extra-processing.json', 'advanced');
"use strict";
/* This is a trampoline for a generator function. The idea is a little bit complicated. :)
*
* So, we have these Promises which represent asynchronous work which does a thing, and then we have
* these generator-functions which can be paused to yield something to their surrounding context,
* and that context can then send a message into the generator, which becomes the value of the yield
* expression.
*
* The idea of `go()` is that it takes a generator as an argument, and whenever that generator
* yields a promise to it, it waits on the promise before restarting the generator with the result
* of that promise. We therefore get to restore a synchronous API to an asynchronous process, with
* the asynchronous calls looking like `var result = yield promise_returning_function();`. You drop
* down to the trampoline and it bounces you back up into the function when the promise resolves.
*
* The result of go() is a Promise that resolves whenever the generator is complete.
*/
function go(gen, ...args) {
// technically the generator is the function* and the iterator is created when you apply it to
// its arguments; usually we expect no arguments in this use-case:
let iterator = gen(...args);
return new Promise(function (accept, reject) {
var sendError, sendValue, next;
next = next_action => {
try {
let result = next_action(); // deferred via a function so that we can catch exceptions.
if (result.done) {
accept(result.value);
} else {
let promise = result.value;
if (!promise || typeof promise.then !== 'function') {
promise = Promise.resolve(result.value);
}
// Here is where you go "Oh my gosh it's so nice dealing with smart people!"
// The inventors of ES6 were smarter than me and realized that occasionally you might
// want to throw an exception into a promise, to give it a chance to try/catch(){} it,
// before handling whatever didn't work out:
promise.then(
p => next(() => iterator.next(p) ),
e => next(() => iterator.throw(e)));
}
} catch (err) {
// this will handle any exceptions not caught by the generator above:
reject(err);
}
}
// kick the iterator off.
next(() => iterator.next());
});
};
// Turn any Node.js style function into a Promise in 12 lines of code.
function promised(self, fn, ...args) {
if (typeof self === 'function') { // special shortcut if `this` is not important to the function
args = [fn, ...args];
fn = self;
self = null;
}
return new Promise((accept, reject) => {
let cb = (err, ...vals) => err? reject(err) : accept(...vals),
full_args = [...args, cb];
fn.apply(self, args);
};
};
function text_stream(stream) {
stream.setEncoding('utf8');
let buff = '';
stream.on('data', chunk => buff += chunk);
return new Promise((accept, reject) => {
stream.on('end', () => accept(buff));
stream.on('error', e => reject(e));
});
}
exports.go = go;
exports.promised = promised;
exports.streamed = streamed;
exports.sleep = (time) => new Promise((acc, rej) => setTimeout(acc(), time));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment