Skip to content

Instantly share code, notes, and snippets.

@deitrix
Last active April 18, 2025 22:04
Show Gist options
  • Save deitrix/41f63588be40cfd1e7a652a7f5e3ec50 to your computer and use it in GitHub Desktop.
Save deitrix/41f63588be40cfd1e7a652a7f5e3ec50 to your computer and use it in GitHub Desktop.
Concurrent processing of a large CSV file
package main
import (
"encoding/csv"
"math/rand"
"os"
"strconv"
)
const numRecords = 1_000_000
// gen.go is used to generate a large CSV file with 1 million records.
func main() {
f, err := os.Create("input.csv")
if err != nil {
panic(err)
}
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
// Write header
header := []string{"ID", "City"}
if err := w.Write(header); err != nil {
panic(err)
}
for i := range numRecords {
city := cities[rand.Intn(len(cities))]
record := []string{strconv.Itoa(i + 1), city}
if err := w.Write(record); err != nil {
panic(err)
}
}
}
var cities = []string{
"New York",
"Los Angeles",
"Chicago",
"Houston",
"Phoenix",
"Philadelphia",
"San Antonio",
"San Diego",
"Dallas",
"San Jose",
"Fort Worth",
"Jacksonville",
"Columbus",
"Charlotte",
"San Francisco",
"Indianapolis",
"Seattle",
"Denver",
"Washington",
"Boston",
"Nashville",
"El Paso",
"Detroit",
"Memphis",
"Portland",
"Oklahoma City",
"Las Vegas",
"Louisville",
}
package main
import (
"bytes"
"encoding/csv"
"errors"
"fmt"
"io"
"os"
"slices"
"strconv"
"sync"
)
const expectedSize = 1_000_000
const maxWorkers = 16
// process.go concurrently reads the CSV file generated by gen.go
func main() {
f, err := os.Open("input.csv")
if err != nil {
panic(err)
}
defer f.Close()
info, err := f.Stat()
if err != nil {
panic(err)
}
workerSize := info.Size() / maxWorkers
// Compute offsets by splitting the file into sections, and navigating to the end of each line
// within each section (except for the first one).
offsets := make([]int64, 0, maxWorkers)
offsets = append(offsets, 0)
for i := int64(1); i < maxWorkers; i++ {
offset := i * workerSize
if _, err := f.Seek(offset, io.SeekStart); err != nil {
panic(err)
}
eol, err := findEOL(f)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
panic(err)
}
offset += eol
if offset != offsets[len(offsets)-1] {
offsets = append(offsets, offset)
}
}
numWorkers := len(offsets)
fmt.Println("numWorkers:", numWorkers)
type worker struct {
file *os.File
reader *csv.Reader
}
// Create CSV readers for each worker
workers := make([]worker, numWorkers)
for i := 0; i < numWorkers; i++ {
f, err := os.Open("input.csv")
if err != nil {
panic(err)
}
if _, err := f.Seek(offsets[i], io.SeekStart); err != nil {
panic(err)
}
var end int64
if i == numWorkers-1 {
end = info.Size()
} else {
end = offsets[i+1]
}
workers[i] = worker{
file: f,
reader: csv.NewReader(io.LimitReader(f, end-offsets[i])),
}
}
scanned := make(chan int, numWorkers)
// Start workers to read the CSV files concurrently
var wg sync.WaitGroup
for _, worker := range workers {
wg.Add(1)
go func() {
defer wg.Done()
defer worker.file.Close()
for {
record, err := worker.reader.Read()
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
if record[0] == "ID" {
continue
}
id, err := strconv.Atoi(record[0])
if err != nil {
panic(err)
}
scanned <- id
}
}()
}
go func() {
wg.Wait()
close(scanned)
}()
// The rest is just validation
var ids []int
for id := range scanned {
ids = append(ids, id)
}
slices.Sort(ids)
fmt.Println(len(ids))
for i := range expectedSize {
if ids[i] != i+1 {
panic("not sorted")
}
}
}
const pageSize = 4096
// findEOL returns the byte offset of the end of the first line in the given reader.
func findEOL(r io.Reader) (int64, error) {
buf := make([]byte, pageSize)
for i := int64(0); ; i++ {
n, err := r.Read(buf)
if err != nil && !errors.Is(err, io.EOF) {
return 0, err
}
if n == 0 {
return 0, io.EOF
}
idx := bytes.IndexByte(buf[:n], '\n')
if idx != -1 {
return i*pageSize + int64(idx), nil
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment