Created
January 23, 2024 07:15
-
-
Save tangledbytes/06698ef369b46523f7890e51c4757651 to your computer and use it in GitHub Desktop.
1brc-go-sol
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"fmt" | |
"math" | |
"os" | |
"os/signal" | |
"runtime" | |
"runtime/pprof" | |
"sort" | |
"sync" | |
"syscall" | |
"unsafe" | |
) | |
const TABLE_SIZE = 98317 | |
//go:linkname mmap syscall.mmap | |
func mmap(addr uintptr, length uintptr, prot int, flags int, fd int, offset int64) (raddr uintptr, err error) | |
//go:linkname munmap syscall.munmap | |
func munmap(addr uintptr, length uintptr) (err error) | |
type FullData struct { | |
name string | |
data *Data | |
} | |
type Data struct { | |
Min float64 | |
Max float64 | |
Sum float64 | |
Count int64 | |
} | |
type _Map struct { | |
slots [TABLE_SIZE]*_MapData | |
} | |
type _MapData struct { | |
key []byte | |
value *Data | |
hash uint32 | |
} | |
func NewMap() *_Map { | |
return &_Map{ | |
slots: [TABLE_SIZE]*_MapData{}, | |
} | |
} | |
func (mp *_Map) Add(k []byte, v *Data) { | |
hash := mp.shash(k) | |
slotIdx := hash % TABLE_SIZE | |
prober := slotIdx | |
for { | |
slot := mp.slots[prober] | |
if slot == nil { | |
mp.slots[slotIdx] = &_MapData{ | |
key: k, | |
value: v, | |
hash: hash, | |
} | |
return | |
} | |
if slot.hash == hash && bytes.Equal(slot.key, k) { | |
slot.value = v | |
return | |
} | |
// mp.amiss++ | |
prober = (prober + 1) % TABLE_SIZE | |
if prober == slotIdx { | |
break | |
} | |
} | |
panic("WTF") | |
} | |
func (mp *_Map) Get(k []byte) *Data { | |
hash := mp.shash(k) | |
slotIdx := hash % TABLE_SIZE | |
prober := slotIdx | |
for { | |
slot := mp.slots[prober] | |
if slot == nil { | |
return nil | |
} | |
if slot.hash == hash && bytes.Equal(slot.key, k) { | |
return slot.value | |
} | |
prober = (prober + 1) % TABLE_SIZE | |
if prober == slotIdx { | |
break | |
} | |
} | |
return nil | |
} | |
func (mp *_Map) Range(fn func(k []byte, v *Data)) { | |
for _, slot := range mp.slots { | |
if slot == nil { | |
continue | |
} | |
fn(slot.key, slot.value) | |
} | |
} | |
func (mp *_Map) shash(k []byte) uint32 { | |
v := uint32(2166136261) | |
for i := 0; i < len(k); i++ { | |
v ^= uint32(k[i]) | |
v *= 16777619 | |
} | |
return v | |
} | |
func clusteredProcess(file string, mp *_Map) { | |
wg := &sync.WaitGroup{} | |
cpus := runtime.NumCPU() | |
stores := make([]*_Map, cpus) | |
for i := range stores { | |
stores[i] = NewMap() | |
} | |
// data := loadFile(file) | |
stat, err := os.Stat(file) | |
if err != nil { | |
panic(err) | |
} | |
dataSize := int(stat.Size()) | |
sizePerCPU := dataSize / cpus | |
remains := dataSize % cpus | |
for i := 0; i < cpus; i++ { | |
wg.Add(1) | |
go func(i int) { | |
size := sizePerCPU | |
if i == cpus-1 { | |
size += remains | |
} | |
data := loadFile(file) | |
consumeChunk(data, i*sizePerCPU, size, stores[i]) | |
// println("add_miss:", i, stores[i].amiss) | |
wg.Done() | |
}(i) | |
} | |
wg.Wait() | |
// Merge the results into final map | |
for i := 0; i < cpus; i++ { | |
stores[i].Range(func(k []byte, v *Data) { | |
data := mp.Get(k) | |
if data == nil { | |
mp.Add(k, &Data{ | |
Min: v.Min, | |
Max: v.Max, | |
Sum: v.Sum, | |
Count: v.Count, | |
}) | |
return | |
} | |
if v.Min < data.Min { | |
data.Min = v.Min | |
} | |
if v.Max > data.Max { | |
data.Max = v.Max | |
} | |
data.Sum += v.Sum | |
data.Count += v.Count | |
}) | |
} | |
} | |
func consumeChunk(data []byte, chunkOffset, size int, mp *_Map) { | |
// 1. Find the start point | |
var start int | |
if chunkOffset == 0 { | |
start = 0 | |
} else { | |
for { | |
if data[chunkOffset-1] == '\n' { | |
start = chunkOffset | |
break | |
} | |
chunkOffset++ | |
} | |
} | |
// 2. Parse the data | |
readptr := start | |
for readptr-start < size { | |
start, end := readNewLine(data, readptr) | |
if start == -1 { | |
break | |
} | |
readptr = end + 1 | |
// Process the line | |
_process(data[start:end-1], mp) | |
} | |
} | |
func loadFile(name string) []byte { | |
file, err := os.Open(name) | |
if err != nil { | |
panic(err) | |
} | |
stat, err := file.Stat() | |
if err != nil { | |
panic(err) | |
} | |
addr, err := mmap(0, uintptr(stat.Size()), syscall.PROT_READ, syscall.MAP_PRIVATE, int(file.Fd()), 0) | |
if err != nil { | |
panic(err) | |
} | |
return unsafe.Slice((*byte)(unsafe.Pointer(addr)), stat.Size()) | |
} | |
func readNewLine(byt []byte, offset int) (int, int) { | |
for i := offset; i < len(byt); i++ { | |
if byt[i] == '\n' { | |
return offset, i | |
} | |
} | |
return -1, -1 | |
} | |
func _process(byt []byte, mp *_Map) { | |
delimLoc := 0 | |
for i := range byt { | |
if byt[i] == ';' { | |
delimLoc = i | |
} | |
} | |
place := byt[:delimLoc] | |
val := byteToFloat(byt[delimLoc+1:]) | |
data := mp.Get(place) | |
if data == nil { | |
mp.Add(place, &Data{ | |
Min: val, | |
Max: val, | |
Sum: val, | |
Count: 1, | |
}) | |
} else { | |
if val < data.Min { | |
data.Min = val | |
} | |
if val > data.Max { | |
data.Max = val | |
} | |
data.Sum += val | |
data.Count += 1 | |
} | |
} | |
func byteToFloat(byt []byte) float64 { | |
isNeg := byt[0] == '-' | |
decPos := -1 | |
num := 0.0 | |
for i, val := range byt { | |
if i == 0 && isNeg { | |
continue | |
} | |
if val == '.' { | |
decPos = i | |
continue | |
} | |
digit := val - '0' | |
num = (num * 10) + float64(digit) | |
} | |
if decPos != -1 { | |
num /= math.Pow10((len(byt) - 1) - decPos) | |
} | |
if isNeg { | |
return num * -1 | |
} | |
return num | |
} | |
func getSortedStore(store *_Map) []*FullData { | |
data := make([]*FullData, 0, TABLE_SIZE) | |
store.Range(func(key []byte, value *Data) { | |
data = append(data, &FullData{ | |
name: string(key), | |
data: value, | |
}) | |
}) | |
sort.Slice(data, func(i, j int) bool { | |
return data[i].name < data[j].name | |
}) | |
return data | |
} | |
func printSortedStore(sortedstore []*FullData) { | |
fmt.Print("{ ") | |
for i, v := range sortedstore { | |
fmt.Printf("%s=%.1f/%.1f/%.1f", v.name, v.data.Min, v.data.Sum/float64(v.data.Count), v.data.Max) | |
if i != len(sortedstore)-1 { | |
fmt.Printf(", ") | |
} | |
} | |
fmt.Print(" }") | |
} | |
func setupCPUProfile() { | |
if os.Getenv("CPU_PROFILE") == "1" { | |
f, err := os.Create("cpu.pprof") | |
if err != nil { | |
panic(err) | |
} | |
if err := pprof.StartCPUProfile(f); err != nil { | |
panic(err) | |
} | |
} | |
} | |
func stopCPUProfile() { | |
if os.Getenv("CPU_PROFILE") == "1" { | |
pprof.StopCPUProfile() | |
} | |
} | |
func exitHooks(fns ...func()) { | |
c := make(chan os.Signal, 1) | |
signal.Notify(c, os.Interrupt) | |
go func() { | |
<-c | |
for _, fn := range fns { | |
fn() | |
} | |
os.Exit(1) | |
}() | |
} | |
func main() { | |
setupCPUProfile() | |
exitHooks( | |
stopCPUProfile, | |
) | |
mp := NewMap() | |
clusteredProcess("./measurements.txt", mp) | |
printSortedStore(getSortedStore(mp)) | |
stopCPUProfile() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment