Skip to content

Instantly share code, notes, and snippets.

@titouancreach
Created August 8, 2024 07:31
Show Gist options
  • Save titouancreach/3643aad81f1f34f48787d1d50a20711f to your computer and use it in GitHub Desktop.
Save titouancreach/3643aad81f1f34f48787d1d50a20711f to your computer and use it in GitHub Desktop.
import {
CreateQueueCommand,
DeleteMessageCommand,
ReceiveMessageCommand,
SendMessageCommand,
type CreateQueueCommandInput,
} from '@aws-sdk/client-sqs'
import { Schema } from '@effect/schema'
import {
Array,
Config,
Context,
Effect,
Layer,
Match,
Option,
pipe,
} from 'effect'
import { UnknownException } from 'effect/Cause'
import { TaggedError } from 'effect/Data'
import { logDebug } from 'effect/Effect'
import { SqsClient } from './SqsClient'
export type { Message as QueueMessage } from '@aws-sdk/client-sqs'
export class AwsSignatureDoesNotMatchError extends TaggedError(
'AwsSignatureDoesNotMatchError',
) { }
const make = Effect.gen(function* () {
const sqsClient = yield* SqsClient
const isProd = yield* Config.string('NODE_ENV').pipe(
Config.withDefault('development'),
Config.map(env => env === 'production'),
)
const sendMessage = <A, I>({
queueName,
schema,
message,
}: { queueName: string; schema: Schema.Schema<A, I>; message: A }) =>
Effect.gen(function* () {
const queueUrl = yield* createQueue({ queueName })
const stringMessage = yield* Schema.encode(Schema.parseJson(schema))(
message,
)
return yield* Effect.tryPromise(() =>
sqsClient.send(
new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: stringMessage,
}),
),
)
})
const pollNextMessage = ({ queueUrl }: { queueUrl: string }) => {
return Effect.gen(function* () {
yield* logDebug('start receive message...')
const receivedMessages = yield* Effect.tryPromise({
try: () =>
sqsClient.send(
new ReceiveMessageCommand({
QueueUrl: queueUrl,
MaxNumberOfMessages: 1,
WaitTimeSeconds: 20,
}),
),
catch: (error: unknown) => {
return Match.value(error).pipe(
Match.when(
{ Code: 'SignatureDoesNotMatch' },
() => new AwsSignatureDoesNotMatchError(),
),
Match.orElse(() => new UnknownException(error)),
)
},
})
return pipe(
receivedMessages.Messages,
Option.fromNullable,
Option.flatMap(Array.head),
)
}).pipe(Effect.withLogSpan('pollNextMessage'))
}
const deleteMessage = ({
receipHandle,
queueUrl,
}: { receipHandle: string; queueUrl: string }) => {
return Effect.gen(function* () {
const deletion = yield* Effect.tryPromise(() =>
sqsClient.send(
new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: receipHandle,
}),
),
)
yield* Effect.logDebug('Message deleted')
return deletion
}).pipe(Effect.withLogSpan('deleteMessage'))
}
const createQueue = ({ queueName }: { queueName: string }) => {
return Effect.gen(function* () {
const resolvedQueueName = `${isProd ? 'Prod_' : ''}${queueName}`
const arnDeadLetterQueue = yield* Config.string('ARN_DEAD_LETTER_QUEUE')
const redrivePolicySchema = Schema.Struct({
deadLetterTargetArn: Schema.String,
maxReceiveCount: Schema.Number,
})
const redrivePolicy: Schema.Schema.Type<typeof redrivePolicySchema> = {
deadLetterTargetArn: arnDeadLetterQueue,
maxReceiveCount: 2,
}
const encode = Schema.encodeSync(Schema.parseJson(redrivePolicySchema))
const params: CreateQueueCommandInput = {
QueueName: resolvedQueueName,
Attributes: {
RedrivePolicy: encode(redrivePolicy),
},
}
const createQueueResp = yield* Effect.tryPromise(() =>
sqsClient.send(new CreateQueueCommand(params)),
)
yield* Effect.logDebug(`Queue created: [${resolvedQueueName}]`)
return yield* Effect.fromNullable(createQueueResp.QueueUrl)
}).pipe(Effect.withLogSpan('createQueue'))
}
return { pollNextMessage, deleteMessage, createQueue, sendMessage }
})
export class SqsService extends Context.Tag('SqsService')<
SqsService,
Effect.Effect.Success<typeof make>
>() {
static Live = Layer.effect(this, make).pipe(Layer.provide(SqsClient.Live))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment