Created
March 26, 2014 16:08
-
-
Save rboyd/9786887 to your computer and use it in GitHub Desktop.
dedup using first unique window with core.async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns cep-dedup | |
(:require [clj-time.coerce :as c] | |
[clojure.core.async :refer :all])) | |
; something like esper's std:firstunique. for a given time window ignore any duplicate messages received | |
; on channels after the first unique. subsequent messages on the first receiving channel pass through. | |
(defn update-sources [sources msgs min-time] | |
(clojure.core/reduce (fn [s msg] | |
(let [remaining (set (filter (fn [m] (not= (:ts m) min-time)) (get s (hash msg))))] | |
(if (empty? remaining) (dissoc s (hash msg)) | |
(assoc s (hash msg) remaining)))) sources msgs)) | |
(defn add-msg [sources buffer ch msg] | |
(let [ts (java.util.Date.)] | |
[(merge-with #(clojure.set/union %1 %2) sources {(hash msg) #{{:source ch :ts ts}}}) | |
(merge-with #(clojure.set/union %1 %2) buffer {ts #{{:body msg}}})])) | |
(defn dedup [recv-chs out-ch window-timespan] | |
(thread (loop [sources {} ; hash(message) => #{{:source <login> :ts <#inst>}, ...} | |
buffer (sorted-map)] ; <#inst (timestamp)> => #{full-msg1, ...} | |
(let [min-time (first (keys buffer)) | |
timeout-ch (if min-time (timeout (- (+ window-timespan (c/to-long min-time)) (c/to-long (java.util.Date.)))) nil) | |
[v ch] (alts!! (if timeout-ch (conj recv-chs timeout-ch) recv-chs)) | |
[new-sources new-buffer] (if (= ch timeout-ch) ; handle timeout | |
(let [msgs (get buffer min-time)] | |
(doseq [msg msgs] | |
(go (>! out-ch (:body msg)))) ; enqueue all messages with this timestamp | |
[(update-sources sources (clojure.core/map :body msgs) min-time) ; remove from sources where hash(msg body) and :ts match | |
(dissoc buffer min-time)]) ; remove messages from buffer where k matches min-time | |
(if-let [existing (get sources (hash v))] ; existing set #{ {:src <ch> :ts <inst>} } | |
(if (some #(= (:source %) ch) existing) | |
(add-msg sources buffer ch v) ; same body, same source: add to buffer | |
[sources buffer]) ; same body, different source: ignore | |
(add-msg sources buffer ch v)))] ; no existing msg: add to buffer | |
(recur new-sources new-buffer))))) | |
(comment | |
(def sch1 (chan)) | |
(def sch2 (chan)) | |
(def och1 (chan)) | |
(dedup [sch1 sch2] och1 500) | |
(thread (while true | |
(println "received" (<!! och1)))) | |
(>!! sch1 {:msg :data}) | |
(>!! sch2 {:msg :data}) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment