Last active January 23, 2020 03:15
(ns sunshine.disk-read
(:require [clojure.core.async :as async :refer [>! <! go-loop chan close! <!!]]])
(:import ( BufferedReader FileReader FileInputStream BufferedInputStream InputStreamReader)))
(def ten-gb-filename "ten.json")
(def one-meg (* 1024 1024))
(defn ^FileInputStream input-stream [^String fname]
(FileInputStream. fname))
(defn count-newlines [^bytes barray]
(let [num-bytes (alength barray)]
(loop [i 0
newlines 0]
(if (>= i num-bytes)
(if (= 10 (aget ^bytes barray i))
(recur (inc i)
(inc newlines))
(recur (inc i)
(with-open [file-stream (FileInputStream. ten-gb-filename)]
(let [channel (chan 500)
counters (for [_ (range 4)]
(go-loop [newline-count 0]
(let [barray (async/<! channel)]
(if (nil? barray)
(recur (+ newline-count
(count-newlines barray)))))))]
(go-loop []
(let [barray (byte-array one-meg)
bytes-read (.read file-stream barray)]
;; this put will block if there are more than 500MBs waiting in channel
;; so as to not engorge the heap (learnt the hard way)
(>! channel barray)
(if (> bytes-read 0) ;; .read returns a -1 on EOF
(close! channel))))
(reduce + (map <!! counters))))
