Skip to content

Instantly share code, notes, and snippets.

@geovanisouza92
Last active September 14, 2017 12:35
Show Gist options
  • Save geovanisouza92/5bede23462a153f8ec68f96a6976791d to your computer and use it in GitHub Desktop.
Save geovanisouza92/5bede23462a153f8ec68f96a6976791d to your computer and use it in GitHub Desktop.
A Lambda function that spawns workers to process data faster
const os = require('os')
const cp = require('child_process')
const Rx = require('rxjs')
// https://github.com/coopernurse/node-pool
const pool = require('generic-pool')
const MAX_WORKERS = os.cpus().length
const createWorkerPool = () => {
const factory = {
create() {
return new Promise(resolve => {
// It assumes that this script is the entry point for worker
const p = cp.fork(__filename)
// NOTE: Maybe wait for the process be "ready"
resolve(p)
})
},
validate(p) {
return Promise.resolve(p.connected)
},
destroy(p) {
return new Promise(resolve => {
p.kill()
resolve()
})
}
}
// Pool options (limit the number of workers and check connectivity)
const opts = {
max: MAX_WORKERS,
testOnBorrow: true
}
return pool.createPool(factory, opts)
}
// Produce data events to be processed by workers
const createEvents = event => Rx.Observable.range(1, 10)
// Instead of using subprocesses, it can invoke another Lambda function
const dispatchToWorker = workers => data =>
// Create the Observable when the result is required (delay promise creation)
Rx.Observable.defer(() =>
// Acquire the next worker available
Rx.Observable.fromPromise(workers.acquire()).flatMap(worker =>
// Create a custom ad-hoc Observable that will communicate with worker
// and propagate the result, when available
Rx.Observable.create(observer => {
// Listener for one result only
worker.once('message', result =>
// Release the worker asap
workers.release(worker).then(() => {
// Emit the result and terminate this custom Observable
observer.next(result)
observer.complete()
})
)
// Send the work for the worker
worker.send(data)
})
)
)
const handler = (event, context, callback) => {
// Create the child process pool
const workers = createWorkerPool()
// Create a observable
const obs = createEvents(event)
.mergeMap(
dispatchToWorker(workers),
// No-op resultSelector
a => a,
// Restrict how many data events will be processed concurrently
workers.max
)
// Some kind of termination logic ("no more results needed")
.takeUntil(Rx.Observable.interval(5000).take(1))
let sub
const next = result => callback(null, result)
const error = err => callback(err)
const complete = () => {
// Terminate all running workers
workers.drain().then(() => {
workers.clear()
sub.unsubscribe()
})
}
// Start processing (until here, no worker was started)
sub = obs.subscribe(next, error, complete)
}
// Is this process running as subprocess or called by Lambda runtime?
if (require.main === module) {
// This is a worker process, so, process incoming events
process.on('message', event => {
// TODO: Dispatch event to processing
const result = null
process.send(result)
})
}
module.exports = {
handler
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment