Last active
September 14, 2017 12:35
-
-
Save geovanisouza92/5bede23462a153f8ec68f96a6976791d to your computer and use it in GitHub Desktop.
A Lambda function that spawns workers to process data faster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const os = require('os') | |
const cp = require('child_process') | |
const Rx = require('rxjs') | |
// https://github.com/coopernurse/node-pool | |
const pool = require('generic-pool') | |
const MAX_WORKERS = os.cpus().length | |
const createWorkerPool = () => { | |
const factory = { | |
create() { | |
return new Promise(resolve => { | |
// It assumes that this script is the entry point for worker | |
const p = cp.fork(__filename) | |
// NOTE: Maybe wait for the process be "ready" | |
resolve(p) | |
}) | |
}, | |
validate(p) { | |
return Promise.resolve(p.connected) | |
}, | |
destroy(p) { | |
return new Promise(resolve => { | |
p.kill() | |
resolve() | |
}) | |
} | |
} | |
// Pool options (limit the number of workers and check connectivity) | |
const opts = { | |
max: MAX_WORKERS, | |
testOnBorrow: true | |
} | |
return pool.createPool(factory, opts) | |
} | |
// Produce data events to be processed by workers | |
const createEvents = event => Rx.Observable.range(1, 10) | |
// Instead of using subprocesses, it can invoke another Lambda function | |
const dispatchToWorker = workers => data => | |
// Create the Observable when the result is required (delay promise creation) | |
Rx.Observable.defer(() => | |
// Acquire the next worker available | |
Rx.Observable.fromPromise(workers.acquire()).flatMap(worker => | |
// Create a custom ad-hoc Observable that will communicate with worker | |
// and propagate the result, when available | |
Rx.Observable.create(observer => { | |
// Listener for one result only | |
worker.once('message', result => | |
// Release the worker asap | |
workers.release(worker).then(() => { | |
// Emit the result and terminate this custom Observable | |
observer.next(result) | |
observer.complete() | |
}) | |
) | |
// Send the work for the worker | |
worker.send(data) | |
}) | |
) | |
) | |
const handler = (event, context, callback) => { | |
// Create the child process pool | |
const workers = createWorkerPool() | |
// Create a observable | |
const obs = createEvents(event) | |
.mergeMap( | |
dispatchToWorker(workers), | |
// No-op resultSelector | |
a => a, | |
// Restrict how many data events will be processed concurrently | |
workers.max | |
) | |
// Some kind of termination logic ("no more results needed") | |
.takeUntil(Rx.Observable.interval(5000).take(1)) | |
let sub | |
const next = result => callback(null, result) | |
const error = err => callback(err) | |
const complete = () => { | |
// Terminate all running workers | |
workers.drain().then(() => { | |
workers.clear() | |
sub.unsubscribe() | |
}) | |
} | |
// Start processing (until here, no worker was started) | |
sub = obs.subscribe(next, error, complete) | |
} | |
// Is this process running as subprocess or called by Lambda runtime? | |
if (require.main === module) { | |
// This is a worker process, so, process incoming events | |
process.on('message', event => { | |
// TODO: Dispatch event to processing | |
const result = null | |
process.send(result) | |
}) | |
} | |
module.exports = { | |
handler | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment