Last active
April 18, 2025 22:04
-
-
Save deitrix/41f63588be40cfd1e7a652a7f5e3ec50 to your computer and use it in GitHub Desktop.
Concurrent processing of a large CSV file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/csv" | |
"math/rand" | |
"os" | |
"strconv" | |
) | |
const numRecords = 1_000_000 | |
// gen.go is used to generate a large CSV file with 1 million records. | |
func main() { | |
f, err := os.Create("input.csv") | |
if err != nil { | |
panic(err) | |
} | |
defer f.Close() | |
w := csv.NewWriter(f) | |
defer w.Flush() | |
// Write header | |
header := []string{"ID", "City"} | |
if err := w.Write(header); err != nil { | |
panic(err) | |
} | |
for i := range numRecords { | |
city := cities[rand.Intn(len(cities))] | |
record := []string{strconv.Itoa(i + 1), city} | |
if err := w.Write(record); err != nil { | |
panic(err) | |
} | |
} | |
} | |
var cities = []string{ | |
"New York", | |
"Los Angeles", | |
"Chicago", | |
"Houston", | |
"Phoenix", | |
"Philadelphia", | |
"San Antonio", | |
"San Diego", | |
"Dallas", | |
"San Jose", | |
"Fort Worth", | |
"Jacksonville", | |
"Columbus", | |
"Charlotte", | |
"San Francisco", | |
"Indianapolis", | |
"Seattle", | |
"Denver", | |
"Washington", | |
"Boston", | |
"Nashville", | |
"El Paso", | |
"Detroit", | |
"Memphis", | |
"Portland", | |
"Oklahoma City", | |
"Las Vegas", | |
"Louisville", | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"encoding/csv" | |
"errors" | |
"fmt" | |
"io" | |
"os" | |
"slices" | |
"strconv" | |
"sync" | |
) | |
const expectedSize = 1_000_000 | |
const maxWorkers = 16 | |
// process.go concurrently reads the CSV file generated by gen.go | |
func main() { | |
f, err := os.Open("input.csv") | |
if err != nil { | |
panic(err) | |
} | |
defer f.Close() | |
info, err := f.Stat() | |
if err != nil { | |
panic(err) | |
} | |
workerSize := info.Size() / maxWorkers | |
// Compute offsets by splitting the file into sections, and navigating to the end of each line | |
// within each section (except for the first one). | |
offsets := make([]int64, 0, maxWorkers) | |
offsets = append(offsets, 0) | |
for i := int64(1); i < maxWorkers; i++ { | |
offset := i * workerSize | |
if _, err := f.Seek(offset, io.SeekStart); err != nil { | |
panic(err) | |
} | |
eol, err := findEOL(f) | |
if err != nil { | |
if errors.Is(err, io.EOF) { | |
break | |
} | |
panic(err) | |
} | |
offset += eol | |
if offset != offsets[len(offsets)-1] { | |
offsets = append(offsets, offset) | |
} | |
} | |
numWorkers := len(offsets) | |
fmt.Println("numWorkers:", numWorkers) | |
type worker struct { | |
file *os.File | |
reader *csv.Reader | |
} | |
// Create CSV readers for each worker | |
workers := make([]worker, numWorkers) | |
for i := 0; i < numWorkers; i++ { | |
f, err := os.Open("input.csv") | |
if err != nil { | |
panic(err) | |
} | |
if _, err := f.Seek(offsets[i], io.SeekStart); err != nil { | |
panic(err) | |
} | |
var end int64 | |
if i == numWorkers-1 { | |
end = info.Size() | |
} else { | |
end = offsets[i+1] | |
} | |
workers[i] = worker{ | |
file: f, | |
reader: csv.NewReader(io.LimitReader(f, end-offsets[i])), | |
} | |
} | |
scanned := make(chan int, numWorkers) | |
// Start workers to read the CSV files concurrently | |
var wg sync.WaitGroup | |
for _, worker := range workers { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
defer worker.file.Close() | |
for { | |
record, err := worker.reader.Read() | |
if err == io.EOF { | |
break | |
} | |
if err != nil { | |
panic(err) | |
} | |
if record[0] == "ID" { | |
continue | |
} | |
id, err := strconv.Atoi(record[0]) | |
if err != nil { | |
panic(err) | |
} | |
scanned <- id | |
} | |
}() | |
} | |
go func() { | |
wg.Wait() | |
close(scanned) | |
}() | |
// The rest is just validation | |
var ids []int | |
for id := range scanned { | |
ids = append(ids, id) | |
} | |
slices.Sort(ids) | |
fmt.Println(len(ids)) | |
for i := range expectedSize { | |
if ids[i] != i+1 { | |
panic("not sorted") | |
} | |
} | |
} | |
const pageSize = 4096 | |
// findEOL returns the byte offset of the end of the first line in the given reader. | |
func findEOL(r io.Reader) (int64, error) { | |
buf := make([]byte, pageSize) | |
for i := int64(0); ; i++ { | |
n, err := r.Read(buf) | |
if err != nil && !errors.Is(err, io.EOF) { | |
return 0, err | |
} | |
if n == 0 { | |
return 0, io.EOF | |
} | |
idx := bytes.IndexByte(buf[:n], '\n') | |
if idx != -1 { | |
return i*pageSize + int64(idx), nil | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment