Skip to content

Instantly share code, notes, and snippets.

@cartersusi
Last active August 16, 2024 16:07
Show Gist options
  • Save cartersusi/2b2be1d23112fe1d63ddcd347dd7db05 to your computer and use it in GitHub Desktop.
Save cartersusi/2b2be1d23112fe1d63ddcd347dd7db05 to your computer and use it in GitHub Desktop.
Go - One Billion Row Challenge
module 1brc
go 1.22.6
require golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa
package main
import (
"bufio"
"fmt"
"io"
"math"
"os"
"strconv"
"strings"
"sync"
"golang.org/x/exp/mmap"
)
type Station struct {
total float64
count int
min float64
max float64
}
type StationMap map[string]Station
type ChunkReader struct {
data []byte
pos int
}
func New(data []byte) *ChunkReader {
return &ChunkReader{data: data}
}
func (r *ChunkReader) Read(p []byte) (int, error) {
if r.pos >= len(r.data) {
return 0, io.EOF
}
n := copy(p, r.data[r.pos:])
r.pos += n
return n, nil
}
func processLine(line string, stations StationMap) {
floatString := ""
var station string
j := len(line)
floatString = string(line[j-3]) + string(line[j-2]) + string(line[j-1])
j -= 3
if j > 0 && line[j-1] == ';' {
j--
station = line[:j]
} else if j > 2 {
floatString = string(line[j-1]) + floatString
j -= 2
station = line[:j]
}
measurement, _ := strconv.ParseFloat(floatString, 64)
s, ok := stations[station]
if !ok {
s = Station{}
}
s.total += measurement
s.count++
s.min = math.Min(s.min, measurement)
s.max = math.Max(s.max, measurement)
stations[station] = s
}
func processChunk(chunk []byte) StationMap {
stations := make(StationMap)
scanner := bufio.NewScanner(New(chunk))
lineNumber := 1
for scanner.Scan() {
processLine(scanner.Text(), stations)
lineNumber++
}
return stations
}
func readFile(filePath string, chunkSize int) StationMap {
reader, _ := mmap.Open(filePath)
defer reader.Close()
fileInfo, _ := os.Stat(filePath)
fileSize := fileInfo.Size()
var wg sync.WaitGroup
res := make(chan StationMap)
for offset := int64(0); offset < fileSize; {
remainingSize := fileSize - offset
currentChunkSize := int64(chunkSize)
if remainingSize < currentChunkSize {
currentChunkSize = remainingSize
}
largeChunk := make([]byte, currentChunkSize+1024)
n, _ := reader.ReadAt(largeChunk, offset)
actualChunkSize := int64(n)
if idx := strings.LastIndexByte(string(largeChunk[:n]), '\n'); idx >= 0 {
actualChunkSize = int64(idx + 1)
}
chunk := largeChunk[:actualChunkSize]
offset += actualChunkSize
wg.Add(1)
go func(chunk []byte) {
defer wg.Done()
res <- processChunk(chunk)
}(chunk)
}
go func() {
wg.Wait()
close(res)
}()
stations := make(StationMap)
for re := range res {
for station, mes := range re {
s, ok := stations[station]
if !ok {
s = Station{}
}
s.total += mes.total
s.count += mes.count
s.min = math.Min(s.min, mes.min)
s.max = math.Max(s.max, mes.max)
stations[station] = s
}
}
return stations
}
func main() {
fp := "dataset.txt"
if len(os.Args) > 1 {
fp = os.Args[1]
}
chunkSize := 64 * 1024 * 1024
stations := readFile(fp, chunkSize)
f, _ := os.Create("output.txt")
defer f.Close()
f.WriteString("Station,Mean,Min,Max\n")
for station, s := range stations {
_, _ = f.WriteString(fmt.Sprintf("%s,%f,%f,%f\n", station, s.total/float64(s.count), s.min, s.max))
}
fmt.Println("Done")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment