Skip to content

Instantly share code, notes, and snippets.

@ivanstepanovftw
Created April 24, 2018 19:46
Show Gist options
  • Save ivanstepanovftw/d7afd70a29ac3ec649ce6387fb73bce5 to your computer and use it in GitHub Desktop.
Save ivanstepanovftw/d7afd70a29ac3ec649ce6387fb73bce5 to your computer and use it in GitHub Desktop.
cmake_minimum_required(VERSION 3.9)
project(threaded_handler)
set(CMAKE_CXX_STANDARD 17)
find_package(Boost)
link_directories(
${Boost_LIBRARY_DIRS}
)
include_directories(
${Boost_INCLUDE_DIRS}
)
add_executable(threaded_handler
main.cc
)
target_link_libraries(threaded_handler
-lpthread
-lboost_iostreams
-lboost_chrono
-lboost_thread
-lboost_system
)
/* Отсюда: https://github.com/lemire/Code-used-on-Daniel-Lemire-s-blog/blob/master/2012/06/26/ioaccess.cpp
* Дополнено плохим примером использованием stride: https://jvns.ca/blog/2014/05/12/computers-are-fast/
*/
#define USE_STRIDE
#define SKIP
#define CHECK(x) { if(!(x)) { \
fprintf(stderr, "%s:%i: failure at: %s\n", __FILE__, __LINE__, #x); \
_exit(1); } }
#include <sys/param.h>
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <sys/resource.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <unistd.h>
#include <iostream>
#include <fstream>
#include <vector>
#include <chrono>
#include <iomanip>
#include <mutex>
#include <deque>
#include <string>
#include <thread>
#include <condition_variable>
#include <atomic>
#include <functional>
#include <iostream>
#include <zconf.h>
#include <sys/stat.h>
#define BOOST_BIND_NO_PLACEHOLDERS
#include <boost/iostreams/device/mapped_file.hpp>
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/scoped_ptr.hpp>
typedef size_t TYPE;
typedef size_t ANSWER_TYPE;
const char *filename = "/tmp/DELME.BIN";
constexpr size_t max_region_size = 4*1024*1024; // 4 MiB
constexpr size_t max_region_length = max_region_size / sizeof(TYPE);
constexpr size_t measures_overall = 1;
constexpr size_t measures_each = 1;
constexpr size_t regions_count = 100;
constexpr size_t stride_size = 2;
void generate_file(const char *file_path, const size_t regions_count, const size_t region_length)
{
using namespace std;
clog<<"file_path: "<<file_path<<", regions_count: "<<regions_count<<", region_length: "<<region_length<<endl;
FILE *fp = fopen(file_path, "wb+");
CHECK(fp != NULL);
TYPE buffer[region_length];
for(size_t i = 0; i < region_length; i++)
buffer[i] = static_cast<TYPE>(i);
for(size_t r = 0; r < regions_count; r++) {
srand(unsigned(time(NULL)));
size_t rnd = region_length - (rand() % 10 + 1);
CHECK(fwrite((void *)&rnd, sizeof(size_t), 1, fp) == 1);
CHECK(fwrite((void *)buffer, sizeof(TYPE), rnd, fp) == rnd);
}
fflush(fp);
fclose(fp);
}
#ifndef USE_STRIDE
ANSWER_TYPE compute(TYPE *buffer, size_t region_length)
{
using namespace std;
ANSWER_TYPE answer = 0;
for(size_t i = 0; i < region_length; i++) {
answer += buffer[i];
}
return answer;
}
#else
/// using stride with cache misses 👍
ANSWER_TYPE compute(TYPE *buffer, size_t region_length)
{
using namespace std;
ANSWER_TYPE answer = 0;
for(size_t j = 0; j < stride_size; j++) {
for(size_t i = j; i < region_length; i += stride_size) {
answer += buffer[i];
}
}
return answer;
}
#endif
void log_result(const char *function_name, double done_in, ANSWER_TYPE result)
{
using namespace std;
cout<<left<<setw(28)<<setfill(' ')<<function_name
<<"done in: "<<fixed<<setprecision(8)<<done_in
<<" sec, result: "<<showbase<<result<<"."<<endl;
}
ANSWER_TYPE test_fread(const char *file_path, size_t regions_count, size_t vbuf = 0)
{
ANSWER_TYPE answer = 0;
FILE *fd = fopen(file_path, "rb");
CHECK(fd != NULL);
if (vbuf)
CHECK(setvbuf(fd, NULL, _IOFBF, vbuf) == 0); // large buffer
TYPE *buffer = new TYPE[max_region_length];
size_t region_length = 0;
for(size_t r = 0; r < regions_count; r++) {
CHECK(fread((void *)&region_length, sizeof(size_t), 1, fd) == 1);
CHECK(fread((void *)buffer, sizeof(TYPE), region_length, fd) == region_length);
answer += compute(buffer, region_length);
}
delete[] buffer;
fclose(fd);
return answer;
}
ANSWER_TYPE test_read(const char *file_path, size_t regions_count)
{
ANSWER_TYPE answer = 0;
int fd = open(file_path, O_RDONLY);
CHECK(fd != -1);
TYPE *buffer = new TYPE[max_region_length];
size_t region_length;
for(size_t r = 0; r < regions_count; r++) {
CHECK(read(fd, &region_length, sizeof(size_t)) == sizeof(size_t));
const size_t region_size = region_length * sizeof(TYPE);
CHECK(read(fd, buffer, region_size) == region_size);
answer += compute(buffer, region_length);
}
delete[] buffer;
close(fd);
return answer;
}
ANSWER_TYPE test_mmap(const char *file_path, size_t regions_count, bool advise, bool shared)
{
ANSWER_TYPE answer = 0;
int fd = open(file_path, O_RDONLY);
struct stat s{};
CHECK(fstat(fd, &s) == 0);
const size_t file_size = size_t(s.st_size);
#ifdef __linux__
char *map = reinterpret_cast<char *>(mmap(NULL, file_size, PROT_READ,
MAP_FILE | (shared ? MAP_SHARED : MAP_PRIVATE) | MAP_POPULATE, fd,
0));
#else
char *map = reinterpret_cast<char *>(mmap(NULL, file_size, PROT_READ,
MAP_FILE | (shared ? MAP_SHARED : MAP_PRIVATE), fd, 0));
#endif
CHECK(map != MAP_FAILED);
if (advise)
CHECK(madvise(map, file_size, MADV_SEQUENTIAL | MADV_WILLNEED) == 0);
close(fd);
char *map_begin = map;
char *map_end = map + size_t(s.st_size); // [map_begin, map_end)
size_t region_length;
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
answer += compute(reinterpret_cast<TYPE *>(map), region_length);
map += region_length * sizeof(TYPE);
}
munmap(map_begin, file_size);
return answer;
}
ANSWER_TYPE test_ifstream(const char *file_path, size_t regions_count)
{
ANSWER_TYPE answer = 0;
std::ifstream in(file_path, std::ios::binary);
CHECK(in.is_open());
CHECK(in.good());
CHECK(!in.fail());
CHECK(!in.bad());
CHECK(!in.eof());
TYPE *buffer = new TYPE[max_region_length];
size_t region_length;
for(size_t r = 0; r < regions_count; r++) {
in.read(reinterpret_cast<char *>(&region_length), sizeof(size_t));
in.read(reinterpret_cast<char *>(buffer), region_length * sizeof(size_t));
answer += compute(buffer, region_length);
}
delete[] buffer;
in.close();
return answer;
}
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count)
{
namespace bio = boost::iostreams;
ANSWER_TYPE answer = 0;
bio::mapped_file mf(file_path);
CHECK(mf.is_open());
char *map = mf.data();
char *map_end = map + size_t(mf.size()); // [map_begin, map_end)
size_t region_length;
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
answer += compute(reinterpret_cast<TYPE *>(map), region_length);
map += region_length*sizeof(TYPE);
}
return answer;
}
namespace with_copy
{
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count)
{
namespace bio = boost::iostreams;
ANSWER_TYPE answer = 0;
bio::mapped_file mf(file_path);
CHECK(mf.is_open());
TYPE *buffer = new TYPE[max_region_length];
char *map = mf.data();
char *map_end = map + size_t(mf.size()); // [map_begin, map_end)
size_t region_length;
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
const size_t region_size = region_length*sizeof(TYPE);
memcpy(buffer, map, region_size);
answer += compute(buffer, region_length);
map += region_size;
}
delete[] buffer;
return answer;
}
}
namespace thread_group
{
ANSWER_TYPE answer;
boost::mutex getter_mutex;
boost::mutex putter_mutex;
void call(char *map, size_t region_length)
{
TYPE buffer[region_length];
{
boost::mutex::scoped_lock getter_lock(getter_mutex);
// std::clog<<"copying: map: "<<&map<<std::endl;
memcpy(buffer, map, region_length*sizeof(TYPE));
}
ANSWER_TYPE ans = compute(buffer, region_length);
{
boost::mutex::scoped_lock putter_lock(putter_mutex);
answer += ans;
}
}
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count)
{
namespace bio = boost::iostreams;
bio::mapped_file mf(file_path);
CHECK(mf.is_open());
char *map = mf.data();
char *map_end = map + size_t(mf.size()); // [map_begin, map_end)
size_t region_length;
answer = 0;
boost::asio::io_service io_service;
boost::thread_group threads;
{
boost::scoped_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service));
for(size_t t = 0; t < boost::thread::hardware_concurrency(); t++) {
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
}
{
boost::mutex::scoped_lock getter_lock(getter_mutex);
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
io_service.post(boost::bind(call, map, region_length));
map += region_length*sizeof(TYPE);
}
}
}
threads.join_all();
return answer;
}
}
namespace task_queue
{
using namespace std;
ANSWER_TYPE answer;
std::mutex getter_mutex;
std::mutex putter_mutex;
void call(TYPE *buffer, char *map, size_t region_length)
{
{
std::scoped_lock<std::mutex> lock{getter_mutex};
// std::clog<<"copying: map: "<<&map<<std::endl;
memcpy(buffer, map, region_length*sizeof(TYPE));
}
ANSWER_TYPE ans = compute(buffer, region_length);
{
std::scoped_lock<std::mutex> lock{putter_mutex};
answer += ans;
}
}
class task_queue_t
{
public:
typedef function<void(TYPE *)> task_t;
vector<thread> pool_m;
deque<task_t> deque_m;
condition_variable condition_m;
mutex mutex_m;
atomic<bool> done_m{false};
task_queue_t(size_t region_length, size_t pool_size = thread::hardware_concurrency())
{
pool_m.reserve(pool_size);
for(size_t i = 0; i < pool_size; i++)
pool_m.emplace_back(bind(&task_queue_t::worker, this, region_length, i));
}
~task_queue_t()
{
join_all();
}
void join_all()
{
unique_lock<mutex> lock{mutex_m};
lock.unlock();
if (done_m.exchange(true))
return;
condition_m.notify_all();
for(auto &thread : pool_m) {
thread.join();
}
}
template<typename F>
void push(F &&function)
{
unique_lock<mutex> lock{mutex_m};
deque_m.emplace_back(forward<F>(function));
condition_m.notify_one();
}
private:
void worker(size_t region_length, size_t i)
{
task_t task;
TYPE *buffer = new TYPE[region_length];
while (true) {
unique_lock<mutex> lock{mutex_m};
condition_m.wait(lock, [=]() { return done_m || !deque_m.empty(); });
if (deque_m.empty())
break;
task = deque_m.front();
deque_m.pop_front();
lock.unlock();
task(buffer);
}
delete[] buffer;
}
task_queue_t(const task_queue_t &) = delete;
task_queue_t(task_queue_t &&) = delete;
task_queue_t &operator=(const task_queue_t &) = delete;
task_queue_t &operator=(task_queue_t &&) = delete;
};
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count)
{
using namespace std::placeholders;
namespace bio = boost::iostreams;
bio::mapped_file mf(file_path);
CHECK(mf.is_open());
char *map = mf.data();
char *map_end = map + size_t(mf.size()); // [map_begin, map_end)
size_t region_length;
answer = 0;
task_queue_t pool(max_region_length);
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
const size_t region_size = region_length*sizeof(TYPE);
pool.push(bind(call, _1, map, region_length));
map += region_size;
}
pool.join_all();
return answer;
}
}
#define HEX(h) hex<<showbase<<(h)<<dec
/*
namespace just_threads
{
using namespace std;
ANSWER_TYPE answer;
std::mutex getter_mutex;
std::mutex putter_mutex;
/// [map, map_r_end)
void call(char *map, char *region_end, char *map_end) {
TYPE buffer[max_region_length];
size_t region_size = MIN(region_end - map, map_end - map);
size_t region_length = region_size/sizeof(TYPE);
ANSWER_TYPE ans;
if (region_end > 0) {
{
scoped_lock<mutex> lock{getter_mutex};
cout<<"memcpy(buffer, "<<HEX(&map)<<", "<<region_size<<")"<<endl;
memcpy(buffer, map, region_size);
map += region_size;
}
ans = compute(reinterpret_cast<TYPE *>(buffer), region_length);
{
scoped_lock<mutex> lock{putter_mutex};
answer += ans;
}
}
char *mm;
while(map < map_end) {
{
scoped_lock<mutex> lock{getter_mutex};
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
region_size = region_length * sizeof(TYPE);
mm = map + region_size;
if (mm < map_end) {
memcpy(buffer, map, region_size);
map = mm;
} else {
region_size = map_end - map;
region_length = region_size/sizeof(TYPE);
memcpy(buffer, map, region_size);
map += region_size;
clog<<"once, map: "<<HEX(&map)<<endl;
}
}
ans = compute(reinterpret_cast<TYPE *>(buffer), region_length);
{
scoped_lock<mutex> lock{putter_mutex};
answer += ans;
}
}
}
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count)
{
using namespace std;
namespace bio = boost::iostreams;
bio::mapped_file mf(file_path);
CHECK(mf.is_open());
char *map = mf.data();
char *map_begin = map;
char *map_end = map + size_t(mf.size()); // [map_begin, map_end)
answer = 0;
const size_t hwc = thread::hardware_concurrency();
size_t i = 0;
size_t region_size;
size_t region_length;
// size_t per_thread = (mf.size()-regions_count*sizeof(size_t))/hwc;
size_t per_thread = mf.size()/hwc;
size_t cur = 0;
size_t cur_end;
size_t to_send;
vector<thread> threads(hwc);
struct reg_t {
char *begin = nullptr;
char *end = nullptr;
size_t len = 0;
size_t size = 0;
size_t off = 0;
size_t off_end = 0;
size_t off_size = 0;
};
vector<reg_t> regions;
size_t off = 0;
size_t all = 0;
while (map < map_end) {
memcpy(&region_length, map, sizeof(size_t));
map += sizeof(size_t);
off += sizeof(size_t);
region_size = region_length * sizeof(TYPE);
regions.push_back(reg_t{map, map+region_size, region_length, region_size, off, off+region_size, all});
off += region_size;
all += region_size;
map += region_size;
}
clog<<"found: "<<regions.size()<<endl;
for(size_t r = 0; r < regions.size(); r++) {
const auto& a = regions[r];
clog<<"r: "<<r
<<", begin: "<<&a.begin
<<", end: "<<&a.end
<<", len: "<<a.len
<<", size: "<<a.size
<<", off: "<<a.off
<<", off_end: "<<a.off_end
<<", off_size: "<<a.off_size
<<endl;
}
clog<<"map_begin: "<<HEX(&map_begin)<<", map_end: "<<HEX(&map_end)<<endl;
size_t reg_size = 0;
size_t r = 0;
for(size_t t = 0; t < hwc; t++) {
clog<<"t: "<<t<<endl;
clog<<"(t+1)*per_thread: "<<(t+1)*per_thread<<endl;
for( ; r < regions.size(); ) {
const reg_t& reg = regions[r];
clog<<"\tr: "<<r<<endl;
if (reg.off_end < (t+1) * per_thread) {
clog<<"\t\treg.off_end < (t+1)*per_thread (skip region)"<<endl;
r++;
continue;
} else {
clog<<"\t\treg.off_end >= (t+1)*per_thread"<<endl;
clog<<"\t\treg_size == 0?"<<(reg_size == 0)<<endl;
clog<<"\t\tmap_begin + reg_size: "<<HEX(map_begin + reg_size)<<endl;
threads[t] = thread(call,
map_begin + t * per_thread,
(reg_size == 0)? nullptr : map_begin + reg_size,
map_begin + (t+1)*per_thread);
reg_size = (reg.off_size + reg.size) - (t+1)*per_thread;
clog<<"\t\treg_size: "<<reg_size<<" (next thread)"<<endl;
break;
}
}
}
for(auto& t : threads)
t.join();
return answer;
}
}*/
int main()
{
using namespace std;
using namespace std::chrono;
ANSWER_TYPE result = 0;
high_resolution_clock::time_point t1;
high_resolution_clock::time_point t2;
for(size_t m = 0; m < measures_overall; m++) {
generate_file(filename, regions_count, max_region_length);
cout<<endl;
#ifndef SKIP
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_fread(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("fread", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_fread(filename, regions_count, 4 * 1024); // 4 KiB
t2 = high_resolution_clock::now();
log_result("fread with 4 KiB buffer", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_fread(filename, regions_count, 32 * 1024 * 1024); // 32 MiB
t2 = high_resolution_clock::now();
log_result("fread with 32 MiB buffer", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_read(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("read", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mmap(filename, regions_count, false, false);
t2 = high_resolution_clock::now();
log_result("mmap", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mmap(filename, regions_count, true, false);
t2 = high_resolution_clock::now();
log_result("mmap fancy", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mmap(filename, regions_count, false, true);
t2 = high_resolution_clock::now();
log_result("mmap shared", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mmap(filename, regions_count, true, true);
t2 = high_resolution_clock::now();
log_result("mmap fancy shared", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_ifstream(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("ifstream", duration_cast<duration<double>>(t2 - t1).count(), result);
#endif
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += test_mapped_file(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("memory_mapped", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += with_copy::test_mapped_file(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("memory_mapped with_copy", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += thread_group::test_mapped_file(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("memory_mapped thread_group", duration_cast<duration<double>>(t2 - t1).count(), result);
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += task_queue::test_mapped_file(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("memory_mapped task_queue", duration_cast<duration<double>>(t2 - t1).count(), result);
/*
result = 0;
t1 = high_resolution_clock::now();
for(size_t n = 0; n < measures_each; n++)
result += just_threads::test_mapped_file(filename, regions_count);
t2 = high_resolution_clock::now();
log_result("memory_mapped just_threads", duration_cast<duration<double>>(t2 - t1).count(), result);
*/
remove(filename);
}
}
/*
* DEBUG RESULTS
memory_mapped done in: 0.28913657 sec, result: 13743502134000.
memory_mapped with_copy done in: 0.37507899 sec, result: 13743502134000.
memory_mapped thread_group done in: 0.27466311 sec, result: 13743502134000.
memory_mapped task_queue done in: 0.29531609 sec, result: 13743502134000.
* RELEASE RESULTS
TODO */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment