Last active
October 26, 2023 21:10
-
-
Save eigenhombre/b0c9e7c7eccc8f040a0cbd74b9df3e62 to your computer and use it in GitHub Desktop.
AWS Kinesis example in Clojure: feeding and consuming streams using Amazonica
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (ns example.kinesis | |
| (:require [amazonica.aws.kinesis :as k] | |
| [cheshire.core :as json] | |
| [clj-time.coerce :as tc] | |
| [clj-time.core :as t]) | |
| (:import [java.nio ByteBuffer] | |
| [java.nio.charset Charset])) | |
| (def my-stream-name "my-neato-stream") | |
| (def cs (Charset/forName "UTF-8")) | |
| (def outstanding-ids (atom #{})) | |
| (defn ^:private submit-records [recs] | |
| (let [encoded-recs (for [rec recs] | |
| {:data (->> rec | |
| json/generate-string | |
| (.encode cs)) | |
| :partition-key (:id rec)})] | |
| (doseq [batch (partition-all 500 encoded-recs)] | |
| (println | |
| ((juxt (comp count :records) | |
| :failed-record-count) | |
| (k/put-records my-stream-name batch)))))) | |
| (defn ^:private deserialize-record [m] | |
| (-> m | |
| .array | |
| (String. cs) | |
| json/parse-string | |
| clojure.walk/keywordize-keys)) | |
| (defn records-for-shard | |
| ([stream-name iterator-type shard-id] | |
| (records-for-shard stream-name iterator-type shard-id nil)) | |
| ([stream-name iterator-type shard-id shard-iterator] | |
| (lazy-seq | |
| (let [si (or shard-iterator | |
| (k/get-shard-iterator stream-name | |
| shard-id | |
| iterator-type)) | |
| {:keys [next-shard-iterator records]} | |
| (k/get-records :shard-iterator si | |
| :deserializer deserialize-record)] | |
| (println (format "Got %d records from shard %s" | |
| (count records) | |
| shard-id)) | |
| (lazy-cat records | |
| (when (seq records) | |
| (records-for-shard stream-name | |
| iterator-type | |
| shard-id | |
| next-shard-iterator))))))) | |
| ;; Adapted from | |
| ;; https://stackoverflow.com/questions/37861893/\ | |
| ;; merge-group-by-huge-sequences-lazily-in-clojure: | |
| (defn merge-sorted-lazily [compare-fn seq1 seq2] | |
| (cond (empty? seq1) seq2 | |
| (empty? seq2) seq1 | |
| (neg? (compare-fn (first seq1) (first seq2))) | |
| (cons (first seq1) | |
| (lazy-seq (merge-sorted-lazily compare-fn (rest seq1) seq2))) | |
| :else | |
| (cons (first seq2) | |
| (lazy-seq (merge-sorted-lazily compare-fn seq1 (rest seq2)))))) | |
| (defn all-kinesis-records [stream-name iterator-type] | |
| (let [cmp (fn [a b] | |
| (compare (-> a :data :submitted_at) | |
| (-> b :data :submitted_at))) | |
| shards (->> stream-name | |
| k/describe-stream | |
| :stream-description | |
| :shards | |
| (map :shard-id)) | |
| all-streams (map (partial records-for-shard stream-name iterator-type) | |
| shards)] | |
| (reduce (partial merge-sorted-lazily cmp) all-streams))) | |
| (comment | |
| (defn example-rec [] | |
| {:body "hi there" | |
| :id (str (java.util.UUID/randomUUID)) | |
| :submitted_at (tc/to-long (t/now)) | |
| :x (inc (rand-int 5))}) | |
| (k/list-streams) | |
| (k/describe-stream my-stream-name) | |
| #_(k/delete-stream my-stream-name) | |
| (def recs (repeatedly 10000 example-rec)) | |
| ;; Check sort order and count already submitted records: | |
| (submit-records recs) | |
| (let [timestamps | |
| (map (comp :submitted_at :data) | |
| (all-kinesis-records my-stream-name "TRIM_HORIZON"))] | |
| (assert (apply <= timestamps)) | |
| (count timestamps)) | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment