Last active
May 14, 2019 09:47
-
-
Save jthomas/327214b107cc5b5a572eab8199f73e75 to your computer and use it in GitHub Desktop.
Example showing how to handle intermittant action failures for large number of invocations using Redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
// What percentage of invocations should randomly fail? | |
const ERROR_RATE = 0.25 | |
// Random delay to results being returned (0 -> 10 seconds) | |
const DELAY_MS = Math.random() * 10000 | |
function should_fail () { | |
return Math.random() < ERROR_RATE | |
} | |
function main(params) { | |
if (should_fail()) throw new Error('failed!') | |
const sum = params.a + params.b | |
return new Promise(resolve => { | |
setTimeout(() => { | |
resolve({ sum }) | |
}, DELAY_MS) | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const redis = require('redis') | |
const jobs = require('./job_store.js') | |
client = redis.createClient() | |
client.on("error", function (err) { | |
console.log("Error " + err); | |
}) | |
const openwhisk = require('openwhisk'); | |
const options = { | |
apihost: <INSERT_API_HOST>, | |
api_key: <INSERT_API_KEY> | |
} | |
const ow = openwhisk(options) | |
const rand_int = () => Math.floor(Math.random() * 100) | |
;(async () => { | |
// 100 invocations, 1 second between polls, 15 second maximum polling time, 15 maximum retries | |
const max_duration = 1000 * 15 | |
const maximum_retries = 5 | |
const invocations = 10 | |
const action = 'sum' | |
// Generate input data for N invocations. | |
const input = new Array(invocations).fill(null).map(() => { | |
return { a: rand_int(), b: rand_int() } | |
}) | |
// Fire invocation and store activation details in Redis | |
const job_ids = input.map(async params => { | |
const id = await jobs.create(client, action, params, maximum_retries) | |
console.log(`created job (#${id}) for ${action} with params: ${JSON.stringify(params)}`) | |
const expires = (Date.now()) + max_duration | |
const activation = await ow.actions.invoke({name: action, params}) | |
console.log(`invoked action for job (#${id}), expires at: ${expires}`) | |
await jobs.store_activation(client, id, activation.activationId, expires) | |
return id | |
}) | |
const results = await Promise.all(job_ids) | |
console.log('job ids created for invocations', results) | |
})(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const ACTIVE_JOBS = 'active_activations' | |
const JOB_COUNTER = 'job_counter' | |
// Generate UUID for jobs | |
const next_id = async client => { | |
return new Promise((resolve, reject) => { | |
client.incr(JOB_COUNTER, (err, result) => { | |
if (err) return reject(err) | |
resolve(result) | |
}) | |
}) | |
} | |
// Create job in Redis for invocation. | |
// Store action name, invocation parameters and retries available. | |
// Return jobs id. | |
const create = async (client, action, params, retries) => { | |
return new Promise(async (resolve, reject) => { | |
const id = await next_id(client) | |
const job_key = `${id}_details` | |
const fields = [ | |
'action', action, | |
'params', JSON.stringify(params), | |
'retries', retries | |
] | |
client.hmset(job_key, fields, (err, res) => { | |
if (err) return reject(err) | |
resolve(id) | |
}) | |
}) | |
} | |
// Return list of active invocation jobs | |
const active = async client => { | |
return new Promise((resolve, reject) => { | |
client.smembers(ACTIVE_JOBS, (err, jobs) => { | |
if (err) return reject(err) | |
resolve(jobs) | |
}) | |
}) | |
} | |
// Return in-flight activation for invocation job. | |
// This is the last member of the activation list for a job. | |
const latest_activation = async (client, job) => { | |
const JOB_ACTIVATIONS = `${job}_activations` | |
return new Promise((resolve, reject) => { | |
client.lrange(JOB_ACTIVATIONS, -1, -1, (err, jobs) => { | |
if (err) return reject(err) | |
resolve(jobs[0]) | |
}) | |
}) | |
} | |
// Has max polling time for an activation elapsed? | |
const activation_expired = async (client, job) => { | |
return new Promise((resolve, reject) => { | |
const JOB_FINISH_KEY = `${job}_expiry` | |
client.get(JOB_FINISH_KEY, (err, expiry) => { | |
if (err) return reject(err) | |
const expired = Date.now() > parseInt(expiry, 10) | |
resolve(expired) | |
}) | |
}) | |
} | |
// Retrieve all job details | |
const details = async (client, job) => { | |
return new Promise((resolve, reject) => { | |
const job_key = `${job}_details` | |
client.hgetall(job_key, (err, details) => { | |
if (err) return reject(err) | |
resolve(details) | |
}) | |
}) | |
} | |
// Store new activation details for a job. | |
// All job to active jobs set. Push latest activation to job activation queue. | |
// Update expiry time for job activation polling. | |
const store_activation = async (client, job, activation_id, expiry) => { | |
return new Promise((resolve, reject) => { | |
const multi = client.multi() | |
multi.sadd(ACTIVE_JOBS, job) | |
const JOB_ACTIVATIONS = `${job}_activations` | |
multi.rpush(JOB_ACTIVATIONS, activation_id) | |
const JOB_FINISH_KEY = `${job}_expiry` | |
multi.set(JOB_FINISH_KEY, expiry) | |
multi.exec((err, replies) => { | |
if (err) return reject(err) | |
resolve() | |
}) | |
}) | |
} | |
// Does job have invocation retries remaining? | |
const has_retries = async (client, job) => { | |
return new Promise((resolve, reject) => { | |
const job_key = `${job}_details` | |
client.hget(job_key, 'retries', (err, retries) => { | |
if (err) return reject(err) | |
resolve(parseInt(retries, 10) > 0) | |
}) | |
}) | |
} | |
// Decrement retries available for a job. | |
const used_retry = async (client, job) => { | |
return new Promise((resolve, reject) => { | |
const job_key = `${job}_details` | |
client.hincrby(job_key, 'retries', -1, (err) => { | |
if (err) return reject(err) | |
resolve() | |
}) | |
}) | |
} | |
// Job has finished, store action result and remove job from active set. | |
const has_finished = async (client, job, result = null) => { | |
return new Promise((resolve, reject) => { | |
const multi = client.multi() | |
const job_key = `${job}_details` | |
multi.hset(job_key, 'result', JSON.stringify(result)) | |
multi.srem(ACTIVE_JOBS, job) | |
multi.exec((err, replies) => { | |
if (err) return reject(err) | |
resolve() | |
}) | |
}) | |
} | |
module.exports = { create, active, latest_activation, activation_expired, | |
details, store_activation, has_retries, used_retry, has_finished } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const redis = require('redis') | |
const jobs = require('./job_store.js') | |
const max_duration = 1000 * 15 | |
const client = redis.createClient() | |
client.on("error", function (err) { | |
console.log("Error " + err); | |
}) | |
// Set up Apache OpenWhisk Client SDK | |
const openwhisk = require('openwhisk'); | |
const options = { | |
apihost: <INSERT_API_HOST>, | |
api_key: <INSERT_API_KEY> | |
} | |
const ow = openwhisk(options) | |
// Set up Apache OpenWhisk Client SDK | |
const openwhisk = require('openwhisk'); | |
const options = { | |
apihost: 'eu-gb.functions.cloud.ibm.com', | |
api_key: '53f1c23a-1022-48c5-9c16-cfd8b229db04:aPHwGYsyopyNUyoIZgNyJP1hJ1zpi9g9uLmSlJpsKoB6aTrmGLi6a8nsO32pFGFB' | |
} | |
const ow = openwhisk(options) | |
// Retrieve activation result for identifier. | |
// Returns null if 404 is returned, otherwise error is thrown. | |
const retrieve_activation = async id => { | |
let result = null | |
try { | |
result = await ow.activations.get({ name: id }) | |
console.log(`activation result (${id}) now available!`) | |
} catch (err) { | |
if (err.statusCode !== 404) { | |
throw err | |
} | |
console.log(`activation result (${id}) is not available yet.`) | |
} | |
return result | |
} | |
// Retry invocation using stored parameters. | |
// If job retries are exhausted, remove active job for this request. | |
// Otherwise, retry invocation and store new activation details | |
const retry_job = async (client, id) => { | |
if (await jobs.has_retries(client, id)) { | |
console.log(`retrying job (#${id})`) | |
const expires = (Date.now()) + max_duration | |
const details = await jobs.details(client, id) | |
const activation = await ow.actions.invoke({name: details.action, params: JSON.parse(details.params)}) | |
console.log(`invoked action for job (#${id}), expires at: ${expires}`) | |
await jobs.store_activation(client, id, activation.activationId, expires) | |
await jobs.used_retry(client, id) | |
} else { | |
console.log(`job (#${id}) has no more retries`) | |
await jobs.has_finished(client, id) | |
} | |
} | |
// Retrieve all in-flight invocations from Redis and check for results. | |
// For each invocation, based on activation result active job might | |
// be either finished, retried or left to complete. | |
const main = async () => { | |
const active_jobs = await jobs.active(client) | |
console.log('retrieved the following active jobs:', active_jobs) | |
for (let job of active_jobs) { | |
const activation_id = await jobs.latest_activation(client, job) | |
console.log(`job (#${job}) latest activation => ${activation_id}`) | |
const result = await retrieve_activation(activation_id) | |
// if result is unavailable and polling deadline reached -> retry job. | |
// if result is unavailable and polling deadline not reached -> do nothing. | |
if (!result) { | |
console.log(`job (#${job}) activation is not available`) | |
if (await jobs.activation_expired(client, job)) { | |
console.log(`job (#${job}) activation has expired`) | |
await retry_job(client, job) | |
} | |
return | |
} | |
console.log(`job (#${job}) activation result is available`) | |
// if result is available and succeeded -> finish job and store result. | |
// if result is available and failed -> retry job | |
if (result.response.success) { | |
console.log(`job (#${job}) activation was successful`) | |
await jobs.has_finished(client, job, result.response.result) | |
} else { | |
console.log(`job (#${job}) activation was not successful`) | |
await retry_job(client, job) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment