Created
November 8, 2015 04:23
-
-
Save Crim/8ba12b695b4a0c16e8e6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(^void execute [this ^Tuple tuple] | |
(let [^RotatingMap pending (.getObject pending) | |
stream-id (.getSourceStreamId tuple)] | |
(if (= stream-id Constants/SYSTEM_TICK_STREAM_ID) | |
; If stream is is system tick, we rotate | |
(.rotate pending) | |
; Otherwise we let id equal value 0 from the tuple | |
(let [id (.getValue tuple 0) | |
; grab our output collector | |
^OutputCollector output-collector (.getObject output-collector) | |
; grab our current value from rotating map using the tuple id | |
curr (.get pending id) | |
; based on what stream-id the tuple came from | |
curr (condp = stream-id | |
; If init stream (new tuple) | |
ACKER-INIT-STREAM-ID (-> curr | |
(update-ack (.getValue tuple 1)) | |
(assoc :spout-task (.getValue tuple 2))) | |
; Ack stream | |
; Update current value xoring with the old value | |
ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1)) | |
; Fail stream, set failde key to true | |
ACKER-FAIL-STREAM-ID (assoc curr :failed true))] | |
; put the value back into our rotating map | |
; this should extend its timeout | |
(.put pending id curr) | |
(when (and curr (:spout-task curr)) | |
; if our current value has been xor'd to 0 | |
(cond (= 0 (:val curr)) | |
; tell the spout the tuple is complete | |
(do | |
(.remove pending id) | |
(acker-emit-direct output-collector | |
(:spout-task curr) | |
ACKER-ACK-STREAM-ID | |
[id] | |
)) | |
; else If the failed key is set | |
; we fail the tuple | |
(:failed curr) | |
(do | |
(.remove pending id) | |
(acker-emit-direct output-collector | |
(:spout-task curr) | |
ACKER-FAIL-STREAM-ID | |
[id] | |
)) | |
)) | |
; Lastly ack the tuple | |
(.ack output-collector tuple) | |
)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment