-
-
Save turlockmike/936b3c9cab4a1dd272e7645dce5d0b88 to your computer and use it in GitHub Desktop.
//filepath: src/handlers/messages/contract-created.ts | |
import {vaidationMiddleware} from '@helloextend/api-utils' | |
//Handle a single Cloud Event. | |
function contractCreatedHandler(ev: CloudEvent, ctx: Context) { | |
const messageBody = JSON.parse(ev.message.body) //Or something. Whatever the cloudEvent specification says | |
const isRetry = ctx.get<number>('event.metadata.retries') > 0 | |
const lastRetry = ctx.get<date>('event.metadata.lastTriedAt') | |
// Do Something | |
return new CloudEventResponse.ok() | |
} | |
//Add valdiation middleware | |
const mid = validationMiddleware({ | |
req: { | |
schema: V1Schema | |
} | |
}) | |
const messageHandler = smerf.cloudEvents().use(mid).handler(contractCreatedHandler) | |
const retryHandler = messageHandler | |
//const dlqHandler = some other handler with the same | |
export default { | |
handler: | |
smerf.use(mid).handler(contractCreatedHandler) | |
// Retry Logic would be based on the status code returned. The backoff timing would be customized in the smerf.config.ts file, same with dlq settings. | |
// in smerf.config.ts | |
```{ | |
events: { | |
'contracts-created': { | |
backoff: [5, 10, 20, 30] | |
dlq: true, | |
schedule: '*/5 * * * *' | |
} | |
} | |
```} |
/////////////////////////////////////////WIP////////////////////////////////////////////////////////// | |
//filepath: src/handlers/topics/contracts.ts | |
// For Handling an entire topic | |
//Filename /topics/contract.ts | |
function batchHandler(cloudEvents, context) { | |
foreach(cloudEvent in cloudEvents) { | |
switch(cloudEvent.type) { | |
case "ContractsCreated": | |
//Do Something | |
case "ContractsUpdated": | |
//Do Something | |
context.retrySyncEmitter.emit(cloudEvent) // I think providing these emmitters in context would be nice. | |
} | |
} | |
return BatchResponse.ok() // or BatchResponse.failed() | |
},) | |
export const createRetryHandler<ContractsCreated | ContractsUpdated>((cloudEvent, context) => { | |
switch(cloudEvent.type) { | |
case "ContractsCreated": | |
//Do Something | |
case "ContractsUpdated": | |
//Do Something | |
} | |
return CloudEventResponse.ok() // Or CloudEventResponse.failed() | |
}) | |
export const createDLQHandler<ContractsCreated | ContractsUpdated>((cloudEvent, context) => { | |
switch(cloudEvent.type) { | |
case "ContractsCreated": | |
//Do Something | |
case "ContractsUpdated": | |
//Do Something | |
} | |
return CloudEventResponse.ok() | |
}) | |
// Some configuration can happen in the config | |
// in smerf.config.ts | |
```{ | |
topics: { | |
'contracts': { | |
backoff: [5, 10, 20, 30], | |
dlq: true, | |
// etc. | |
} | |
} | |
```} | |
So, the intention behind the TopicResponse.ok(successfulEvents, failedEvents) would be that it would return a successful response to adapter so it can continue processing, and the adapter would determine what to do with failed events. This is just a high level interface, a lot of the complexity would be the adapters responsibility, but we allow devs to specify which events were successful vs failed, write retry and dlq handlers to support those events, but leave the adapter the responsibility for determining the workflow pattern and integration with infrastructure components. In the case of kafka, we could implement whatever pattern you think makes the most sense and covers the most use cases.
code would be something like...
function contractCreatedHandler(
ev: CloudEvent<"contracts-created", V1Schema>,
ctx: Context
) {
const messageBody = ev.data;
const isRetry = ev.failure?.attemptCount > 0;
const lastRetry = ev.failure?.timestamp;
// Do Something
return new CloudEventResponse.ok();
}
what is the retryEmitter doing? Is it retry sending OR send the event for retry of processing?
context.retrySyncEmitter.emit(cloudEvent)
should cloudEvents be an async iterable OR array is another debate... async iterable would allow for back pressure modeling
I like the folder pathing strategy separating topics from events.