Skip to content

Instantly share code, notes, and snippets.

@JamesWP
Created September 20, 2025 22:59
Show Gist options
  • Save JamesWP/7f67da5e2852cba3795ebda6210a65c0 to your computer and use it in GitHub Desktop.
Save JamesWP/7f67da5e2852cba3795ebda6210a65c0 to your computer and use it in GitHub Desktop.
#include <cstdint>
#include <optional>
#include <unordered_map>
#include <iostream>
#include <format>
#define WFQ 1
using QueueId = uint64_t;
using Cost = uint64_t;
using QueueWeight = double;
#ifdef WFQ
using VirtualTime = double;
#endif
#if 0
// Something to flatten the tree of queues into one flat set
// Literature calls this process of taking a work item and assigning a queue 'classification'
template <typename T>
class QueueRegistry {
public:
using QueueTuple = std::tuple<int, int>;
QueueId get_queue_id_for(const T& t);
private:
std::unordered_map<QueueTuple, QueueId> _queueIds;
QueueId _nextQueue{1000};
};
#endif
// Knows only of a flat set of queues
// Knows only of the cost of the next item in each queue
// Knows only the relative weights of each queue
class Scheduler {
public:
void set_queue_next(QueueId id, std::optional<Cost> c);
void set_queue_weight(QueueId id, QueueWeight weight);
// Called after each update to check for new work
// Caller should then dispatch the work, or change the queue if no work available
// TODO: Should this consider the max cost to be sent here?
std::optional<QueueId> get_next_queue() const;
// called to inform the Scheduler of the work in progress/complete
void on_dispatch(QueueId id, Cost cost);
void on_complete(QueueId id, Cost cost);
void set_max_inflight(Cost max);
private:
// Keeps track of the state of each Queue
struct Q {
QueueWeight w;
std::optional<Cost> next;
Cost outstanding;
#ifdef WFQ
VirtualTime virtual_finish_time; // Stores the virtual finish time of the last item dispatched from this queue
#endif
};
Cost outstanding; // Total outstanding work
Cost max_outstanding; // Max allowed outstanding
#ifdef WFQ
VirtualTime _virtual_time{0.0}; // The global virtual time of the scheduler
#endif
std::unordered_map<QueueId, Q> _queues;
};
int main() {
Scheduler s;
s.set_queue_weight(1, 2);
s.set_queue_weight(2, 1);
s.set_queue_weight(3, 1);
s.set_queue_next(1, 1);
s.set_queue_next(2, 1);
s.set_queue_next(3, 1);
s.set_max_inflight(2);
auto q = s.get_next_queue(); // get next queue for dispatch
std::cout << std::format("Q {}", q.value()) << std::endl;
s.on_dispatch(q.value(), 1); // tell the scheduler we sent it
s.set_queue_next(q.value(), std::nullopt); // tell the scheduler we have no more work in that queue
auto q2 = s.get_next_queue();
std::cout << std::format("Q {}", q2.value()) << std::endl;
s.on_dispatch(q2.value(), 1);
s.set_queue_next(q2.value(), std::nullopt);
auto n = s.get_next_queue(); // due to max inflight we should not get asked to dispatch anything
std::cout << std::format("Q {} == false", static_cast<bool>(n)) << std::endl;
s.on_complete(q.value(), 1); // now we tell the scheduler we finished some work
s.on_complete(q2.value(), 1);
auto q3 = s.get_next_queue(); // we should now get told to dispatch something else
std::cout << std::format("Q {}", q3.value()) << std::endl;
s.on_dispatch(q3.value(), 1);
s.set_queue_next(q3.value(), std::nullopt);
auto n2 = s.get_next_queue(); // due to max inflight we should not get asked to dispatch anything
std::cout << std::format("Q {} == false", static_cast<bool>(n2)) << std::endl;
}
#ifdef WFQ
// Sets the cost of the next item in a queue. If optional is empty,
// it means the queue is now empty.
void Scheduler::set_queue_next(QueueId id, std::optional<Cost> c) {
// If the queue doesn't exist, create it with default values.
if (_queues.find(id) == _queues.end()) {
_queues[id] = {1.0, c, 0, _virtual_time};
} else {
_queues[id].next = c;
}
}
// Sets the weight of a queue.
void Scheduler::set_queue_weight(QueueId id, QueueWeight weight) {
if (_queues.find(id) == _queues.end()) {
_queues[id] = {weight, std::nullopt, 0, _virtual_time};
} else {
_queues[id].w = weight;
}
}
// Determines the next queue to dispatch from based on the WFQ algorithm.
// The next item to be dispatched is from the queue with the minimum
// virtual finish time that has a pending item.
std::optional<QueueId> Scheduler::get_next_queue() const {
// Return nothing if the total outstanding work has reached its limit
if (outstanding >= max_outstanding) {
return std::nullopt;
}
std::optional<QueueId> best_queue = std::nullopt;
VirtualTime min_finish_time = std::numeric_limits<VirtualTime>::max();
// Iterate through all queues to find the one with the minimum virtual finish time
for (const auto& pair : _queues) {
const QueueId& current_id = pair.first;
const auto& q = pair.second;
// Check if the queue has an item to dispatch and if dispatching it
// won't exceed the max outstanding cost.
if (q.next.has_value() && (outstanding + q.next.value() <= max_outstanding)) {
// Check if this queue is the best candidate so far
if (q.virtual_finish_time < min_finish_time) {
min_finish_time = q.virtual_finish_time;
best_queue = current_id;
}
}
}
return best_queue;
}
// Called when a work item is dispatched. This updates the total outstanding
// cost, the outstanding cost for the specific queue, and calculates the
// new virtual finish time for the queue based on the WFQ algorithm.
void Scheduler::on_dispatch(QueueId id, Cost cost) {
auto it = _queues.find(id);
if (it != _queues.end()) {
it->second.outstanding += cost;
outstanding += cost;
// The virtual finish time for the dispatched item
VirtualTime new_vft = std::max(_virtual_time, it->second.virtual_finish_time) + (static_cast<VirtualTime>(cost) / it->second.w);
// Update the queue's virtual finish time and the global virtual time
it->second.virtual_finish_time = new_vft;
_virtual_time = new_vft;
}
}
// Called when a work item is completed. This reduces the total outstanding
// cost and the outstanding cost for the specific queue.
void Scheduler::on_complete(QueueId id, Cost cost) {
auto it = _queues.find(id);
if (it != _queues.end()) {
// Ensure costs don't go below zero
if (it->second.outstanding >= cost) {
it->second.outstanding -= cost;
} else {
it->second.outstanding = 0;
}
if (outstanding >= cost) {
outstanding -= cost;
} else {
outstanding = 0;
}
}
}
// Sets the maximum allowed outstanding work.
void Scheduler::set_max_inflight(Cost max) {
max_outstanding = max;
}
#else
// Sets the cost of the next item in a queue. If optional is empty,
// it means the queue is now empty.
void Scheduler::set_queue_next(QueueId id, std::optional<Cost> c) {
if (_queues.find(id) == _queues.end()) {
_queues[id] = {1.0, c, 0}; // Default weight to 1 if not set
} else {
_queues[id].next = c;
}
}
// Sets the weight of a queue.
void Scheduler::set_queue_weight(QueueId id, QueueWeight weight) {
if (_queues.find(id) == _queues.end()) {
_queues[id] = {weight, std::nullopt, 0};
} else {
_queues[id].w = weight;
}
}
// Determines the next queue to dispatch from.
// The algorithm is simple:
// 1. Check if the total outstanding work is less than the max allowed.
// 2. Iterate through all queues.
// 3. For each queue, check if it has a next work item (cost is not nullopt).
// 4. Return the first queue found with available work. This is a very simple
// round-robin approach, as requested.
// TODO: a proper WFQ scheduler would track and update "virtual finish times"
// to ensure fairness. This simplified model does not.
std::optional<QueueId> Scheduler::get_next_queue() const {
// Return nothing if the total outstanding work has reached its limit
if (outstanding >= max_outstanding) {
return std::nullopt;
}
// Iterate through the queues and find the first one with a pending item.
// This is a simple, non-fair, "dumb" round-robin-like approach.
for (const auto& pair : _queues) {
const auto& q = pair.second;
if (q.next.has_value() && (outstanding + q.next.value() <= max_outstanding)) {
return pair.first;
}
}
// No queue has a pending item
return std::nullopt;
}
// Called when a work item is dispatched. This updates the total outstanding
// cost and the outstanding cost for the specific queue.
void Scheduler::on_dispatch(QueueId id, Cost cost) {
auto it = _queues.find(id);
if (it != _queues.end()) {
it->second.outstanding += cost;
outstanding += cost;
} else {
// Handle case where a dispatch is called for a non-existent queue,
// although this shouldn't happen with the normal workflow.
// It's good practice to log or handle this.
}
}
// Called when a work item is completed. This reduces the total outstanding
// cost and the outstanding cost for the specific queue.
void Scheduler::on_complete(QueueId id, Cost cost) {
auto it = _queues.find(id);
if (it != _queues.end()) {
// Ensure costs don't go below zero
if (it->second.outstanding >= cost) {
it->second.outstanding -= cost;
} else {
it->second.outstanding = 0;
}
if (outstanding >= cost) {
outstanding -= cost;
} else {
outstanding = 0;
}
}
}
// Sets the maximum allowed outstanding work.
void Scheduler::set_max_inflight(Cost max) {
max_outstanding = max;
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment