Last active
July 31, 2024 19:01
-
-
Save fabian57/b00dec2c181a39d4648cc8be149990b9 to your computer and use it in GitHub Desktop.
My implementation of the 1brc challenge in rust
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Run with target-cpu="native" | |
// Commit history at https://git.plobos.xyz/Playground/1brc/src/branch/main/src/main/rust | |
use std::{ | |
fs::File, | |
io::BufReader, | |
thread, | |
}; | |
use std::collections::HashMap; | |
use std::fmt::Display; | |
use std::io::{BufRead, Seek, SeekFrom}; | |
use std::sync::mpsc; | |
use std::time::Instant; | |
const DEFAULT_HASHMAP_LENGTH: usize = 10000; | |
fn main() { | |
const FILE_PATH: &str = "measurements.txt"; | |
let now = Instant::now(); | |
thread::scope(|s| { | |
let mut stations: HashMap<String, StationMeasurements> = HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | |
let (tx, rx) = mpsc::channel(); | |
let cores = thread::available_parallelism().unwrap().into(); | |
let file = File::open(FILE_PATH).expect("File measurements.txt not found"); | |
let mut reader = BufReader::new(&file); | |
let file_length = reader.seek(SeekFrom::End(0)).unwrap(); | |
let chunk_length = file_length as usize / cores; | |
let mut bounds = Vec::with_capacity(cores + 1); | |
bounds.push(0); | |
for i in 1..cores { | |
let mut reader = BufReader::new(&file); | |
let mut byte_start = chunk_length * i; | |
reader.seek(SeekFrom::Start(byte_start as u64)).expect("could not seek"); | |
let mut line = Vec::with_capacity(108); | |
let line_len = reader.read_until(b'\n', &mut line).expect("could not read bytes"); | |
byte_start += line_len; | |
bounds.push(byte_start as u64); | |
} | |
bounds.push(file_length); | |
for i in 0..cores { | |
let tx = tx.clone(); | |
let mut currposition = *bounds.get(i).unwrap(); | |
let end = *bounds.get(i+1).unwrap(); | |
s.spawn(move || { | |
let file = File::open(FILE_PATH).expect("File measurements.txt not found"); | |
let mut reader = BufReader::new(&file); | |
reader.seek(SeekFrom::Start(currposition)).unwrap(); | |
let mut t_stations: HashMap<String, StationMeasurements> = | |
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | |
let mut line = Vec::with_capacity(108); | |
loop { | |
let line_len = reader.read_until(b'\n', &mut line).expect("could not read bytes"); | |
if line_len == 0 { | |
break; | |
} | |
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); | |
let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; | |
let temp = parse_temp(temp.split_last().unwrap().1); | |
let measurements_option = t_stations.get_mut(&station); | |
if let Some(measurements) = measurements_option { | |
measurements.update(temp); | |
} else { | |
let measurements = StationMeasurements { | |
min: temp, | |
max: temp, | |
count: 1, | |
sum: temp, | |
}; | |
t_stations.insert(station, measurements); | |
} | |
currposition += line.len() as u64; | |
if currposition >= end { | |
break; | |
} | |
line.clear(); | |
} | |
let _ = tx.send(t_stations); | |
}); | |
} | |
drop(tx); | |
while let Ok(t_stations) = rx.recv() { | |
for (station, measurements) in t_stations.iter() { | |
let joined_measurements_options = stations.get_mut(station.as_str()); | |
if let Some(joined_measurements) = joined_measurements_options { | |
joined_measurements.merge(measurements); | |
} else { | |
stations.insert(station.to_owned(), *measurements); | |
} | |
} | |
} | |
let mut stations: Vec<String> = stations.iter().map(|(station, measurements)| { | |
let measurements = measurements.to_string(); | |
#[cfg(feature = "json")] | |
{ | |
format!("{{\"{station}\":\"{measurements}\"}}") | |
} | |
#[cfg(not(feature = "json"))] | |
{ | |
format!("{station}={measurements}") | |
} | |
}).collect(); | |
stations.sort(); | |
let stations = stations.join(","); | |
#[cfg(feature = "json")] | |
{ | |
println!("\n\n[{stations}]"); | |
} | |
#[cfg(not(feature = "json"))] | |
{ | |
println!("\n\n{{{stations}}}"); | |
} | |
println!("\n\nTime={} ms", now.elapsed().as_millis()); | |
}); | |
} | |
#[derive(Copy, Clone)] | |
struct StationMeasurements { | |
pub min: isize, | |
pub max: isize, | |
pub count: isize, | |
pub sum: isize, | |
} | |
impl StationMeasurements { | |
pub fn update(&mut self, v: isize) { | |
self.min = self.min.min(v); | |
self.max = self.max.max(v); | |
self.count += 1; | |
self.sum += v; | |
} | |
pub fn merge(&mut self, other: &Self) { | |
self.min = self.min.min(other.min); | |
self.max = self.max.max(other.max); | |
self.count += other.count; | |
self.sum += other.sum; | |
} | |
} | |
impl Display for StationMeasurements { | |
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
let min = self.min as f64 / 10.0; | |
let max = self.max as f64 / 10.0; | |
let avg = (self.sum as f64 / self.count as f64) / 10.0; | |
write!(f, "{min}/{avg:.1}/{max}") | |
} | |
} | |
#[inline] | |
const fn get_digit(b: u8) -> u32 { | |
(b as u32).wrapping_sub('0' as u32) | |
} | |
#[inline] | |
fn parse_temp(bytes: &[u8]) -> isize { | |
let is_negative = bytes[0] == b'-'; | |
let as_decimal = match (is_negative, bytes.len()) { | |
(true, 4) => get_digit(bytes[1]) * 10 + get_digit(bytes[3]), | |
(true, 5) => get_digit(bytes[1]) * 100 + get_digit(bytes[2]) * 10 + get_digit(bytes[4]), | |
(false, 3) => get_digit(bytes[0]) * 10 + get_digit(bytes[2]), | |
(false, 4) => get_digit(bytes[0]) * 100 + get_digit(bytes[1]) * 10 + get_digit(bytes[3]), | |
_x => panic!("could not parse temp: is_negative = {is_negative}, length = {}", bytes.len()), | |
}; | |
if is_negative { | |
-(as_decimal as isize) | |
} else { | |
as_decimal as isize | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment