Last active
July 30, 2021 09:47
-
-
Save witalobenicio/436ca69e3fa666447fc5bb4551c7a361 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
ChangeMessageVisibilityCommand, | |
ChangeMessageVisibilityCommandOutput, | |
DeleteMessageCommand, | |
DeleteMessageCommandOutput, | |
Message, | |
ReceiveMessageCommand, | |
SendMessageBatchCommand, | |
SendMessageBatchRequestEntry, | |
SendMessageCommand, | |
SendMessageCommandOutput, | |
SQSClient, | |
SQSClientConfig, | |
} from '@aws-sdk/client-sqs'; | |
import config from '@config'; | |
import { IQueue } from '@managers/queue/types/Queue'; | |
import chunkify from '@core/utils/chunkify'; | |
class QueueManager { | |
private awsConfig: SQSClientConfig = { | |
region: config.awsRegion, | |
credentials: { | |
accessKeyId: config.awsAccessKey, | |
secretAccessKey: config.awsAccessSecret, | |
}, | |
}; | |
private sqs: SQSClient = new SQSClient(this.awsConfig); | |
// Max number of messages to receive | |
private readonly batch: number = 10; | |
// Max number of seconds that message will 'belong' to a receiver | |
private readonly visibilityTimeout: number = 8 * 60; | |
public async send(body: string, to?: IQueue): Promise<void> { | |
if (!body) { throw new Error('You need to provide a body'); } | |
if (!to) { throw new Error('You need to provide a queue destination'); } | |
const command = new SendMessageCommand({ | |
QueueUrl: to.getUrl(), | |
MessageBody: body, | |
}); | |
return this.sqs.send(command); | |
} | |
public async sendMultiple(bodies: string[], to: IQueue): Promise<void> { | |
if (!bodies || !bodies.length) { throw new Error('You need to provide a body'); } | |
const requests = []; | |
// Separate our bigger array into multiple arrays of batch max size so we don't get an error | |
chunkify(tasks, this.batch).forEach(chunk => { | |
const entries = chunk.map(body => ({ MessageBody: body })) as unknown as SendMessageBatchRequestEntry[]; | |
const command = new SendMessageBatchCommand({ | |
QueueUrl: to.getUrl(), | |
Entries: entries, | |
}); | |
requests.push(this.sqs.send(command)); | |
}); | |
return Promise.all(requests); | |
} | |
public async releaseMessage(receiptHandle: string, from: IQueue): Promise<ChangeMessageVisibilityCommandOutput> { | |
const command = new ChangeMessageVisibilityCommand({ | |
QueueUrl: from.getUrl(), | |
VisibilityTimeout: 0, | |
ReceiptHandle: receiptHandle, | |
}); | |
return this.sqs.send(command); | |
} | |
public async receiveWithPriority(from: IQueue, quantity: number): Promise<Message[]> { | |
const messages = await this.receiveFromQueue(from, quantity, 0); | |
if ((!messages || !messages.length) && from.getPriorityQueue()) { | |
return this.receive(from.getPriorityQueue(), quantity); | |
} | |
return messages; | |
} | |
public async receive(from: IQueue, quantity: number): Promise<Message[]> { | |
return this.receiveWithPriority(from, quantity); | |
} | |
public async delete(receiptHandle: string, from: IQueue): Promise<DeleteMessageCommandOutput> { | |
const command = new DeleteMessageCommand({ | |
QueueUrl: from.getUrl(), | |
ReceiptHandle: receiptHandle, | |
}); | |
return this.sqs.send(command); | |
} | |
private async receiveFromQueue(from: IQueue, quantity: number, waitTime: number): Promise<Message[]> { | |
const usedQuantity = quantity === undefined || quantity > this.batch ? this.batch : quantity; | |
const command = new ReceiveMessageCommand({ | |
QueueUrl: from.getUrl(), | |
MaxNumberOfMessages: usedQuantity, | |
VisibilityTimeout: this.visibilityTimeout, | |
WaitTimeSeconds: waitTime, | |
}); | |
const data = await this.sqs.send(command); | |
return data.Messages; | |
} | |
} | |
export default QueueManager; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment