Skip to content

Instantly share code, notes, and snippets.

@daveray
Created January 28, 2014 04:48
Show Gist options
  • Save daveray/8662410 to your computer and use it in GitHub Desktop.
Save daveray/8662410 to your computer and use it in GitHub Desktop.
Map-Reduce Jr. via core.async
(ns mr-jr
(:refer-clojure :exclude [shuffle])
(:require [clojure.string :as string]
[clojure.core.async :as async]))
(defn async-group-by
"Kinda like clojure.core/group-by, but takes a channel and returns a channel"
[f g ch]
(->> ch
(async/reduce
(fn [ret x]
(let [k (f x)]
(assoc ret k
(conj (get ret k [])
(g x)))))
{})))
(defn mr-shuffle
"Apply hash function h to each value of input-ch and pass along to
output-ch with the resulting index. Returns output-chs.
"
[h output-chs input-ch]
(let [n (count output-chs)]
(async/go-loop []
(when-let [v (async/<! input-ch)]
(let [i (mod (Math/abs ^long (h v)) n)]
(async/>! (output-chs i) v)
(recur)))
(doseq [o output-chs]
(async/close! o))))
output-chs)
(defn map-reduce
"Takes a map describing a map-reduce job and returns an output channel that emits
[key value] pair results.
Supported options are:
:input Channel of [key value] pair inputs
:mapper Function that takes a [key value] pair and returns a sequence of [key value]
pairs
:n-mappers Numer of mappers. Defaults to 1.
:reducer Function that takes a [key [& values]] pair and returns a sequence of
[key value] pairs
:n-reducers Numer of reducers. Defaults to 1.
"
[{:keys [input map-fn n-mappers reduce-fn n-reducers]
:or {n-mappers 1 n-reducers 1}}]
; Make n-mapper processes to read off the input channel. Their output,
; [key value] pairs is merged back into a single channel
(let [mapper-chs (->> (repeatedly n-mappers #(async/mapcat< map-fn input))
(vec)
(async/merge))
output-ch (if reduce-fn
; Take mapper output...
(->> mapper-chs
; Group into {key [values]}
(async-group-by first second)
; Break map out into [key [values]] pairs
(async/mapcat< seq)
; Create n-reducer reduce channels and shuffle keys to
; them using hash function
(mr-shuffle (comp hash first)
(vec (repeatedly n-reducers #(async/chan))))
; Create reducer processes
(mapv #(async/mapcat< reduce-fn %))
; Merge reducer output back into a single stream
(async/merge))
mapper-chs)]
output-ch))
(defn word-count
[^String dir]
; channel of [_ file-name] pairs for input
(let [input (->> (map vector
(range)
(->> (java.io.File. dir)
(file-seq)
(filter #(and (.isFile ^java.io.File %)
(.endsWith (.getName ^java.io.File %) ".txt")))))
(async/to-chan))
result (map-reduce
{:input input
:map-fn (fn [[_ f]]
(->> (string/split (slurp f) #"\s+")
(map #(vector (string/lower-case %) 1))))
:n-mappers 10
:reduce-fn (fn [[word counts]]
[[word (reduce + counts)]])
:n-reducers 3 }) ]
; synchronous loop to print results
(loop []
(when-let [r (async/<!! result)]
(println r)
(recur)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment