Skip to content

Instantly share code, notes, and snippets.

@pr0g
Last active January 14, 2024 18:15
Show Gist options
  • Save pr0g/06a60fba90a18682522825a70f18b362 to your computer and use it in GitHub Desktop.
Save pr0g/06a60fba90a18682522825a70f18b362 to your computer and use it in GitHub Desktop.
#include <marl/defer.h>
#include <marl/event.h>
#include <marl/scheduler.h>
#include <marl/waitgroup.h>
#include <algorithm>
#include <charconv>
#include <fstream>
#include <iostream>
#include <map>
#include <span>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
struct measurement_t {
float min;
float max;
double total;
double count;
};
static std::pair<std::string_view, std::string_view> split_by(
std::string_view string_to_split, std::string_view separator) {
std::pair<std::string_view, std::string_view> split_strs;
for (auto separator_position = string_to_split.find(separator);
separator_position != std::string::npos;
separator_position = string_to_split.find(separator)) {
auto split_str = string_to_split.substr(0, separator_position);
string_to_split = string_to_split.substr(
separator_position + separator.size(), string_to_split.size());
split_strs.first = split_str;
}
auto split_str = string_to_split.substr(0, string_to_split.size());
split_strs.second = split_str;
return split_strs;
}
static size_t preprocess_chunk(std::span<const char> buffer) {
std::string_view data(buffer.data(), buffer.size());
// find the last newline and return how many bytes are left
return data.size() - (data.find_last_of('\n') + 1);
}
static std::map<std::string, measurement_t, std::less<>> process_chunk(
std::span<const char> buffer) {
std::map<std::string, measurement_t, std::less<>> stations;
std::string_view data(buffer.data(), buffer.size());
for (auto next = data.find('\n'); next != std::string_view::npos;
next = data.find('\n')) {
constexpr std::string_view separator = ";";
const auto line = data.substr(0, next);
const auto station_temp = split_by(line, separator);
const auto temp_parts = split_by(station_temp.second, ".");
int32_t temp_whole;
std::from_chars(
temp_parts.first.data(),
temp_parts.first.data() + temp_parts.first.size(), temp_whole);
int32_t temp_fraction;
std::from_chars(
temp_parts.second.data(),
temp_parts.second.data() + temp_parts.second.size(), temp_fraction);
const float temperature = std::copysign(
float((std::abs(temp_whole) * 100) + (temp_fraction * 10)) / 100.0f,
(float)temp_whole);
if (auto it = stations.find(station_temp.first); it != stations.end()) {
it->second.min = std::min(it->second.min, temperature);
it->second.max = std::max(it->second.max, temperature);
it->second.total += temperature;
it->second.count++;
} else {
stations.insert(
{std::string(station_temp.first),
{temperature, temperature, temperature, 1.0}});
}
// move to next line
data.remove_prefix(next + 1);
}
return stations;
}
int main() {
std::ifstream file("measurements.txt", std::ios::binary);
if (!file) {
std::cerr << "File could not be opened!\n";
return 1;
}
marl::Scheduler scheduler(marl::Scheduler::Config::allCores());
scheduler.bind();
defer(scheduler.unbind());
std::vector<std::map<std::string, measurement_t, std::less<>>> stations;
constexpr size_t buffer_size = 1024 * 1024 * 4; // 4MB
std::vector<char> buffer(buffer_size);
size_t offset = 0;
std::map<std::string, measurement_t, std::less<>> all_stations;
marl::WaitGroup wait_group;
marl::mutex mutex;
while (file) {
file.read(buffer.data() + offset, buffer.size() - offset);
const std::streamsize bytes_read_count = file.gcount();
// resize buffer to actual data size
buffer.resize(bytes_read_count + offset);
wait_group.add(1);
offset = preprocess_chunk(buffer);
marl::schedule([buffer, &stations, &wait_group, &mutex] {
defer(wait_group.done());
auto station = process_chunk(buffer);
{
marl::lock lock(mutex);
stations.push_back(std::move(station));
}
});
std::copy(buffer.end() - offset, buffer.end(), buffer.begin());
// resize buffer back to original size for next read
buffer.resize(buffer_size);
}
wait_group.wait();
for (auto& station : stations) {
for (const auto& [s, m] : station) {
if (auto it = all_stations.find(s); it != all_stations.end()) {
it->second.min = std::min(it->second.min, m.min);
it->second.max = std::max(it->second.max, m.max);
it->second.total += m.total;
it->second.count += m.count;
} else {
all_stations.insert({s, m});
}
}
}
const auto rounded = [](const double value) {
return std::copysign(std::round(std::abs(value) * 10) / 10, value);
};
const auto output_element =
[&rounded](std::map<std::string, measurement_t, std::less<>>::iterator it) {
const auto& [station, measurement] = *it;
std::cout << station << '=' << measurement.min << '/'
<< rounded(measurement.total / measurement.count) << '/'
<< measurement.max;
};
std::cout << std::fixed;
std::cout << std::setprecision(1);
std::cout << "{";
auto it = all_stations.begin();
for (; it != std::prev(all_stations.end()); ++it) {
output_element(it);
std::cout << ", ";
}
output_element(it);
std::cout << "}\n";
return 0;
}
cmake_minimum_required(VERSION 3.26)
project(1brc LANGUAGES CXX)
include(FetchContent)
FetchContent_Declare(
marl
GIT_REPOSITORY https://github.com/google/marl.git
GIT_TAG dbf097e43824d4b4ba45d5b696d86e147a4b9b00
)
FetchContent_MakeAvailable(marl)
add_executable(${PROJECT_NAME})
target_sources(${PROJECT_NAME} PRIVATE main.cpp)
target_link_libraries(${PROJECT_NAME} PRIVATE marl)
target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_20)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment