Skip to content

Instantly share code, notes, and snippets.

@kitallis
Created March 17, 2025 15:37
Show Gist options
  • Save kitallis/d7711fefd76d5145cab512384303adef to your computer and use it in GitHub Desktop.
Save kitallis/d7711fefd76d5145cab512384303adef to your computer and use it in GitHub Desktop.
(ns byteoffset
(:require [clojure.java.io :as io]
[opencsv-clj.core :as csv]
[com.climate.claypoole :as cp]
[criterium.core :as crit])
(:import (java.io RandomAccessFile
FileInputStream
BufferedInputStream
InputStreamReader
BufferedReader)))
(def file-name "resources/m.csv")
(def data-file (io/resource "xs.csv"))
(def results (atom {}))
(defn seek-till [^RandomAccessFile file offset]
(when-not (zero? offset)
(.seek file offset)
(while (not= (.read file) (int \newline))))
(.getFilePointer file))
(defn offsets [^RandomAccessFile file length split]
(map (fn [offset] (seek-till file offset))
(range 0 length (/ length split))))
(defn chunk-file
[filename split]
(with-open [file (RandomAccessFile. filename "r")]
(let [length (.length file)
raw-offsets (offsets file length split)
offsets (concat raw-offsets [length])]
(doall (partition 2 1 offsets)))))
(defn eat-lines
"Recursively cons a lazy-seq of lines
by subtracting the remaining bytes of a parsed line"
[reader pending]
(lazy-seq
(if-let [line (and (pos? pending)
(.readLine reader))]
(cons line (eat-lines reader (- pending (.length line))))
(.close reader))))
(defn reader [file buffer-size start-byte]
(-> (doto
(FileInputStream. file)
(.skip start-byte))
(BufferedInputStream. buffer-size) ;; 128kb
(InputStreamReader. "US-ASCII")
(BufferedReader.)))
(defn read-lines-by-range [file start-byte end-byte]
(eat-lines (reader file 131072 start-byte)
(- end-byte start-byte)))
(defn thread-id []
(.getId (Thread/currentThread)))
(defn parse-by-line [file-name frst scnd]
(reduce
(fn [acc line]
(let [row (first (csv/read-csv line))]
(-> acc
(update :cells (partial + (count row)))
(update :rows inc))))
{:cells 0 :rows 0}
(read-lines-by-range file-name frst scnd)))
(defn process [file-name parallelism]
(let [pool (cp/threadpool parallelism)
futures (mapv
(fn [[frst scnd]]
(cp/future pool
(swap! results
assoc
(thread-id)
(parse-by-line file-name frst scnd))))
(chunk-file file-name parallelism))]
(doseq [f futures] @f)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment