Last active
April 20, 2018 09:18
-
-
Save ivanstepanovftw/4e768bde8577fd2a12ae20aa74b72aed to your computer and use it in GitHub Desktop.
Короче, как правильно сделать метод wait()? Что бы все jobs были выполнены.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <mutex> | |
#include <deque> | |
#include <string> | |
#include <thread> | |
#include <condition_variable> | |
#include <atomic> | |
#include <functional> | |
#include <iostream> | |
#include <zconf.h> | |
#define HEX(s) hex<<showbase<<(s)<<dec | |
using namespace std; | |
mutex mu_print; | |
//[1] : https://medium.com/@fosterbrereton/starfighter-in-c-the-task-queue-a074d132e78 | |
//[2] : http://ru.cppreference.com/w/cpp/utility/functional/bind | |
/** Это обычный Thread Pool, украденый из [1] */ | |
class task_queue_t { | |
public: | |
typedef function<void (char *)> task_t; | |
deque<task_t> deque_m; | |
vector<thread> pool_m; | |
condition_variable condition_m; | |
mutex mutex_m; | |
atomic<bool> done_m{false}; | |
task_queue_t(size_t buffer_size, size_t pool_size = thread::hardware_concurrency()) { | |
for(size_t i = 0; i < pool_size; i++) { | |
pool_m.emplace_back(bind(&task_queue_t::worker, this, buffer_size, i)); | |
} | |
} | |
~task_queue_t() { | |
unique_lock<mutex> lock{mutex_m}; | |
signal_done(); | |
lock.unlock(); | |
for(auto& thread : pool_m) { | |
thread.join(); | |
} | |
} | |
template <typename F> | |
void push(F&& function) { | |
unique_lock<mutex> lock{mutex_m}; | |
deque_m.emplace_back(forward<F>(function)); | |
condition_m.notify_one(); | |
} | |
void signal_done() { | |
if (done_m.exchange(true)) | |
return; | |
condition_m.notify_all(); | |
} | |
void wait() { | |
/// Если вот так написать, то main() не завершается: | |
// for(thread &t : this->pool_m) { | |
// t.join(); | |
// } | |
/// Если так, то прерываются все потоки и не все TASKS выполняются. | |
// this->signal_done(); | |
/// Поэтому решил написать свой метод, который хз как реализовать. | |
/* В конце выполнения должно вывести в консоль: | |
[...] | |
cursor: 23, done. | |
~~~~~~~~~~~~ !! WORKER 3 STOPPED !! ~~~~~~~~~~~~ | |
!! DONE !! | |
Process finished with exit code 0 | |
*/ | |
} | |
void worker(size_t buffer_size, size_t i) { | |
task_t task; | |
char *buffer = new char[buffer_size]; | |
while (true) | |
try { | |
unique_lock<mutex> lock{mutex_m}; | |
condition_m.wait(lock, | |
[=]() { | |
return done_m || !empty_unsafe(); | |
}); | |
if (done_m) | |
break; | |
if (try_pop_unsafe(task)) { | |
lock.unlock(); | |
task(buffer); | |
} else | |
clog<<"~~~~~~~~~~~~ !! end possibly !! ~~~~~~~~~~~~"<<endl; | |
} catch (...) { | |
// Drop it on the floor. Not ideal, but really | |
// there's nowhere for them to go right now. | |
scoped_lock<mutex> lock{mu_print}; | |
clog<<"~~~~~~~~~~~~ !! CATCH CALLED !! ~~~~~~~~~~~~"<<endl; | |
break; | |
} | |
{ | |
scoped_lock<mutex> lock{mu_print}; | |
clog<<"~~~~~~~~~~~~ !! WORKER "<<i<<" STOPPED !! ~~~~~~~~~~~~"<<endl; | |
} | |
delete [] buffer; | |
} | |
bool empty_unsafe() const { | |
return deque_m.empty(); | |
} | |
bool try_pop_unsafe(task_t& task) { | |
if (deque_m.empty()) | |
return false; | |
task = deque_m.front(); | |
deque_m.pop_front(); | |
return true; | |
} | |
private: | |
task_queue_t(const task_queue_t&) = delete; | |
task_queue_t(task_queue_t&&) = delete; | |
task_queue_t& operator=(const task_queue_t&) = delete; | |
task_queue_t& operator=(task_queue_t&&) = delete; | |
}; | |
void call(char *buffer, size_t cursor) { | |
{ | |
scoped_lock<mutex> lock{mu_print}; | |
clog<<"buffer at: "<<HEX(&buffer)<<endl; | |
clog<<" cursor: "<<cursor<<endl; | |
} | |
sleep(3); | |
{ | |
scoped_lock<mutex> lock{mu_print}; | |
clog<<" cursor: "<<cursor<<", done."<<endl; | |
} | |
} | |
int main() { | |
using namespace placeholders; | |
constexpr size_t SIZE_PER_THREAD = 123; //4*1024*1024; //4 MiB | |
constexpr size_t TASKS = 3*8; | |
task_queue_t pool(SIZE_PER_THREAD); | |
{ | |
scoped_lock<mutex> lock{mu_print}; | |
for(size_t t = 0; t < TASKS; t++) { | |
function<void(char *)> f = bind(call, _1, t); | |
pool.push(f); | |
} | |
clog<<"done pushing "<<TASKS<<" tasks."<<endl; | |
} | |
pool.wait(); | |
clog<<"!! DONE !!"<<endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment