Last active
April 12, 2024 13:42
-
-
Save eao197/77bf5850c0f81f71a418b5578fda833c to your computer and use it in GitHub Desktop.
Примеры для лекции про базовые средства поддержки многопоточности в C++
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*** | |
Класс std::thread | |
*/ | |
#include <iostream> | |
#include <thread> | |
void SayHello() | |
{ | |
std::cout << "Hello from thread: " << std::this_thread::get_id() << std::endl; | |
} | |
void SimpleThreadBody() | |
{ | |
SayHello(); | |
} | |
int main() { | |
SayHello(); | |
std::thread thread{SimpleThreadBody}; | |
thread.join(); | |
} | |
// Вызов join для std::thread обязателен. std::jthread появился только в C++20. | |
// Можно сперва создать пустой std::thread, а лишь потом запустить. | |
#include <iostream> | |
#include <thread> | |
bool SomeCondition() { | |
return false; | |
} | |
void SayHello() | |
{ | |
std::cout << "Hello from thread: " << std::this_thread::get_id() << std::endl; | |
} | |
void SimpleThreadBody() | |
{ | |
SayHello(); | |
} | |
int main() { | |
SayHello(); | |
std::thread thread; | |
if(SomeCondition()) { | |
thread = std::thread{SimpleThreadBody}; | |
} | |
if(thread.joinable()) { | |
thread.join(); | |
} | |
} | |
// Конструктор std::thread может получать N аргументов. | |
#include <iostream> | |
#include <thread> | |
void SimpleThreadBody(std::string name, int priority) | |
{ | |
std::cout << "This is thread " << name << " with priority " << priority << std::endl; | |
} | |
int main() { | |
std::thread thread{SimpleThreadBody, "Example", 2}; | |
thread.join(); | |
} | |
// Если в качестве thread-func передают указатель на нестатический метод класса, | |
// то вторым аргументом должен быть указатель this для этого класса. | |
#include <iostream> | |
#include <thread> | |
class SimpleActiveObject { | |
std::thread _thread; | |
public: | |
SimpleActiveObject(std::string name, int priority) { | |
_thread = std::thread{ | |
&SimpleActiveObject::threadBody, this, | |
std::move(name), priority | |
}; | |
} | |
~SimpleActiveObject() { | |
if(_thread.joinable()) { | |
_thread.join(); | |
} | |
} | |
private: | |
void threadBody(std::string name, int priority) { | |
std::cout << "This is thread " << name << " with priority " << priority << std::endl; | |
} | |
}; | |
int main() { | |
SimpleActiveObject aoOne{"one", 0}; | |
SimpleActiveObject aoTwo{"two", 2}; | |
} | |
// Но проще бывает использовать лямбду с захватом контекста. | |
#include <iostream> | |
#include <thread> | |
class SimpleActiveObject { | |
std::thread _thread; | |
public: | |
SimpleActiveObject(std::string name, int priority) { | |
_thread = std::thread{ | |
[=, this]() { | |
threadBody(name, priority); | |
} | |
}; | |
} | |
~SimpleActiveObject() { | |
if(_thread.joinable()) { | |
_thread.join(); | |
} | |
} | |
private: | |
void threadBody(const std::string & name, int priority) { | |
std::cout << "This is thread " << name << " with priority " << priority << std::endl; | |
} | |
}; | |
int main() { | |
SimpleActiveObject aoOne{"one", 0}; | |
SimpleActiveObject aoTwo{"two", 2}; | |
} | |
/*** | |
std::mutex и защита доступа к разделяемым данным | |
*/ | |
#include <iostream> | |
#include <thread> | |
#include <mutex> | |
#include <chrono> | |
using namespace std::chrono_literals; | |
int main() { | |
std::string commonData; | |
std::mutex commonDataLock; | |
std::thread first{ | |
[&commonData, &commonDataLock]() { | |
for(int i = 0; i < 5; ++i) { | |
{ | |
std::lock_guard<std::mutex> lock{commonDataLock}; | |
commonData += "A"; | |
} | |
std::this_thread::sleep_for(10ms); | |
} | |
} | |
}; | |
std::thread second{ | |
[&commonData, &commonDataLock]() { | |
for(int i = 0; i < 5; ++i) { | |
{ | |
std::lock_guard<std::mutex> lock{commonDataLock}; | |
commonData += "B"; | |
} | |
std::this_thread::sleep_for(10ms); | |
} | |
} | |
}; | |
std::thread third{ | |
[&commonData, &commonDataLock]() { | |
for(int i = 0; i < 5; ++i) { | |
{ | |
std::lock_guard<std::mutex> lock{commonDataLock}; | |
commonData += "C"; | |
} | |
std::this_thread::sleep_for(10ms); | |
} | |
} | |
}; | |
first.join(); | |
second.join(); | |
third.join(); | |
std::cout << commonData << std::endl; | |
} | |
// Что будет если сделать так: | |
std::thread first{ | |
[&commonData, &commonDataLock]() { | |
for(int i = 0; i < 5; ++i) { | |
{ | |
std::lock_guard<std::mutex> lock{commonDataLock}; | |
commonData += "A"; | |
std::this_thread::sleep_for(10ms); | |
} | |
} | |
} | |
}; | |
// mutex может быть захвачен только одной нитью. | |
// Если владеющая mutex-ом нить пытается захватить его повторно, то это проблема. | |
// Отдельное внимание на std::lock_guard. | |
// В стандартной библиотеке кроме lock_guard есть еще и unique_lock, и scoped_lock. | |
// Это все RAII-идиома. Попытки захватывать и освобождать mutex-ы без RAII | |
// неизбежно ведут к проблемам. | |
// Кроме std::mutex есть еще и std::recursive_mutex, std::shared_mutex и их timed-аналоги | |
// (std::timed_mutex, std::recursive_timed_mutex, std::shared_timed_mutex). | |
/*** | |
std::scoped_lock и std::lock | |
*/ | |
// Проблемный код. | |
#include <iostream> | |
#include <thread> | |
#include <mutex> | |
void FirstThread(std::mutex & m1, std::mutex & m2) { | |
std::lock_guard<std::mutex> l1{m1}; | |
std::lock_guard<std::mutex> l2{m2}; | |
std::cout << "FirstThread acquired locks" << std::endl; | |
} | |
void SecondThread(std::mutex & m1, std::mutex & m2) { | |
std::lock_guard<std::mutex> l2{m2}; | |
std::lock_guard<std::mutex> l1{m1}; | |
std::cout << "SecondThread acquired locks" << std::endl; | |
} | |
int main() { | |
std::mutex m1, m2; | |
std::thread first{FirstThread, std::ref(m1), std::ref(m2)}; | |
std::thread second{SecondThread, std::ref(m1), std::ref(m2)}; | |
first.join(); | |
second.join(); | |
} | |
// Если нужно захватить несколько мутексов сразу, то это должно делаться в одном и | |
// том же порядке. | |
#include <iostream> | |
#include <thread> | |
#include <mutex> | |
void FirstThread(std::mutex & m1, std::mutex & m2) { | |
std::scoped_lock<std::mutex, std::mutex> l{m1, m2}; | |
std::cout << "FirstThread acquired locks" << std::endl; | |
} | |
void SecondThread(std::mutex & m1, std::mutex & m2) { | |
std::scoped_lock<std::mutex, std::mutex> l{m2, m1}; | |
std::cout << "SecondThread acquired locks" << std::endl; | |
} | |
int main() { | |
std::mutex m1, m2; | |
std::thread first{FirstThread, std::ref(m1), std::ref(m2)}; | |
std::thread second{SecondThread, std::ref(m1), std::ref(m2)}; | |
first.join(); | |
second.join(); | |
} | |
// Можно обратить внимание на использование std::ref чтобы избежать попытки | |
// копирования std::mutex при передаче его аргументом в конструктор std::thread. | |
// Под капотом у scoped_lock лежит функция std::lock: | |
// https://en.cppreference.com/w/cpp/thread/lock | |
// Функция std::lock может использоваться если есть необходимость работать | |
// в более старом стандарте, чем C++17. | |
/*** | |
std::condition_variable | |
*/ | |
// Используется для того, чтобы "разбудить" спящую нить (которая ждет уведомления). | |
// Должен использоваться совместно с std::mutex и std::unique_lock. | |
#include <iostream> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
int main() { | |
std::mutex childStartedLock; | |
std::condition_variable childStartedCV; | |
std::unique_lock<std::mutex> lock{childStartedLock}; | |
std::thread child{[&childStartedLock, &childStartedCV]() { | |
std::lock_guard<std::mutex> lock{childStartedLock}; | |
std::cout << "Child started" << std::endl; | |
childStartedCV.notify_one(); | |
}}; | |
childStartedCV.wait(lock); | |
std::cout << "We know that child started" << std::endl; | |
child.join(); | |
} | |
// У condition_variable кроме notify_one есть еще и notify_all. | |
#include <iostream> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
#include <chrono> | |
using namespace std::chrono_literals; | |
int main() { | |
std::mutex readyToWorkLock; | |
std::condition_variable readyToWorkCV; | |
bool allThreadsStarted{false}; | |
std::thread first{[&readyToWorkLock, &readyToWorkCV, &allThreadsStarted]() { | |
std::cout << "The first thread just started" << std::endl; | |
std::unique_lock<std::mutex> lock{readyToWorkLock}; | |
if(!allThreadsStarted) | |
readyToWorkCV.wait(lock); | |
std::cout << "The first thread is ready to work" << std::endl; | |
}}; | |
std::thread second{[&readyToWorkLock, &readyToWorkCV, &allThreadsStarted]() { | |
std::cout << "The second thread just started" << std::endl; | |
std::unique_lock<std::mutex> lock{readyToWorkLock}; | |
if(!allThreadsStarted) | |
readyToWorkCV.wait(lock); | |
std::cout << "The second thread is ready to work" << std::endl; | |
}}; | |
std::this_thread::sleep_for(250ms); | |
{ | |
std::lock_guard<std::mutex> lock{readyToWorkLock}; | |
allThreadsStarted = true; | |
readyToWorkCV.notify_all(); | |
} | |
first.join(); | |
second.join(); | |
} | |
// Спонтанные (внезапные) пробуждения и защита от них. | |
readyToWorkCV.wait(lock, [&allThreadsStarted]{ return allThreadsStarted; }); | |
/*** | |
std::promise и std::future | |
*/ | |
#include <iostream> | |
#include <thread> | |
#include <future> | |
int main() { | |
std::promise<void> childStarted; | |
std::thread child{[&childStarted]() { | |
std::cout << "Child started" << std::endl; | |
childStarted.set_value(); | |
}}; | |
childStarted.get_future().get(); | |
std::cout << "We know that child started" << std::endl; | |
child.join(); | |
} | |
// Может использоваться для получения значений разных типов. | |
#include <iostream> | |
#include <thread> | |
#include <future> | |
int main() { | |
std::promise<std::string> aggregateResult; | |
std::promise<int> firstChildResult; | |
std::promise<long> secondChildResult; | |
std::thread aggregate{ | |
[&aggregateResult, | |
firstResult = firstChildResult.get_future(), | |
secondResult = secondChildResult.get_future()]() mutable { | |
aggregateResult.set_value( | |
std::to_string(firstResult.get()) + ":" + std::to_string(secondResult.get())); | |
} | |
}; | |
std::thread first{ | |
[&firstChildResult]() { | |
firstChildResult.set_value(42); | |
} | |
}; | |
std::thread second{ | |
[&secondChildResult]() { | |
secondChildResult.set_value(100500); | |
} | |
}; | |
std::cout << "The result is: " << aggregateResult.get_future().get() << std::endl; | |
aggregate.join(); | |
first.join(); | |
second.join(); | |
} | |
/*** | |
atomic-и | |
*/ | |
// Продвинутая тема для экспертов. | |
// Для применения atomic-ов для более-менее сложных сценариев требуется | |
// хорошее понимание т.н. memory_order | |
// (https://en.cppreference.com/w/cpp/atomic/memory_order) | |
// Но могут использоваться в простых ситуациях. | |
// | |
// Например, атомарный счетчик для генерации последовательно возрастающих | |
// "сквозных" идентификаторов в разных нитях. | |
#include <iostream> | |
#include <thread> | |
#include <atomic> | |
#include <chrono> | |
using namespace std::chrono_literals; | |
void ThreadFunc(std::atomic<int> & idCounter) { | |
for(int i = 0; i < 10; ++i) { | |
const auto nextId = ++idCounter; | |
std::cout << "Processing ID: " << nextId << std::endl; | |
std::this_thread::sleep_for(5ms); | |
} | |
} | |
int main() { | |
std::atomic<int> idCounter{0}; | |
std::thread first{ThreadFunc, std::ref(idCounter)}; | |
std::thread second{ThreadFunc, std::ref(idCounter)}; | |
std::thread third{ThreadFunc, std::ref(idCounter)}; | |
first.join(); | |
second.join(); | |
third.join(); | |
} | |
// Иногда atomic-и удобно использовать для "статуса", обновление и чтение которого | |
// не требует дополнительных mutex-ов. | |
#include <iostream> | |
#include <thread> | |
#include <atomic> | |
#include <chrono> | |
using namespace std::chrono_literals; | |
enum class ProcessingStatus : int { | |
NotStarted, | |
FirstPhase, | |
SecondPhase, | |
Completed | |
}; | |
int main() { | |
std::atomic<ProcessingStatus> status{ProcessingStatus::NotStarted}; | |
std::thread first{[&status]() { | |
status = ProcessingStatus::FirstPhase; | |
std::this_thread::sleep_for(30ms); | |
status = ProcessingStatus::SecondPhase; | |
std::this_thread::sleep_for(40ms); | |
status = ProcessingStatus::Completed; | |
}}; | |
for(;;) { | |
const auto currentStatus = status.load(std::memory_order_acquire); | |
switch(currentStatus) { | |
case ProcessingStatus::NotStarted: std::cout << "NotStarted" << std::endl; break; | |
case ProcessingStatus::FirstPhase: std::cout << "FirstPhase" << std::endl; break; | |
case ProcessingStatus::SecondPhase: std::cout << "SecondPhase" << std::endl; break; | |
case ProcessingStatus::Completed: std::cout << "Completed" << std::endl; break; | |
} | |
if(ProcessingStatus::Completed == currentStatus) | |
break; | |
std::this_thread::sleep_for(15ms); | |
} | |
first.join(); | |
} | |
// Нужно обратить внимание на std::memory_order_acquire при вызове load. | |
// Когда atomic-у присваивается значение, то используется | |
// std::memory_order_seq_cst (самая надежная, но и самая ресурсоемкая опция). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment