Last active
February 11, 2024 19:34
-
-
Save vvvvalvalval/c6c2d991bdc96cce4c14 to your computer and use it in GitHub Desktop.
Asynchronous map function with clojure.core.async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(require '[clojure.core.async :as a]) | |
(defn- seq-of-chan "Creates a lazy seq from a core.async channel." [c] | |
(lazy-seq | |
(let [fst (a/<!! c)] | |
(if (nil? fst) nil (cons fst (seq-of-chan c)) )))) | |
(defn map-pipeline-async "Map for asynchronous functions, backed by clojure.core.async/pipeline-async . | |
From an asynchronous function af, and a seq coll, creates a lazy seq that is the result of applying the asynchronous function af to each element of coll. | |
af must be an asyncronous function as described in clojure.core.async/pipeline-async. | |
takes an optional p parallelism number." | |
([af p coll] | |
(let [ic (a/chan p), oc (a/chan p)] | |
(a/onto-chan ic coll) | |
(a/pipeline-async p oc af ic) | |
(seq-of-chan oc) | |
)) | |
([af coll] (map-pipeline-async af 200 coll))) | |
;; -------------------------------------------- | |
;; Example - a web scraper that needs to fetch a lot of pages using asynchronous io, yet with a synchronous interface | |
;; -------------------------------------------- | |
(require '[org.httpkit.client :as http]) | |
(def pages "Some pages that we should fetch" | |
[{:url "http://www.google.com"} | |
{:url "http://www.github.com"} | |
;; ... | |
{:url "https://news.ycombinator.com/"}]) | |
(def fetched-pages "A lazy seq of the (asynchronously) fetched pages" | |
(map-pipeline-async (fn [{:keys [url] :as page} c] | |
(http/get url (fn [{:keys [body] :as resp}] ;; asynchronous GET with http-kit using callback function | |
(when (string? body) (a/>!! c (assoc page :fetched-html body))) ;; if fetch succeeded, put it in the channel | |
(a/close! c) | |
))) | |
200 pages)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment