Created
February 24, 2018 22:30
-
-
Save arrieta/18b6ecc630d55682e1c362b59bacb957 to your computer and use it in GitHub Desktop.
Simple pattern for thread-safe, read-only access to an existing, fixed, contiguous memory slice.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
A simple pattern for thread-safe, read-only access to an existing, fixed, | |
contiguous memory slice. | |
This pattern is useful when simple chunking would be undesirable. For example: | |
A job may consist of three hard (H) tasks, each lasting two seconds, and three | |
easy (E) tasks, each lasting one second. If we simply pass the first three to | |
one thread, and the last three to a second thread, the second thread will | |
finish after three seconds, and leave the first thread to finish six seconds | |
worth of work. (total runtime: six seconds). The proposed approach would take | |
two seconds for the first two task, two seconds for tasks three to five, and | |
one second for the last task (total runtime: five seconds). | |
Job: [H1, H2, H3, E1, E2, E3] | |
First Approach (pre-fixed alterantive) | |
Seconds: 0 ----- 1 ----- 2 ----- 3 ----- 4 ----- 5 ----- 6 | |
Thread1: |--- TASK H1 ---|--- TASK H2 ---|--- TASK H3 ---| | |
Thread2: |TASK E1|TASK E2|TASK E3|******** WASTE ********| | |
Proposed Approach (one of several equivalent alternatives) | |
Seconds: 0 ----- 1 ----- 2 ----- 3 ----- 4 ----- 5 ----- 6 | |
Thread1: |--- TASK H1 ---|--- TASK H3 ---|TASK E3| | |
Thread2: |--- TASK H2 ---|TASK E1|TASK E2| WASTE | | |
An alternative equivalent to the proposed approach is to provide an | |
`std::atomic<size_t>` to serve as index, and a container size. This would be | |
useful when dealing with non-contiguous storage (but could result in O(n) | |
container access). That approach is no more efficient than the suggested one | |
because we also relay on a single atomic variable. | |
(C) 2018 Nabla Zero Labs | |
MIT License | |
*/ | |
#include <atomic> | |
#include <chrono> | |
#include <iostream> | |
#include <mutex> | |
#include <thread> | |
#include <vector> | |
std::mutex kSTDOUT_MUTEX; | |
template <typename... Args> | |
void log(Args&&... args) { | |
std::lock_guard<std::mutex> lock(kSTDOUT_MUTEX); | |
((std::cout << args << " "), ...) << std::endl; | |
} | |
// A "Job" simply references a slab of contiguous memory that needs to be | |
// processed by multiple threads in a read-only manner. | |
struct Job { | |
std::atomic<const int*> beg; // this will be incremented, but cannot modify | |
// the underlying value | |
const int* const end; // the sentinel cannot be modified | |
}; | |
struct Worker { | |
Job& job; // the "job slab" common to all workers | |
Worker(Job& job) : job{job} {} | |
void operator()() { | |
log(std::this_thread::get_id(), "starting main loop"); | |
while (job.beg != job.end) { // atomic compare | |
process(job.beg++); // atomic post-increment | |
} | |
log(std::this_thread::get_id(), "finished main loop"); | |
} | |
// do something with the data, but don't modify it. | |
void process(const int* job) { | |
log("\t", std::this_thread::get_id(), "processing job #", *job); | |
std::this_thread::sleep_for(std::chrono::seconds(2)); | |
log("\t", std::this_thread::get_id(), "finished job #", *job); | |
} | |
}; | |
int main() { | |
std::vector<int> data{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; // sample slab of work | |
Job job{data.data(), std::next(data.data(), data.size())}; | |
std::vector<std::thread> workers; | |
for (int n = 0; n < 4; ++n) { | |
workers.emplace_back(Worker(job)); | |
} | |
for (auto&& worker : workers) { | |
worker.join(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample run