-
-
Save corlinp/176a97c58099bca36bcd5679e68f9708 to your computer and use it in GitHub Desktop.
/* | |
Corlin Palmer's Go solution to the 1BRC coding challenge: https://github.com/gunnarmorling/1brc | |
- This solution reads the file sequentially as fast as possible (reader) | |
- It passes off the job of ensuring that each chunk ends with a complete line to another goroutine (lineSupervisor) | |
- The lineSupervisor sends valid chunks to a pool of worker goroutines (worker) which parse the data and calculate the results | |
- The results from the workers are collected in a map and then sorted before printing the final results | |
A fair amount of optimization has been done to reduce memory allocations. | |
Still, it's currently 2X faster than the best Java implementation (on my machine, a Macbook M3 Pro). | |
Script Time (s) | |
calculate_average_corlinp.sh 5.092 | |
calculate_average_warpspeedlabs.sh 8.183 (Go) | |
calculate_average_ddimtirov.sh 10.183 | |
calculate_average_ebarlas.sh 11.625 | |
calculate_average_royvanrijn.sh 11.970 | |
calculate_average_AlexanderYastrebov.sh 14.114 (Go) | |
calculate_average_filiphr.sh 15.038 | |
calculate_average_palmr.sh 15.359 | |
calculate_average_spullara.sh 16.350 | |
calculate_average_seijikun.sh 20.214 | |
calculate_average_padreati.sh 22.460 | |
calculate_average_richardstartin.sh 22.496 | |
calculate_average_bjhara.sh 23.166 | |
calculate_average_criccomini.sh 23.378 | |
calculate_average_truelive.sh 25.741 | |
calculate_average_khmarbaise.sh 41.079 | |
calculate_average_kuduwa-keshavram.sh 45.390 | |
calculate_average_itaske.sh 50.250 | |
calculate_average.sh 162.420 | |
For some fun comparisons, it's even faster than wc -l, but still slower than piping it to /dev/null: | |
cat measurements.txt > /dev/null 3.522 | |
wc -l measurements.txt 11.892 | |
*/ | |
package main | |
import ( | |
"bytes" | |
"fmt" | |
"io" | |
"os" | |
"sort" | |
"sync" | |
) | |
// reader reads raw chunks from the file and sends them to the rawChunks channel. | |
func reader(file *os.File, rawChunks chan<- []byte) { | |
const chunkSize = 256 * 1024 | |
buf := make([]byte, chunkSize) | |
for { | |
bytesRead, err := file.Read(buf) | |
if bytesRead > 0 { | |
chunk := make([]byte, bytesRead) | |
copy(chunk, buf[:bytesRead]) | |
rawChunks <- chunk | |
} | |
if err != nil { | |
if err != io.EOF { | |
fmt.Printf("Error reading file: %v\n", err) | |
} | |
break | |
} | |
} | |
close(rawChunks) | |
} | |
// processChunk processes a single chunk and returns the valid part and the leftover part. | |
func processChunk(chunk, leftover []byte) (validPart, newLeftover []byte) { | |
// Find the first and last newline to determine the valid part of the chunk | |
firstNewline := bytes.Index(chunk, []byte{'\n'}) | |
lastNewline := bytes.LastIndex(chunk, []byte{'\n'}) | |
if firstNewline != -1 { | |
// There's a complete line at the beginning of the chunk | |
validPart = append(leftover, chunk[:firstNewline+1]...) | |
leftover = leftover[:0] // Clear the leftover | |
} else { | |
// No complete line at the start, append the whole chunk to the leftover | |
leftover = append(leftover, chunk...) | |
} | |
if lastNewline != -1 && firstNewline != lastNewline { | |
// There's at least one complete line in this chunk | |
// Include the middle part of the chunk, which contains only complete lines | |
validPart = append(validPart, chunk[firstNewline+1:lastNewline+1]...) | |
// Store the dangling end (if any) in newLeftover for the next chunk | |
newLeftover = append(newLeftover, chunk[lastNewline+1:]...) | |
} else { | |
// No complete lines or only one complete line in this chunk | |
newLeftover = leftover | |
} | |
return validPart, newLeftover | |
} | |
// lineSupervisor takes raw chunks, ensures they end with complete lines, and sends valid chunks to the workers. | |
func lineSupervisor(rawChunks <-chan []byte, validChunks chan<- []byte) { | |
buffer := make([]byte, 0) // Buffer to hold the dangling ends | |
for chunk := range rawChunks { | |
validPart, newBuffer := processChunk(chunk, buffer) | |
if len(validPart) > 0 { | |
validChunks <- validPart | |
} | |
buffer = newBuffer | |
} | |
// Handle any data left in the buffer after all chunks have been processed | |
if len(buffer) > 0 { | |
validChunks <- buffer | |
} | |
close(validChunks) | |
} | |
type CityName [64]byte | |
// [0] = sum of all temperatures | |
// [1] = number of temperatures | |
// [2] = min temperature | |
// [3] = max temperature | |
type CityTemp *[4]int64 | |
// worker parses the lines in a chunk and performs some calculations | |
func worker(validChunks <-chan []byte, results chan<- map[CityName]CityTemp) { | |
cityTemps := make(map[CityName]CityTemp, 512) | |
var city CityName | |
cityLen := 0 | |
var temp int64 | |
var isNegative bool | |
for chunk := range validChunks { | |
for _, b := range chunk { | |
switch b { | |
case ';': // Delimiter between city and temperature | |
cityLen = 64 | |
isNegative = false | |
case '\n': | |
if isNegative { | |
temp = -temp | |
} | |
ct := cityTemps[city] | |
if ct == nil { | |
minMaxTemp := temp | |
if isNegative { | |
minMaxTemp = -minMaxTemp | |
} | |
cityTemps[city] = &[4]int64{temp, 1, minMaxTemp, minMaxTemp} | |
} else { | |
ct[0] += temp | |
ct[1]++ | |
if temp < ct[2] { | |
ct[2] = temp | |
} | |
if temp > ct[3] { | |
ct[3] = temp | |
} | |
} | |
// Reset for the next line | |
city = CityName{} | |
cityLen = 0 | |
temp = 0 | |
isNegative = false | |
case '.': | |
continue // Skip the decimal point, we know all numbers have 1 decimal place | |
case '-': | |
isNegative = true // Next number will be negative | |
default: | |
if cityLen < 44 { | |
city[cityLen] = b | |
cityLen++ | |
} else { | |
// Inline parsing of temperature | |
temp = temp*10 + int64(b-'0') | |
} | |
} | |
} | |
} | |
results <- cityTemps | |
} | |
func main() { | |
if len(os.Args) < 2 { | |
fmt.Println("Error: First arg should be the measurements.txt file path") | |
return | |
} | |
filePath := os.Args[1] | |
file, err := os.Open(filePath) | |
if err != nil { | |
fmt.Printf("Error opening file: %v\n", err) | |
return | |
} | |
defer file.Close() | |
numWorkers := 256 | |
rawChunks := make(chan []byte, 16384) // buffered channel for raw chunks from the file | |
validChunks := make(chan []byte, 16384) // buffered channel for valid chunks ending with a complete line | |
results := make(chan map[CityName]CityTemp, numWorkers) // buffered channel for results from the workers | |
// Start the reader | |
go reader(file, rawChunks) | |
// Start the line supervisor | |
go lineSupervisor(rawChunks, validChunks) | |
var wg sync.WaitGroup | |
// Start worker goroutines | |
for i := 0; i < numWorkers; i++ { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
worker(validChunks, results) | |
}() | |
} | |
// Collect results | |
go func() { | |
wg.Wait() | |
close(results) | |
}() | |
finalResults := make(map[CityName]CityTemp) | |
for cityTemps := range results { | |
for city, ct := range cityTemps { | |
finalCt := finalResults[city] | |
if finalCt == nil { | |
finalCt = new([4]int64) | |
} | |
finalCt[0] += ct[0] | |
finalCt[1] += ct[1] | |
if ct[2] < finalCt[2] { | |
finalCt[2] = ct[2] | |
} | |
if ct[3] > finalCt[3] { | |
finalCt[3] = ct[3] | |
} | |
finalResults[city] = finalCt | |
} | |
} | |
// Sort results | |
allCities := make([]CityName, 0, len(finalResults)) | |
for city := range finalResults { | |
allCities = append(allCities, city) | |
} | |
sort.Slice(allCities, func(i, j int) bool { | |
return bytes.Compare(allCities[i][:], allCities[j][:]) < 0 | |
}) | |
// Calculate and print the final results | |
fmt.Print("{") | |
for i, city := range allCities { | |
ct := finalResults[city] | |
fmt.Printf("%s=%.1f/%.1f/%.1f", city[:], float64(ct[2])/10, float64(ct[0])/float64(ct[1])/10, float64(ct[3])/10) | |
if i < len(allCities)-1 { | |
fmt.Print(", ") | |
} | |
} | |
fmt.Println("}") | |
} |
Thanks for sharing! I was looking for an implementation that was fast AND importantly, still approachable. Yours perfectly fit the bill. Kudos!
Hey @corlinp! Not sure if you're as comfortable in Rust as in Go, but if by any chance you get to look at this port[1] of your implementation. I tried to make use of all the same "mechanisms" you used in yours but it's a bit slow, like 2 orders of magnitude slow. Would really love to know what makes your Go implementation tick. Thanks!
[1] https://github.com/arhyth/rubric/blob/main/src/main.rs
@arhyth honored that you wanted to replicate my approach! Unfortunately I'm not too familiar with Rust, so might be totally off base here, but you may want to take a look at using thread pools instead of so much async code? Good luck! :)
Hey @corlinp just wanted to say thanks for sharing your approach, learning a lot from the practial application of concurrency primatives here 🙌