Created
April 27, 2014 20:33
-
-
Save svard/11355031 to your computer and use it in GitHub Desktop.
Trident redis state using marceline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Trident state persisted in redis. | |
;; Implemented in clojure using marceline lib. | |
;; Based on https://github.com/kstyrc/trident-redis | |
(ns state.redis | |
(:require [marceline.storm.trident :as t] | |
[clj-redis.client :as redis]) | |
(:import [storm.trident.state.map CachedMap TransactionalMap NonTransactionalMap OpaqueMap SnapshottableMap])) | |
(defn- get-type-map | |
[c type] | |
(case type | |
:transactional (TransactionalMap/build c) | |
:non-transactional (NonTransactionalMap/build c) | |
:opaque (OpaqueMap/build c) | |
(throw (RuntimeException. "Unknown type map")))) | |
(defn- get-values-from-hash | |
[pool hkey keys] | |
(let [vals (redis/hgetall pool hkey) | |
str-keys (map #(str (t/first %)) keys)] | |
(mapv (fn [key] | |
(when-let [val (get vals key)] | |
(read-string val))) | |
str-keys))) | |
(defn- get-values | |
[pool keys] | |
(let [str-keys (map #(str (t/first %)) keys) | |
vals (apply redis/mget pool str-keys)] | |
(mapv (fn [val] | |
(when val | |
(read-string val))) | |
vals))) | |
(defn- put-values-into-hash | |
[pool hkey keys vals] | |
(mapv (fn [key val] | |
(redis/hset pool | |
hkey | |
(str (t/first key)) | |
(str val))) | |
keys | |
vals)) | |
(defn- put-values | |
[pool keys vals] | |
(let [str-keys (map #(str (t/first %)) keys) | |
key-val (flatten (map #(vector %1 (str %2)) str-keys vals))] | |
(apply redis/mset pool key-val))) | |
(t/defbackingmap | |
redis-state {:params [pool hkey]} | |
([keys] | |
(if (< 0 (count keys)) | |
(if hkey | |
(java.util.ArrayList. (get-values-from-hash pool hkey keys)) | |
(java.util.ArrayList. (get-values pool keys))) | |
(java.util.ArrayList. []))) | |
([keys vals] | |
(when (< 0 (count keys)) | |
(if hkey | |
(put-values-into-hash pool hkey keys vals) | |
(put-values pool keys vals))))) | |
(t/defstatefactory | |
redis-factory {:params [host & {:keys [hkey port type] | |
:or {port 6379} :as opts}]} | |
[conf metrics index part] | |
(let [jedis-pool (redis/init {:url (str "redis://" host ":" port)}) | |
redis (redis-state jedis-pool hkey) | |
c (CachedMap. redis 1000) | |
ms (get-type-map c type)] | |
(SnapshottableMap. ms (t/values "$REDIS-MAP-STATE-GLOBAL")))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment