Last active
August 20, 2020 05:11
-
-
Save mgabeler-lee-6rs/69a9b0e368d061b1935a7aa428e217b6 to your computer and use it in GitHub Desktop.
Demo of applying retry policy to subscription
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2020-08-20T05:08:10.567Z Creating resources for legacy | |
2020-08-20T05:08:10.573Z Creating topic | |
2020-08-20T05:08:18.934Z Getting topic metadata | |
2020-08-20T05:08:19.074Z Updating metadata | |
2020-08-20T05:08:19.608Z Creating subscription | |
2020-08-20T05:08:21.534Z Configuring resources for legacy | |
2020-08-20T05:08:21.668Z No retry policy set, adding it | |
2020-08-20T05:08:22.152Z Trying nack/ack for legacy | |
2020-08-20T05:08:22.153Z Activating subscription | |
2020-08-20T05:08:22.156Z Sending a message | |
2020-08-20T05:08:22.280Z Sent | |
2020-08-20T05:08:23.960Z { delay: 1807 } Got a message | |
2020-08-20T05:08:23.962Z ... Nacking it | |
2020-08-20T05:08:27.345Z { delay: 3385 } Got a message | |
2020-08-20T05:08:27.345Z ... Nacking it | |
2020-08-20T05:08:30.749Z { delay: 3404 } Got a message | |
2020-08-20T05:08:30.749Z ... Nacking it | |
2020-08-20T05:08:34.152Z { delay: 3403 } Got a message | |
2020-08-20T05:08:34.152Z ... Nacking it | |
2020-08-20T05:08:38.556Z { delay: 4404 } Got a message | |
2020-08-20T05:08:38.556Z ... Nacking it | |
2020-08-20T05:08:42.959Z { delay: 4403 } Got a message | |
2020-08-20T05:08:42.960Z ... Nacking it | |
2020-08-20T05:08:48.365Z { delay: 5406 } Got a message | |
2020-08-20T05:08:48.365Z ... Nacking it | |
2020-08-20T05:08:53.770Z { delay: 5405 } Got a message | |
2020-08-20T05:08:53.771Z ... Nacking it | |
2020-08-20T05:09:00.176Z { delay: 6406 } Got a message | |
2020-08-20T05:09:00.176Z ... Nacking it | |
2020-08-20T05:09:07.584Z { delay: 7408 } Got a message | |
2020-08-20T05:09:07.584Z ... Nacking it | |
2020-08-20T05:09:17.993Z { delay: 10409 } Got a message | |
2020-08-20T05:09:17.994Z ... Acking it | |
2020-08-20T05:09:17.995Z { numSeen: 11, nacks: -1, seen: 11, duration: 55715 } Got results | |
2020-08-20T05:09:17.995Z Deactivating subscription | |
2020-08-20T05:09:17.998Z Deleting resources for legacy | |
2020-08-20T05:09:19.225Z Done with legacy | |
2020-08-20T05:09:19.225Z Creating resources for modern | |
2020-08-20T05:09:19.225Z Creating topic | |
2020-08-20T05:09:54.754Z Getting topic metadata | |
2020-08-20T05:09:54.891Z Updating metadata | |
2020-08-20T05:09:55.404Z Creating subscription | |
2020-08-20T05:09:57.676Z Configuring resources for modern | |
2020-08-20T05:09:57.824Z Sub retry policy already set correctly | |
2020-08-20T05:09:57.824Z Trying nack/ack for modern | |
2020-08-20T05:09:57.825Z Activating subscription | |
2020-08-20T05:09:57.825Z Sending a message | |
2020-08-20T05:09:57.947Z Sent | |
2020-08-20T05:09:59.652Z { delay: 1827 } Got a message | |
2020-08-20T05:09:59.653Z ... Nacking it | |
2020-08-20T05:10:03.052Z { delay: 3400 } Got a message | |
2020-08-20T05:10:03.052Z ... Nacking it | |
2020-08-20T05:10:06.457Z { delay: 3405 } Got a message | |
2020-08-20T05:10:06.457Z ... Nacking it | |
2020-08-20T05:10:09.861Z { delay: 3404 } Got a message | |
2020-08-20T05:10:09.861Z ... Nacking it | |
2020-08-20T05:10:13.264Z { delay: 3403 } Got a message | |
2020-08-20T05:10:13.265Z ... Nacking it | |
2020-08-20T05:10:17.668Z { delay: 4404 } Got a message | |
2020-08-20T05:10:17.668Z ... Nacking it | |
2020-08-20T05:10:22.072Z { delay: 4404 } Got a message | |
2020-08-20T05:10:22.072Z ... Nacking it | |
2020-08-20T05:10:27.480Z { delay: 5408 } Got a message | |
2020-08-20T05:10:27.480Z ... Nacking it | |
2020-08-20T05:10:35.887Z { delay: 8407 } Got a message | |
2020-08-20T05:10:35.887Z ... Nacking it | |
2020-08-20T05:10:45.298Z { delay: 9411 } Got a message | |
2020-08-20T05:10:45.298Z ... Nacking it | |
2020-08-20T05:10:53.707Z { delay: 8409 } Got a message | |
2020-08-20T05:10:53.707Z ... Acking it | |
2020-08-20T05:10:53.707Z { numSeen: 11, nacks: -1, seen: 11, duration: 55760 } Got results | |
2020-08-20T05:10:53.707Z Deactivating subscription | |
2020-08-20T05:10:53.708Z Deleting resources for modern | |
2020-08-20T05:10:54.908Z Done with modern |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* eslint-disable no-await-in-loop */ | |
import { PubSub, Topic, Subscription, Message, SubscriptionMetadata } from '@google-cloud/pubsub'; | |
import _ from 'lodash'; | |
type OptName = 'PUBSUB_PROJECT_ID' | 'TOPIC_NAME' | 'SUB_NAME'; | |
type Opts = Record<OptName, string>; | |
function log(...args: any[]) { | |
// eslint-disable-next-line no-console | |
return console.log(new Date(), ...args); | |
} | |
function error(err: Error, ...args: any[]) { | |
// eslint-disable-next-line no-console | |
return console.error(new Date(), err, ...args); | |
} | |
function initEnvironment(): Opts { | |
return _.pick( | |
_.defaults(process.env, { | |
PUBSUB_PROJECT_ID: 'my-project', | |
TOPIC_NAME: 'my-topic', | |
SUB_NAME: 'my-subscription', | |
}), | |
'PUBSUB_PROJECT_ID', | |
'TOPIC_NAME', | |
'SUB_NAME', | |
); | |
} | |
async function initPubSub(opts: Opts) { | |
const pubSub = new PubSub({ | |
// ...? | |
}); | |
// cleanup after any prior run that failed | |
const topic = pubSub.topic(opts.TOPIC_NAME); | |
const sub = topic.subscription(opts.SUB_NAME); | |
if ((await sub.exists())[0]) { | |
log('Removing leftover subscription'); | |
await sub.delete(); | |
} | |
if ((await topic.exists())[0]) { | |
log('Removing leftover topic'); | |
await topic.delete(); | |
} | |
return pubSub; | |
} | |
const retryPolicy: SubscriptionMetadata['retryPolicy'] = { | |
// using strings for `seconds` props so we can compare this against | |
// the `getMetadata` return | |
minimumBackoff: { | |
seconds: '1', | |
nanos: 0, | |
}, | |
maximumBackoff: { | |
seconds: '60', | |
nanos: 0, | |
}, | |
}; | |
async function createTopicAndSubscription({ | |
opts, | |
pubSub, | |
mode, | |
}: { | |
opts: Opts; | |
pubSub: PubSub; | |
mode: 'legacy' | 'modern'; | |
}) { | |
log('Creating topic'); | |
const [topic] = await pubSub.topic(opts.TOPIC_NAME).create({ | |
// why can't we put the labels metadata in here? | |
}); | |
log('Getting topic metadata'); | |
const [meta] = await topic.getMetadata(); | |
// be a good citizen and make clear this is a curiosity hack | |
if (meta.labels?.['squad_owner'] !== 'curiosity') { | |
log('Updating metadata'); | |
await topic.setMetadata({ | |
// keys not present in this will be unmodified | |
labels: { | |
...(meta.labels || {}), | |
squad_owner: 'curiosity', | |
}, | |
}); | |
} | |
log('Creating subscription'); | |
let sub; | |
switch (mode) { | |
case 'legacy': | |
// create the sub with default settings | |
[sub] = await topic.createSubscription(opts.SUB_NAME, { | |
labels: { squad_owner: 'curiosity' }, | |
}); | |
break; | |
case 'modern': | |
[sub] = await topic.createSubscription(opts.SUB_NAME, { | |
labels: { squad_owner: 'curiosity' }, | |
retryPolicy, | |
}); | |
break; | |
default: | |
throw new Error('wtf'); | |
} | |
return { topic, sub }; | |
} | |
async function reconfigureSubscription(sub: Subscription) { | |
const [meta] = await sub.getMetadata(); | |
if (meta.retryPolicy) { | |
if (_.isEqual(meta.retryPolicy, retryPolicy)) { | |
log('Sub retry policy already set correctly'); | |
return; | |
} else { | |
log({ retryPolicy: meta.retryPolicy }, 'Sub retry policy set, but wrong, fixing'); | |
} | |
} else { | |
log('No retry policy set, adding it'); | |
} | |
await sub.setMetadata({ | |
retryPolicy, | |
}); | |
} | |
async function deleteTopicAndSubscription({ topic, sub }: { topic: Topic; sub: Subscription }) { | |
await sub.close(); | |
sub.removeAllListeners('message'); | |
await sub.delete(); | |
await topic.delete(); | |
} | |
function manualPromise<T>() { | |
let resolve!: (value: T) => void; | |
let reject!: (err: any) => void; | |
const promise = new Promise((_res, _rej) => { | |
resolve = _res; | |
reject = _rej; | |
}); | |
return { promise, resolve, reject }; | |
} | |
async function tryNackAck({ topic, sub }: { topic: Topic; sub: Subscription }) { | |
let numSeen = 0; | |
let nacks = 10; | |
const { promise: completion, resolve: allReceived } = manualPromise<number>(); | |
const { promise: timeout, reject: onTimeout } = manualPromise<number>(); | |
let lastReceived = Date.now(); | |
function handler(msg: Message) { | |
const now = Date.now(); | |
log({ delay: now - lastReceived }, 'Got a message'); | |
lastReceived = now; | |
++numSeen; | |
if (nacks-- > 0) { | |
log('... Nacking it'); | |
msg.nack(); | |
} else { | |
log('... Acking it'); | |
msg.ack(); | |
allReceived(numSeen); | |
} | |
} | |
log('Activating subscription'); | |
sub.on('message', handler); | |
log('Sending a message'); | |
await topic.publishJSON({}); | |
log('Sent'); | |
const start = Date.now(); | |
const timer = setTimeout(onTimeout, 65_000, new Error('Timed out')); | |
try { | |
const seen = await Promise.race([completion, timeout]); | |
const duration = Date.now() - start; | |
log({ numSeen, nacks, seen, duration }, 'Got results'); | |
} catch (err) { | |
const duration = Date.now() - start; | |
error(err, { numSeen, nacks, duration }, 'Failed'); | |
} finally { | |
clearTimeout(timer); | |
} | |
log('Deactivating subscription'); | |
sub.removeAllListeners('message'); | |
} | |
async function main() { | |
try { | |
const opts = initEnvironment(); | |
const pubSub = await initPubSub(opts); | |
for (const mode of ['legacy', 'modern'] as const) { | |
log(`Creating resources for ${mode}`); | |
const { topic, sub } = await createTopicAndSubscription({ opts, pubSub, mode }); | |
log(`Configuring resources for ${mode}`); | |
await reconfigureSubscription(sub); | |
log(`Trying nack/ack for ${mode}`); | |
await tryNackAck({ topic, sub }); | |
log(`Deleting resources for ${mode}`); | |
await deleteTopicAndSubscription({ topic, sub }); | |
log(`Done with ${mode}`); | |
} | |
} catch (err) { | |
error(err); | |
process.exitCode = 1; | |
} | |
} | |
export = main; | |
if (require.main === module) { | |
main(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment