Created
January 28, 2014 04:48
-
-
Save daveray/8662410 to your computer and use it in GitHub Desktop.
Map-Reduce Jr. via core.async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns mr-jr | |
(:refer-clojure :exclude [shuffle]) | |
(:require [clojure.string :as string] | |
[clojure.core.async :as async])) | |
(defn async-group-by | |
"Kinda like clojure.core/group-by, but takes a channel and returns a channel" | |
[f g ch] | |
(->> ch | |
(async/reduce | |
(fn [ret x] | |
(let [k (f x)] | |
(assoc ret k | |
(conj (get ret k []) | |
(g x))))) | |
{}))) | |
(defn mr-shuffle | |
"Apply hash function h to each value of input-ch and pass along to | |
output-ch with the resulting index. Returns output-chs. | |
" | |
[h output-chs input-ch] | |
(let [n (count output-chs)] | |
(async/go-loop [] | |
(when-let [v (async/<! input-ch)] | |
(let [i (mod (Math/abs ^long (h v)) n)] | |
(async/>! (output-chs i) v) | |
(recur))) | |
(doseq [o output-chs] | |
(async/close! o)))) | |
output-chs) | |
(defn map-reduce | |
"Takes a map describing a map-reduce job and returns an output channel that emits | |
[key value] pair results. | |
Supported options are: | |
:input Channel of [key value] pair inputs | |
:mapper Function that takes a [key value] pair and returns a sequence of [key value] | |
pairs | |
:n-mappers Numer of mappers. Defaults to 1. | |
:reducer Function that takes a [key [& values]] pair and returns a sequence of | |
[key value] pairs | |
:n-reducers Numer of reducers. Defaults to 1. | |
" | |
[{:keys [input map-fn n-mappers reduce-fn n-reducers] | |
:or {n-mappers 1 n-reducers 1}}] | |
; Make n-mapper processes to read off the input channel. Their output, | |
; [key value] pairs is merged back into a single channel | |
(let [mapper-chs (->> (repeatedly n-mappers #(async/mapcat< map-fn input)) | |
(vec) | |
(async/merge)) | |
output-ch (if reduce-fn | |
; Take mapper output... | |
(->> mapper-chs | |
; Group into {key [values]} | |
(async-group-by first second) | |
; Break map out into [key [values]] pairs | |
(async/mapcat< seq) | |
; Create n-reducer reduce channels and shuffle keys to | |
; them using hash function | |
(mr-shuffle (comp hash first) | |
(vec (repeatedly n-reducers #(async/chan)))) | |
; Create reducer processes | |
(mapv #(async/mapcat< reduce-fn %)) | |
; Merge reducer output back into a single stream | |
(async/merge)) | |
mapper-chs)] | |
output-ch)) | |
(defn word-count | |
[^String dir] | |
; channel of [_ file-name] pairs for input | |
(let [input (->> (map vector | |
(range) | |
(->> (java.io.File. dir) | |
(file-seq) | |
(filter #(and (.isFile ^java.io.File %) | |
(.endsWith (.getName ^java.io.File %) ".txt"))))) | |
(async/to-chan)) | |
result (map-reduce | |
{:input input | |
:map-fn (fn [[_ f]] | |
(->> (string/split (slurp f) #"\s+") | |
(map #(vector (string/lower-case %) 1)))) | |
:n-mappers 10 | |
:reduce-fn (fn [[word counts]] | |
[[word (reduce + counts)]]) | |
:n-reducers 3 }) ] | |
; synchronous loop to print results | |
(loop [] | |
(when-let [r (async/<!! result)] | |
(println r) | |
(recur))))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment