Created
August 8, 2024 07:31
-
-
Save titouancreach/3643aad81f1f34f48787d1d50a20711f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
CreateQueueCommand, | |
DeleteMessageCommand, | |
ReceiveMessageCommand, | |
SendMessageCommand, | |
type CreateQueueCommandInput, | |
} from '@aws-sdk/client-sqs' | |
import { Schema } from '@effect/schema' | |
import { | |
Array, | |
Config, | |
Context, | |
Effect, | |
Layer, | |
Match, | |
Option, | |
pipe, | |
} from 'effect' | |
import { UnknownException } from 'effect/Cause' | |
import { TaggedError } from 'effect/Data' | |
import { logDebug } from 'effect/Effect' | |
import { SqsClient } from './SqsClient' | |
export type { Message as QueueMessage } from '@aws-sdk/client-sqs' | |
export class AwsSignatureDoesNotMatchError extends TaggedError( | |
'AwsSignatureDoesNotMatchError', | |
) { } | |
const make = Effect.gen(function* () { | |
const sqsClient = yield* SqsClient | |
const isProd = yield* Config.string('NODE_ENV').pipe( | |
Config.withDefault('development'), | |
Config.map(env => env === 'production'), | |
) | |
const sendMessage = <A, I>({ | |
queueName, | |
schema, | |
message, | |
}: { queueName: string; schema: Schema.Schema<A, I>; message: A }) => | |
Effect.gen(function* () { | |
const queueUrl = yield* createQueue({ queueName }) | |
const stringMessage = yield* Schema.encode(Schema.parseJson(schema))( | |
message, | |
) | |
return yield* Effect.tryPromise(() => | |
sqsClient.send( | |
new SendMessageCommand({ | |
QueueUrl: queueUrl, | |
MessageBody: stringMessage, | |
}), | |
), | |
) | |
}) | |
const pollNextMessage = ({ queueUrl }: { queueUrl: string }) => { | |
return Effect.gen(function* () { | |
yield* logDebug('start receive message...') | |
const receivedMessages = yield* Effect.tryPromise({ | |
try: () => | |
sqsClient.send( | |
new ReceiveMessageCommand({ | |
QueueUrl: queueUrl, | |
MaxNumberOfMessages: 1, | |
WaitTimeSeconds: 20, | |
}), | |
), | |
catch: (error: unknown) => { | |
return Match.value(error).pipe( | |
Match.when( | |
{ Code: 'SignatureDoesNotMatch' }, | |
() => new AwsSignatureDoesNotMatchError(), | |
), | |
Match.orElse(() => new UnknownException(error)), | |
) | |
}, | |
}) | |
return pipe( | |
receivedMessages.Messages, | |
Option.fromNullable, | |
Option.flatMap(Array.head), | |
) | |
}).pipe(Effect.withLogSpan('pollNextMessage')) | |
} | |
const deleteMessage = ({ | |
receipHandle, | |
queueUrl, | |
}: { receipHandle: string; queueUrl: string }) => { | |
return Effect.gen(function* () { | |
const deletion = yield* Effect.tryPromise(() => | |
sqsClient.send( | |
new DeleteMessageCommand({ | |
QueueUrl: queueUrl, | |
ReceiptHandle: receipHandle, | |
}), | |
), | |
) | |
yield* Effect.logDebug('Message deleted') | |
return deletion | |
}).pipe(Effect.withLogSpan('deleteMessage')) | |
} | |
const createQueue = ({ queueName }: { queueName: string }) => { | |
return Effect.gen(function* () { | |
const resolvedQueueName = `${isProd ? 'Prod_' : ''}${queueName}` | |
const arnDeadLetterQueue = yield* Config.string('ARN_DEAD_LETTER_QUEUE') | |
const redrivePolicySchema = Schema.Struct({ | |
deadLetterTargetArn: Schema.String, | |
maxReceiveCount: Schema.Number, | |
}) | |
const redrivePolicy: Schema.Schema.Type<typeof redrivePolicySchema> = { | |
deadLetterTargetArn: arnDeadLetterQueue, | |
maxReceiveCount: 2, | |
} | |
const encode = Schema.encodeSync(Schema.parseJson(redrivePolicySchema)) | |
const params: CreateQueueCommandInput = { | |
QueueName: resolvedQueueName, | |
Attributes: { | |
RedrivePolicy: encode(redrivePolicy), | |
}, | |
} | |
const createQueueResp = yield* Effect.tryPromise(() => | |
sqsClient.send(new CreateQueueCommand(params)), | |
) | |
yield* Effect.logDebug(`Queue created: [${resolvedQueueName}]`) | |
return yield* Effect.fromNullable(createQueueResp.QueueUrl) | |
}).pipe(Effect.withLogSpan('createQueue')) | |
} | |
return { pollNextMessage, deleteMessage, createQueue, sendMessage } | |
}) | |
export class SqsService extends Context.Tag('SqsService')< | |
SqsService, | |
Effect.Effect.Success<typeof make> | |
>() { | |
static Live = Layer.effect(this, make).pipe(Layer.provide(SqsClient.Live)) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment