Skip to content

Instantly share code, notes, and snippets.

@kennethkalmer
Created March 18, 2018 10:40
Show Gist options
  • Save kennethkalmer/72722eeebfefe1595a70b098e056ad3c to your computer and use it in GitHub Desktop.
Save kennethkalmer/72722eeebfefe1595a70b098e056ad3c to your computer and use it in GitHub Desktop.
(ns user
(:require [clojure.core.async :as async :refer [<! <!! go-loop]]))
;; Use an agent to print so we don't clobber the output during async prints
(def printer (agent nil))
(defn report [& s]
(send printer
(fn [_]
(apply println s))))
;; Simple counter
(def cheat (atom 0))
;; Infinite sequence of random ints
(def nums (repeatedly #(rand-int 10)))
;; Fake processing function
(defn process-n
"Absolute intensive process of n, like mining crypto but valuable"
[n]
(report "processing" n (swap! cheat inc))
#_ (<!! (async/timeout (* n 100))) ;; <- Enable this form to lock up the pipeline
(+ n (rand-int n)))
;; This is a stateful transducer that will only return the unique elements
;; that flowed through. Basically (clojure.core/distinct) in this example
(def xform (fn [rf]
(let [seen (volatile! #{})]
(fn
([] (rf))
([result] (rf result))
([result item]
(report "xform" item (swap! cheat inc))
(if (contains? @seen item)
result
(do
(vswap! seen conj item)
(rf result item))))))))
;; Make a channel that will accept n's
(def in (async/chan))
;; Output channel that will be populated with the distinct values as it passes through `xform`
(def out (async/chan 1 xform))
;; Use core.async's pipeline to assemble a pipeline for concurrently processing-n
(async/pipeline (+ 2 (.availableProcessors (Runtime/getRuntime)))
out
(map process-n)
in)
;; Collect the unique results as they come off the pipeline
(go-loop [results []]
(if-let [v (<! out)]
(recur (conj results v))
(do
(report "Done!" results))))
;; Do the heavy work of processing 100 n's
(async/onto-chan in (take 100 nums))
processing 1 7
processing 2 1
processing 2 2
processing 4 3
processing 2 4
processing 2 8
processing 0 5
processing 4 6
processing 4 5
processing 5 7
processing 4 6
processing 5 1
processing 9 4
processing 0 3
processing 6 2
processing 3 8
processing 6 9
processing 9 10
processing 9 11
processing 0 12
xform 9 13
xform 9 14
xform 0 15
processing 7 16
xform 13 17
processing 9 18
xform 6 19
processing 8 20
xform 6 21
processing 9 22
xform 9 23
xform 5 24
processing 7 25
xform 10 26
processing 8 27
processing 1 28
xform 12 30
processing 0 29
processing 1 31
xform 12 32
xform 0 33
processing 9 34
processing 2 36
xform 12 35
processing 9 37
xform 15 38
processing 6 39
xform 8 40
xform 9 41
processing 9 42
xform 7 43
xform 15 44
xform 1 45
processing 8 46
xform 0 47
xform 1 48
xform 13 49
xform 3 50
processing 6 51
xform 9 52
xform 8 53
processing 8 54
xform 17 55
xform 10 56
xform 7 57
processing 1 58
xform 14 59
xform 1 60
processing 5 61
xform 8 62
processing 8 63
xform 11 65
processing 9 64
xform 12 66
processing 9 67
processing 8 68
xform 13 69
processing 7 70
xform 9 71
xform 9 72
processing 2 73
xform 2 74
processing 1 75
processing 4 76
xform 1 77
xform 7 78
processing 9 79
processing 7 80
xform 16 81
xform 10 82
processing 4 83
xform 6 84
processing 7 85
xform 7 86
processing 6 87
processing 9 88
xform 8 89
xform 11 90
processing 3 91
processing 0 92
xform 4 93
xform 0 94
processing 4 95
processing 3 96
xform 6 97
xform 3 98
processing 6 99
processing 2 100
xform 7 101
xform 2 102
processing 0 103
xform 0 104
processing 1 105
processing 7 106
xform 1 107
xform 8 108
processing 7 109
xform 8 110
processing 6 111
processing 9 112
xform 6 113
xform 17 114
processing 2 115
processing 4 116
xform 2 117
processing 2 118
xform 6 119
xform 2 120
processing 3 121
processing 0 122
xform 3 123
xform 0 124
processing 6 125
processing 6 126
xform 7 127
processing 8 128
xform 8 129
xform 15 130
processing 0 131
xform 0 132
processing 2 133
processing 7 134
xform 2 135
processing 9 136
xform 13 137
xform 9 138
processing 9 139
xform 14 140
processing 2 141
xform 3 142
processing 6 143
processing 9 144
xform 10 145
xform 10 146
processing 1 147
processing 6 148
processing 7 149
xform 1 150
processing 0 151
xform 8 152
xform 7 153
xform 0 154
processing 7 155
processing 7 156
processing 3 157
xform 8 158
xform 10 159
xform 3 161
processing 9 160
xform 13 162
processing 7 163
processing 1 164
xform 9 165
processing 3 166
xform 1 167
processing 5 168
xform 3 169
processing 3 170
xform 7 171
xform 3 172
processing 0 173
processing 0 174
xform 0 175
xform 0 176
processing 4 177
processing 2 178
xform 7 179
processing 9 180
xform 2 181
processing 5 182
xform 16 183
processing 2 184
xform 5 185
xform 2 186
processing 1 187
xform 1 188
processing 6 189
processing 7 190
processing 4 191
xform 7 192
processing 2 193
processing 5 194
xform 13 195
processing 4 196
xform 6 197
xform 3 198
xform 6 199
xform 6 200
Done! [9 0 13 6 5 10 12 15 8 7 1 3 17 14 11 2 16 4]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment