Created
May 21, 2024 19:35
-
-
Save scc-tw/b62b180771e0af2f5c3c6055e11594cf to your computer and use it in GitHub Desktop.
Efficient Resource Management and Evaluation of Large-scale Node Processing in C++
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <memory> | |
#include <vector> | |
#include <string> | |
#include <stdexcept> | |
#include <stack> | |
#include <chrono> | |
// Resource pool for data locality | |
constexpr auto file_max_number = 20'000'000; | |
constexpr auto process_max_number = 1'000'000; | |
struct ResourcePool | |
{ | |
std::vector<std::string> fileNamePool{file_max_number}; | |
std::vector<std::string> fileHashPool{file_max_number}; | |
std::vector<std::string> processNamePool{process_max_number}; | |
std::vector<std::string> processTcpPool{process_max_number}; | |
}; | |
ResourcePool resourcePool; | |
enum class NodeType | |
{ | |
File, | |
Process, | |
Combine, | |
Processor, | |
Unknown | |
}; | |
struct ScanExpression | |
{ | |
virtual ~ScanExpression() = default; | |
virtual void evaluate() = 0; | |
virtual int isLeaf() const { return 0; } | |
virtual NodeType getType() const { return NodeType::Unknown; } | |
}; | |
struct ScanNode : public ScanExpression | |
{ | |
bool evaluated = false; | |
virtual void process() const = 0; | |
__always_inline void evaluate() override | |
{ | |
if (!evaluated) | |
{ | |
process(); | |
evaluated = true; | |
} | |
} | |
}; | |
struct ScanProcessor; | |
struct FileNode; | |
struct ProcessNode; | |
struct CombineProcessor; | |
struct FileNode : public ScanNode | |
{ | |
size_t filenameIndex; | |
FileNode(size_t index) : filenameIndex(index) {} | |
FileNode(FileNode &&) = default; | |
FileNode &operator=(FileNode &&) = default; | |
__always_inline void process() const override | |
{ | |
std::clog << "Processing file: " << resourcePool.fileNamePool[filenameIndex] << "\n"; | |
} | |
NodeType getType() const override { return NodeType::File; } | |
int isLeaf() const override { return 1; } | |
}; | |
struct ProcessNode : public ScanNode | |
{ | |
size_t processNameIndex; | |
ProcessNode(size_t index) : processNameIndex(index) {} | |
ProcessNode(ProcessNode &&) = default; | |
ProcessNode &operator=(ProcessNode &&) = default; | |
__always_inline void process() const override | |
{ | |
std::clog << "Executing process: " << resourcePool.processNamePool[processNameIndex] << "\n"; | |
} | |
NodeType getType() const override { return NodeType::Process; } | |
int isLeaf() const override { return 1; } | |
}; | |
struct ScanProcessor : public ScanNode | |
{ | |
std::shared_ptr<ScanNode> lhs; | |
std::shared_ptr<ScanNode> rhs; | |
template <typename Left, typename Right> | |
ScanProcessor(std::shared_ptr<Left> left, std::shared_ptr<Right> right) | |
: lhs(std::static_pointer_cast<ScanNode>(left)), rhs(std::static_pointer_cast<ScanNode>(right)) | |
{ | |
static_assert(std::is_base_of<ScanNode, Left>::value, "Left must derive from ScanNode"); | |
static_assert(std::is_base_of<ScanNode, Right>::value, "Right must derive from ScanNode"); | |
} | |
ScanProcessor(ScanProcessor &&) = default; | |
ScanProcessor &operator=(ScanProcessor &&) = default; | |
__always_inline void evaluate() override | |
{ | |
std::stack<std::weak_ptr<ScanNode>> eval_seq_stack; | |
std::vector<std::weak_ptr<ScanNode>> eval_seq_queue; | |
eval_seq_stack.push(lhs); | |
eval_seq_stack.push(rhs); | |
while (!eval_seq_stack.empty()) | |
{ | |
auto node = eval_seq_stack.top(); | |
eval_seq_stack.pop(); | |
auto node_ptr = node.lock(); | |
if (!node_ptr->isLeaf()) | |
{ | |
auto processor = std::static_pointer_cast<ScanProcessor>(node_ptr); | |
eval_seq_stack.push(processor->rhs); | |
eval_seq_stack.push(processor->lhs); | |
} | |
else | |
{ | |
eval_seq_queue.push_back(node); | |
} | |
} | |
std::clog << "Evaluating nodes" << "\n"; | |
std::clog << "Node size: " << eval_seq_queue.size() << "\n"; | |
for (auto it = eval_seq_queue.rbegin(); it != eval_seq_queue.rend(); ++it) | |
{ | |
auto node = it->lock(); | |
if (node) | |
node->evaluate(); | |
} | |
std::clog << "Finished evaluating nodes" << "\n"; | |
} | |
NodeType getType() const override { return NodeType::Processor; } | |
}; | |
struct CombineProcessor : public ScanProcessor | |
{ | |
template <typename Left, typename Right> | |
CombineProcessor(std::shared_ptr<Left> left, std::shared_ptr<Right> right) | |
: ScanProcessor(left, right) | |
{ | |
static_assert(std::is_base_of<ScanNode, Left>::value, "Left must derive from ScanNode"); | |
static_assert(std::is_base_of<ScanNode, Right>::value, "Right must derive from ScanNode"); | |
} | |
CombineProcessor(CombineProcessor &&) = default; | |
CombineProcessor &operator=(CombineProcessor &&) = default; | |
__always_inline void process() const override | |
{ | |
std::clog << "Combining results from nodes" << "\n"; | |
} | |
NodeType getType() const override { return NodeType::Combine; } | |
}; | |
template <typename NodeType, typename... Args> | |
std::shared_ptr<NodeType> make_node(Args &&...args) | |
{ | |
return std::make_shared<NodeType>(std::forward<Args>(args)...); | |
} | |
int main() | |
{ | |
// Initialize resource pool data | |
for (size_t i = 0; i < file_max_number; ++i) | |
{ | |
resourcePool.fileNamePool[i] = "file" + std::to_string(i) + ".txt"; | |
resourcePool.fileHashPool[i] = "hash" + std::to_string(i); | |
} | |
for (size_t i = 0; i < process_max_number; ++i) | |
{ | |
resourcePool.processNamePool[i] = "Process " + std::to_string(i); | |
resourcePool.processTcpPool[i] = "TCP " + std::to_string(i); | |
} | |
auto FilePool = std::vector<std::shared_ptr<FileNode>>(file_max_number); | |
for (size_t i = 0; i < file_max_number; ++i) | |
{ | |
FilePool[i] = make_node<FileNode>(i); | |
} | |
auto ProcessPool = std::vector<std::shared_ptr<ProcessNode>>(process_max_number); | |
for (size_t i = 0; i < process_max_number; ++i) | |
{ | |
ProcessPool[i] = make_node<ProcessNode>(i); | |
} | |
auto combinedNodes = make_node<CombineProcessor>(FilePool[0], FilePool[1]); | |
for (size_t i = 2; i < file_max_number; ++i) | |
{ | |
combinedNodes = make_node<CombineProcessor>(combinedNodes, FilePool[i]); | |
} | |
for (size_t i = 0; i < process_max_number; ++i) | |
{ | |
combinedNodes = make_node<CombineProcessor>(combinedNodes, ProcessPool[i]); | |
} | |
auto start = std::chrono::high_resolution_clock::now(); | |
combinedNodes->evaluate(); | |
auto end = std::chrono::high_resolution_clock::now(); | |
std::chrono::duration<double> elapsed = end - start; | |
std::cout << "For " << file_max_number << " files and " << process_max_number << " processes, elapsed time: " << elapsed.count() << "s\n"; | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ulimit -s 1048576 | |
➜ test ulimit -a | |
-t: cpu time (seconds) unlimited | |
-f: file size (blocks) unlimited | |
-d: data seg size (kbytes) unlimited | |
-s: stack size (kbytes) 1048576 | |
-c: core file size (blocks) unlimited | |
-m: resident set size (kbytes) unlimited | |
-u: processes 127280 | |
-n: file descriptors 1024 | |
-l: locked-in-memory size (kbytes) 8192 | |
-v: address space (kbytes) unlimited | |
-x: file locks unlimited | |
-i: pending signals 127280 | |
-q: bytes in POSIX msg queues 819200 | |
-e: max nice 0 | |
-r: max rt priority 0 | |
-N 15: rt cpu time (microseconds) unlimited |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment