Created
April 21, 2018 12:20
-
-
Save ivanstepanovftw/dcc83aede3f2254925ad36d6e48058f1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cmake_minimum_required(VERSION 3.9) | |
project(threaded_handler) | |
set(CMAKE_CXX_STANDARD 17) | |
find_package(Boost) | |
link_directories( | |
${Boost_LIBRARY_DIRS} | |
) | |
include_directories( | |
${Boost_INCLUDE_DIRS} | |
) | |
add_executable(threaded_handler | |
# thread_pool_example.cc | |
main.cc | |
# thread_pool.cc | |
# thread_pool_another.cc | |
# func_arg_reordering.cc | |
# mapped_file_example.cc | |
) | |
target_link_libraries(threaded_handler | |
-lpthread | |
-lboost_iostreams | |
-lboost_chrono | |
-lboost_thread | |
-lboost_system | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Отсюда: https://github.com/lemire/Code-used-on-Daniel-Lemire-s-blog/blob/master/2012/06/26/ioaccess.cpp | |
* Дополнено плохим примером использованием stride: https://jvns.ca/blog/2014/05/12/computers-are-fast/ | |
*/ | |
#define I_HAVE_NO_BOOST | |
//#define USE_STRIDE | |
#include <fcntl.h> | |
#include <stdio.h> | |
#include <errno.h> | |
#include <string.h> | |
#include <stdlib.h> | |
#include <sys/resource.h> | |
#include <sys/types.h> | |
#include <sys/uio.h> | |
#include <sys/mman.h> | |
#include <unistd.h> | |
#include <iostream> | |
#include <fstream> | |
#include <vector> | |
#include <cassert> | |
#include <chrono> | |
#include <iomanip> | |
#include <mutex> | |
#include <deque> | |
#include <string> | |
#include <thread> | |
#include <condition_variable> | |
#include <atomic> | |
#include <functional> | |
#include <iostream> | |
#include <zconf.h> | |
#include <sys/stat.h> | |
#ifndef I_HAVE_NO_BOOST | |
#define BOOST_BIND_NO_PLACEHOLDERS | |
#include <boost/iostreams/device/mapped_file.hpp> | |
#include <boost/chrono.hpp> | |
#include <boost/thread.hpp> | |
#include <boost/asio.hpp> | |
#include <boost/lexical_cast.hpp> | |
#include <boost/scoped_ptr.hpp> | |
#endif | |
typedef size_t TYPE; | |
typedef size_t ANSWER_TYPE; | |
const char *filename = "DELME.BIN"; | |
constexpr size_t max_region_size = 4*1024*1024; // 4 MiB | |
constexpr size_t max_region_length = max_region_size / sizeof(TYPE); | |
constexpr size_t measures_overall = 1; | |
constexpr size_t measures_each = 1; | |
constexpr size_t regions_count = 10; | |
constexpr size_t stride_size = 2; | |
//use 94761707417984 !524287 | |
//not 137438691328 !524287 | |
void generate_file(const char *file_path, const size_t regions_count, const size_t region_length) | |
{ | |
FILE *fp = fopen(file_path, "wb+"); | |
assert(fp != NULL); | |
TYPE buffer[region_length]; | |
for(size_t i = 0; i < region_length; i++) | |
buffer[i] = static_cast<TYPE>(i); | |
for(size_t r = 0; r < regions_count; r++) { | |
assert(fwrite((void *)®ion_length, sizeof(size_t), 1, fp) == 1); | |
assert(fwrite((void *)buffer, sizeof(TYPE), region_length, fp) == region_length); | |
} | |
fflush(fp); | |
fclose(fp); | |
} | |
#ifndef USE_STRIDE | |
ANSWER_TYPE compute(TYPE *buffer, size_t region_length) | |
{ | |
using namespace std; | |
ANSWER_TYPE answer = 0; | |
for(size_t i = 0; i < region_length; i++) { | |
answer += buffer[i]; | |
} | |
return answer; | |
} | |
#else | |
/// using stride with cache misses 👍 | |
ANSWER_TYPE compute(TYPE *buffer, size_t region_length) | |
{ | |
using namespace std; | |
ANSWER_TYPE answer = 0; | |
for(size_t j = 0; j < stride_size; j++) { | |
for(size_t i = j; i < region_length; i += stride_size) { | |
answer += buffer[i]; | |
} | |
} | |
return answer; | |
} | |
#endif | |
void log_result(const char *function_name, double done_in, ANSWER_TYPE result) | |
{ | |
using namespace std; | |
cout<<left<<setw(28)<<setfill(' ')<<function_name | |
<<"done in: "<<fixed<<setprecision(8)<<done_in | |
<<" sec, result: "<<showbase<<result<<"."<<endl; | |
} | |
ANSWER_TYPE test_fread(const char *file_path, size_t regions_count, size_t vbuf = 0) | |
{ | |
ANSWER_TYPE answer = 0; | |
FILE *fd = fopen(file_path, "rb"); | |
assert(fd != NULL); | |
if (vbuf) | |
assert(setvbuf(fd, NULL, _IOFBF, vbuf) == 0); // large buffer | |
TYPE *buffer = new TYPE[max_region_length]; | |
size_t region_length = 0; | |
for(size_t r = 0; r < regions_count; r++) { | |
assert(fread((void *)®ion_length, sizeof(size_t), 1, fd) == 1); | |
assert(fread((void *)buffer, sizeof(TYPE), region_length, fd) == region_length); | |
answer += compute(buffer, region_length); | |
} | |
delete[] buffer; | |
fclose(fd); | |
return answer; | |
} | |
ANSWER_TYPE test_read(const char *file_path, size_t regions_count) | |
{ | |
ANSWER_TYPE answer = 0; | |
int fd = open(file_path, O_RDONLY); | |
assert(fd != -1); | |
TYPE *buffer = new TYPE[max_region_length]; | |
size_t region_length; | |
for(size_t r = 0; r < regions_count; r++) { | |
assert(read(fd, ®ion_length, sizeof(size_t)) == sizeof(size_t)); | |
const size_t region_size = region_length * sizeof(TYPE); | |
assert(read(fd, buffer, region_size) == region_size); | |
answer += compute(buffer, region_length); | |
} | |
delete[] buffer; | |
close(fd); | |
return answer; | |
} | |
ANSWER_TYPE test_mmap(const char *file_path, size_t regions_count, bool advise, bool shared) | |
{ | |
ANSWER_TYPE answer = 0; | |
int fd = open(file_path, O_RDONLY); | |
struct stat s{}; | |
assert(fstat(fd, &s) == 0); | |
const size_t file_size = size_t(s.st_size); | |
std::cout<<"filesize: "<<file_size<<std::endl; | |
#ifdef __linux__ | |
char *map = reinterpret_cast<char *>(mmap(NULL, file_size, PROT_READ, | |
MAP_FILE | (shared ? MAP_SHARED : MAP_PRIVATE) | MAP_POPULATE, fd, | |
0)); | |
#else | |
char *map = reinterpret_cast<char *>(mmap(NULL, file_size, PROT_READ, | |
MAP_FILE | (shared ? MAP_SHARED : MAP_PRIVATE), fd, 0)); | |
#endif | |
assert(map != MAP_FAILED); | |
if (advise) | |
assert(madvise(map, file_size, MADV_SEQUENTIAL | MADV_WILLNEED) == 0); | |
close(fd); | |
char *map_begin = map; | |
char *map_end = map + size_t(s.st_size); // [map_begin, map_end) | |
size_t region_length; | |
while (map < map_end) { | |
memcpy(®ion_length, map, sizeof(size_t)); | |
map += sizeof(size_t); | |
answer += compute(reinterpret_cast<TYPE *>(map), region_length); | |
map += region_length * sizeof(TYPE); | |
} | |
munmap(map_begin, file_size); | |
return answer; | |
} | |
ANSWER_TYPE test_ifstream(const char *file_path, size_t regions_count) | |
{ | |
ANSWER_TYPE answer = 0; | |
std::ifstream in(file_path, std::ios::binary); | |
assert(in.is_open()); | |
assert(in.good()); | |
assert(!in.fail()); | |
assert(!in.bad()); | |
assert(!in.eof()); | |
TYPE *buffer = new TYPE[max_region_length]; | |
size_t region_length; | |
for(size_t r = 0; r < regions_count; r++) { | |
in.read(reinterpret_cast<char *>(®ion_length), sizeof(size_t)); | |
in.read(reinterpret_cast<char *>(buffer), region_length * sizeof(size_t)); | |
answer += compute(buffer, region_length); | |
} | |
delete[] buffer; | |
in.close(); | |
return answer; | |
} | |
#ifndef I_HAVE_NO_BOOST | |
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count) | |
{ | |
namespace bio = boost::iostreams; | |
ANSWER_TYPE answer = 0; | |
puts("0"); | |
bio::mapped_file mf(file_path); | |
assert(mf.is_open()); | |
puts("1"); | |
char *map = mf.data(); | |
char *map_end = map + size_t(mf.size()); // [map_begin, map_end) | |
size_t region_length; | |
puts("2"); | |
while (map < map_end) { | |
puts(" 3"); | |
memcpy(®ion_length, map, sizeof(size_t)); | |
map += sizeof(size_t); | |
answer += compute(reinterpret_cast<TYPE *>(map), region_length); | |
map += region_length*sizeof(TYPE); | |
} | |
puts("4"); | |
return answer; | |
} | |
namespace with_copy | |
{ | |
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count) | |
{ | |
namespace bio = boost::iostreams; | |
ANSWER_TYPE answer = 0; | |
bio::mapped_file mf(file_path); | |
assert(mf.is_open()); | |
TYPE *buffer = new TYPE[max_region_length]; | |
char *map = mf.data(); | |
char *map_end = map + size_t(mf.size()); // [map_begin, map_end) | |
size_t region_length; | |
while (map < map_end) { | |
memcpy(®ion_length, map, sizeof(size_t)); | |
map += sizeof(size_t); | |
const size_t region_size = region_length*sizeof(TYPE); | |
memcpy(buffer, map, region_size); | |
answer += compute(buffer, region_length); | |
map += region_size; | |
} | |
delete[] buffer; | |
return answer; | |
} | |
} | |
namespace thread_group | |
{ | |
ANSWER_TYPE answer; | |
boost::mutex getter_mutex; | |
boost::mutex putter_mutex; | |
void call(char *map, size_t region_length) | |
{ | |
TYPE buffer[region_length]; | |
{ | |
boost::mutex::scoped_lock getter_lock(getter_mutex); | |
// std::clog<<"copying: map: "<<&map<<std::endl; | |
memcpy(buffer, map, region_length*sizeof(TYPE)); | |
} | |
ANSWER_TYPE ans = compute(buffer, region_length); | |
{ | |
boost::mutex::scoped_lock putter_lock(putter_mutex); | |
answer += ans; | |
} | |
} | |
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count) | |
{ | |
namespace bio = boost::iostreams; | |
bio::mapped_file mf(file_path); | |
assert(mf.is_open()); | |
char *map = mf.data(); | |
char *map_end = map + size_t(mf.size()); // [map_begin, map_end) | |
size_t region_length; | |
answer = 0; | |
boost::asio::io_service io_service; | |
boost::thread_group threads; | |
{ | |
boost::scoped_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service)); | |
for(size_t t = 0; t < boost::thread::hardware_concurrency(); t++) { | |
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service)); | |
} | |
{ | |
boost::mutex::scoped_lock getter_lock(getter_mutex); | |
while (map < map_end) { | |
memcpy(®ion_length, map, sizeof(size_t)); | |
map += sizeof(size_t); | |
io_service.post(boost::bind(call, map, region_length)); | |
map += region_length*sizeof(TYPE); | |
} | |
} | |
} | |
threads.join_all(); | |
return answer; | |
} | |
} | |
namespace task_queue | |
{ | |
using namespace std; | |
ANSWER_TYPE answer; | |
std::mutex getter_mutex; | |
std::mutex putter_mutex; | |
void call(TYPE *buffer, char *map, size_t region_length) | |
{ | |
{ | |
std::scoped_lock<std::mutex> lock{getter_mutex}; | |
// std::clog<<"copying: map: "<<&map<<std::endl; | |
memcpy(buffer, map, region_length*sizeof(TYPE)); | |
} | |
ANSWER_TYPE ans = compute(buffer, region_length); | |
{ | |
std::scoped_lock<std::mutex> lock{putter_mutex}; | |
answer += ans; | |
} | |
} | |
class task_queue_t | |
{ | |
public: | |
typedef function<void(TYPE *)> task_t; | |
vector<thread> pool_m; | |
deque<task_t> deque_m; | |
condition_variable condition_m; | |
mutex mutex_m; | |
atomic<bool> done_m{false}; | |
task_queue_t(size_t region_length, size_t pool_size = thread::hardware_concurrency()) | |
{ | |
pool_m.reserve(pool_size); | |
for(size_t i = 0; i < pool_size; i++) | |
pool_m.emplace_back(bind(&task_queue_t::worker, this, region_length, i)); | |
} | |
~task_queue_t() | |
{ | |
join_all(); | |
} | |
void join_all() | |
{ | |
unique_lock<mutex> lock{mutex_m}; | |
lock.unlock(); | |
if (done_m.exchange(true)) | |
return; | |
condition_m.notify_all(); | |
for(auto &thread : pool_m) { | |
thread.join(); | |
} | |
} | |
template<typename F> | |
void push(F &&function) | |
{ | |
unique_lock<mutex> lock{mutex_m}; | |
deque_m.emplace_back(forward<F>(function)); | |
condition_m.notify_one(); | |
} | |
private: | |
void worker(size_t region_length, size_t i) | |
{ | |
task_t task; | |
TYPE *buffer = new TYPE[region_length]; | |
while (true) { | |
unique_lock<mutex> lock{mutex_m}; | |
condition_m.wait(lock, [=]() { return done_m || !deque_m.empty(); }); | |
if (deque_m.empty()) | |
break; | |
task = deque_m.front(); | |
deque_m.pop_front(); | |
lock.unlock(); | |
task(buffer); | |
} | |
delete[] buffer; | |
} | |
task_queue_t(const task_queue_t &) = delete; | |
task_queue_t(task_queue_t &&) = delete; | |
task_queue_t &operator=(const task_queue_t &) = delete; | |
task_queue_t &operator=(task_queue_t &&) = delete; | |
}; | |
ANSWER_TYPE test_mapped_file(const char *file_path, size_t regions_count) | |
{ | |
using namespace std::placeholders; | |
namespace bio = boost::iostreams; | |
bio::mapped_file mf(file_path); | |
assert(mf.is_open()); | |
char *map = mf.data(); | |
char *map_end = map + size_t(mf.size()); // [map_begin, map_end) | |
size_t region_length; | |
answer = 0; | |
task_queue_t pool(max_region_length); | |
while (map < map_end) { | |
memcpy(®ion_length, map, sizeof(size_t)); | |
map += sizeof(size_t); | |
const size_t region_size = region_length*sizeof(TYPE); | |
pool.push(bind(call, _1, map, region_length)); | |
map += region_size; | |
} | |
pool.join_all(); | |
return answer; | |
} | |
} | |
#endif | |
int main() | |
{ | |
using namespace std; | |
using namespace std::chrono; | |
ANSWER_TYPE result = 0; | |
high_resolution_clock::time_point t1; | |
high_resolution_clock::time_point t2; | |
for(size_t m = 0; m < measures_overall; m++) { | |
generate_file(filename, regions_count, max_region_length); | |
cout<<endl; | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_fread(filename, regions_count); | |
t2 = high_resolution_clock::now(); | |
log_result("fread", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_fread(filename, regions_count, 4 * 1024); // 4 KiB | |
t2 = high_resolution_clock::now(); | |
log_result("fread with 4 KiB buffer", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_fread(filename, regions_count, 32 * 1024 * 1024); // 32 MiB | |
t2 = high_resolution_clock::now(); | |
log_result("fread with 32 MiB buffer", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_read(filename, regions_count); | |
t2 = high_resolution_clock::now(); | |
log_result("read", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_mmap(filename, regions_count, false, false); | |
t2 = high_resolution_clock::now(); | |
log_result("mmap", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_mmap(filename, regions_count, true, false); | |
t2 = high_resolution_clock::now(); | |
log_result("mmap fancy", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_mmap(filename, regions_count, false, true); | |
t2 = high_resolution_clock::now(); | |
log_result("mmap shared", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_mmap(filename, regions_count, true, true); | |
t2 = high_resolution_clock::now(); | |
log_result("mmap fancy shared", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_ifstream(filename, regions_count); | |
t2 = high_resolution_clock::now(); | |
log_result("ifstream", duration_cast<duration<double>>(t2 - t1).count(), result); | |
#ifndef I_HAVE_NO_BOOST | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += test_mapped_file(filename, regions_count); | |
t2 = high_resolution_clock::now(); | |
log_result("memory_mapped", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += with_copy::test_mapped_file(filename, regions_count); | |
t2 = high_resolution_clock::now(); | |
log_result("memory_mapped with_copy", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += thread_group::test_mapped_file(filename, regions_count); | |
t2 = high_resolution_clock::now(); | |
log_result("memory_mapped thread_group", duration_cast<duration<double>>(t2 - t1).count(), result); | |
result = 0; | |
t1 = high_resolution_clock::now(); | |
for(size_t n = 0; n < measures_each; n++) | |
result += task_queue::test_mapped_file(filename, regions_count); | |
t2 = high_resolution_clock::now(); | |
log_result("memory_mapped task_queue", duration_cast<duration<double>>(t2 - t1).count(), result); | |
#endif | |
remove(filename); | |
} | |
} | |
/* | |
* DEBUG RESULTS | |
fread done in: 0.42606470 sec, result: 13743869132800. | |
fread with 4 KiB buffer done in: 0.49409973 sec, result: 13743869132800. | |
fread with 32 MiB buffer done in: 0.47028050 sec, result: 13743869132800. | |
read done in: 0.44382183 sec, result: 13743869132800. | |
mmap done in: 0.28163830 sec, result: 13743869132800. | |
mmap fancy done in: 0.27840858 sec, result: 13743869132800. | |
mmap shared done in: 0.30134640 sec, result: 13743869132800. | |
mmap fancy shared done in: 0.30472618 sec, result: 13743869132800. | |
ifstream done in: 0.39622994 sec, result: 13743869132800. | |
memory_mapped done in: 0.27899714 sec, result: 13743869132800. | |
memory_mapped with_copy done in: 0.39397306 sec, result: 13743869132800. | |
memory_mapped thread_group done in: 0.45736181 sec, result: 13743869132800. | |
memory_mapped task_queue done in: 0.33702452 sec, result: 13743869132800. | |
* RELEASE RESULTS */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment