Skip to content

Instantly share code, notes, and snippets.

@svard
Created April 27, 2014 20:33
Show Gist options
  • Save svard/11355031 to your computer and use it in GitHub Desktop.
Save svard/11355031 to your computer and use it in GitHub Desktop.
Trident redis state using marceline
;; Trident state persisted in redis.
;; Implemented in clojure using marceline lib.
;; Based on https://github.com/kstyrc/trident-redis
(ns state.redis
(:require [marceline.storm.trident :as t]
[clj-redis.client :as redis])
(:import [storm.trident.state.map CachedMap TransactionalMap NonTransactionalMap OpaqueMap SnapshottableMap]))
(defn- get-type-map
[c type]
(case type
:transactional (TransactionalMap/build c)
:non-transactional (NonTransactionalMap/build c)
:opaque (OpaqueMap/build c)
(throw (RuntimeException. "Unknown type map"))))
(defn- get-values-from-hash
[pool hkey keys]
(let [vals (redis/hgetall pool hkey)
str-keys (map #(str (t/first %)) keys)]
(mapv (fn [key]
(when-let [val (get vals key)]
(read-string val)))
str-keys)))
(defn- get-values
[pool keys]
(let [str-keys (map #(str (t/first %)) keys)
vals (apply redis/mget pool str-keys)]
(mapv (fn [val]
(when val
(read-string val)))
vals)))
(defn- put-values-into-hash
[pool hkey keys vals]
(mapv (fn [key val]
(redis/hset pool
hkey
(str (t/first key))
(str val)))
keys
vals))
(defn- put-values
[pool keys vals]
(let [str-keys (map #(str (t/first %)) keys)
key-val (flatten (map #(vector %1 (str %2)) str-keys vals))]
(apply redis/mset pool key-val)))
(t/defbackingmap
redis-state {:params [pool hkey]}
([keys]
(if (< 0 (count keys))
(if hkey
(java.util.ArrayList. (get-values-from-hash pool hkey keys))
(java.util.ArrayList. (get-values pool keys)))
(java.util.ArrayList. [])))
([keys vals]
(when (< 0 (count keys))
(if hkey
(put-values-into-hash pool hkey keys vals)
(put-values pool keys vals)))))
(t/defstatefactory
redis-factory {:params [host & {:keys [hkey port type]
:or {port 6379} :as opts}]}
[conf metrics index part]
(let [jedis-pool (redis/init {:url (str "redis://" host ":" port)})
redis (redis-state jedis-pool hkey)
c (CachedMap. redis 1000)
ms (get-type-map c type)]
(SnapshottableMap. ms (t/values "$REDIS-MAP-STATE-GLOBAL"))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment