Created
June 10, 2024 22:06
-
-
Save 0x0L/02f693e74380b59489966b1955bc17d3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <algorithm> | |
#include <iostream> | |
#include <string> | |
#include <thread> | |
#include <vector> | |
#include <future> | |
#include <queue> | |
#include <condition_variable> | |
#include <mutex> | |
#include <functional> | |
#include <algorithm> | |
#include <iterator> | |
// #include "ThreadPool.hpp" | |
class ThreadPool | |
{ | |
public: | |
ThreadPool(size_t num_threads) : stop(false) | |
{ | |
for (size_t i = 0; i < num_threads; i++) | |
{ | |
workers.emplace_back([this] | |
{ | |
for(;;) { | |
std::function<void()> task; | |
{ | |
std::unique_lock<std::mutex> lock(this->queue_mutex); | |
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); | |
if (this->stop && this->tasks.empty()) return; | |
task = std::move(this->tasks.front()); | |
this->tasks.pop(); | |
} | |
task(); | |
} }); | |
} | |
} | |
~ThreadPool() | |
{ | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex); | |
stop = true; | |
} | |
condition.notify_all(); | |
for (std::thread &worker : workers) | |
{ | |
worker.join(); | |
} | |
} | |
template <class F, class... Args> | |
auto enqueue(F &&f, Args &&...args) -> std::future<typename std::invoke_result_t<F, Args...>> | |
{ | |
using return_type = typename std::invoke_result_t<F, Args...>; | |
auto task = std::make_shared<std::packaged_task<return_type()>>( | |
std::bind(std::forward<F>(f), std::forward<Args>(args)...)); | |
std::future<return_type> res = task->get_future(); | |
{ | |
std::unique_lock<std::mutex> lock(queue_mutex); | |
if (stop) | |
throw std::runtime_error("enqueue on stopped ThreadPool"); | |
tasks.emplace([task]() | |
{ (*task)(); }); | |
} | |
condition.notify_one(); | |
return res; | |
} | |
private: | |
std::vector<std::thread> workers; | |
std::queue<std::function<void()>> tasks; | |
std::mutex queue_mutex; | |
std::condition_variable condition; | |
bool stop; | |
}; | |
std::vector<int> | |
parallel_unique_merge1(const std::vector<std::vector<int>> &sorted_vectors) | |
{ | |
size_t num_threads = std::thread::hardware_concurrency(); | |
std::cout << "Using " << num_threads << " threads." << std::endl; | |
ThreadPool pool(num_threads); | |
std::vector<std::future<std::vector<int>>> futures; | |
for (const auto &vec : sorted_vectors) | |
{ | |
futures.emplace_back(pool.enqueue([vec] | |
{ return vec; })); | |
} | |
while (futures.size() > 1) | |
{ | |
std::vector<std::future<std::vector<int>>> new_futures; | |
for (int i = 0; i < futures.size(); i += 2) | |
{ | |
if (i + 1 < futures.size()) | |
{ | |
auto future1 = std::move(futures[i]); | |
auto future2 = std::move(futures[i + 1]); | |
new_futures.emplace_back( | |
pool.enqueue([future1 = std::move(future1), future2 = std::move(future2)]() mutable | |
{ | |
std::vector<int> vec1 = future1.get(); | |
std::vector<int> vec2 = future2.get(); | |
std::vector<int> merged; | |
std::set_union(vec1.cbegin(), vec1.cend(), vec2.cbegin(), vec2.cend(), std::back_inserter(merged)); | |
return merged; })); | |
} | |
else | |
{ | |
new_futures.push_back(std::move(futures[i])); | |
} | |
} | |
futures = std::move(new_futures); | |
} | |
return futures[0].get(); | |
} | |
void parallel_merge(const std::vector<std::vector<int>> &sorted_vectors, | |
std::vector<int> &result, size_t start_idx, | |
size_t end_idx) | |
{ | |
if (end_idx - start_idx == 1) | |
{ | |
result = sorted_vectors[start_idx]; | |
return; | |
} | |
size_t mid_idx = start_idx + (end_idx - start_idx) / 2; | |
std::vector<int> left_result, right_result; | |
std::thread left_thread(parallel_merge, std::cref(sorted_vectors), | |
std::ref(left_result), start_idx, mid_idx); | |
std::thread right_thread(parallel_merge, std::cref(sorted_vectors), | |
std::ref(right_result), mid_idx, end_idx); | |
left_thread.join(); | |
right_thread.join(); | |
std::set_union(left_result.cbegin(), left_result.cend(), | |
right_result.cbegin(), right_result.cend(), | |
std::back_inserter(result)); | |
} | |
std::vector<int> | |
parallel_unique_merge2(const std::vector<std::vector<int>> &sorted_vectors) | |
{ | |
std::vector<int> final_result; | |
parallel_merge(sorted_vectors, final_result, 0, sorted_vectors.size()); | |
return final_result; | |
} | |
std::vector<int> | |
parallel_unique_merge3(const std::vector<std::vector<int>> &sorted_vectors) | |
{ | |
using T = std::tuple<int, size_t, size_t, size_t>; | |
std::priority_queue<T, std::vector<T>, std::greater<T>> priority_queue; | |
for (size_t i = 0; i < sorted_vectors.size(); i++) | |
{ | |
auto &v = sorted_vectors[i]; | |
if (v.empty()) | |
continue; | |
priority_queue.emplace(std::make_tuple(v[0], i, 0, v.size())); | |
} | |
std::vector<int> unique; | |
while (!priority_queue.empty()) | |
{ | |
int x = std::get<0>(priority_queue.top()); | |
unique.push_back(x); | |
// std::cout << x << std::endl; | |
while (!priority_queue.empty() && std::get<0>(priority_queue.top()) == x) | |
{ | |
auto &top = priority_queue.top(); | |
int new_pos = 1 + std::get<2>(top); | |
int size = std::get<3>(top); | |
if (new_pos < size) | |
{ | |
size_t idx = std::get<1>(top); | |
priority_queue.emplace(std::make_tuple(sorted_vectors[idx][new_pos], idx, new_pos, size)); | |
} | |
priority_queue.pop(); | |
// std::cout << "\t" << x << " " << priority_queue.size() << std::endl; | |
} | |
// std::cout << x << " " << priority_queue.size() << std::endl; | |
} | |
return unique; | |
} | |
int main() | |
{ | |
// std::vector<std::vector<int>> sorted_vectors = {{2, 5, 6, 8}, | |
// {1, 2, 3, 4, 7}}; | |
std::vector<std::vector<int>> sorted_vectors; | |
for (size_t i = 0; i < 200; i++) | |
{ | |
std::vector<int> x(1000000); | |
for (size_t j = 0; j < x.size(); j++) | |
{ | |
x[j] = j; | |
} | |
sorted_vectors.emplace_back(x); | |
} | |
std::cout << "Starting" << std::endl; | |
std::vector<int> output = parallel_unique_merge1(sorted_vectors); | |
// for (const int &x : output) | |
// { | |
// std::cout << x << " "; | |
// } | |
// std::cout << output.size(); | |
// std::cout << std::endl; | |
// std::vector<int> x{ 2, 5, 6 }; | |
// std::vector<int> y{ 1, 2, 3, 4, 7}; | |
// std::vector<int> z; | |
// std::set_union(x.cbegin(), x.cend(), y.cbegin(), y.cend(), | |
// std::back_inserter(z)); std::cout << z.size() << std::endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment