Last active
December 17, 2019 08:08
-
-
Save alexanderkjeldaas/58b0cd894cf9f8397292657908046634 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// g++ -O3 -g -std=c++17 test.cc -lboost_filesystem -lboost_system -lpthread -lboost_iostreams -lboost_thread | |
#include <iostream> | |
#include <string> | |
#include <bitset> | |
#include <thread> | |
// #include <experimental/barrier> | |
#include <boost/thread/barrier.hpp> | |
#include <boost/iostreams/device/mapped_file.hpp> | |
#include <boost/filesystem.hpp> | |
#define BOOST_DISABLE_ASSERTS 1 | |
#include <boost/assert.hpp> | |
using namespace std; | |
struct entry { | |
char chars[3]; | |
char numbers[3]; | |
char _cr; | |
char _nl; | |
}; | |
// Parallel designed to iterate over multiple datasets. | |
static | |
void parallel_for(vector<uintmax_t> nb_elements, | |
function<void (const vector<pair<uintmax_t,uintmax_t>> range, int thread)> functor, | |
unsigned nb_threads) | |
{ | |
vector<uintmax_t> batch_size; | |
vector<uintmax_t> batch_remainder; | |
for (const auto &e : nb_elements) { | |
batch_size.push_back((e + nb_threads - 1) / nb_threads); | |
batch_remainder.push_back(e % nb_threads); | |
} | |
vector< thread > my_threads(nb_threads); | |
// Parallel execution | |
for(unsigned i = 0; i < nb_threads; ++i) | |
{ | |
vector<pair<uintmax_t,uintmax_t>> range; | |
for (unsigned j = 0; j < batch_size.size(); j++) { | |
auto bs = batch_size[j]; | |
range.push_back( | |
make_pair(i * bs, | |
min( (i + 1) * bs, nb_elements[j]) | |
) | |
); | |
} | |
my_threads[i] = thread(functor, range, i); | |
} | |
// Wait for the other thread to finish their task | |
for (auto &t : my_threads) { | |
t.join(); | |
} | |
} | |
const int max_threads = 8; | |
typedef bitset<26*26*26> (*thread_local_bitsets)[10][10]; | |
int main (int argc, const char *argv[]) | |
{ | |
BOOST_ASSERT(argc == 2); | |
const char* path = argv[1]; | |
size_t filesize = boost::filesystem::file_size(path); | |
boost::iostreams::mapped_file_params params; | |
params.path = path; | |
params.length = filesize; | |
params.offset = 0; | |
boost::iostreams::mapped_file_source mf; | |
mf.open(params); | |
struct entry *entries = static_cast<struct entry*>((void*)mf.data()); | |
BOOST_ASSERT(entries != NULL); | |
uintmax_t nb_elements = filesize / sizeof(struct entry); | |
BOOST_ASSERT(nb_elements > 0); | |
unsigned nb_threads = thread::hardware_concurrency(); | |
if (nb_threads == 0 || nb_threads > 8) { | |
nb_threads = 8; | |
} | |
cout << "Running " << nb_threads << " threads\n"; | |
boost::barrier task_barrier(nb_threads); | |
thread_local_bitsets partitions[nb_threads]; | |
parallel_for({nb_elements, 1000}, [&entries, &partitions, &task_barrier, &nb_threads](const vector<pair<uintmax_t, uintmax_t>> range, int tidx) mutable { | |
// Parallel init | |
{ | |
partitions[tidx] = new bitset<26*26*26>[10][10][10]; | |
} | |
task_barrier.count_down_and_wait(); | |
// Parallel parse and set thread-local bitsets. | |
{ | |
auto [start, end] = range[0]; | |
for(int i = start; i < end; ++i) { | |
auto [c0, c1, c2] = entries[i].chars; | |
BOOST_ASSERT(c0 >= 'A' && c0 <= 'Z'); | |
BOOST_ASSERT(c1 >= 'A' && c1 <= 'Z'); | |
BOOST_ASSERT(c2 >= 'A' && c2 <= 'Z'); | |
auto val = (c0 - 'A') * (26*26) + (c1 - 'A') * 26 + c2 - 'A'; | |
BOOST_ASSERT(val < 26*26*26); | |
auto [n0, n1, n2] = entries[i].numbers; | |
BOOST_ASSERT(n0 >= '0' && n0 <= '9'); | |
BOOST_ASSERT(n1 >= '0' && n1 <= '9'); | |
BOOST_ASSERT(n2 >= '0' && n2 <= '9'); | |
bitset<26*26*26> &bits = partitions[tidx][n0-'0'][n1-'0'][n2-'0']; | |
if (bits[val]) { | |
cout << "Found duplicate!\n" << flush; | |
_exit(1); | |
} | |
bits[val] = true; | |
} | |
} | |
task_barrier.count_down_and_wait(); | |
// Parallel join bitsets | |
{ | |
auto [start, end] = range[1]; | |
for (int a = 0; a < 10; a++) { | |
for (int b = 0; b < 10; b++) { | |
for (int c = 0; c < 10; c++) { | |
// Stupid partitioning. Could short-circuit the loops above instead. | |
int stupid = a*100 + b*10 + c; | |
if (stupid < start || stupid >= end) | |
continue; | |
// We're tasked to check if partitions[*][a][b][c] contains a duplicate. | |
bitset<26*26*26> bits; | |
bits.reset(); | |
for (int part = 0; part < nb_threads; part++) { | |
bits |= partitions[part][a][b][c]; | |
} | |
for (int part = 0; part < nb_threads; part++) { | |
bits ^= partitions[part][a][b][c]; | |
} | |
if (bits.any()) { | |
cout << "Found duplicate!\n" << flush; | |
_exit(1); | |
} | |
} | |
} | |
} | |
} | |
}, nb_threads); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment