Skip to content

Instantly share code, notes, and snippets.

@JacobNinja
Last active April 6, 2022 09:14
Show Gist options
  • Save JacobNinja/5c98496a632e1a466cbf to your computer and use it in GitHub Desktop.
Save JacobNinja/5c98496a632e1a466cbf to your computer and use it in GitHub Desktop.
Clojure core.async pipeline example
(require '[clojure.core.async :as async]
'[clj-http.client :as client]
'[clojure.data.json :as json])
(def concurrency 5)
(let [in (async/chan)
out (async/chan)
request-handler (fn [url out*]
(async/go
(println "Making request:" url)
(let [response (client/get url)
body (json/read-str (:body response))]
(doseq [repo (body "items")]
(async/>! out (repo "clone_url"))))
; Finally close the channel to signal finished processing
(async/close! out*)))]
; Process `in` messages concurrently
(async/pipeline-async concurrency out request-handler in)
; Push URLs to process
(async/go
(doseq [url (for [page (range 10)] (str "https://api.github.com/search/repositories?q=language:clojure&page="
(inc page)))]
(async/>! in url)))
; Print results of processing
(async/go-loop []
(println (async/<! out))
(recur)))
; `in` can be backed by a redis queue
(comment
(async/go-loop []
(if-let [message (pop-redis-queue)]
(async/>! in message)
; Sleep if no messages available
(async/<! (async/timeout 1000)))
(recur)))
@JacobNinja
Copy link
Author

Result:

Making request: https://api.github.com/search/repositories?q=language:clojure&page=1
Making request: https://api.github.com/search/repositories?q=language:clojure&page=3
Making request: https://api.github.com/search/repositories?q=language:clojure&page=4
Making request: https://api.github.com/search/repositories?q=language:clojure&page=2
Making request: https://api.github.com/search/repositories?q=language:clojure&page=5
Making request: https://api.github.com/search/repositories?q=language:clojure&page=6
Making request: https://api.github.com/search/repositories?q=language:clojure&page=7
https://github.com/ckirkendall/enfocus.git
https://github.com/tonsky/rum.git
https://github.com/jkk/honeysql.git
https://github.com/clojure/java.jdbc.git
https://github.com/weavejester/lein-ring.git
https://github.com/ckirkendall/kioo.git
https://github.com/technomancy/slamhound.git
https://github.com/yieldbot/flambo.git
https://github.com/someteam/acha.git
https://github.com/xsc/lein-ancient.git
https://github.com/joshaber/clojurem.git
https://github.com/jamesmacaulay/zelkova.git
https://github.com/ztellman/gloss.git
https://github.com/rplevy/swiss-arrows.git
https://github.com/omcljs/om-cookbook.git
https://github.com/joyofclojure/book-source.git
https://github.com/amalloy/useful.git
https://github.com/liquidz/misaki.git
https://github.com/gigasquid/wonderland-clojure-katas.git
https://github.com/reiddraper/simple-check.git
https://github.com/racehub/om-bootstrap.git
https://github.com/ptaoussanis/clojure-web-server-benchmarks.git
https://github.com/metosin/compojure-api.git
https://github.com/dgrnbrg/spyscope.git
https://github.com/james-henderson/chord.git
https://github.com/clojure/tools.namespace.git
https://github.com/fhd/clostache.git
https://github.com/Factual/skuld.git
https://github.com/damballa/parkour.git
https://github.com/jaycfields/expectations.git
https://github.com/cemerick/piggieback.git
https://github.com/slagyr/speclj.git
https://github.com/benzap/flyer.js.git
https://github.com/Datomic/day-of-datomic.git
https://github.com/jalehman/react-tutorial-om.git
https://github.com/ztellman/rhizome.git
https://github.com/liebke/cljr.git
https://github.com/Raynes/conch.git
https://github.com/riemann/riemann-jvm-profiler.git
https://github.com/weavejester/codox.git
https://github.com/drcode/webfui.git
https://github.com/immutant/immutant.git
https://github.com/magnars/optimus.git
https://github.com/bodil/BODOL.git
https://github.com/arthuredelstein/clooj.git
https://github.com/clojurewerkz/elastisch.git
https://github.com/clojure/test.check.git
https://github.com/frenchy64/typed-clojure.git
https://github.com/lynaghk/cljx.git
https://github.com/ztellman/manifold.git
https://github.com/aphyr/tesser.git
https://github.com/juxt/jig.git
https://github.com/tailrecursion/javelin.git
https://github.com/nuroko/nurokit.git
https://github.com/weavejester/environ.git
https://github.com/mmcgrana/clj-stacktrace.git
https://github.com/JulianBirch/cljs-ajax.git
https://github.com/r0man/sablono.git
https://github.com/weavejester/ragtime.git
https://github.com/ibdknox/jayq.git
https://github.com/fogus/trammel.git
https://github.com/ptaoussanis/nippy.git
https://github.com/cgrand/enliven.git
https://github.com/Datomic/codeq.git
https://github.com/hraberg/deuce.git
https://github.com/Raynes/tentacles.git
https://github.com/mcohen01/amazonica.git
https://github.com/k2nr/ViChrome.git
https://github.com/cemerick/pomegranate.git
https://github.com/gcv/appengine-magic.git
https://github.com/rkneufeld/lein-try.git
https://github.com/yogthos/markdown-clj.git
https://github.com/scgilardi/slingshot.git
https://github.com/brandonbloom/fipp.git
https://github.com/levand/domina.git
https://github.com/rbrush/clara-rules.git
https://github.com/semperos/clj-webdriver.git
https://github.com/ztellman/penumbra.git
https://github.com/magnars/prone.git
https://github.com/Netflix/PigPen.git
https://github.com/liebke/avout.git
https://github.com/MichaelDrogalis/dire.git
https://github.com/aaronc/freactive.git
https://github.com/juxt/bidi.git
https://github.com/aboekhoff/congomongo.git
https://github.com/yogthos/Selmer.git
https://github.com/pyr/cyanite.git
https://github.com/jonromero/music-as-data.git
https://github.com/maryrosecook/islaclj.git
https://github.com/mikera/core.matrix.git
https://github.com/4clojure/4clojure.git
https://github.com/swannodette/logic-tutorial.git
https://github.com/andrewvc/engulf.git
https://github.com/pkamenarsky/atea.git
https://github.com/michaelklishin/monger.git
https://github.com/clojure/core.match.git
https://github.com/funcool/buddy.git
https://github.com/gdeer81/marginalia.git
https://github.com/whamtet/Excel-REPL.git
https://github.com/puniverse/pulsar.git
https://github.com/macourtney/Conjure.git
https://github.com/richhickey/clojure-contrib.git
https://github.com/drewr/postal.git
https://github.com/wit-ai/duckling.git
https://github.com/LightTable/LightTable.git
https://github.com/Prismatic/hiphip.git
https://github.com/clojure-android/lein-droid.git
https://github.com/clojure/clojurescript.git
https://github.com/ztellman/automat.git
https://github.com/ptaoussanis/carmine.git
https://github.com/technomancy/leiningen.git
https://github.com/adamwynne/twitter-api.git
https://github.com/relevance/labrepl.git
https://github.com/jonase/kibit.git
https://github.com/omcljs/om.git
https://github.com/oakes/lein-fruit.git
https://github.com/cemerick/austin.git
https://github.com/mmcgrana/ring.git
https://github.com/overtone/overtone.git
https://github.com/purnam/purnam.git
https://github.com/arcadia-unity/Arcadia.git
https://github.com/cognitect/transit-format.git
https://github.com/weavejester/compojure.git
https://github.com/LauJensen/clojureql.git
https://github.com/tailrecursion/hoplon.git
https://github.com/bhauman/lein-figwheel.git
https://github.com/aphyr/riemann.git
https://github.com/clojure/tools.cli.git
https://github.com/takeoutweight/clojure-scheme.git
https://github.com/noprompt/frak.git
https://github.com/functional-koans/clojure-koans.git
https://github.com/weavejester/cljfmt.git
https://github.com/google/hesokuri.git
https://github.com/Factual/drake.git
https://github.com/swannodette/mori.git
https://github.com/technomancy/robert-hooke.git
https://github.com/puppetlabs/trapperkeeper.git
https://github.com/clojure-liberator/liberator.git
https://github.com/incanter/incanter.git
https://github.com/clojurebook/ClojureProgramming.git
https://github.com/dakrone/clojure-opennlp.git
https://github.com/ztellman/aleph.git
https://github.com/clojure/core.logic.git
https://github.com/nathanmarz/storm-deploy.git
https://github.com/pedestal/app-tutorial.git
https://github.com/quil/quil.git
https://github.com/plexus/chestnut.git
https://github.com/swannodette/enlive-tutorial.git
https://github.com/yogthos/clj-pdf.git
https://github.com/pedestal/pedestal.git
https://github.com/onyx-platform/onyx.git
https://github.com/noir-clojure/lib-noir.git
https://github.com/ztellman/potemkin.git
https://github.com/reagent-project/reagent.git
https://github.com/stuartsierra/component.git
https://github.com/stuarthalloway/programming-clojure.git
https://github.com/Raynes/fs.git
https://github.com/magomimmo/modern-cljs.git
https://github.com/noir-clojure/noir.git
https://github.com/oakes/Nightweb.git
https://github.com/elastic/es2unix.git
https://github.com/clojure-cookbook/clojure-cookbook.git
https://github.com/ptaoussanis/sente.git
https://github.com/nathanmarz/specter.git
https://github.com/cgrand/moustache.git
https://github.com/tonsky/datascript.git
https://github.com/ztellman/lamina.git
https://github.com/circleci/frontend.git
https://github.com/clojure/algo.monads.git
https://github.com/Engelberg/instaparse.git
https://github.com/schani/clojurec.git
https://github.com/jonase/eastwood.git
https://github.com/budu/lobos.git
https://github.com/nathanmarz/cascalog.git
https://github.com/emezeske/lein-cljsbuild.git
https://github.com/gf3/secretary.git
https://github.com/tomjakubowski/weasel.git
https://github.com/frankiesardo/icepick.git
https://github.com/krisajenkins/yesql.git
https://github.com/clojure/tools.nrepl.git
https://github.com/Datomic/simulant.git
https://github.com/weavejester/hiccup.git
https://github.com/dakrone/clj-http.git
https://github.com/Prismatic/om-tools.git
https://github.com/AvisoNovate/pretty.git
https://github.com/Prismatic/plumbing.git
https://github.com/swannodette/lt-cljs-tutorial.git
https://github.com/levand/quiescent.git
https://github.com/kawasima/jagrid.git
https://github.com/cgrand/enlive.git
https://github.com/pallet/pallet.git
https://github.com/LuxLang/lux.git
https://github.com/marick/Midje.git
https://github.com/clojure/core.typed.git
https://github.com/imalooney/t3tr0s.git
https://github.com/Prismatic/schema.git
https://github.com/Prismatic/dommy.git
https://github.com/aphyr/jepsen.git
https://github.com/Day8/re-frame.git
https://github.com/korma/Korma.git
https://github.com/dakrone/cheshire.git
https://github.com/clojure/core.async.git
https://github.com/oakes/play-clj.git
https://github.com/daveray/seesaw.git
https://github.com/venantius/ultra.git
https://github.com/cemerick/friend.git
https://github.com/ptaoussanis/timbre.git
https://github.com/boot-clj/boot.git
https://github.com/oakes/Nightcode.git
Making request: https://github.com/hugoduncan/criterium.githttps://api.github.com/search/repositories?q=language:clojure&page=8

Making request:Making request:  https://api.github.com/search/repositories?q=language:clojure&page=10https://api.github.com/search/repositories?q=language:clojure&page=9

https://github.com/michaelklishin/langohr.git
https://github.com/franks42/clj-ns-browser.git
https://github.com/jochu/swank-clojure.git
https://github.com/hraberg/mimir.git
https://github.com/mtgred/netrunner.git
https://github.com/ninjudd/clojure-protobuf.git
https://github.com/weavejester/clj-aws-s3.git
https://github.com/mjul/docjure.git
https://github.com/rosado/clj-processing.git
https://github.com/brunoV/throttler.git
https://github.com/weavejester/reagi.git
https://github.com/aphyr/timelike.git
https://github.com/stuartsierra/clojure-hadoop.git
https://github.com/sbtourist/nimrod.git
https://github.com/youngnh/parsatron.git
https://github.com/mikera/clisk.git
https://github.com/overtone/at-at.git
https://github.com/PrecursorApp/om-i.git
https://github.com/amitrathore/swarmiji.git
https://github.com/ctford/leipzig.git
https://github.com/clojure/math.combinatorics.git
https://github.com/weavejester/clout.git
https://github.com/richhickey/harmonikit.git
https://github.com/yieldbot/marceline.git
https://github.com/magnars/stasis.git
https://github.com/davidsantiago/hickory.git
https://github.com/weavejester/medley.git
https://github.com/runexec/chp.git
https://github.com/LonoCloud/synthread.git
https://github.com/razum2um/clj-debugger.git
https://github.com/frenchy64/Logic-Starter.git
https://github.com/sunng87/slacker.git
https://github.com/Prismatic/fnhouse.git
https://github.com/clojure/tools.trace.git
https://github.com/stuartsierra/lazytest.git
https://github.com/clojure-emacs/cider-nrepl.git
https://github.com/r0man/cljs-http.git
https://github.com/cemerick/clojurescript.test.git
https://github.com/reagent-project/reagent-cookbook.git
https://github.com/cgrand/parsley.git
https://github.com/swannodette/om-sync.git
https://github.com/owainlewis/falkor.git
https://github.com/clojure/core.cache.git
https://github.com/LightTable/fetch.git
https://github.com/ninjudd/cake.git
https://github.com/heroku/pulse.git
https://github.com/Chouser/clojure-jna.git
https://github.com/tcr/mug.git
https://github.com/kumarshantanu/lein-exec.git
https://github.com/juxt/juxt-accounting.git
https://github.com/xsc/rewrite-clj.git
https://github.com/mmcgrana/fleetdb.git
https://github.com/gtrak/no.disassemble.git
https://github.com/TheClimateCorporation/claypoole.git
https://github.com/someben/skream.git
https://github.com/taylorlapeyre/oj.git
https://github.com/clojure/tools.logging.git
https://github.com/puppetlabs/puppetdb.git
https://github.com/elastic/stream2es.git
https://github.com/overtone/shadertone.git
https://github.com/ato/clojars-web.git
https://github.com/gigasquid/clj-drone.git
https://github.com/halgari/mjolnir.git
https://github.com/sgrove/omchaya.git
https://github.com/ptaoussanis/tower.git
https://github.com/ReadyForZero/babbage.git
https://github.com/ibdknox/crate.git
https://github.com/trptcolin/reply.git
https://github.com/danielsz/system.git
https://github.com/sjl/metrics-clojure.git
https://github.com/ibdknox/pinot.git
https://github.com/clojure/data.json.git
https://github.com/leonardoborges/bouncer.git
https://github.com/killme2008/defun.git
https://github.com/adamtornhill/code-maat.git
https://github.com/funcool/cats.git
https://github.com/zcaudate/vinyasa.git
https://github.com/jamii/strucjure.git
https://github.com/clojure-numerics/expresso.git
https://github.com/clojure-clutch/clutch.git
https://github.com/mattrepl/clj-oauth.git
https://github.com/ztellman/vertigo.git
https://github.com/cch1/http.async.client.git
https://github.com/xeqi/kerodon.git
https://github.com/miner/herbert.git
https://github.com/alanning/meteor-load-test.git
https://github.com/luminus-framework/luminus-template.git
https://github.com/dpapathanasiou/tweet-secret.git
https://github.com/duelinmarkers/clj-record.git
https://github.com/swannodette/hello-cljsc.git

@ieugen
Copy link

ieugen commented Apr 6, 2022

I have tried to implement the code using redis, to try it out and it works once.
Trying to understand where I am going wrong:

(ns ro.ieugen.training.async.pipeline
  "https://gist.github.com/JacobNinja/5c98496a632e1a466cbf"
  (:require [clojure.core.async :as async]
            [clj-http.client :as client]
            [clojure.data.json :as json]
            [taoensso.carmine :as car :refer (wcar)]))

(def concurrency 5)
(def server1-conn {:pool {}
                   :spec {:url "redis://localhost:5672/"}})
(defmacro wcar* [& body] `(car/wcar server1-conn ~@body))

(comment
  (wcar*
   (car/ping)
   (car/set "foo" "bar")
   (car/get "foo")
   (car/lpush "queue" "a" "b" "c" "d"))
  (wcar* (car/lpop "queue"))

  ; `in` can be backed by a redis queue
  (let [in (async/chan)
        out (async/chan)
        request-handler (fn [url out*]
                          (async/go
                            (println "Making request:" url)
                            (let [response (client/get url)
                                  body (json/read-str (:body response))]
                              (doseq [repo (get body "items")]
                                (async/>! out (repo "clone_url")))
                              (async/close! out*))
                            (println "Done!" url)))]
    (async/pipeline-async concurrency out request-handler in)
    ; `in` can be backed by a redis queue
    (async/go-loop []
      (if-let [message (wcar* (car/lpop "queue"))]
        (do
          (println "Got message from queue" message)
          (async/>! in message))
      ; Sleep if no messages available
        (do
          (println "sleeping")
          (async/<! (async/timeout 1000))))
      (recur))
    (async/go-loop
     []
      (println (async/<! out))
      (recur)))
  
  ;; send stuff to redis so they get processed.
  (doseq [url (for [page (range 10)]
                (str "https://api.github.com/search/repositories?q=language:clojure&page=" (inc page)))]
    (wcar* (car/lpush "queue" url)))

  0)

After the first processing, the out channel is closed.
That blocks any other future processing.
I can push to redis multiple times and I would like a continuous process.
Any idea what am I missing?

{:paths ["src"]
 :deps {clj-http/clj-http {:mvn/version "3.12.3"}
        org.clojure/data.json {:mvn/version "2.4.0"}
        org.clojure/clojure {:mvn/version "1.11.1"}
        org.clojure/core.async {:mvn/version "1.5.648"}
        com.taoensso/carmine {:mvn/version "3.1.0"}}
 :aliases
 ;; using :deps in aliases will replace deps. Use :extra-deps if you need all
 {:dev {:extra-deps {org.clojure/test.check {:mvn/version "1.1.1"}}}
  :build {:deps {io.github.clojure/tools.build {:git/tag "v0.8.1" :git/sha "7d40500"}}
          :ns-default build}
  :build-params {:deps {io.github.clojure/tools.build {:git/tag "v0.8.1" :git/sha "7d40500"}}
                 :ns-default build-params}}}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment