Last active
December 21, 2015 03:08
-
-
Save Quantisan/6239540 to your computer and use it in GitHub Desktop.
Cascalog pattern to right join only the most recent record to left
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; TEST | |
(deftest join-right-recent-test | |
(let [left (memory-source-tap ["?timestamp" "?uscc" "?x"] | |
[[1000 "u1" "AAA"] | |
[2000 "u2" "BBB"]]) | |
right (memory-source-tap ["?timestamp" "?uscc" "?y" "?z"] | |
[[500 "u1" "a1" "a2"] | |
[800 "u1" "b1" "b2"] | |
[1100 "u1" "c1" "c2"]])] | |
(fact | |
(join-right-recent left right ["?uscc"]) => (produces [[1000 "u1" "AAA" "b1" "b2"] | |
[2000 "u2" "BBB" nil nil]])))) | |
(deftest join-right-recent-right-in-middle-test | |
(let [left (memory-source-tap ["?timestamp" "?uscc" "?x"] | |
[[1000 "u1" "AAA"] | |
[2000 "u1" "BBB"]]) | |
right (memory-source-tap ["?timestamp" "?uscc" "?y" "?z"] | |
[[1100 "u1" "c1" "c2"]])] | |
(fact | |
(join-right-recent left right ["?uscc"]) => (produces [[1000 "u1" "AAA" nil nil] | |
[2000 "u1" "BBB" "c1 c2"]])))) | |
(deftest join-right-recent-multi-anchors-test | |
(let [left (memory-source-tap ["?timestamp" "?uscc" "?sup" "?x"] | |
[[1000 "u1" "A" "AAA"] | |
[2000 "u2" "B" "BBB"]]) | |
right (memory-source-tap ["?timestamp" "?uscc" "?sup" "?y" "?z"] | |
[[500 "u1" "A" "a1" "a2"] | |
[800 "u1" "B" "b1" "b2"] | |
[1100 "u1" "A" "c1" "c2"]])] | |
(fact | |
(join-right-recent left right ["?uscc" "?sup"]) => (produces [[1000 "u1" "A" "AAA" "a1" "a2"] | |
[2000 "u2" "B" "BBB" nil nil]])))) | |
(deftest double-join-right-recent-test | |
(let [left (memory-source-tap ["?timestamp" "?uscc" "?sup" "?x"] | |
[[1000 "u1" "A" "AAA"] | |
[2000 "u2" "B" "BBB"]]) | |
mid (memory-source-tap ["?timestamp" "?uscc" "?sup" "?y" "?z"] | |
[[500 "u1" "A" "a1" "a2"] | |
[800 "u1" "B" "b1" "b2"] | |
[1100 "u1" "A" "c1" "c2"]]) | |
right (memory-source-tap ["?timestamp" "?uscc" "?session"] | |
[[800 "u1" 800]]) | |
first-join (join-right-recent left mid ["?uscc" "?sup"])] | |
(fact | |
first-join => (produces [[1000 "u1" "A" "AAA" "a1" "a2"] | |
[2000 "u2" "B" "BBB" nil nil]]) | |
(join-right-recent first-join right ["?uscc"]) => (produces [[1000 "u1" "A" "AAA" "a1" "a2" 800] | |
[2000 "u2" "B" "BBB" nil nil nil]]) | |
(-> left | |
(join-right-recent mid ["?uscc" "?sup"]) | |
(join-right-recent right ["?uscc"])) => (produces [[1000 "u1" "A" "AAA" "a1" "a2" 800] | |
[2000 "u2" "B" "BBB" nil nil nil]])))) | |
;; CODE | |
(defn right-join [left right right-in-vars out-vars] | |
(let [left-vars (get-out-fields left)] | |
(<- out-vars | |
(left :>> left-vars) | |
(right :>> right-in-vars)))) | |
(defn latest-recent-tuple [state coll] | |
(let [[ts-right & more] state | |
[ts-left ts-right-new & more-new] coll] | |
(if (and ts-right-new (< ts-right-new ts-left)) | |
(into [] (cons ts-right-new more-new)) | |
state))) | |
(defbufferiterop latest-recent-tuple-buffer | |
"Takes in [?timestamp-left ?timestamp-right & more], returns [?timestamp-right-recent & more-latest]" | |
[tuples] | |
(let [tuples-seq (iterator-seq tuples) | |
n (-> tuples-seq (first) (count) (dec))] | |
(->> tuples-seq | |
(reduce latest-recent-tuple (repeat n nil)) | |
(into []) | |
(vector)))) | |
;; this one could do with some refactoring | |
(defn gen-right-join-fields | |
"Generate all the needed var combinations for join-right-recent use" | |
[left-vars right-vars join-fields] | |
(let [right-extra-vars (apply f/remove-fields right-vars join-fields) | |
right-extra-vars-appended (into [] (map #(str % "-all") right-extra-vars)) | |
right-extra-unground-vars (apply f/set-vars-type right-extra-vars (f/var-kwd :unground) right-extra-vars) | |
right-extra-appended-nullable-vars (apply f/set-vars-type right-extra-vars-appended (f/var-kwd :nullable) right-extra-vars-appended) | |
right-extra-vars-nullable (apply f/set-vars-type right-extra-vars (f/var-kwd :nullable) right-extra-vars)] | |
{:right-in-vars (apply f/set-vars-type right-vars (f/var-kwd :unground) right-extra-vars) | |
:joined-out-vars (apply f/add-fields left-vars right-extra-unground-vars) | |
:joined-in-vars (apply f/add-fields left-vars right-extra-appended-nullable-vars) | |
:right-extra-vars-append-all (f/prepend-fields right-extra-appended-nullable-vars "?timestamp") | |
:right-extra-vars-nullable right-extra-vars-nullable | |
:right-recent-joined-vars (apply f/add-fields left-vars (f/remove-field right-extra-vars-nullable "!timestamp"))})) | |
(defn join-right-recent [left right join-on-fields] | |
(let [left-vars (get-out-fields left) | |
right-vars (get-out-fields right) | |
{:keys [right-in-vars | |
joined-out-vars | |
joined-in-vars | |
right-extra-vars-append-all | |
right-extra-vars-nullable | |
right-recent-joined-vars]} (gen-right-join-fields left-vars right-vars join-on-fields) | |
joined-src (right-join left right right-in-vars joined-out-vars)] | |
(<- right-recent-joined-vars | |
(joined-src :>> joined-in-vars) | |
(:sort !timestamp-all) | |
(latest-recent-tuple-buffer :<< right-extra-vars-append-all :>> right-extra-vars-nullable)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment