Skip to content

Instantly share code, notes, and snippets.

@ivanstepanovftw
Last active April 20, 2018 09:18
Show Gist options
  • Save ivanstepanovftw/4e768bde8577fd2a12ae20aa74b72aed to your computer and use it in GitHub Desktop.
Save ivanstepanovftw/4e768bde8577fd2a12ae20aa74b72aed to your computer and use it in GitHub Desktop.
Короче, как правильно сделать метод wait()? Что бы все jobs были выполнены.
#include <mutex>
#include <deque>
#include <string>
#include <thread>
#include <condition_variable>
#include <atomic>
#include <functional>
#include <iostream>
#include <zconf.h>
#define HEX(s) hex<<showbase<<(s)<<dec
using namespace std;
mutex mu_print;
//[1] : https://medium.com/@fosterbrereton/starfighter-in-c-the-task-queue-a074d132e78
//[2] : http://ru.cppreference.com/w/cpp/utility/functional/bind
/** Это обычный Thread Pool, украденый из [1] */
class task_queue_t {
public:
typedef function<void (char *)> task_t;
deque<task_t> deque_m;
vector<thread> pool_m;
condition_variable condition_m;
mutex mutex_m;
atomic<bool> done_m{false};
task_queue_t(size_t buffer_size, size_t pool_size = thread::hardware_concurrency()) {
for(size_t i = 0; i < pool_size; i++) {
pool_m.emplace_back(bind(&task_queue_t::worker, this, buffer_size, i));
}
}
~task_queue_t() {
unique_lock<mutex> lock{mutex_m};
signal_done();
lock.unlock();
for(auto& thread : pool_m) {
thread.join();
}
}
template <typename F>
void push(F&& function) {
unique_lock<mutex> lock{mutex_m};
deque_m.emplace_back(forward<F>(function));
condition_m.notify_one();
}
void signal_done() {
if (done_m.exchange(true))
return;
condition_m.notify_all();
}
void wait() {
/// Если вот так написать, то main() не завершается:
// for(thread &t : this->pool_m) {
// t.join();
// }
/// Если так, то прерываются все потоки и не все TASKS выполняются.
// this->signal_done();
/// Поэтому решил написать свой метод, который хз как реализовать.
/* В конце выполнения должно вывести в консоль:
[...]
cursor: 23, done.
~~~~~~~~~~~~ !! WORKER 3 STOPPED !! ~~~~~~~~~~~~
!! DONE !!
Process finished with exit code 0
*/
}
void worker(size_t buffer_size, size_t i) {
task_t task;
char *buffer = new char[buffer_size];
while (true)
try {
unique_lock<mutex> lock{mutex_m};
condition_m.wait(lock,
[=]() {
return done_m || !empty_unsafe();
});
if (done_m)
break;
if (try_pop_unsafe(task)) {
lock.unlock();
task(buffer);
} else
clog<<"~~~~~~~~~~~~ !! end possibly !! ~~~~~~~~~~~~"<<endl;
} catch (...) {
// Drop it on the floor. Not ideal, but really
// there's nowhere for them to go right now.
scoped_lock<mutex> lock{mu_print};
clog<<"~~~~~~~~~~~~ !! CATCH CALLED !! ~~~~~~~~~~~~"<<endl;
break;
}
{
scoped_lock<mutex> lock{mu_print};
clog<<"~~~~~~~~~~~~ !! WORKER "<<i<<" STOPPED !! ~~~~~~~~~~~~"<<endl;
}
delete [] buffer;
}
bool empty_unsafe() const {
return deque_m.empty();
}
bool try_pop_unsafe(task_t& task) {
if (deque_m.empty())
return false;
task = deque_m.front();
deque_m.pop_front();
return true;
}
private:
task_queue_t(const task_queue_t&) = delete;
task_queue_t(task_queue_t&&) = delete;
task_queue_t& operator=(const task_queue_t&) = delete;
task_queue_t& operator=(task_queue_t&&) = delete;
};
void call(char *buffer, size_t cursor) {
{
scoped_lock<mutex> lock{mu_print};
clog<<"buffer at: "<<HEX(&buffer)<<endl;
clog<<" cursor: "<<cursor<<endl;
}
sleep(3);
{
scoped_lock<mutex> lock{mu_print};
clog<<" cursor: "<<cursor<<", done."<<endl;
}
}
int main() {
using namespace placeholders;
constexpr size_t SIZE_PER_THREAD = 123; //4*1024*1024; //4 MiB
constexpr size_t TASKS = 3*8;
task_queue_t pool(SIZE_PER_THREAD);
{
scoped_lock<mutex> lock{mu_print};
for(size_t t = 0; t < TASKS; t++) {
function<void(char *)> f = bind(call, _1, t);
pool.push(f);
}
clog<<"done pushing "<<TASKS<<" tasks."<<endl;
}
pool.wait();
clog<<"!! DONE !!"<<endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment