Skip to content

Instantly share code, notes, and snippets.

@0x0L
Created June 10, 2024 22:06
Show Gist options
  • Save 0x0L/02f693e74380b59489966b1955bc17d3 to your computer and use it in GitHub Desktop.
Save 0x0L/02f693e74380b59489966b1955bc17d3 to your computer and use it in GitHub Desktop.
#include <algorithm>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <future>
#include <queue>
#include <condition_variable>
#include <mutex>
#include <functional>
#include <algorithm>
#include <iterator>
// #include "ThreadPool.hpp"
class ThreadPool
{
public:
ThreadPool(size_t num_threads) : stop(false)
{
for (size_t i = 0; i < num_threads; i++)
{
workers.emplace_back([this]
{
for(;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty()) return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
} });
}
}
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers)
{
worker.join();
}
}
template <class F, class... Args>
auto enqueue(F &&f, Args &&...args) -> std::future<typename std::invoke_result_t<F, Args...>>
{
using return_type = typename std::invoke_result_t<F, Args...>;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]()
{ (*task)(); });
}
condition.notify_one();
return res;
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
std::vector<int>
parallel_unique_merge1(const std::vector<std::vector<int>> &sorted_vectors)
{
size_t num_threads = std::thread::hardware_concurrency();
std::cout << "Using " << num_threads << " threads." << std::endl;
ThreadPool pool(num_threads);
std::vector<std::future<std::vector<int>>> futures;
for (const auto &vec : sorted_vectors)
{
futures.emplace_back(pool.enqueue([vec]
{ return vec; }));
}
while (futures.size() > 1)
{
std::vector<std::future<std::vector<int>>> new_futures;
for (int i = 0; i < futures.size(); i += 2)
{
if (i + 1 < futures.size())
{
auto future1 = std::move(futures[i]);
auto future2 = std::move(futures[i + 1]);
new_futures.emplace_back(
pool.enqueue([future1 = std::move(future1), future2 = std::move(future2)]() mutable
{
std::vector<int> vec1 = future1.get();
std::vector<int> vec2 = future2.get();
std::vector<int> merged;
std::set_union(vec1.cbegin(), vec1.cend(), vec2.cbegin(), vec2.cend(), std::back_inserter(merged));
return merged; }));
}
else
{
new_futures.push_back(std::move(futures[i]));
}
}
futures = std::move(new_futures);
}
return futures[0].get();
}
void parallel_merge(const std::vector<std::vector<int>> &sorted_vectors,
std::vector<int> &result, size_t start_idx,
size_t end_idx)
{
if (end_idx - start_idx == 1)
{
result = sorted_vectors[start_idx];
return;
}
size_t mid_idx = start_idx + (end_idx - start_idx) / 2;
std::vector<int> left_result, right_result;
std::thread left_thread(parallel_merge, std::cref(sorted_vectors),
std::ref(left_result), start_idx, mid_idx);
std::thread right_thread(parallel_merge, std::cref(sorted_vectors),
std::ref(right_result), mid_idx, end_idx);
left_thread.join();
right_thread.join();
std::set_union(left_result.cbegin(), left_result.cend(),
right_result.cbegin(), right_result.cend(),
std::back_inserter(result));
}
std::vector<int>
parallel_unique_merge2(const std::vector<std::vector<int>> &sorted_vectors)
{
std::vector<int> final_result;
parallel_merge(sorted_vectors, final_result, 0, sorted_vectors.size());
return final_result;
}
std::vector<int>
parallel_unique_merge3(const std::vector<std::vector<int>> &sorted_vectors)
{
using T = std::tuple<int, size_t, size_t, size_t>;
std::priority_queue<T, std::vector<T>, std::greater<T>> priority_queue;
for (size_t i = 0; i < sorted_vectors.size(); i++)
{
auto &v = sorted_vectors[i];
if (v.empty())
continue;
priority_queue.emplace(std::make_tuple(v[0], i, 0, v.size()));
}
std::vector<int> unique;
while (!priority_queue.empty())
{
int x = std::get<0>(priority_queue.top());
unique.push_back(x);
// std::cout << x << std::endl;
while (!priority_queue.empty() && std::get<0>(priority_queue.top()) == x)
{
auto &top = priority_queue.top();
int new_pos = 1 + std::get<2>(top);
int size = std::get<3>(top);
if (new_pos < size)
{
size_t idx = std::get<1>(top);
priority_queue.emplace(std::make_tuple(sorted_vectors[idx][new_pos], idx, new_pos, size));
}
priority_queue.pop();
// std::cout << "\t" << x << " " << priority_queue.size() << std::endl;
}
// std::cout << x << " " << priority_queue.size() << std::endl;
}
return unique;
}
int main()
{
// std::vector<std::vector<int>> sorted_vectors = {{2, 5, 6, 8},
// {1, 2, 3, 4, 7}};
std::vector<std::vector<int>> sorted_vectors;
for (size_t i = 0; i < 200; i++)
{
std::vector<int> x(1000000);
for (size_t j = 0; j < x.size(); j++)
{
x[j] = j;
}
sorted_vectors.emplace_back(x);
}
std::cout << "Starting" << std::endl;
std::vector<int> output = parallel_unique_merge1(sorted_vectors);
// for (const int &x : output)
// {
// std::cout << x << " ";
// }
// std::cout << output.size();
// std::cout << std::endl;
// std::vector<int> x{ 2, 5, 6 };
// std::vector<int> y{ 1, 2, 3, 4, 7};
// std::vector<int> z;
// std::set_union(x.cbegin(), x.cend(), y.cbegin(), y.cend(),
// std::back_inserter(z)); std::cout << z.size() << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment