Skip to content

Instantly share code, notes, and snippets.

@titouancreach
Created August 8, 2024 07:32
Show Gist options
  • Save titouancreach/e1ae54cad93f510f3a9fbe596db2b838 to your computer and use it in GitHub Desktop.
Save titouancreach/e1ae54cad93f510f3a9fbe596db2b838 to your computer and use it in GitHub Desktop.
import { Schema } from '@effect/schema'
import { SqsService, type QueueMessage } from '@repo/shared/SqsService'
import { Duration, Effect, Schedule } from 'effect'
const defaultAwsRetryPolicy = Schedule.intersect(
Schedule.exponential(Duration.seconds(1)),
Schedule.recurs(5),
)
export const makeConsumer = <A, I, E, R>({
queueName,
handler,
schema,
}: {
queueName: string
handler: (message: A, rawSqsMessage: QueueMessage) => Effect.Effect<void, E, R>
schema: Schema.Schema<A, I>
}) =>
Effect.gen(function* () {
const sqsService = yield* SqsService
const url = yield* sqsService
.createQueue({ queueName })
.pipe(
Effect.tapError(Effect.logError),
Effect.retry(defaultAwsRetryPolicy),
)
const run = Effect.gen(function* () {
const maybeMessage = yield* sqsService
.pollNextMessage({ queueUrl: url })
.pipe(
Effect.tapError(Effect.logError),
Effect.retry(defaultAwsRetryPolicy),
Effect.orDie,
)
const message = yield* maybeMessage
const decodeFn = Schema.decodeUnknown(Schema.parseJson(schema))
const decodedMessage = yield* decodeFn(message.Body).pipe(
Effect.tapError(Effect.logWarning),
)
yield* handler(decodedMessage, message).pipe(
Effect.tapError(Effect.logError),
)
if (message.ReceiptHandle) {
yield* sqsService.deleteMessage({
queueUrl: url,
receipHandle: message.ReceiptHandle,
})
}
}).pipe(Effect.forever, Effect.retry(Schedule.forever))
return yield* run
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment