Created
April 26, 2024 13:08
-
-
Save eao197/0a7a91e21480a53351d91c6785760819 to your computer and use it in GitHub Desktop.
Возможные реализации thread_pool-ов для задачи рекурсивного обхода подкаталогов
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <condition_variable> | |
#include <functional> | |
#include <iostream> | |
#include <mutex> | |
#include <queue> | |
#include <thread> | |
#include <vector> | |
namespace demo | |
{ | |
class thread_pool_t | |
{ | |
public: | |
// Тип одной заявки на исполнение. | |
using task_t = std::function< void() >; | |
private: | |
// Тип очереди заявок. | |
using task_queue_t = std::queue< task_t >; | |
// Статус пула. | |
enum class status_t | |
{ | |
// Работа еще не начиналась, ждем самую первую заявку. | |
not_started, | |
// Работа начата, но сигнала на принудительное завершение | |
// еще не было. | |
in_progress, | |
// Работа должна быть завершена принудительно. | |
shutdown | |
}; | |
// Тип для попытки извлечения очередной заявки из очереди. | |
struct task_extraction_result_t | |
{ | |
// Заявка для обработки. | |
// | |
// Содержит актуальное значение только если m_should_process == true; | |
task_t m_task; | |
// Если true, то заявку нужно обрабатывать и в m_task лежит | |
// актуальная заявка. | |
bool m_should_process; | |
}; | |
// Размер thread-pool-а. | |
const std::size_t m_pool_size; | |
// Замок для всего thread-pool-а. | |
std::mutex m_pool_lock; | |
// Рабочие нити пула. | |
std::vector< std::thread > m_threads; | |
// Очередь заявок. | |
task_queue_t m_queue; | |
// Условие для ожидания появления заявок в очереди. | |
// | |
// В том числе выставляется и при смене статуса на shutdown. | |
std::condition_variable m_queue_not_empty; | |
// Текущий статус пула. | |
status_t m_status{ status_t::not_started }; | |
// Признак того, что пул завершил свою работу. | |
bool m_completed{ false }; | |
// Условие для ожидания завершения работы пула. | |
// | |
// Взводится когда m_completed принимает true. | |
std::condition_variable m_completion_confirmed; | |
// Сколько нитей реально стартовало. | |
// | |
// Этот счетчик нужен для случая возникновения исключений в методе start(). | |
std::size_t m_actually_started{}; | |
// Сколько нитей сейчас реально свободно. | |
// | |
// Это если работа была начата, все нити свободны, а очередь заявок пуста, | |
// то работу можно завершать. | |
std::size_t m_actually_free{}; | |
// Сколько нитей завершило свою работу. | |
// | |
// Этот счетчик нужен для определения момента, когда можно выставлять m_completed. | |
std::size_t m_actually_completed{}; | |
public: | |
// Инициализирующий конструктор. | |
// | |
// ПРИМЕЧАНИЕ: ожидаем, что pool_size больше нуля, но не проверяем это | |
// специально. | |
thread_pool_t( std::size_t pool_size ) | |
: m_pool_size{ pool_size } | |
{ | |
} | |
~thread_pool_t() | |
{ | |
// Принудительно завершаем работу тех нитей, которые могли | |
// быть запущены. | |
{ | |
std::lock_guard lock{ m_pool_lock }; | |
m_status = status_t::shutdown; | |
m_queue_not_empty.notify_all(); | |
} | |
for( auto & t : m_threads ) | |
// Если нить запущена, то значит она joinable, нет смысла | |
// проверять это отдельно. | |
t.join(); | |
} | |
// Запустить рабочие нити пула. | |
// | |
// ПРИМЕЧАНИЕ: может вызываться только один раз. | |
void | |
start() | |
{ | |
// Запускаем рабочие нити при заблокированном объекте, чтобы | |
// проще было контролировать сколько запустилось, сколько завершило | |
// свою работу. | |
std::lock_guard lock{ m_pool_lock }; | |
// ПРИМЕЧАНИЕ: не заботимся об исключениях из-за невозможности | |
// запустить очередную нить. Если i-я нить не стартует, то все | |
// ранее запущенные нити будут остановлены в деструкторе. | |
m_threads.reserve( m_pool_size ); | |
for( std::size_t i = 0; i < m_pool_size; ++i ) | |
{ | |
m_threads.push_back( std::thread{ [this]() { body(); } } ); | |
++m_actually_started; | |
} | |
} | |
// Добавить еще одну заявку в очередь. | |
void | |
push( task_t task ) | |
{ | |
std::lock_guard lock{ m_pool_lock }; | |
// Дергать condition_variable имеет смысл только если очередь была пуста. | |
const bool was_empty = m_queue.empty(); | |
m_queue.push( std::move(task) ); | |
if( was_empty ) | |
m_queue_not_empty.notify_one(); | |
} | |
// Ожидать завершения работы. | |
void | |
wait_for_completion() | |
{ | |
std::unique_lock lock{ m_pool_lock }; | |
if( !m_completed ) | |
m_completion_confirmed.wait( lock, [this]{ return m_completed; } ); | |
} | |
private: | |
// Основная функция рабочей нити. | |
void | |
body() | |
{ | |
// Для основного цикла нужно обеспечить no exception гарантию, т.к. | |
// даже если там исключение выскочит, то нам все равно нужно | |
// вызвать decrement_started_count. | |
try | |
{ | |
do_main_loop(); | |
} | |
catch(...) | |
{ | |
// Просто все перехватываем. | |
// Ничего не логируем, т.к. при логировании так же могут | |
// возникнуть исключения. | |
} | |
handle_thread_completion(); | |
} | |
void | |
do_main_loop() | |
{ | |
bool should_continue{ true }; | |
do | |
{ | |
const auto extraction_result = try_extract_next_task(); | |
should_continue = extraction_result.m_should_process; | |
if( should_continue ) | |
{ | |
extraction_result.m_task(); | |
} | |
} | |
while( should_continue ); | |
} | |
// ПРИМЕЧАНИЕ: noexcept потому что нет шансов востановиться при | |
// возникновении исключения. | |
void | |
handle_thread_completion() noexcept | |
{ | |
std::lock_guard lock{ m_pool_lock }; | |
++m_actually_completed; | |
if( m_actually_started == m_actually_completed ) | |
{ | |
// Пул завершил свою работу. Если этого момента кто-то ждал, | |
// то нужно об этом сообщить. | |
m_completed = true; | |
m_completion_confirmed.notify_one(); | |
} | |
} | |
// Попробовать достать заявку из очереди. | |
// | |
// Если task_extraction_result_t::m_should_process равен false, | |
// то цикл обработки заявок нужно прекратить. | |
[[nodiscard]] task_extraction_result_t | |
try_extract_next_task() | |
{ | |
// Повторяем попытки достать заявку из очереди до тех пор, | |
// пока заявка в очереди появится, либо пока будет выставлен | |
// признак на завершение работы. | |
bool should_continue = true; | |
// Все действия внутри цикла должны выполняться при захваченном | |
// замке объекта. | |
std::unique_lock lock{ m_pool_lock }; | |
do | |
{ | |
// Одной свободной нитью стало больше. | |
++m_actually_free; | |
// Если работа уже началась, но очередь пуста и все рабочие | |
// нити свободны, то значит новых заявок уже не будет, а значит | |
// и работу нужно завершать. | |
if( m_queue.empty() && status_t::in_progress == m_status | |
&& m_actually_started == m_actually_free ) | |
{ | |
m_status = status_t::shutdown; | |
m_queue_not_empty.notify_all(); | |
} | |
should_continue = (status_t::shutdown != m_status); | |
if( should_continue ) | |
{ | |
if( m_queue.empty() ) | |
{ | |
// Придется дать, пока в очереди что-то появится. | |
// Или если статус поменяется. | |
m_queue_not_empty.wait( lock, | |
[this] { | |
return !m_queue.empty() || | |
status_t::shutdown == m_status; | |
} ); | |
} | |
else | |
{ | |
task_extraction_result_t ret_value{ std::move(m_queue.front()), true }; | |
m_queue.pop(); | |
// Одной свободной нитью стало меньше. | |
--m_actually_free; | |
// Если это самая первая заявка, то нужно указать, | |
// что работа началась. | |
if( status_t::not_started == m_status ) | |
m_status = status_t::in_progress; | |
return ret_value; | |
} | |
} | |
} | |
while( should_continue ); | |
// Раз оказались здесь, значит работу нужно завершать. | |
return { {}, false }; | |
} | |
}; | |
} /* namespace demo */ | |
using namespace demo; | |
void | |
test_task( thread_pool_t & pool, int remaining ) | |
{ | |
if( remaining ) | |
{ | |
pool.push( [&pool, r = remaining - 1]() { | |
test_task( pool, r ); | |
} ); | |
} | |
std::this_thread::sleep_for( std::chrono::milliseconds{20} ); | |
std::cout << "test_task: " << remaining << std::endl; | |
} | |
int | |
main() | |
{ | |
{ | |
thread_pool_t pool{ 4 }; | |
pool.start(); | |
pool.push( [&pool]() { | |
test_task( pool, 10 ); | |
} ); | |
pool.wait_for_completion(); | |
} | |
std::cout << "Completed" << std::endl; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <atomic> | |
#include <condition_variable> | |
#include <functional> | |
#include <future> | |
#include <iostream> | |
#include <mutex> | |
#include <optional> | |
#include <queue> | |
#include <thread> | |
#include <vector> | |
#include <utility> | |
namespace demo | |
{ | |
class thread_pool_t | |
{ | |
public: | |
// Тип задачи, которая будет отдаваться в качестве заявки. | |
using task_t = std::function< void() >; | |
private: | |
// Тип, который хранит информацию, необходимую для завершения работы. | |
struct work_completion_data_t | |
{ | |
// Сколько еще активных заявок. | |
// | |
// ПРИМЕЧАНИЕ: в начале работы содержит нулевое значение. | |
std::atomic< unsigned int > m_live_demands{}; | |
// Промиз, который нужно выставить, чтобы дать сигнал | |
// о том, что работа была завершена. | |
std::promise< void > m_completion_promise; | |
}; | |
// Тип одной заявки. | |
// | |
// ПРИМЕЧАНИЕ: это Moveable, но не Copyable тип. | |
struct demand_t | |
{ | |
// Информация для завершения работы. | |
// | |
// Нулевое значение указывает, что содержимое объекта | |
// перемещено и сам объект активным уже не является. | |
work_completion_data_t * m_work_completion; | |
// Задача для выполнения. | |
// | |
// Может быть пустым объектом, если содержимое demand_t | |
// было перемещено. | |
task_t m_task; | |
friend void | |
swap( demand_t & a, demand_t & b ) noexcept | |
{ | |
using std::swap; | |
swap( a.m_work_completion, b.m_work_completion ); | |
swap( a.m_task, b.m_task ); | |
} | |
demand_t( | |
work_completion_data_t & work_completion, | |
task_t task) | |
: m_work_completion{ std::addressof(work_completion) } | |
, m_task{ std::move(task) } | |
{ | |
// Нужно указать, что активных заявок стало больше. | |
++(m_work_completion->m_live_demands); | |
} | |
~demand_t() | |
{ | |
// Если содержимое объекта не было куда-то перемещено, | |
// то это живой объект, который должен сообщить, что | |
// живых объектов стало меньше. | |
if( m_work_completion ) | |
{ | |
if( 0u == --(m_work_completion->m_live_demands) ) | |
{ | |
// Живых заявок больше нет, можно завершать работу. | |
m_work_completion->m_completion_promise.set_value(); | |
} | |
} | |
} | |
// Копирования быть не должно. | |
demand_t( const demand_t & ) = delete; | |
// А перемещение должно быть. | |
demand_t( demand_t && o ) noexcept | |
: m_work_completion{ | |
std::exchange( o.m_work_completion, nullptr ) } | |
, m_task{ std::exchange( o.m_task, task_t{} ) } | |
{} | |
// Копирования быть не должно. | |
demand_t & | |
operator=( const demand_t & ) = delete; | |
// А перемещение делаем через "make temporary then swap". | |
demand_t & | |
operator=( demand_t && o ) noexcept | |
{ | |
demand_t tmp{ std::move(o) }; | |
swap( tmp, *this ); | |
return *this; | |
} | |
}; | |
// Тип очереди заявок. | |
using demand_queue_t = std::queue< demand_t >; | |
// Размер thread-pool-а. | |
const std::size_t m_pool_size; | |
// Информация для завершения работы. | |
work_completion_data_t m_work_completion; | |
// Замок для всего thread-pool-а. | |
std::mutex m_pool_lock; | |
// Рабочие нити пула. | |
std::vector< std::thread > m_threads; | |
// Очередь заявок. | |
demand_queue_t m_demands; | |
// Условие для ожидания появления заявок в очереди. | |
// | |
// В том числе выставляется и при смене статуса на shutdown. | |
std::condition_variable m_queue_not_empty; | |
// Признак того, что пул должен завершить свою работу. | |
bool m_shutdown_initiated{ false }; | |
public: | |
// Инициализирующий конструктор. | |
// | |
// ПРИМЕЧАНИЕ: ожидаем, что pool_size больше нуля, но не проверяем это | |
// специально. | |
thread_pool_t( std::size_t pool_size ) | |
: m_pool_size{ pool_size } | |
{ | |
} | |
~thread_pool_t() | |
{ | |
// Принудительно завершаем работу тех нитей, которые могли | |
// быть запущены. | |
{ | |
std::lock_guard lock{ m_pool_lock }; | |
m_shutdown_initiated = true; | |
m_queue_not_empty.notify_all(); | |
} | |
for( auto & t : m_threads ) | |
// Если нить запущена, то значит она joinable, нет смысла | |
// проверять это отдельно. | |
t.join(); | |
} | |
// Запустить рабочие нити пула. | |
// | |
// ПРИМЕЧАНИЕ: может вызываться только один раз. | |
void | |
start() | |
{ | |
// Запускаем рабочие нити при заблокированном объекте, чтобы | |
// проще было контролировать сколько запустилось, сколько завершило | |
// свою работу. | |
std::lock_guard lock{ m_pool_lock }; | |
// ПРИМЕЧАНИЕ: не заботимся об исключениях из-за невозможности | |
// запустить очередную нить. Если i-я нить не стартует, то все | |
// ранее запущенные нити будут остановлены в деструкторе. | |
m_threads.reserve( m_pool_size ); | |
for( std::size_t i = 0; i < m_pool_size; ++i ) | |
{ | |
m_threads.push_back( std::thread{ [this]() { body(); } } ); | |
} | |
} | |
// Добавить еще одну заявку в очередь. | |
void | |
push( task_t task ) | |
{ | |
std::lock_guard lock{ m_pool_lock }; | |
// Дергать condition_variable имеет смысл только если очередь была пуста. | |
const bool was_empty = m_demands.empty(); | |
m_demands.push( demand_t{ m_work_completion, std::move(task) } ); | |
if( was_empty ) | |
m_queue_not_empty.notify_one(); | |
} | |
// Ожидать завершения работы. | |
void | |
wait_for_completion() | |
{ | |
m_work_completion.m_completion_promise.get_future().wait(); | |
} | |
private: | |
// Основная функция рабочей нити. | |
void | |
body() | |
{ | |
// Не заморачиваемся на исключения. Если вылетит из do_main_loop, | |
// то все приложение будет убито. | |
do_main_loop(); | |
} | |
void | |
do_main_loop() | |
{ | |
// Повторяем попытки достать заявку из очереди до тех пор, | |
// пока заявка в очереди появится, либо пока будет выставлен | |
// признак на завершение работы. | |
bool should_continue = true; | |
do | |
{ | |
const auto opt_demand = try_extract_demand(); | |
if( opt_demand ) | |
opt_demand->m_task(); | |
else | |
// Нет заявок, значит работа закончена. | |
should_continue = false; | |
} | |
while( should_continue ); | |
} | |
// Возвращает пустой optional если нужно завершить работу. | |
[[nodiscard]] std::optional< demand_t > | |
try_extract_demand() | |
{ | |
std::optional< demand_t > result; | |
// Пытаемся извлечь заявку при захваченном замке объекта. | |
std::unique_lock lock{ m_pool_lock }; | |
if( !m_shutdown_initiated ) | |
{ | |
if( m_demands.empty() ) | |
// Придется поспать. | |
m_queue_not_empty.wait( lock, | |
[this] { | |
return !m_demands.empty() || m_shutdown_initiated; | |
} ); | |
// Еще раз проверим m_shutdown_initiated, т.к. он мог измениться | |
// пока мы спали. | |
if( !m_shutdown_initiated ) | |
{ | |
result = std::move(m_demands.front()); | |
m_demands.pop(); | |
} | |
} | |
return result; | |
} | |
}; | |
} /* namespace demo */ | |
using namespace demo; | |
void | |
test_task( thread_pool_t & pool, int remaining ) | |
{ | |
if( remaining ) | |
{ | |
pool.push( [&pool, r = remaining - 1]() { | |
test_task( pool, r ); | |
} ); | |
} | |
std::this_thread::sleep_for( std::chrono::milliseconds{20} ); | |
std::cout << "test_task: " << remaining << std::endl; | |
} | |
int | |
main() | |
{ | |
{ | |
thread_pool_t pool{ 4 }; | |
pool.start(); | |
pool.push( [&pool]() { | |
test_task( pool, 10 ); | |
} ); | |
// Дожидаемся пока работа будет завершена. | |
pool.wait_for_completion(); | |
// При выходе thread_pool остановит сам себя в деструкторе. | |
} | |
std::cout << "Completed" << std::endl; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment