Skip to content

Instantly share code, notes, and snippets.

@eigenhombre
Last active October 26, 2023 21:10
Show Gist options
  • Select an option

  • Save eigenhombre/b0c9e7c7eccc8f040a0cbd74b9df3e62 to your computer and use it in GitHub Desktop.

Select an option

Save eigenhombre/b0c9e7c7eccc8f040a0cbd74b9df3e62 to your computer and use it in GitHub Desktop.
AWS Kinesis example in Clojure: feeding and consuming streams using Amazonica
(ns example.kinesis
(:require [amazonica.aws.kinesis :as k]
[cheshire.core :as json]
[clj-time.coerce :as tc]
[clj-time.core :as t])
(:import [java.nio ByteBuffer]
[java.nio.charset Charset]))
(def my-stream-name "my-neato-stream")
(def cs (Charset/forName "UTF-8"))
(def outstanding-ids (atom #{}))
(defn ^:private submit-records [recs]
(let [encoded-recs (for [rec recs]
{:data (->> rec
json/generate-string
(.encode cs))
:partition-key (:id rec)})]
(doseq [batch (partition-all 500 encoded-recs)]
(println
((juxt (comp count :records)
:failed-record-count)
(k/put-records my-stream-name batch))))))
(defn ^:private deserialize-record [m]
(-> m
.array
(String. cs)
json/parse-string
clojure.walk/keywordize-keys))
(defn records-for-shard
([stream-name iterator-type shard-id]
(records-for-shard stream-name iterator-type shard-id nil))
([stream-name iterator-type shard-id shard-iterator]
(lazy-seq
(let [si (or shard-iterator
(k/get-shard-iterator stream-name
shard-id
iterator-type))
{:keys [next-shard-iterator records]}
(k/get-records :shard-iterator si
:deserializer deserialize-record)]
(println (format "Got %d records from shard %s"
(count records)
shard-id))
(lazy-cat records
(when (seq records)
(records-for-shard stream-name
iterator-type
shard-id
next-shard-iterator)))))))
;; Adapted from
;; https://stackoverflow.com/questions/37861893/\
;; merge-group-by-huge-sequences-lazily-in-clojure:
(defn merge-sorted-lazily [compare-fn seq1 seq2]
(cond (empty? seq1) seq2
(empty? seq2) seq1
(neg? (compare-fn (first seq1) (first seq2)))
(cons (first seq1)
(lazy-seq (merge-sorted-lazily compare-fn (rest seq1) seq2)))
:else
(cons (first seq2)
(lazy-seq (merge-sorted-lazily compare-fn seq1 (rest seq2))))))
(defn all-kinesis-records [stream-name iterator-type]
(let [cmp (fn [a b]
(compare (-> a :data :submitted_at)
(-> b :data :submitted_at)))
shards (->> stream-name
k/describe-stream
:stream-description
:shards
(map :shard-id))
all-streams (map (partial records-for-shard stream-name iterator-type)
shards)]
(reduce (partial merge-sorted-lazily cmp) all-streams)))
(comment
(defn example-rec []
{:body "hi there"
:id (str (java.util.UUID/randomUUID))
:submitted_at (tc/to-long (t/now))
:x (inc (rand-int 5))})
(k/list-streams)
(k/describe-stream my-stream-name)
#_(k/delete-stream my-stream-name)
(def recs (repeatedly 10000 example-rec))
;; Check sort order and count already submitted records:
(submit-records recs)
(let [timestamps
(map (comp :submitted_at :data)
(all-kinesis-records my-stream-name "TRIM_HORIZON"))]
(assert (apply <= timestamps))
(count timestamps))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment