Last active
June 12, 2024 20:38
-
-
Save 0x0L/a4054abd38f25f434ac1f0d2c66d3641 to your computer and use it in GitHub Desktop.
parallel_unique_merge.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
N = 200 | |
L = 10_000_000 | |
p = 0.2 | |
r = np.arange(L / p, dtype=int) | |
s = [ | |
r[np.random.rand(r.shape[0]) > 1 - p] | |
for _ in range(N) | |
] | |
length = np.array([len(x) for x in s]) | |
data = np.concatenate(s) | |
print(length) | |
print(data) | |
data.tofile("series/data") | |
length.tofile("series/length") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// c++ -Wall -Ofast -std=c++20 parallel_unique_merge.cpp -o pum | |
#include <iostream> | |
#include <filesystem> | |
#include <fstream> | |
#include <vector> | |
#include <span> | |
#include <queue> | |
#include <future> | |
template < | |
typename T, | |
typename A, | |
template <typename, typename> class C> | |
auto chunker(const C<T, A> &c, const typename C<T, A>::size_type &k) | |
{ | |
if (k <= 0) | |
throw std::domain_error("chunker() requires k > 0"); | |
using INPUT_CONTAINER_TYPE = C<T, A>; | |
using OUTPUT_CONTAINER_TYPE = C<INPUT_CONTAINER_TYPE, | |
std::allocator<INPUT_CONTAINER_TYPE>>; | |
OUTPUT_CONTAINER_TYPE out_c; | |
auto chunkBeg = std::begin(c); | |
for (auto left = c.size(); left != 0;) | |
{ | |
auto const skip = std::min(left, k); | |
INPUT_CONTAINER_TYPE sub_container; | |
std::back_insert_iterator<INPUT_CONTAINER_TYPE> back_v(sub_container); | |
std::copy_n(chunkBeg, skip, back_v); | |
out_c.push_back(sub_container); | |
left -= skip; | |
std::advance(chunkBeg, skip); | |
} | |
return out_c; | |
} | |
template <typename T> | |
auto read_file(std::string path) | |
{ | |
std::ifstream ifs(path, std::ios::binary); | |
auto sz = std::filesystem::file_size(path); | |
std::vector<T> buf(sz / sizeof(T)); | |
ifs.read(reinterpret_cast<char *>(buf.data()), sz); | |
return buf; | |
} | |
template <typename T> | |
auto split_arrays(const std::vector<T> &data, const std::vector<int64_t> &lengths) | |
{ | |
std::vector<std::span<const T>> arr(lengths.size()); | |
size_t pos = 0; | |
for (size_t i = 0; i < lengths.size(); i++) | |
{ | |
arr[i] = std::span(&data[pos], lengths[i]); | |
pos += lengths[i]; | |
} | |
return arr; | |
} | |
struct FrontCompare | |
{ | |
template <typename T> | |
inline bool operator()(const std::span<const T> &x, const std::span<const T> &y) const | |
{ | |
return x.front() > y.front(); | |
} | |
}; | |
template <typename T> | |
auto unique_sorted(const std::vector<std::span<const T>> &inputs) | |
{ | |
std::vector<T> output; | |
using U = std::span<const T>; | |
std::priority_queue<U, std::vector<U>, FrontCompare> priority_queue; | |
for (auto &v : inputs) | |
{ | |
if (!v.empty()) | |
priority_queue.emplace(v); | |
} | |
auto longest = std::max_element( | |
inputs.begin(), inputs.end(), | |
[](const auto &x, const auto &y) | |
{ return x.size() < y.size(); }); | |
output.reserve(longest->size()); | |
while (!priority_queue.empty()) | |
{ | |
auto top = priority_queue.top(); | |
auto val = top.front(); | |
output.push_back(val); | |
while (top.front() == val) | |
{ | |
priority_queue.pop(); | |
if (top.size() > 1) | |
priority_queue.emplace(top.subspan(1)); | |
if (priority_queue.empty()) | |
break; | |
top = priority_queue.top(); | |
} | |
} | |
return output; | |
} | |
template <typename T> | |
auto parallel_unique_sorted(const std::vector<std::span<const T>> &inputs) | |
{ | |
size_t sz = inputs.size(); | |
size_t n_threads = std::min((size_t)std::thread::hardware_concurrency(), sz); | |
size_t chunksize = sz / n_threads; | |
//std::cout << "num_threads " << num_threads << " chunksize " << chunksize << std::endl; | |
auto chunks = chunker(inputs, chunksize); | |
std::vector<std::future<std::vector<T>>> futures; | |
for (auto &c : chunks) | |
futures.emplace_back(std::async(std::launch::async, unique_sorted<T>, std::cref(c))); | |
std::vector<std::vector<T>> outputs; | |
for (auto &f : futures) | |
outputs.emplace_back(f.get()); | |
std::vector<std::span<const T>> as_span; | |
for (auto &x : outputs) | |
as_span.emplace_back(std::span{x}); | |
return unique_sorted(as_span); | |
} | |
int main() | |
{ | |
auto lengths = read_file<int64_t>("series/length"); | |
auto data = read_file<int64_t>("series/data"); | |
// std::vector<int64_t> lengths {4, 5}; | |
// std::vector<int64_t> data {2, 5, 6, 8, 1, 2, 3, 4, 7}; | |
auto arr = split_arrays(data, lengths); | |
auto result = parallel_unique_sorted(arr); | |
std::cout << result.size() << std::endl; | |
for (size_t i = 0; i < 3; i++) | |
{ | |
std::cout << result[i] << ", "; | |
} | |
std::cout << " ... "; | |
for (size_t i = result.size() - 3; i < result.size(); i++) | |
{ | |
std::cout << result[i] << ", "; | |
} | |
std::cout << std::endl; | |
// for (auto z : result) { | |
// std::cout << z << ", "; | |
// } | |
// std::cout << std::endl; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pyarrow as pa | |
lengths = np.fromfile("series/length", dtype=int) | |
data = np.fromfile("series/data", dtype=int) | |
tbl = pa.table({"ts": data}) | |
df = tbl["ts"] | |
df = df.unique().sort() | |
df = df.to_numpy() | |
print(df) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pyarrow as pa | |
from datafusion import SessionContext, col | |
lengths = np.fromfile("series/length", dtype=int) | |
data = np.fromfile("series/data", dtype=int) | |
# arr = np.split(data, np.cumsum(lengths)[:-1]) | |
# tables = [ | |
# pa.table({"ts": x}) | |
# for x in arr | |
# ] | |
tbl = pa.table({"ts": data}) | |
ctx = SessionContext() | |
df = ctx.from_arrow_table(tbl) | |
df = df.distinct().sort(col("ts").sort()) | |
df = df.to_arrow_table()["ts"].to_numpy() | |
print(len(df)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
lengths = np.fromfile("series/length", dtype=int) | |
data = np.fromfile("series/data", dtype=int) | |
df = np.sort(np.unique(data)) | |
print(len(df)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pyarrow as pa | |
import polars as pl | |
lengths = np.fromfile("series/length", dtype=int) | |
data = np.fromfile("series/data", dtype=int) | |
tbl = pa.table({"ts": data}) | |
df = pl.from_arrow(tbl) | |
df = df.unique().sort("ts") | |
df = df.to_arrow()["ts"].to_numpy() | |
print(len(df)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment