Created
September 20, 2025 22:59
-
-
Save JamesWP/7f67da5e2852cba3795ebda6210a65c0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <cstdint> | |
| #include <optional> | |
| #include <unordered_map> | |
| #include <iostream> | |
| #include <format> | |
| #define WFQ 1 | |
| using QueueId = uint64_t; | |
| using Cost = uint64_t; | |
| using QueueWeight = double; | |
| #ifdef WFQ | |
| using VirtualTime = double; | |
| #endif | |
| #if 0 | |
| // Something to flatten the tree of queues into one flat set | |
| // Literature calls this process of taking a work item and assigning a queue 'classification' | |
| template <typename T> | |
| class QueueRegistry { | |
| public: | |
| using QueueTuple = std::tuple<int, int>; | |
| QueueId get_queue_id_for(const T& t); | |
| private: | |
| std::unordered_map<QueueTuple, QueueId> _queueIds; | |
| QueueId _nextQueue{1000}; | |
| }; | |
| #endif | |
| // Knows only of a flat set of queues | |
| // Knows only of the cost of the next item in each queue | |
| // Knows only the relative weights of each queue | |
| class Scheduler { | |
| public: | |
| void set_queue_next(QueueId id, std::optional<Cost> c); | |
| void set_queue_weight(QueueId id, QueueWeight weight); | |
| // Called after each update to check for new work | |
| // Caller should then dispatch the work, or change the queue if no work available | |
| // TODO: Should this consider the max cost to be sent here? | |
| std::optional<QueueId> get_next_queue() const; | |
| // called to inform the Scheduler of the work in progress/complete | |
| void on_dispatch(QueueId id, Cost cost); | |
| void on_complete(QueueId id, Cost cost); | |
| void set_max_inflight(Cost max); | |
| private: | |
| // Keeps track of the state of each Queue | |
| struct Q { | |
| QueueWeight w; | |
| std::optional<Cost> next; | |
| Cost outstanding; | |
| #ifdef WFQ | |
| VirtualTime virtual_finish_time; // Stores the virtual finish time of the last item dispatched from this queue | |
| #endif | |
| }; | |
| Cost outstanding; // Total outstanding work | |
| Cost max_outstanding; // Max allowed outstanding | |
| #ifdef WFQ | |
| VirtualTime _virtual_time{0.0}; // The global virtual time of the scheduler | |
| #endif | |
| std::unordered_map<QueueId, Q> _queues; | |
| }; | |
| int main() { | |
| Scheduler s; | |
| s.set_queue_weight(1, 2); | |
| s.set_queue_weight(2, 1); | |
| s.set_queue_weight(3, 1); | |
| s.set_queue_next(1, 1); | |
| s.set_queue_next(2, 1); | |
| s.set_queue_next(3, 1); | |
| s.set_max_inflight(2); | |
| auto q = s.get_next_queue(); // get next queue for dispatch | |
| std::cout << std::format("Q {}", q.value()) << std::endl; | |
| s.on_dispatch(q.value(), 1); // tell the scheduler we sent it | |
| s.set_queue_next(q.value(), std::nullopt); // tell the scheduler we have no more work in that queue | |
| auto q2 = s.get_next_queue(); | |
| std::cout << std::format("Q {}", q2.value()) << std::endl; | |
| s.on_dispatch(q2.value(), 1); | |
| s.set_queue_next(q2.value(), std::nullopt); | |
| auto n = s.get_next_queue(); // due to max inflight we should not get asked to dispatch anything | |
| std::cout << std::format("Q {} == false", static_cast<bool>(n)) << std::endl; | |
| s.on_complete(q.value(), 1); // now we tell the scheduler we finished some work | |
| s.on_complete(q2.value(), 1); | |
| auto q3 = s.get_next_queue(); // we should now get told to dispatch something else | |
| std::cout << std::format("Q {}", q3.value()) << std::endl; | |
| s.on_dispatch(q3.value(), 1); | |
| s.set_queue_next(q3.value(), std::nullopt); | |
| auto n2 = s.get_next_queue(); // due to max inflight we should not get asked to dispatch anything | |
| std::cout << std::format("Q {} == false", static_cast<bool>(n2)) << std::endl; | |
| } | |
| #ifdef WFQ | |
| // Sets the cost of the next item in a queue. If optional is empty, | |
| // it means the queue is now empty. | |
| void Scheduler::set_queue_next(QueueId id, std::optional<Cost> c) { | |
| // If the queue doesn't exist, create it with default values. | |
| if (_queues.find(id) == _queues.end()) { | |
| _queues[id] = {1.0, c, 0, _virtual_time}; | |
| } else { | |
| _queues[id].next = c; | |
| } | |
| } | |
| // Sets the weight of a queue. | |
| void Scheduler::set_queue_weight(QueueId id, QueueWeight weight) { | |
| if (_queues.find(id) == _queues.end()) { | |
| _queues[id] = {weight, std::nullopt, 0, _virtual_time}; | |
| } else { | |
| _queues[id].w = weight; | |
| } | |
| } | |
| // Determines the next queue to dispatch from based on the WFQ algorithm. | |
| // The next item to be dispatched is from the queue with the minimum | |
| // virtual finish time that has a pending item. | |
| std::optional<QueueId> Scheduler::get_next_queue() const { | |
| // Return nothing if the total outstanding work has reached its limit | |
| if (outstanding >= max_outstanding) { | |
| return std::nullopt; | |
| } | |
| std::optional<QueueId> best_queue = std::nullopt; | |
| VirtualTime min_finish_time = std::numeric_limits<VirtualTime>::max(); | |
| // Iterate through all queues to find the one with the minimum virtual finish time | |
| for (const auto& pair : _queues) { | |
| const QueueId& current_id = pair.first; | |
| const auto& q = pair.second; | |
| // Check if the queue has an item to dispatch and if dispatching it | |
| // won't exceed the max outstanding cost. | |
| if (q.next.has_value() && (outstanding + q.next.value() <= max_outstanding)) { | |
| // Check if this queue is the best candidate so far | |
| if (q.virtual_finish_time < min_finish_time) { | |
| min_finish_time = q.virtual_finish_time; | |
| best_queue = current_id; | |
| } | |
| } | |
| } | |
| return best_queue; | |
| } | |
| // Called when a work item is dispatched. This updates the total outstanding | |
| // cost, the outstanding cost for the specific queue, and calculates the | |
| // new virtual finish time for the queue based on the WFQ algorithm. | |
| void Scheduler::on_dispatch(QueueId id, Cost cost) { | |
| auto it = _queues.find(id); | |
| if (it != _queues.end()) { | |
| it->second.outstanding += cost; | |
| outstanding += cost; | |
| // The virtual finish time for the dispatched item | |
| VirtualTime new_vft = std::max(_virtual_time, it->second.virtual_finish_time) + (static_cast<VirtualTime>(cost) / it->second.w); | |
| // Update the queue's virtual finish time and the global virtual time | |
| it->second.virtual_finish_time = new_vft; | |
| _virtual_time = new_vft; | |
| } | |
| } | |
| // Called when a work item is completed. This reduces the total outstanding | |
| // cost and the outstanding cost for the specific queue. | |
| void Scheduler::on_complete(QueueId id, Cost cost) { | |
| auto it = _queues.find(id); | |
| if (it != _queues.end()) { | |
| // Ensure costs don't go below zero | |
| if (it->second.outstanding >= cost) { | |
| it->second.outstanding -= cost; | |
| } else { | |
| it->second.outstanding = 0; | |
| } | |
| if (outstanding >= cost) { | |
| outstanding -= cost; | |
| } else { | |
| outstanding = 0; | |
| } | |
| } | |
| } | |
| // Sets the maximum allowed outstanding work. | |
| void Scheduler::set_max_inflight(Cost max) { | |
| max_outstanding = max; | |
| } | |
| #else | |
| // Sets the cost of the next item in a queue. If optional is empty, | |
| // it means the queue is now empty. | |
| void Scheduler::set_queue_next(QueueId id, std::optional<Cost> c) { | |
| if (_queues.find(id) == _queues.end()) { | |
| _queues[id] = {1.0, c, 0}; // Default weight to 1 if not set | |
| } else { | |
| _queues[id].next = c; | |
| } | |
| } | |
| // Sets the weight of a queue. | |
| void Scheduler::set_queue_weight(QueueId id, QueueWeight weight) { | |
| if (_queues.find(id) == _queues.end()) { | |
| _queues[id] = {weight, std::nullopt, 0}; | |
| } else { | |
| _queues[id].w = weight; | |
| } | |
| } | |
| // Determines the next queue to dispatch from. | |
| // The algorithm is simple: | |
| // 1. Check if the total outstanding work is less than the max allowed. | |
| // 2. Iterate through all queues. | |
| // 3. For each queue, check if it has a next work item (cost is not nullopt). | |
| // 4. Return the first queue found with available work. This is a very simple | |
| // round-robin approach, as requested. | |
| // TODO: a proper WFQ scheduler would track and update "virtual finish times" | |
| // to ensure fairness. This simplified model does not. | |
| std::optional<QueueId> Scheduler::get_next_queue() const { | |
| // Return nothing if the total outstanding work has reached its limit | |
| if (outstanding >= max_outstanding) { | |
| return std::nullopt; | |
| } | |
| // Iterate through the queues and find the first one with a pending item. | |
| // This is a simple, non-fair, "dumb" round-robin-like approach. | |
| for (const auto& pair : _queues) { | |
| const auto& q = pair.second; | |
| if (q.next.has_value() && (outstanding + q.next.value() <= max_outstanding)) { | |
| return pair.first; | |
| } | |
| } | |
| // No queue has a pending item | |
| return std::nullopt; | |
| } | |
| // Called when a work item is dispatched. This updates the total outstanding | |
| // cost and the outstanding cost for the specific queue. | |
| void Scheduler::on_dispatch(QueueId id, Cost cost) { | |
| auto it = _queues.find(id); | |
| if (it != _queues.end()) { | |
| it->second.outstanding += cost; | |
| outstanding += cost; | |
| } else { | |
| // Handle case where a dispatch is called for a non-existent queue, | |
| // although this shouldn't happen with the normal workflow. | |
| // It's good practice to log or handle this. | |
| } | |
| } | |
| // Called when a work item is completed. This reduces the total outstanding | |
| // cost and the outstanding cost for the specific queue. | |
| void Scheduler::on_complete(QueueId id, Cost cost) { | |
| auto it = _queues.find(id); | |
| if (it != _queues.end()) { | |
| // Ensure costs don't go below zero | |
| if (it->second.outstanding >= cost) { | |
| it->second.outstanding -= cost; | |
| } else { | |
| it->second.outstanding = 0; | |
| } | |
| if (outstanding >= cost) { | |
| outstanding -= cost; | |
| } else { | |
| outstanding = 0; | |
| } | |
| } | |
| } | |
| // Sets the maximum allowed outstanding work. | |
| void Scheduler::set_max_inflight(Cost max) { | |
| max_outstanding = max; | |
| } | |
| #endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment