Skip to content

Instantly share code, notes, and snippets.

@Quantisan
Last active December 21, 2015 03:08
Show Gist options
  • Save Quantisan/6239540 to your computer and use it in GitHub Desktop.
Save Quantisan/6239540 to your computer and use it in GitHub Desktop.
Cascalog pattern to right join only the most recent record to left
;; TEST
(deftest join-right-recent-test
(let [left (memory-source-tap ["?timestamp" "?uscc" "?x"]
[[1000 "u1" "AAA"]
[2000 "u2" "BBB"]])
right (memory-source-tap ["?timestamp" "?uscc" "?y" "?z"]
[[500 "u1" "a1" "a2"]
[800 "u1" "b1" "b2"]
[1100 "u1" "c1" "c2"]])]
(fact
(join-right-recent left right ["?uscc"]) => (produces [[1000 "u1" "AAA" "b1" "b2"]
[2000 "u2" "BBB" nil nil]]))))
(deftest join-right-recent-right-in-middle-test
(let [left (memory-source-tap ["?timestamp" "?uscc" "?x"]
[[1000 "u1" "AAA"]
[2000 "u1" "BBB"]])
right (memory-source-tap ["?timestamp" "?uscc" "?y" "?z"]
[[1100 "u1" "c1" "c2"]])]
(fact
(join-right-recent left right ["?uscc"]) => (produces [[1000 "u1" "AAA" nil nil]
[2000 "u1" "BBB" "c1 c2"]]))))
(deftest join-right-recent-multi-anchors-test
(let [left (memory-source-tap ["?timestamp" "?uscc" "?sup" "?x"]
[[1000 "u1" "A" "AAA"]
[2000 "u2" "B" "BBB"]])
right (memory-source-tap ["?timestamp" "?uscc" "?sup" "?y" "?z"]
[[500 "u1" "A" "a1" "a2"]
[800 "u1" "B" "b1" "b2"]
[1100 "u1" "A" "c1" "c2"]])]
(fact
(join-right-recent left right ["?uscc" "?sup"]) => (produces [[1000 "u1" "A" "AAA" "a1" "a2"]
[2000 "u2" "B" "BBB" nil nil]]))))
(deftest double-join-right-recent-test
(let [left (memory-source-tap ["?timestamp" "?uscc" "?sup" "?x"]
[[1000 "u1" "A" "AAA"]
[2000 "u2" "B" "BBB"]])
mid (memory-source-tap ["?timestamp" "?uscc" "?sup" "?y" "?z"]
[[500 "u1" "A" "a1" "a2"]
[800 "u1" "B" "b1" "b2"]
[1100 "u1" "A" "c1" "c2"]])
right (memory-source-tap ["?timestamp" "?uscc" "?session"]
[[800 "u1" 800]])
first-join (join-right-recent left mid ["?uscc" "?sup"])]
(fact
first-join => (produces [[1000 "u1" "A" "AAA" "a1" "a2"]
[2000 "u2" "B" "BBB" nil nil]])
(join-right-recent first-join right ["?uscc"]) => (produces [[1000 "u1" "A" "AAA" "a1" "a2" 800]
[2000 "u2" "B" "BBB" nil nil nil]])
(-> left
(join-right-recent mid ["?uscc" "?sup"])
(join-right-recent right ["?uscc"])) => (produces [[1000 "u1" "A" "AAA" "a1" "a2" 800]
[2000 "u2" "B" "BBB" nil nil nil]]))))
;; CODE
(defn right-join [left right right-in-vars out-vars]
(let [left-vars (get-out-fields left)]
(<- out-vars
(left :>> left-vars)
(right :>> right-in-vars))))
(defn latest-recent-tuple [state coll]
(let [[ts-right & more] state
[ts-left ts-right-new & more-new] coll]
(if (and ts-right-new (< ts-right-new ts-left))
(into [] (cons ts-right-new more-new))
state)))
(defbufferiterop latest-recent-tuple-buffer
"Takes in [?timestamp-left ?timestamp-right & more], returns [?timestamp-right-recent & more-latest]"
[tuples]
(let [tuples-seq (iterator-seq tuples)
n (-> tuples-seq (first) (count) (dec))]
(->> tuples-seq
(reduce latest-recent-tuple (repeat n nil))
(into [])
(vector))))
;; this one could do with some refactoring
(defn gen-right-join-fields
"Generate all the needed var combinations for join-right-recent use"
[left-vars right-vars join-fields]
(let [right-extra-vars (apply f/remove-fields right-vars join-fields)
right-extra-vars-appended (into [] (map #(str % "-all") right-extra-vars))
right-extra-unground-vars (apply f/set-vars-type right-extra-vars (f/var-kwd :unground) right-extra-vars)
right-extra-appended-nullable-vars (apply f/set-vars-type right-extra-vars-appended (f/var-kwd :nullable) right-extra-vars-appended)
right-extra-vars-nullable (apply f/set-vars-type right-extra-vars (f/var-kwd :nullable) right-extra-vars)]
{:right-in-vars (apply f/set-vars-type right-vars (f/var-kwd :unground) right-extra-vars)
:joined-out-vars (apply f/add-fields left-vars right-extra-unground-vars)
:joined-in-vars (apply f/add-fields left-vars right-extra-appended-nullable-vars)
:right-extra-vars-append-all (f/prepend-fields right-extra-appended-nullable-vars "?timestamp")
:right-extra-vars-nullable right-extra-vars-nullable
:right-recent-joined-vars (apply f/add-fields left-vars (f/remove-field right-extra-vars-nullable "!timestamp"))}))
(defn join-right-recent [left right join-on-fields]
(let [left-vars (get-out-fields left)
right-vars (get-out-fields right)
{:keys [right-in-vars
joined-out-vars
joined-in-vars
right-extra-vars-append-all
right-extra-vars-nullable
right-recent-joined-vars]} (gen-right-join-fields left-vars right-vars join-on-fields)
joined-src (right-join left right right-in-vars joined-out-vars)]
(<- right-recent-joined-vars
(joined-src :>> joined-in-vars)
(:sort !timestamp-all)
(latest-recent-tuple-buffer :<< right-extra-vars-append-all :>> right-extra-vars-nullable))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment