Last active
October 12, 2019 12:16
-
-
Save rhythnic/5a05324b448d1663f0860e0b4d10200f to your computer and use it in GitHub Desktop.
Request/Response on top of pubsub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ***************************************************************** | |
// Extend pubsub with request-response pattern | |
// Pubsub instance should conform to this interface: | |
// https://github.com/apollographql/graphql-subscriptions/blob/master/src/pubsub-engine.ts | |
// ***************************************************************** | |
// ***************************************************************** | |
// API | |
// request(topic, args) | |
// respond(topic, ({ args, pass, request }) => {}) | |
// respond = respond.partialTopic(str) | |
const uuid = require('uuid') | |
function isPlainObject (obj) { | |
return !!obj && typeof obj === 'object' && obj.constructor === 'Object' | |
} | |
/** | |
* Make a unique key corresponding to a request | |
* @param {string} topic - The topic of the request | |
* @param {string[]} correlationId - Array of correlation IDs | |
*/ | |
const mkResponsePromiseMapKey = (topic, correlationId) => | |
`${topic}__${correlationId.join('_')}` | |
/** | |
* Create an error instance from an object | |
* @param {Object} obj | |
* @param {string} obj.message - Error message | |
*/ | |
const mkResponseError = obj => { | |
const error = new Error(obj.message) | |
Object.keys(obj).forEach(x => { error[x] = obj[x] }) | |
return error | |
} | |
/** | |
* Create a handler for response messages | |
* @param {Object} topic - Topic of the request | |
* @param {Map} map - Map holding request resolving fns, see responsePromiseMap | |
*/ | |
const responseHandlerFactory = (topic, map) => msg => { | |
const mapKey = mkResponsePromiseMapKey(topic, msg.correlationId$) | |
// If key not in map, then multiple responses received | |
if (!map.has(mapKey)) { | |
console.error(`Reduntant response received for "${mapKey}". ${JSON.stringify(msg)}`) | |
return | |
} | |
const { resolve, reject, timerId } = map.get(mapKey) | |
clearTimeout(timerId) | |
map.delete(mapKey) | |
if (msg.error) { | |
if (isPlainObject(msg.error)) msg.error = mkResponseError(msg.error) | |
return reject(msg.error) | |
} | |
resolve(msg.result) | |
} | |
/** | |
* Add meta to args object | |
* @param {Object} topic - Topic of the request | |
* @param {Object} args - Arguments received by the request | |
*/ | |
function extendArgsWithMeta (topic, args) { | |
const correlationId$ = args.correlationId$ | |
? [ ...args.correlationId$, uuid.v1() ] | |
: [ uuid.v1() ] | |
return { | |
...args, | |
correlationId$, | |
responseTopic$: `${topic},response`, | |
topic$: topic | |
} | |
} | |
/** | |
* Initiate a request-response pattern over pubsub | |
* @param {PubSub} pubsub - Pubsub singleton instance | |
* @param {string} topic - Topic listened to by cmd handler | |
* @param {Object} args - Arguments passed to command | |
*/ | |
function pubsubRequestFactory ({ pubsub, timeout = 3000 }) { | |
// Holds the resolve and reject functions for responding to requests | |
const responsePromiseMap = new Map() | |
// Holds the unsubscribeIds for topic-response subscriptions | |
const subscriptionMap = new Map() | |
return function pubsubRequest (topic, args) { | |
// Return a promise to the caller | |
return new Promise(async (resolve, reject) => { | |
if (!args || typeof args !== 'object') { | |
return reject('pubsub.request paylod must be an object') | |
} | |
// Add meta, like correlation ID to args | |
args = extendArgsWithMeta(topic, args) | |
// Put promise resolving fns in the Map, so they're available to the response handler | |
const mapKey = mkResponsePromiseMapKey(topic, args.correlationId$) | |
// Anticipate no response | |
const timerId = setTimeout(() => { | |
const error = new Error(`Request to topic "${topic}" timed out at ${timeout}ms.`) | |
pubsub.publish(args.responseTopic$, { error, correlationId$: args.correlationId$ }) | |
}, timeout) | |
responsePromiseMap.set(mapKey, { resolve, reject, timerId }) | |
// subscribe to topic response, if not already subscribed | |
if (!subscriptionMap.has(topic)) { | |
const handler = responseHandlerFactory(topic, responsePromiseMap) | |
const unsubscribeId = await pubsub.subscribe(args.responseTopic$, handler) | |
subscriptionMap.set(topic, unsubscribeId) | |
} | |
// publish message | |
pubsub.publish(topic, args) | |
}) | |
} | |
} | |
/** | |
* Register a command handler | |
* @param {PubSub} pubsub - Pubsub singleton instance | |
* @param {string} topic - Topic listened to by cmd handler | |
* @param {Function} handler - Command handler | |
*/ | |
function pubsubRespondFactory ({ pubsub }, request) { | |
return function pubsubRespond (topic, handler) { | |
// Subscribe to topic | |
return pubsub.subscribe(topic, async ({ correlationId$, responseTopic$, topic$, ...args }) => { | |
let passInvoked = false | |
const pass$ = (topic, args) => { | |
passInvoked = true | |
pubsub.publish(topic, { ...args, correlationId$, responseTopic$ }) | |
} | |
const request$ = (topic, args) => request(topic, { ...args, correlationId$ }) | |
try { | |
const result = await handler({ ...args, pass$, request$, topic$, correlationId$, responseTopic$ }) | |
// If nothing is returned, assume the request was passed | |
if (passInvoked) return | |
pubsub.publish(responseTopic$, { result, correlationId$ }) | |
} catch (error) { | |
pubsub.publish(responseTopic$, { error, correlationId$ }) | |
} | |
}) | |
} | |
} | |
module.exports = function buildPubsubRequestResponse (opts) { | |
const request = pubsubRequestFactory(opts) | |
const respond = pubsubRespondFactory(opts, request) | |
return { request, respond } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment