Skip to content

Instantly share code, notes, and snippets.

@rhythnic
Last active October 12, 2019 12:16
Show Gist options
  • Save rhythnic/5a05324b448d1663f0860e0b4d10200f to your computer and use it in GitHub Desktop.
Save rhythnic/5a05324b448d1663f0860e0b4d10200f to your computer and use it in GitHub Desktop.
Request/Response on top of pubsub
// *****************************************************************
// Extend pubsub with request-response pattern
// Pubsub instance should conform to this interface:
// https://github.com/apollographql/graphql-subscriptions/blob/master/src/pubsub-engine.ts
// *****************************************************************
// *****************************************************************
// API
// request(topic, args)
// respond(topic, ({ args, pass, request }) => {})
// respond = respond.partialTopic(str)
const uuid = require('uuid')
function isPlainObject (obj) {
return !!obj && typeof obj === 'object' && obj.constructor === 'Object'
}
/**
* Make a unique key corresponding to a request
* @param {string} topic - The topic of the request
* @param {string[]} correlationId - Array of correlation IDs
*/
const mkResponsePromiseMapKey = (topic, correlationId) =>
`${topic}__${correlationId.join('_')}`
/**
* Create an error instance from an object
* @param {Object} obj
* @param {string} obj.message - Error message
*/
const mkResponseError = obj => {
const error = new Error(obj.message)
Object.keys(obj).forEach(x => { error[x] = obj[x] })
return error
}
/**
* Create a handler for response messages
* @param {Object} topic - Topic of the request
* @param {Map} map - Map holding request resolving fns, see responsePromiseMap
*/
const responseHandlerFactory = (topic, map) => msg => {
const mapKey = mkResponsePromiseMapKey(topic, msg.correlationId$)
// If key not in map, then multiple responses received
if (!map.has(mapKey)) {
console.error(`Reduntant response received for "${mapKey}". ${JSON.stringify(msg)}`)
return
}
const { resolve, reject, timerId } = map.get(mapKey)
clearTimeout(timerId)
map.delete(mapKey)
if (msg.error) {
if (isPlainObject(msg.error)) msg.error = mkResponseError(msg.error)
return reject(msg.error)
}
resolve(msg.result)
}
/**
* Add meta to args object
* @param {Object} topic - Topic of the request
* @param {Object} args - Arguments received by the request
*/
function extendArgsWithMeta (topic, args) {
const correlationId$ = args.correlationId$
? [ ...args.correlationId$, uuid.v1() ]
: [ uuid.v1() ]
return {
...args,
correlationId$,
responseTopic$: `${topic},response`,
topic$: topic
}
}
/**
* Initiate a request-response pattern over pubsub
* @param {PubSub} pubsub - Pubsub singleton instance
* @param {string} topic - Topic listened to by cmd handler
* @param {Object} args - Arguments passed to command
*/
function pubsubRequestFactory ({ pubsub, timeout = 3000 }) {
// Holds the resolve and reject functions for responding to requests
const responsePromiseMap = new Map()
// Holds the unsubscribeIds for topic-response subscriptions
const subscriptionMap = new Map()
return function pubsubRequest (topic, args) {
// Return a promise to the caller
return new Promise(async (resolve, reject) => {
if (!args || typeof args !== 'object') {
return reject('pubsub.request paylod must be an object')
}
// Add meta, like correlation ID to args
args = extendArgsWithMeta(topic, args)
// Put promise resolving fns in the Map, so they're available to the response handler
const mapKey = mkResponsePromiseMapKey(topic, args.correlationId$)
// Anticipate no response
const timerId = setTimeout(() => {
const error = new Error(`Request to topic "${topic}" timed out at ${timeout}ms.`)
pubsub.publish(args.responseTopic$, { error, correlationId$: args.correlationId$ })
}, timeout)
responsePromiseMap.set(mapKey, { resolve, reject, timerId })
// subscribe to topic response, if not already subscribed
if (!subscriptionMap.has(topic)) {
const handler = responseHandlerFactory(topic, responsePromiseMap)
const unsubscribeId = await pubsub.subscribe(args.responseTopic$, handler)
subscriptionMap.set(topic, unsubscribeId)
}
// publish message
pubsub.publish(topic, args)
})
}
}
/**
* Register a command handler
* @param {PubSub} pubsub - Pubsub singleton instance
* @param {string} topic - Topic listened to by cmd handler
* @param {Function} handler - Command handler
*/
function pubsubRespondFactory ({ pubsub }, request) {
return function pubsubRespond (topic, handler) {
// Subscribe to topic
return pubsub.subscribe(topic, async ({ correlationId$, responseTopic$, topic$, ...args }) => {
let passInvoked = false
const pass$ = (topic, args) => {
passInvoked = true
pubsub.publish(topic, { ...args, correlationId$, responseTopic$ })
}
const request$ = (topic, args) => request(topic, { ...args, correlationId$ })
try {
const result = await handler({ ...args, pass$, request$, topic$, correlationId$, responseTopic$ })
// If nothing is returned, assume the request was passed
if (passInvoked) return
pubsub.publish(responseTopic$, { result, correlationId$ })
} catch (error) {
pubsub.publish(responseTopic$, { error, correlationId$ })
}
})
}
}
module.exports = function buildPubsubRequestResponse (opts) {
const request = pubsubRequestFactory(opts)
const respond = pubsubRespondFactory(opts, request)
return { request, respond }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment