Created
August 3, 2018 14:40
-
-
Save igrishaev/3ee0d3d014c8d8f93afc1c37034719c9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns project.sqs | |
(:require [cheshire.core :as json]) | |
(:import [com.amazonaws.services.sqs | |
AmazonSQS | |
AmazonSQSClientBuilder] | |
[com.amazonaws.auth | |
BasicAWSCredentials | |
AWSStaticCredentialsProvider] | |
[com.amazonaws.services.sqs.model | |
CreateQueueRequest | |
DeleteMessageBatchResultEntry | |
DeleteMessageBatchResult | |
DeleteMessageResult | |
SendMessageBatchResultEntry | |
BatchResultErrorEntry | |
SendMessageBatchResult | |
SendMessageResult | |
SendMessageBatchRequestEntry | |
SendMessageRequest | |
SendMessageBatchRequest | |
ReceiveMessageResult | |
DeleteMessageRequest | |
DeleteMessageBatchRequest | |
DeleteMessageBatchRequestEntry | |
ReceiveMessageRequest | |
Message]) | |
(:refer-clojure :exclude [send])) | |
(def enumerate (partial map-indexed vector)) | |
;; | |
;; Client | |
;; | |
(defn amazon-sqs | |
[aws-key aws-secret region] | |
(let [creds (new BasicAWSCredentials aws-key aws-secret)] | |
(-> (AmazonSQSClientBuilder/standard) | |
(.withRegion region) | |
(.withCredentials (new AWSStaticCredentialsProvider creds)) | |
(.build)))) | |
;; | |
;; Queue | |
;; | |
;; | |
;; Send | |
;; | |
(defn message-request | |
[queue-url {:keys [group-id deduplication-id body]}] | |
(-> (new SendMessageRequest) | |
(.withQueueUrl queue-url) | |
(.withMessageBody body) | |
(.withMessageGroupId group-id) | |
(.withMessageDeduplicationId deduplication-id))) | |
(defn message-batch-request | |
[queue-url messages] | |
(-> (new SendMessageBatchRequest) | |
(.withQueueUrl queue-url) | |
(.withEntries | |
(for [[index {:keys [group-id deduplication-id body]}] (enumerate messages)] | |
(-> (new SendMessageBatchRequestEntry) | |
(.withId (str index)) | |
(.withMessageBody body) | |
(.withMessageGroupId group-id) | |
(.withMessageDeduplicationId deduplication-id)))))) | |
(defn send-message | |
[client request] | |
(.sendMessage client request)) | |
(defn send-message-batch | |
[client request] | |
(.sendMessageBatch client request)) | |
;; | |
;; Receive | |
;; | |
(defn receive-request | |
[queue-url {:keys [visibility-timeout | |
wait-timeout | |
max-number-of-messages]}] | |
(-> (new ReceiveMessageRequest) | |
(.withQueueUrl queue-url) | |
(.withMaxNumberOfMessages (int max-number-of-messages)) | |
(.withVisibilityTimeout (int visibility-timeout)) | |
(.withWaitTimeSeconds (int wait-timeout)))) | |
(defn receive-message | |
[client request] | |
(.receiveMessage client request)) | |
;; | |
;; Delete | |
;; | |
(defn delete-request | |
[queue-url handle] | |
(new DeleteMessageRequest queue-url handle)) | |
(defn delete-batch-request | |
[queue-url handles] | |
(-> (new DeleteMessageBatchRequest) | |
(.withQueueUrl queue-url) | |
(.withEntries | |
(for [[index handle] (enumerate handles)] | |
(-> (new DeleteMessageBatchRequestEntry) | |
(.withId (str index)) | |
(.withReceiptHandle handle)))))) | |
(defn delete-message | |
[client request] | |
(.deleteMessage client request)) | |
(defn delete-message-batch | |
[client request] | |
(.deleteMessageBatch client request)) | |
;; | |
;; to Clojure | |
;; | |
(defprotocol ToClojure | |
(->clj [obj])) | |
(extend-type Message | |
ToClojure | |
(->clj [obj] | |
{:body (.getBody obj) | |
:body-md5 (.getMD5OfBody obj) | |
:id (.getMessageId obj) | |
:handle (.getReceiptHandle obj)})) | |
(extend-type ReceiveMessageResult | |
ToClojure | |
(->clj [obj] | |
{:messages (mapv ->clj (.getMessages obj))})) | |
(extend-type SendMessageResult | |
ToClojure | |
(->clj [obj] | |
{:id (.getMessageId obj) | |
:seq-number (.getSequenceNumber obj)})) | |
(extend-type SendMessageBatchResult | |
ToClojure | |
(->clj [obj] | |
{:failed (map ->clj (.getFailed obj)) | |
:successful (map ->clj (.getSuccessful obj))})) | |
(extend-type BatchResultErrorEntry | |
ToClojure | |
(->clj [obj] | |
{:id (.getId obj) | |
:message (.getMessage obj) | |
:sender-fault (.getSenderFault obj)})) | |
(extend-type SendMessageBatchResultEntry | |
ToClojure | |
(->clj [obj] | |
{:id (.getId obj) | |
:message-id (.getMessageId obj) | |
:seq-number (.getSequenceNumber obj)})) | |
(extend-type DeleteMessageResult | |
ToClojure | |
(->clj [obj])) | |
(extend-type DeleteMessageBatchResult | |
ToClojure | |
(->clj [obj] | |
{:failed (map ->clj (.getFailed obj)) | |
:successful (map ->clj (.getSuccessful obj))})) | |
(extend-type DeleteMessageBatchResultEntry | |
ToClojure | |
(->clj [obj] | |
{:id (.getId obj)})) | |
;; | |
;; Shortcuts | |
;; | |
(defn uuid [] | |
(str (java.util.UUID/randomUUID))) | |
(defn ->message | |
[group-id data] | |
{:group-id group-id | |
:deduplication-id (-> data hash str) | |
:body (json/generate-string data)}) | |
(defn msg-encode | |
[msg] | |
(update msg :body json/generate-string)) | |
(defn msg-decode | |
[msg] | |
(update msg :body json/parse-string true)) | |
(def map-msg-decode | |
(partial mapv msg-decode)) | |
(defn send | |
[client queue-url message] | |
(let [req (message-request queue-url message)] | |
(->clj (send-message client req)))) | |
(defn send-many | |
[client queue-url messages] | |
(let [req (message-batch-request queue-url messages)] | |
(->clj (send-message-batch client req)))) | |
(def receive-default | |
{:visibility-timeout 30 | |
:max-number-of-messages 10 | |
:wait-timeout 5}) | |
(defn receive | |
[client queue-url & [opt]] | |
(let [opt (merge receive-default opt) | |
req (receive-request queue-url opt) | |
res (receive-message client req) | |
res (->clj res) | |
{:keys [messages]} res] | |
(update res :messages map-msg-decode))) | |
(defn delete | |
[client queue-url handle] | |
(let [req (delete-request queue-url handle)] | |
(->clj (delete-message client req)))) | |
(defn delete-many | |
[client queue-url handles] | |
(let [req (delete-batch-request queue-url handles)] | |
(->clj (delete-message-batch client req)))) | |
(defn queue-url | |
[client queue-name] | |
(-> client (.getQueueUrl queue-name) .getQueueUrl)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment