Skip to content

Instantly share code, notes, and snippets.

@nyorain
Last active May 10, 2021 09:09
Show Gist options
  • Save nyorain/e4e835a7334d990f083c6d88a8066364 to your computer and use it in GitHub Desktop.
Save nyorain/e4e835a7334d990f083c6d88a8066364 to your computer and use it in GitHub Desktop.
Small game-orientated UDP protocol, C++17
#pragma once
#include <kyo/message.hpp>
#include <asio/buffer.hpp>
#include <nytl/span.hpp>
#include <cstdint>
#include <cstddef>
#include <vector>
#include <memory>
#include <bitset>
#include <chrono>
#include <functional>
namespace kyo {
/// Magic package numbers.
namespace magic {
/// Magic numbers with which the package starts.
/// HeaderMagic indicates that this is the first fragment of a package an includes
/// a message header. FragmentMagic indicates that this is a later part of a
/// fragment and only has a FragmentHeader.
static constexpr auto message = 24375;
static constexpr auto fragment = 55620;
/// The magic numbers to end a package with
/// EndMagic is used for the last fragment of a package while
/// endAnotherMagic indicates that there is at least 1 next fragment.
static constexpr auto end = 42523;
static constexpr auto another = 01532;
}
/// Header of a sent message.
/// The ackBits value represent (bitwise) whether the last packages before ack (the
/// last seen package) has been received as well.
/// Example: The most significant bit in ackBits represents whether the package with
/// the id 'ack - 32' has been received from the other side.
struct MessageHeader {
uint32_t magic = magic::message;
uint32_t seq; // sequence number of this package
uint32_t ack; // latest seen package from other end
uint32_t ackBits; // last received packages, acknowledge bits (relative to ack)
};
/// Header a fragment package.
struct FragmentHeader {
uint32_t magic = magic::fragment;
uint32_t seq; // the sequence number of the related message
uint32_t fragment; // the fragment number
};
/// Represents the various detected errors a MessageHeader can have from
/// the point of view of a ConnectionManager.
enum class MessageHeaderStatus {
valid = 0,
invalidMagic,
seqDiff,
alreadyReceived,
ackDiff,
ackOld,
ackNew
};
/// Returns the name of a MessageHeaderStatus value.
const char* name(MessageHeaderStatus mhs);
/// Manages sequence numbers and acknowledgements of sequences
/// for an udp connection. Used to build and process MessageHeaders.
/// For correct behaviour, all messages send to the other end must include
/// a header retrieved using the 'nextHeader' functions and the headers
/// of all retrieved messages must be passed to 'processHeader'.
class ConnectionManager {
public:
/// The number of acknowledged packages to store for tracking.
/// Note that if the gap between the last acknowledged sequence numbers
/// in two succeeding packages is larger than the stored ackBits in
/// the message header, all package sequence numbers we don't have any ack information
/// about will be treated as lost (since in this case something went wrong, anyways).
static constexpr auto remoteAckStoreCount = 1024;
/// Setting for the rather heuristic sequence number validation
/// the maximum seq number difference accepted
/// packages in the range [remoteSeq_ - maxSeqDiff, remoteSeq_ + maxSeqDiff] are
/// accepted. Also used to validate acknowledgements and other sequence number
/// related checks. If anywhere is a jump of more than this count, the connection
/// is basically broken.
/// XXX any way to recover from it? any way to not require a hard connection reset?
static constexpr auto maxSeqDiff = 1024;
using Clock = std::chrono::steady_clock;
/// The number of ping times stores.
/// Should not exceed a reasonable amount, this many pings are stored as array.
static constexpr auto pingStoreCount = 5;
public:
/// Generates the message header for the next message.
/// Increases the local sequence number.
MessageHeader nextHeader();
/// Processes an received message header.
/// Returns its status, i.e. if it was valid or its first detected defect.
/// If it was invalid, not changes to local state will be made.
/// Headers are invalid if they have an invalid magic number or are too
/// old.
MessageHeaderStatus processHeader(const MessageHeader& msg);
/// Returns the last used local sequence number.
auto localSeq() const { return localSeq_; }
/// Returns the highest received sequence number from the other side.
/// This indicates the last sequence id we acknowledge when sending message
/// to the other side.
auto remoteSeq() const { return remoteSeq_; }
/// Returns a bit mask indicating which of the last packages sent from remote
/// were received on this end.
auto localAckBits() const { return localAckBits_; }
/// Returns the highest local sequence number acknowledged by the other side.
auto remoteAck() const { return remoteAck_; }
/// Returns whether the package with the given sequence number was acknowledged
/// by the other side. Note that this returns false if the given
/// sequence id is no longer cached, i.e. ackStoreCount behind the
/// last acknowledges sequence number of the other side.
bool acknowledged(uint32_t sequenceNumber) const;
/// Returns the last 'pingStoreCount' ping time (in microseconds, measured by Clock).
/// The ping times (naturally) always include the process delay, i.e.
/// the time needed between receiving a package and sending the ack package (so
/// it is NO fully correct network-ping time).
/// If a pinged package is lost, the ping time will include the time
/// until a newer package is acknowledged.
decltype(auto) pings() const { return pings_; }
protected:
std::uint32_t localSeq_ = 0; // seq number of the last message sent
std::uint32_t remoteSeq_ = 0; // highest seq number of a received message, ack
std::uint32_t localAckBits_ = 0; // which last remote messages were retrieved on this end
std::uint32_t remoteAck_ = 0; // last package that was acknowledged by the other side
// the last sent ping-relevant message and its time
std::array<unsigned int, pingStoreCount> pings_ = {};
std::uint32_t pingSent_ = 0;
Clock::time_point pingTime_ = {};
// which package the other side acknowledged
// remoteAckBits_[i] represents whether package with sequence number
// remoteAck_ - i - 1 was acknowledged by the other side
std::bitset<remoteAckStoreCount> remoteAckBits_ = {};
};
/// The status of a MessageManager for a received package.
enum class PackageStatus {
invalid, // package was invalid in any way
invalidMessage, // package was valid but completed invalid package message data
fragment, // (so-far) valid fragment of a not-yet finished fragmented package
message // package finished a complete fragmented package, was fully valid
};
/// Derivation from ConnectionManager that handles messages to send.
/// Implements the concept of separating critical messages (that have to reach the other
/// side are sent again every package until acknowledged) and non-critical messages (that
/// are only sent once but whose arrival can still be tracked).
/// Note that ConnectionManager is not a virtual class, so an object of this type
/// should be used (and especially not destrued) as a ConnectionManager.
/// It just builds upon its functionality.
class MessageManager : public ConnectionManager {
public:
/// The maximum size of packages in bytes (only the raw buffers).
/// Used to avoid fragmentation or higher package lost rates.
static constexpr auto maxPackageSize = 1200;
/// The function responsible for handling received messages.
/// See the messageHandler function for more information.
/// \param seq The sequence number this message belongs to
/// \param buf The packages message buffer pointing to the start of a message.
/// Must be advanced to the end of the message.
using MessageHandler = std::function<bool(uint32_t seq, RecvBuffer& buf)>;
/// A critical message. Contains data and associated sequence number (i.e. the
/// sequence number it was first sent with).
struct Message {
std::vector<std::byte> data;
uint32_t seq;
};
public:
// --- Sending ---
/// Queues the given non-critical message up for the next frame.
/// Returns the sequence number of the next message.
/// The sequence number can be used to check whether the other side
/// acknowledged the message, by calling acknowledged(sequence number).
/// Note that due to the non-critical nature of the message it might
/// never be received. After some time (after ackStoreCount new sent packages),
/// the sequence id will no longer be tracked.
uint32_t queueMsg(nytl::Span<const std::byte> msg);
/// Queues the given critical message.
/// Returns the sequence number of the next message.
/// The sequence number can be used to check whether the other side has
/// acknowledged the message by checking if remoteAck() is larger
/// than the returned sequence number. Using this approach, one has to
/// care about sequence number wrapping.
uint32_t queueCriticalMsg(nytl::Span<const std::byte> msg);
/// Prepares and returns the next packages to be sent.
/// The returned buffers will remain valid until the next time this function is called.
/// Will remove all acknowledged critical messages before preparing and
/// all queued nonCritical messages after preparing.
const std::vector<asio::const_buffer>& packages();
/// Returns all queued critical messages.
/// \param update If this is true, this calls updates the internal vector, i.e.
/// clears packages that were acknowledged.
/// Note that there may actually be messages left in the queue that
/// were already acknowledged if it is not updated.
const std::vector<Message>& criticalMessages(bool update = false);
const std::vector<Message>& criticalMessages() const { return critical_; }
/// Returns the currently queued non-critical messages.
decltype(auto) nonCriticalMessage() const { return nonCritical_; }
/// Clears all pending critical messages.
/// Returns the moved vector.
std::vector<Message> clearCriticalMessages() { return std::move(critical_); }
// --- Receiving ---
/// Processes the given received package.
/// Returns the status of the message.
/// If it returns PackageStatus::message this package caused
/// a new package (and therefore new messages) to be available and has already
/// called the message handler with success.
/// If it returns PackageStatus::invalid the buffer itself could not be parsed.
/// If it returns PackageStatus::invalidMessage the buffer completed
/// a fragmented package (or was self-contained) but could not be correctly
/// parsed by the messgae handler.
/// If it returns PackageStatus::fragment the package was a valid fragment that
/// did not complete a package.
PackageStatus processPackage(asio::const_buffer buffer);
/// Sets the callback for messages to be processed.
/// This will only be called from within processPackage.
/// The handler will receive a MessageBuffer that points to the
/// beginning of a message. It must advance it behind the message (to the
/// first byte after the end of the message).
/// If the handler throws MsgBufInvalid or returns false or if there is an internal
/// error while processing the package, the whole package is treated
/// as invalid and will not be further processed.
/// Errors other than MsgBufInvalid form the message handler are just propagated
/// out of the processPackage function.
/// Might be set to an empty handler in which case no messages are processed
/// and processPackage will always trigger invalid message return values.
/// Returns the moved old handler. At the beginning is initialized with an no
/// handler.
/// This function must not be called from inside the old message handler.
MessageHandler messageHandler(MessageHandler newHandler);
/// Frees all stored fragments that are older than the given time.
/// Returns the number of discarded fragmented packages.
unsigned int discardFragments(Clock::duration age);
/// Frees all currently unused memory.
void shrink();
protected:
/// Handles the given package data, i.e. dispatches messages to the message handler.
/// The passed ConstMsgBuf points to the beginning of the package data and its
/// size only contains the size of the package data (excluding end magic value).
/// Returns whether the buffer could completely handled.
/// \param lastAck The previously last acknowledged package.
/// When this function is calld the header of this package was already processed
/// so this information is needed
/// \param seqNumber the sequence number of the package
bool handlePackageData(uint32_t lastAck, uint32_t seqNumber, RecvBuffer buffer);
/// Clears critical messages that were acknowledged by the other side from
/// critical_
void updateCriticalMessages();
protected:
std::vector<Message> critical_; // stores all critical messages, sorted
std::vector<std::vector<std::byte>> nonCritical_; // stores all non-critical pending messages
std::vector<std::byte> packageBuffer_; // raw package buffer store
std::vector<asio::const_buffer> buffers_; // buffers sent in last step
MessageHandler messageHandler_ {}; // current message handler, might be empty
/// Contains some currently unused buffers that will be reused the next time
/// a buffer is needed. Separated since msg buffers are usually way smaller
/// than pkg buffers. They may still contain data, must be cleared when popped
std::vector<std::vector<std::byte>> unusedMsgBuffers_;
std::vector<std::vector<std::byte>> unusedPkgBuffers_; // used for fragmented pkgs
/// A fragmented package that is currently being assembled.
/// Contains the packages sequence number as well as all fragments.
/// If a vector in fragments is empty the associated fragment has not yet
/// been received and must be waited on.
/// Fragmented pacakges are discarded after some time if not all fragments
/// have arrived.
struct FragmentedPackage {
Clock::time_point firstSeen; // first encounter of any fragment
MessageHeader header;
std::vector<bool> received; // which fragments where received
std::vector<std::byte> data; // the raw package data (stripped headers and magic)
};
std::vector<FragmentedPackage> fragmented_; // sorted
};
} // namespace kyo
#include <kyo/connection.hpp>
#include <dlg/dlg.hpp>
#include <limits>
#include <cmath>
#include <algorithm>
#include <iostream>
// NOTE: do we need ackbits at all? it currently is only used for validation (?)
namespace kyo {
using namespace dlg::literals;
// utility
const char* name(MessageHeaderStatus mhs)
{
switch(mhs) {
case MessageHeaderStatus::valid: return "valid";
case MessageHeaderStatus::invalidMagic: return "invalidMagic";
case MessageHeaderStatus::ackDiff: return "ackDiff";
case MessageHeaderStatus::seqDiff: return "seqDiff";
case MessageHeaderStatus::ackOld: return "ackOld";
case MessageHeaderStatus::ackNew: return "ackNew";
case MessageHeaderStatus::alreadyReceived: return "alreadyReceived";
default: return "<unknown>";
}
}
// ConnectionManager
MessageHeader ConnectionManager::nextHeader()
{
MessageHeader ret {};
ret.seq = ++localSeq_;
ret.ack = remoteSeq_;
ret.ackBits = localAckBits_;
// send a new ping if there is no active one
if(remoteAck_ - pingSent_ < maxSeqDiff) {
pingSent_ = localSeq_;
pingTime_ = Clock::now();
}
return ret;
}
MessageHeaderStatus ConnectionManager::processHeader(const MessageHeader& msg)
{
static const auto scopeSource = "network::cm::processHeader"_src;
dlg::SourceGuard sourceGuard(scopeSource);
using MHS = MessageHeaderStatus;
// first make sure the header is valid, then change local state
// check magic number
if(msg.magic != magic::message)
return MHS::invalidMagic;
// - remote sequence number ---------------------------------------
// the absolute difference between seq numbers across wrap boundary
// if there is a too large gap, the package is treated as invalid.
auto absSeqDiff = std::min(msg.seq - remoteSeq_, remoteSeq_ - msg.seq);
if(absSeqDiff > maxSeqDiff) {
dlg_info("seqDiff too high, msg.seq: {}, remoteSeq: {}", msg.seq, remoteSeq_);
return MHS::seqDiff;
}
// check if this is the same package as the one referenced by remoteSeq_
if(absSeqDiff == 0) {
dlg_info("seq {} already received(1)", msg.seq);
return MHS::alreadyReceived;
}
// whether to update the remote sequence number
// if this is true this is the newest pacakge ever received from the other side
auto newRemoteSeq = (msg.seq - remoteSeq_) <= maxSeqDiff;
// if it is and old message, it is invalid if it was already received (tracked
// by localAckBits_)
if(!newRemoteSeq && absSeqDiff < 32 && (localAckBits_ & (1 << (absSeqDiff - 1)))) {
dlg_info("seq {} already received(2)", msg.seq);
return MHS::alreadyReceived;
}
// - remote acknowledged seq -----------------------------------------
// package acknowledged is not one of the last maxSeqDiff sent packages
if(localSeq_ - msg.ack > maxSeqDiff) {
dlg_info("ackDiff too high, msg.ack: {}, localSeq: {}", msg.ack, localSeq_);
return MHS::ackDiff;
}
// newest package AND package acknowledged is older than the last acknowledged package
if(newRemoteSeq && msg.ack - remoteAck_ > maxSeqDiff) {
dlg_info("newer package with older ack {}, remoteAck_: {}", msg.ack, remoteAck_);
return MHS::ackOld;
}
// older package AND acknowledged package is newer than the last acknowledged package
if(!newRemoteSeq && remoteAck_ - msg.ack > maxSeqDiff) {
dlg_info("older package with newer ack {}, remoteAck_: {}", msg.ack, remoteAck_);
return MHS::ackNew;
}
// - ping check --------------------------------------------------
if(remoteAck_ - pingSent_ > maxSeqDiff && msg.ack - pingSent_ < maxSeqDiff) {
for(auto i = 0u; i < pingStoreCount - 1; ++i)
pings_[i] = pings_[i + 1];
auto diff = Clock::now() - pingTime_;
auto count = std::chrono::duration_cast<std::chrono::microseconds>(diff).count();
dlg_debug("recieved pong after {} microseconds", count);
pings_.back() = count;
}
// - update local state -------------------------------------
// we know the package is valid and apply its information
if(newRemoteSeq) { // i.e. new message
// ackBits are alwasy relative to the last seen package so we have to shift it
// shift to the left since the most significant bit is the oldest one whose
// bit is no longer needed
localAckBits_ <<= absSeqDiff;
remoteSeq_ = msg.seq;
// make sure to set the localAck bit for the old latest ack (if possible)
if(absSeqDiff < 32)
localAckBits_ |= (1 << (absSeqDiff - 1));
auto remoteAckDiff = msg.ack - remoteAck_;
remoteAckBits_ <<= remoteAckDiff;
remoteAck_ = msg.ack;
// make sure to set the remoteAck bit for the old latest ack (if possible)
if(remoteAckDiff < remoteAckBits_.size())
remoteAckBits_ |= (1 << (remoteAckDiff - 1));
} else { // i.e. old message
// update localAckBits if in range
localAckBits_ |= 1 << (absSeqDiff - 1);
// TODO: update remoteAckBits_ here as well?
// we might get new information from this old package
}
return MHS::valid;
}
bool ConnectionManager::acknowledged(uint32_t sequenceNumber) const
{
auto diff = remoteAck_ - sequenceNumber;
if(diff == 0) return true; // last acknowledged package
if(diff > remoteAckStoreCount) return false; // out of date
return remoteAckBits_[diff - 1];
}
// MessageManager
uint32_t MessageManager::queueMsg(nytl::Span<const std::byte> msg)
{
dlg_assert_debug(!msg.empty(), "network::mm::queueCriticalMsg"_src, "empty message");
// try to find a matching unused msg buffer
// XXX: we could iterate over the vector here and try to find a one
// with a matching size, +memory -performance
std::vector<std::byte> buffer {};
if(!unusedMsgBuffers_.empty()) {
buffer = std::move(unusedMsgBuffers_.back());
unusedMsgBuffers_.pop_back();
}
buffer.clear();
buffer.insert(buffer.end(), msg.begin(), msg.end());
nonCritical_.emplace_back(std::move(buffer));
return localSeq_ + 1;
}
uint32_t MessageManager::queueCriticalMsg(nytl::Span<const std::byte> msg)
{
dlg_assert_debug(!msg.empty(), "network::mm::queueCriticalMsg"_src, "empty message");
// try to find a matching unused msg buffer
// XXX: we could iterate over the vector here and try to find a one
// with a matching size, +memory -performance
Message cmsg {};
if(!unusedMsgBuffers_.empty()) {
cmsg.data = std::move(unusedMsgBuffers_.back());
unusedMsgBuffers_.pop_back();
}
cmsg.data.clear();
cmsg.data.insert(cmsg.data.end(), msg.begin(), msg.end());
// insert the message just at the end
// this automatically assures that the vector will be sorted.
// use localSeq_ + 1 since this critical message will be first sent with
// the next package
cmsg.seq = localSeq_ + 1;
critical_.emplace_back(std::move(cmsg));
return localSeq_ + 1;
}
// - utility -
namespace {
/// Returns a casted reference of 'T' from the given pointer and increases
/// the pointer sizeof(T)
template<typename T>
decltype(auto) nextCast(std::byte*& ptr)
{
auto pos = ptr;
ptr += sizeof(T);
return reinterpret_cast<T&>(*pos);
}
} // anonymous namespace
const std::vector<asio::const_buffer>& MessageManager::packages()
{
updateCriticalMessages();
// clear previous buffers buffer
buffers_.clear();
packageBuffer_.clear();
packageBuffer_.resize(maxPackageSize);
// ptr: points to the first unwritten bytes in packageBuffer_
// fragBegin: points to the begin of the current message
// fragEnd: points to the address in which the end magic value will be written.
auto ptr = packageBuffer_.data();
auto fragBegin = packageBuffer_.data();
auto fragEnd = fragBegin + maxPackageSize - 4;
nextCast<MessageHeader>(ptr) = nextHeader();
// the current fragment part
// note that the first fragment header has part 1
auto fragPart = 0u;
// if there are no messages to send just return the single package filled
// with the message header. Might be useful for acknowleding packages without
// sending additional data
if(critical_.empty() && nonCritical_.empty()) {
nextCast<uint32_t>(ptr) = magic::end; // signal message end
buffers_.push_back(asio::buffer(packageBuffer_.data(), ptr - fragBegin));
return buffers_;
}
// this buffer will hold the indices of packageBuffer_ on which a new fragment starts
// excludes the first fragments start value (0)
// we cannot directly insert into buffers_ since the data in packageBuffers_
// might be reallocated multiple times during this function
std::vector<unsigned int> fragments {};
// initialize the associated sequence number to a value different
// than the first critical package so it will write it
auto currentSeq = localSeq_ + 1;
if(!critical_.empty())
currentSeq = critical_.front().seq + 1;
// function that writes the given data into the message buffer
// makes sure that there is enough space in the current fragment
auto write = [&](const auto* data, auto size) {
auto remaining = size;
while(remaining != 0) {
// create new fragment if we reached its end
// write magic end value
// we always assume that other con
if(ptr == fragEnd) {
// end fragment
nextCast<uint32_t>(ptr) = magic::another;
auto oldSize = packageBuffer_.size();
// push message into buffers
fragments.push_back(oldSize);
// create next message
packageBuffer_.resize(oldSize + maxPackageSize);
fragBegin = ptr = &packageBuffer_[oldSize];
fragEnd = fragBegin + maxPackageSize - 4;
// insert next fragment header
nextCast<FragmentHeader>(ptr) = {magic::fragment, localSeq(), ++fragPart};
}
// write as much as possible
// keep spacing for potential next seq number
// the spacing is only needed in the last fragment we touch
auto size = std::min<unsigned int>(remaining, fragEnd - ptr);
std::memcpy(ptr, data, size);
remaining -= size;
ptr += size;
}
};
// function that determines the byte count for the message group
// with the given sequence number from the given iterator in
// critical. Will also add the size of nonCritical messages
// if they belong into this message group
auto messageGroupSize = [&](uint32_t seq, auto it) {
uint32_t size = 0u;
// iterate over left critical messages
while(it != critical_.end() && it->seq == seq) {
size += it->data.size();
++it;
}
// if all critical messages were included and the sequence
// number matches also add non critical message sizes
// TODO: it == critical_.end() really needed here
if(it == critical_.end() && seq == localSeq_) {
for(auto& msg : nonCritical_)
size += msg.size();
}
return size;
};
// - write all critical messages -
for(auto it = critical_.begin(); it != critical_.end(); ++it) {
auto& msg = *it;
// check if a new message group has to be started
if(msg.seq != currentSeq) {
currentSeq = msg.seq;
auto size = messageGroupSize(msg.seq, it);
write(&msg.seq, sizeof(msg.seq));
write(&size, sizeof(size));
}
// write current critical message
write(msg.data.data(), msg.data.size());
}
// - wirte all non-critical messages -
if(!nonCritical_.empty()) {
// check if the current package group already has the right
// sequence number of if we have to start a new one
// all non-critical messages belong to the localSeq_ seq number
if(currentSeq != localSeq_) {
currentSeq = localSeq_;
auto size = messageGroupSize(localSeq_, critical_.end());
write(&localSeq_, sizeof(localSeq_));
write(&size, sizeof(size));
}
// write all non-critical messages
// we dont need any spacing anymore since there won't be any sequence indicators
// after this
for(auto& msg : nonCritical_)
write(msg.data(), msg.size());
}
// now push the fragments into buffers_
auto prevFrag = 0u;
auto totalSize = 0u;
for(auto frag : fragments) {
buffers_.push_back(asio::buffer(&packageBuffer_[prevFrag], frag - prevFrag));
totalSize += frag - prevFrag;
prevFrag = frag;
}
// End the last fragment and push it into the buffers
// if the last fragment has no data abandon it and write the magic::end to the
// last previous fragment. Note that if the last fragment was the first
// fragment it had a message header (with larger size) and therefore will
// always be added.
if(static_cast<unsigned int>(ptr - fragBegin) == sizeof(FragmentHeader)) {
*reinterpret_cast<uint32_t*>(fragBegin - 4) = magic::end;
} else {
nextCast<uint32_t>(ptr) = magic::end;
buffers_.push_back(asio::buffer(&packageBuffer_[prevFrag], ptr - fragBegin));
totalSize += ptr - fragBegin;
}
// clear non-critical messages
unusedMsgBuffers_.insert(
unusedMsgBuffers_.end(),
std::make_move_iterator(nonCritical_.begin()),
std::make_move_iterator(nonCritical_.end()));
nonCritical_.clear();
dlg_debug("network::mm::packages"_src, "{} packages, total size {}",
buffers_.size(), totalSize);
return buffers_;
}
const std::vector<MessageManager::Message>& MessageManager::criticalMessages(bool update)
{
if(update) updateCriticalMessages();
return critical_;
}
void MessageManager::updateCriticalMessages()
{
// TODO: really use eaxSeqDiff here?
auto it = critical_.begin();
for(; it != critical_.end() && remoteAck_ - it->seq <= maxSeqDiff; ++it);
if(it == critical_.begin())
return;
// clear critical_, move buffers back to unusedMsgBuffers_
unusedMsgBuffers_.reserve(unusedMsgBuffers_.size() + it - critical_.begin());
for(auto i = critical_.begin(); i != it; ++i)
unusedMsgBuffers_.push_back(std::move(i->data));
critical_.erase(critical_.begin(), it);
}
PackageStatus MessageManager::processPackage(asio::const_buffer buffer)
{
static const auto scopeSource = "network::mm::processpkg"_src;
dlg::SourceGuard sourceGuard(scopeSource);
// utility
// the max raw package data size of the first fragment
constexpr auto firstDataSize = maxPackageSize - sizeof(MessageHeader) - 4;
// the max raw package data size of a non-first fragment
constexpr auto fragDataSize = maxPackageSize - sizeof(FragmentHeader) - 4;
// check that it at least has the size of the smaller header and end magic
// for all first checks we only outputs info messages for invalid packages
// since they might simply come from something else and are not really
// an issue
auto size = asio::buffer_size(buffer);
if(size < sizeof(FragmentHeader) + 4) {
dlg_info("invalid pkg: size {} too small", size);
return PackageStatus::invalid;
}
// check magic numbers
auto data = asio::buffer_cast<const std::byte*>(buffer);
auto beginMagic = *reinterpret_cast<const uint32_t*>(data);
auto endMagic = *reinterpret_cast<const uint32_t*>(data + size - 4);
if(endMagic != magic::end && endMagic != magic::another) {
dlg_info("invalid pkg: invalid end magic value ", endMagic);
return PackageStatus::invalid;
}
// fragment handling variables
uint32_t seqid = 0u; // the sequence id the fragment belongs to (if it is an fragment)
uint32_t fragpart = 0u; // the part the fragment has
const MessageHeader* header = nullptr; // potential message header
const std::byte* dataBegin = nullptr; // raw data begin
const std::byte* dataEnd = (data + size) - 4; // raw data end
// check message header or fragment header
if(beginMagic == magic::message) {
header = reinterpret_cast<const MessageHeader*>(data);
if(endMagic == magic::end) {
// we received a single, non-fragmented message, yeay
// handle its header an pass it to handlePackageData
auto lastAck = remoteSeq_;
auto processed = processHeader(*header);
if(processed != MessageHeaderStatus::valid) {
dlg_info("invalid pkg: processing sc message header failed: {}", name(processed));
return PackageStatus::invalid;
}
RecvBuffer msgbuf {};
msgbuf.current = data + sizeof(MessageHeader);
msgbuf.end = data + size - 4; // exclude last magic
return handlePackageData(lastAck, header->seq, msgbuf) ?
PackageStatus::message :
PackageStatus::invalidMessage;
}
// if it was only the first part of the fragmented message we wait with processing
// the header until all fragments part arrive (if they do)
seqid = header->seq;
fragpart = 0u; // first fragment
dataBegin = data + sizeof(MessageHeader);
} else if(beginMagic == magic::fragment) {
auto& header = *reinterpret_cast<const FragmentHeader*>(data);
seqid = header.seq;
fragpart = header.fragment;
dataBegin = data + sizeof(FragmentHeader);
} else {
dlg_info("invalid pkg: Invalid start magic value {}", beginMagic);
return PackageStatus::invalid;
}
// here we know that the package is part of a fragmented pkg
// find the upper bound, i.e. the place it would have in the sorted fragmented_ vector
// or otherwise the place it should be inserted to
FragmentedPackage dummy {};
dummy.header.seq = seqid;
auto fpkg = std::upper_bound(fragmented_.begin(), fragmented_.end(), dummy,
[](const auto& a, const auto& b) { return a.header.seq < b.header.seq; });
// check if fragmented already contains the given package, otherwise create it
// try to use an unused pkg buffer instead of allocating a new one
if(fpkg == fragmented_.end() || fpkg->header.seq != seqid) {
fpkg = fragmented_.emplace(fpkg);
fpkg->firstSeen = Clock::now();
if(!unusedPkgBuffers_.empty()) {
fpkg->data = std::move(unusedPkgBuffers_.back());
fpkg->data.clear();
unusedPkgBuffers_.pop_back();
}
}
// set its header (if it is valid i.e. this package had one)
if(header) fpkg->header = *header;
// the required size of fpkg->received
auto neededSize = fragpart;
if(endMagic == magic::another)
++neededSize;
// resize, store in received
if(fpkg->received.size() < neededSize)
fpkg->received.resize(neededSize, false);
fpkg->received[fragpart] = true;
// make sure fpkg->data is large enough so that we can copy
// our data into it
auto prev = fragpart ? firstDataSize + (fragpart - 1) * fragDataSize : 0u;
auto bufferBegin = prev;
auto bufferEnd = static_cast<unsigned int>(dataEnd - dataBegin) + prev;
if(fpkg->data.size() < bufferEnd)
fpkg->data.resize(neededSize);
// TODO: do we have to make sure the fragment was not already received?
// copy the raw data into the buffer
std::memcpy(&fpkg->data[bufferBegin], dataBegin, dataEnd - dataBegin);
// if the fragmented package is complete now, handle it
if(std::all_of(fpkg->received.begin(), fpkg->received.end(), [](auto b) { return b; })) {
// try to handle its header
auto lastAck = remoteSeq_;
auto processed = processHeader(fpkg->header);
if(processed != MessageHeaderStatus::valid) {
dlg_info("invalid pkg: processing frag message header failed: {}", name(processed));
return PackageStatus::invalid;
}
RecvBuffer buf {};
buf.current = fpkg->data.data();
buf.end = buf.current + fpkg->data.size();
auto ret = handlePackageData(lastAck, fpkg->header.seq, buf) ?
PackageStatus::message :
PackageStatus::invalidMessage;
unusedPkgBuffers_.push_back(std::move(fpkg->data));
fragmented_.erase(fpkg);
return ret;
}
return PackageStatus::fragment;
}
bool MessageManager::handlePackageData(uint32_t lastAck, uint32_t seqNumber, RecvBuffer buffer)
{
((void) seqNumber); // unused; may be used for a future message group spec
const static auto scopeSource = "network::mm::handlePkgData";
dlg::SourceGuard sourceGuard(scopeSource);
// there is no sense in processing if there is no message handler
if(!messageHandler_) {
dlg_warn("no message handler set");
return false;
}
try {
// groupSeq will hold the current sequence number the handled
// messages belong to
// groupSize holds the left size in the current message group
// the package data starts with the sequence number of the first message
// group.
auto groupSeq = 0u;
auto groupSize = 0u;
auto groupFirst = true;
// we read until the entire (!) package data is processed (without any paddings)
// or until something goes wrong
while(buffer.current < buffer.end) {
// TODO: some group seq number validity check
// e.g. check the jump is not too high
// read information forn ext message group
auto nextSeq = next<uint32_t>(buffer);
if(!groupFirst && nextSeq == groupSeq) {
dlg_warn("invalid pkg: two groups with the same seq number {}", groupSeq);
return false;
}
groupFirst = false;
groupSeq = nextSeq;
groupSize = next<uint32_t>(buffer);
// logError(":2 ", groupFirst, groupSeq, groupSize);
// check if given group size would exceed the overall package data size
if(groupSize == 0 || buffer.current + groupSize > buffer.end) {
dlg_warn("invalid pkg: group size {} is too large", groupSize);
return false;
}
// if we have already received this message group, simply skip it
// now the knowledge of the groups size in bytes comes in really handy
if(lastAck - groupSeq < maxSeqDiff) {
dlg_debug("skipping already received group {}", groupSeq);
buffer.current += groupSize;
continue;
}
// handle all messages in the group
auto groupBuffer = RecvBuffer {buffer.current, buffer.current + groupSize};
while(groupBuffer.current < groupBuffer.end) {
try {
auto before = groupBuffer.current;
auto ret = false;
{
static const auto scopeSource = "network::mm::[messageHandler]"_src;
dlg::SourceGuard sourceGuard(scopeSource);
ret = messageHandler_(groupSeq, groupBuffer);
}
// avoid infinite loop, message handler MUST advance buffer
if(before == groupBuffer.current) {
dlg_error("invalid messages handle did not advance buffer");
return false;
}
if(!ret) {
dlg_warn("invalid pkg: handler return false");
return false;
}
} catch(const MsgBufInvalid& err) {
dlg_warn("invalid pkg: messageHandler threw: {}", err.what());
return false;
}
}
// check that the groupBuffer was only advanced in its bounds
if(groupBuffer.current > groupBuffer.end) {
dlg_error("message group buffer advanced too far");
return false;
}
// apply the advance of the message group to the data buffer
buffer.current = groupBuffer.current;
}
} catch(const MsgBufOutOfRange& err) {
dlg_error("invalid pkg size, internal assumptions not met");
return false;
}
// check that buffer was only advanced in bounds
if(buffer.current > buffer.end) {
dlg_error("pkg buffer advanced too far");
return false;
}
return true;
}
MessageManager::MessageHandler MessageManager::messageHandler(MessageHandler newHandler)
{
dlg_debug("network::mm"_src, "setting new message handler");
std::swap(messageHandler_, newHandler);
return newHandler;
}
void MessageManager::shrink()
{
unusedMsgBuffers_ = {};
unusedPkgBuffers_ = {};
buffers_.shrink_to_fit();
packageBuffer_.shrink_to_fit();
nonCritical_.shrink_to_fit();
critical_.shrink_to_fit();
fragmented_.shrink_to_fit();
}
unsigned int MessageManager::discardFragments(Clock::duration age)
{
auto prev = fragmented_.size();
auto now = Clock::now();
std::remove_if(fragmented_.begin(), fragmented_.end(), [&](const auto& pkg) {
return (now - pkg.firstSeen) >= age;
});
auto ret = fragmented_.size() - prev;
dlg_debug("network::mm"_src, "Discarded {} fragmented packages", ret);
return ret;
}
} // namespace kyo

Messages between client and server have the following structure:

Pkg := Header RawData MagicNumber
Header := [MessageHeader] | [FragmentHeader]
RawData := MessageGroup RawData | [empty]
MessageGroup := SeqNumber ByteCount Messages
Messages := Message Messages | Message
Message := MessageType [Message buffer]
SeqNumber; ByteCount; MagicNumber := [uint32_t]
MessageType := [uint8_t]

A MessageGroup contains all sent packages belonging to the same sequence number. The beginning of a message group contains this sequence number as well as the number of bytes the Messages in this group contains. There is at least 1 message in a message group. There may be packages without any MessageGroup.

Packages larger than a certain size are split up in fragments. In this case all fragments except the last have the fixed maximum packages size and the data is plainly split to the fragments. This splitting can occur over any bounding i.e. even uint32_t values may be split up across fragment boundaries.

Current remaining problems:

  • if the message handler advances the RecvBuffer incorrectly, the whole package will be handled incorrectly
    • problem: the message handler will be called with a buffer pointing NOT to the start of a message
    • could be solved by something like a msg-begin magic value (or don't use message groups and but the seq number in front of each msg?)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment