Last active
April 26, 2022 10:59
-
-
Save pfirsich/e990ab2200721cae1382ed9995d3addd to your computer and use it in GitHub Desktop.
Dmitry Vyukov's thread-safe multiple producer single consumer queue with wait free production ported to C++. I read the source code carefully and added lots of comments explaining what it does (it took me a while to figure it all out).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// g++ -Wall -Wextra -O0 -g -o log_test | |
#include <atomic> | |
#include <optional> | |
// Vyukov MPSC (wait-free multiple producers, single consumer) queue | |
// https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue | |
template <typename T> | |
class MpscQueue { | |
public: | |
MpscQueue() | |
: stub_() | |
, consumeEnd_(&stub_) | |
, produceEnd_(&stub_) | |
{ | |
} | |
void produce(T&& value) | |
{ | |
produce(new Node { std::move(value), nullptr }); | |
} | |
std::optional<T> consume() | |
{ | |
auto node = consumeEnd_.load(); | |
auto next = node->next.load(); | |
// If we are supposed to consume the stub, then the list is either empty (nullopt) | |
// or this is the first time we consume, in which case we just move consumeEnd ahead. | |
if (node == &stub_) { | |
if (!next) { | |
return std::nullopt; | |
} | |
consumeEnd_.store(next); | |
node = next; | |
next = node->next; | |
} | |
if (next) { | |
consumeEnd_.store(next); | |
return unpackNode(node); | |
} | |
// If we don't have a `next` element, `node` should be the last item in the list, | |
// unless a new item was produced since we last loaded consumeEnd. | |
// If there was, we need to try from the start (because there would be a `next`). | |
// Instead of calling consume recursively (dangerous), we just bail and let the caller | |
// retry. | |
// I am fairly sure you could leave this check out completely and it would still work | |
// correctly, but it would be less efficient. | |
if (node != produceEnd_.load()) { | |
return std::nullopt; | |
} | |
// Assuming the check above failed (and we got here), the state of the list should be: | |
// stub -> node (consumeEnd, produceEnd) -> nullptr | |
// Since we have no next item to make the new consumeEnd, we need to put stub_ into the | |
// queue again. | |
stub_.next.store(nullptr); | |
produce(&stub_); | |
// Now we have either attached stub to `node` or to another element other producer threads | |
// might have added in the meantime. | |
// In case we have finished attaching the other element to stub_, but the other producer | |
// thread has not finished attaching `node` to the new element (i.e. it did not set | |
// node->next yet), the below condition (`if (next)`) would be false. | |
// Assuming one other producer thread the list would look like this (next != NULL): | |
// node (consumeEnd) *(-> elem) -> stub (produceEnd) | |
// or this (next is NULL): | |
// node (consumeEnd) -X- elem -> stub (produceEnd) | |
// The latter case is what Vyukov refers to saying that the consumer is blocking (see source link). | |
next = node->next.load(); | |
if (next) { | |
consumeEnd_.store(next); | |
return unpackNode(node); | |
} | |
// If the other thread has not managed to attach the new element to `node` yet, we have no | |
// other choice but to wait for it to finish, so we return nullopt. | |
return std::nullopt; | |
} | |
private: | |
struct Node { | |
T value; | |
// "next" in the order of consumption | |
std::atomic<Node*> next; | |
}; | |
static T unpackNode(Node* node) | |
{ | |
auto value = std::move(node->value); | |
delete node; | |
return value; | |
} | |
void produce(Node* node) | |
{ | |
auto prev = produceEnd_.exchange(node); | |
prev->next.store(node); | |
} | |
// This is not an actual element of the queue, but simply a place to "park" consumeEnd, when | |
// there is nothing to consume. | |
// Sadly this makes default constructability for T a requirement. | |
Node stub_; | |
// Yes, screw "head" and "tail" and everyone doing whatever they please with those words. | |
std::atomic<Node*> consumeEnd_; | |
std::atomic<Node*> produceEnd_; | |
}; | |
#include <cstdlib> | |
#include <iostream> | |
#include <string> | |
#include <thread> | |
#include <vector> | |
#include <unistd.h> | |
// You likely might have to tweak these so you sometimes get longer and shorter (and empty) queues. | |
// Probably you also want to make sure the queue size is not continuously increasing. | |
constexpr int maxNumProduces = 4; | |
constexpr size_t numThreads = 8; | |
MpscQueue<std::string> queue; | |
std::atomic<size_t> size; | |
std::atomic<size_t> producerId { 0 }; | |
void producerThreadFunc() | |
{ | |
auto id = producerId++; | |
size_t line = 0; | |
while (true) { | |
const auto num = std::rand() % maxNumProduces; | |
for (int i = 0; i < num; ++i) { | |
queue.produce("[" + std::to_string(id) + "] log " + std::to_string(line++) + "\n"); | |
size++; | |
} | |
std::cout << "queue size: " << size.load() << std::endl; | |
::usleep(1); | |
} | |
} | |
int main() | |
{ | |
std::vector<std::thread> producerThreads(numThreads); | |
for (auto& thread : producerThreads) { | |
thread = std::thread { producerThreadFunc }; | |
}; | |
while (true) { | |
const auto line = queue.consume(); | |
if (!line) { | |
::usleep(1); | |
continue; | |
} | |
size--; | |
std::cout << *line; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment