Created
March 17, 2025 15:37
-
-
Save kitallis/d7711fefd76d5145cab512384303adef to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (ns byteoffset | |
| (:require [clojure.java.io :as io] | |
| [opencsv-clj.core :as csv] | |
| [com.climate.claypoole :as cp] | |
| [criterium.core :as crit]) | |
| (:import (java.io RandomAccessFile | |
| FileInputStream | |
| BufferedInputStream | |
| InputStreamReader | |
| BufferedReader))) | |
| (def file-name "resources/m.csv") | |
| (def data-file (io/resource "xs.csv")) | |
| (def results (atom {})) | |
| (defn seek-till [^RandomAccessFile file offset] | |
| (when-not (zero? offset) | |
| (.seek file offset) | |
| (while (not= (.read file) (int \newline)))) | |
| (.getFilePointer file)) | |
| (defn offsets [^RandomAccessFile file length split] | |
| (map (fn [offset] (seek-till file offset)) | |
| (range 0 length (/ length split)))) | |
| (defn chunk-file | |
| [filename split] | |
| (with-open [file (RandomAccessFile. filename "r")] | |
| (let [length (.length file) | |
| raw-offsets (offsets file length split) | |
| offsets (concat raw-offsets [length])] | |
| (doall (partition 2 1 offsets))))) | |
| (defn eat-lines | |
| "Recursively cons a lazy-seq of lines | |
| by subtracting the remaining bytes of a parsed line" | |
| [reader pending] | |
| (lazy-seq | |
| (if-let [line (and (pos? pending) | |
| (.readLine reader))] | |
| (cons line (eat-lines reader (- pending (.length line)))) | |
| (.close reader)))) | |
| (defn reader [file buffer-size start-byte] | |
| (-> (doto | |
| (FileInputStream. file) | |
| (.skip start-byte)) | |
| (BufferedInputStream. buffer-size) ;; 128kb | |
| (InputStreamReader. "US-ASCII") | |
| (BufferedReader.))) | |
| (defn read-lines-by-range [file start-byte end-byte] | |
| (eat-lines (reader file 131072 start-byte) | |
| (- end-byte start-byte))) | |
| (defn thread-id [] | |
| (.getId (Thread/currentThread))) | |
| (defn parse-by-line [file-name frst scnd] | |
| (reduce | |
| (fn [acc line] | |
| (let [row (first (csv/read-csv line))] | |
| (-> acc | |
| (update :cells (partial + (count row))) | |
| (update :rows inc)))) | |
| {:cells 0 :rows 0} | |
| (read-lines-by-range file-name frst scnd))) | |
| (defn process [file-name parallelism] | |
| (let [pool (cp/threadpool parallelism) | |
| futures (mapv | |
| (fn [[frst scnd]] | |
| (cp/future pool | |
| (swap! results | |
| assoc | |
| (thread-id) | |
| (parse-by-line file-name frst scnd)))) | |
| (chunk-file file-name parallelism))] | |
| (doseq [f futures] @f))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment