Last active
June 6, 2018 14:12
-
-
Save raymcdermott/e495d7907ce83a64b049 to your computer and use it in GitHub Desktop.
core.async filtering by time
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns green-eggs.core | |
(:require [clojure.core.async | |
:as a | |
:refer [>! <! >!! <!! go chan buffer close! thread | |
alts! alts!! timeout onto-chan]] | |
[clj-time.core :as t] | |
[clj-time.coerce :refer [from-date]])) | |
;If you were to implement yourself, you could make a `go` that simply pulls from a channel, and adds to another as a | |
;`[timestamp item]` pair, then finally pushes into an unbounded `chan` that has a transducer that filters based on | |
; age of that timestamp | |
(def green-eggs-n-ham | |
["in the rain" | |
"on a train" | |
"in a box" | |
"with a fox" | |
"in a house" | |
"with a mouse" | |
"here or there" | |
"anywhere"]) | |
(defn delaying-timestamper [millis] | |
(let [now (t/now)] | |
(Thread/sleep millis) | |
now)) | |
(defn add-timestamp [now-provider item] | |
(let [now (now-provider)] | |
[now item])) | |
(defn time-stamper | |
[timestamper in] | |
(let [out (chan)] | |
(a/go-loop [] | |
(if-let [item (<! in)] | |
(if-let [enriched (timestamper item)] | |
(>! out enriched))) | |
(recur)) | |
out)) | |
; for testing the window | |
(def add-timestamp-with-delay (partial add-timestamp (partial delaying-timestamper 200))) | |
(def add-timestamp-no-delay (partial add-timestamp t/now)) | |
(defn window [window-seconds window-open elem] | |
(let [[time item] elem | |
window-close (t/plus window-open (t/seconds window-seconds)) | |
window (t/interval window-open window-close)] | |
(if (t/within? window time) | |
item))) | |
(def time-window (partial window 1 (t/now))) | |
(defn window-filter | |
[in] | |
(let [out (chan)] | |
(a/go-loop [] | |
(if-let [item (time-window (<! in))] | |
(>! out item)) | |
(recur)) | |
out)) | |
(defn simple-printer [in] | |
(a/go-loop [] | |
(if-let [item (<! in)] | |
(println item)) | |
(recur))) | |
(def in-chan (chan)) | |
(def standard-channel (time-stamper add-timestamp-with-delay in-chan)) | |
(def windowed (window-filter standard-channel)) | |
(simple-printer windowed) | |
(onto-chan in-chan green-eggs-n-ham) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment