Skip to content

Instantly share code, notes, and snippets.

@simgt
Created April 14, 2016 14:11
Show Gist options
  • Save simgt/60713269f186ca8cef693ff762c66093 to your computer and use it in GitHub Desktop.
Save simgt/60713269f186ca8cef693ff762c66093 to your computer and use it in GitHub Desktop.
Flatbuffers, Boost.Asio, TCP
#pragma once
#include <cstdint>
#include <cassert>
#include <algorithm>
#include <cstdlib>
#include <cstdio>
namespace net {
class Message
{
public:
enum { HeaderSize = 4 };
enum { MaxBodySize = 512 };
Message()
: body_size_(0)
{
}
Message(const uint8_t* body, size_t body_size)
: body_size_(body_size)
{
assert(body_size <= MaxBodySize);
std::copy(body, body + body_size, this->body());
encode_header();
}
const uint8_t* data() const
{
return data_;
}
uint8_t* data()
{
return data_;
}
std::size_t size() const
{
return HeaderSize + body_size_;
}
const uint8_t* body() const
{
return data_ + HeaderSize;
}
uint8_t* body()
{
return data_ + HeaderSize;
}
std::size_t body_size() const
{
return body_size_;
}
void body_size(std::size_t new_size)
{
assert(new_size <= MaxBodySize);
body_size_ = new_size;
}
bool decode_header()
{
char header[HeaderSize + 1] = {0};
std::copy(data_, data_ + HeaderSize, header);
body_size_ = std::atoi(header);
if (body_size_ > MaxBodySize)
{
body_size_ = 0;
return false;
}
return true;
}
void encode_header()
{
char header[HeaderSize + 1] = "";
std::sprintf(header, "%4d", static_cast<int>(body_size_));
std::copy(header, header + HeaderSize, data_);
}
private:
uint8_t data_[HeaderSize + MaxBodySize];
std::size_t body_size_;
};
}
namespace net.fbs;
struct Vec3 {
x:float;
y:float;
z:float;
}
struct Quat {
w:float;
x:float;
y:float;
z:float;
}
table SensorsReading {
mag:Vec3;
acc:Vec3;
gyr:Vec3;
pos:Vec3;
}
table FilterState {
pos:Vec3;
att:Quat;
}
union Data { SensorsReading, FilterState }
table Root {
data:Data;
}
root_type Root;
#include "server.hpp"
#include <cstdlib>
#include <iostream>
#include <utility>
#include <cstdio>
namespace net {
Session::Session(tcp::socket socket, Server& server, RecvMsgHandler& recv_msg_handler)
: socket_(std::move(socket)),
server_(server),
recv_msg_handler_(recv_msg_handler)
{
}
void Session::start()
{
async_read_header();
}
std::string Session::address() const
{
const tcp::endpoint remote_ep = socket_.remote_endpoint();
return remote_ep.address().to_string();
}
uint16_t Session::port() const
{
const tcp::endpoint remote_ep = socket_.remote_endpoint();
return remote_ep.port();
}
void Session::async_write(std::shared_ptr<Message> msg)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(msg->data(), msg->size()),
[this, self, msg](boost::system::error_code ec, std::size_t /*size*/)
{
if (ec)
{
server_.remove_session(self);
}
});
}
void Session::async_read_header()
{
auto self(shared_from_this());
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), Message::HeaderSize),
[this, self](boost::system::error_code ec, std::size_t size)
{
assert(size >= Message::HeaderSize);
if (!ec)
{
if (read_msg_.decode_header())
{
async_read_body();
}
else
{
std::cerr << "Wrong header O_o" << std::endl;
}
}
else
{
server_.remove_session(self);
}
});
}
void Session::async_read_body()
{
auto self(shared_from_this());
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_size()),
[this, self](boost::system::error_code ec, std::size_t /*size*/)
{
if (!ec)
{
recv_msg_handler_(*this, read_msg_.body(), read_msg_.body_size());
async_read_header();
}
else
{
server_.remove_session(self);
}
});
}
Server::Server(boost::asio::io_service& io_service, short port, RecvMsgHandler recv_msg_handler)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
accept_socket_(io_service),
resolver_(io_service),
resolve_socket_(io_service),
recv_msg_handler_(std::move(recv_msg_handler))
{
async_accept();
}
void Server::async_accept()
{
acceptor_.async_accept(accept_socket_,
[this](boost::system::error_code ec)
{
if (!ec)
{
add_session(std::move(accept_socket_));
}
async_accept();
});
}
void Server::add_session(tcp::socket socket)
{
auto s = std::make_shared<Session>(std::move(socket), *this, recv_msg_handler_);
sessions_.emplace(s);
std::cout << "session " << s->address() << ':' << s->port() << " added" << std::endl;
s->start();
}
void Server::remove_session(std::shared_ptr<Session> s)
{
sessions_.erase(s);
}
void Server::async_write_all(std::shared_ptr<Message> msg)
{
for (auto s : sessions_)
{
s->async_write(msg);
}
}
void Server::connect_to(const std::string& ip, uint16_t port)
{
tcp::resolver::query query(ip, std::to_string(port));
boost::asio::connect(resolve_socket_, resolver_.resolve(query));
add_session(std::move(resolve_socket_));
}
void Server::print_status()
{
std::cout << sessions_.size() << " sessions" << std::endl;
for (auto s : sessions_)
{
std::cout << ' ' << s->address() << ':' << s->port() << std::endl;
}
}
}
#if 1
// Test code bellow
#include "schema_generated.h"
#include <thread>
#include <chrono>
using namespace net;
std::ostream& operator<< (std::ostream& out, const fbs::Vec3& vec)
{
out << vec.x() << " " << vec.y() << " " << vec.z();
return out;
}
std::ostream& operator<< (std::ostream& out, const fbs::Quat& quat)
{
out << quat.w() << " " << quat.x() << " " << quat.y() << " " << quat.z();
return out;
}
void handle_sensors_reading(const fbs::SensorsReading* raw)
{
std::cout << "acc = " << *raw->acc() << std::endl;
}
void handle_filter_state(const fbs::FilterState* filter)
{
std::cout << "att = " << *filter->att() << std::endl;
}
void handle_receive_msg(const Session& s, const uint8_t* data, size_t size)
{
auto verifier = flatbuffers::Verifier(data, size);
if (fbs::VerifyRootBuffer(verifier))
{
auto root = fbs::GetRoot(data);
std::cout << "Rx " << fbs::EnumNameData(root->data_type())
<< " from " << s.address() << ":" << s.port() << std::endl;
switch (root->data_type())
{
case fbs::Data_SensorsReading:
handle_sensors_reading(static_cast<const fbs::SensorsReading*>(root->data()));
break;
case fbs::Data_FilterState:
handle_filter_state(static_cast<const fbs::FilterState*>(root->data()));
break;
default:
throw std::runtime_error("Unknown message body type");
}
}
else
{
throw std::runtime_error("Ill-formed message");
}
}
int main(int argc, char* argv[])
{
try
{
if (argc < 2)
{
std::cerr << "Usage: " << argv[0]
<< " SERVER_PORT [OTHER_IP OTHER_PORT]" << std::endl;
return 1;
}
boost::asio::io_service io_service;
using namespace std::placeholders;
Server server(io_service, std::atoi(argv[1]), std::bind(handle_receive_msg, _1, _2, _3));
// connect to the other server
if (argc == 4)
{
server.connect_to(argv[2], std::atoi(argv[3]));
}
for (size_t i = 10; i > 0; i--)
{
io_service.poll();
server.print_status();
// send junk data
if (argc == 4)
{
{
flatbuffers::FlatBufferBuilder builder;
fbs::Vec3 mag(1, 0, 0), acc(0, 0, 9.80665), gyr(1, 1, 1), tri(0, 0, 0);
auto data = fbs::CreateSensorsReading(builder, &mag, &acc, &gyr, &tri);
auto root = fbs::CreateRoot(builder, fbs::Data_SensorsReading, data.Union());
fbs::FinishRootBuffer(builder, root);
auto msg = std::make_shared<Message>(builder.GetBufferPointer(), builder.GetSize());
server.async_write_all(msg);
}
{
flatbuffers::FlatBufferBuilder builder;
fbs::Vec3 pos(1, 2, 3);
fbs::Quat att(0, 1, 1, 1);
auto data = fbs::CreateFilterState(builder, &pos, &att);
auto root = fbs::CreateRoot(builder, fbs::Data_FilterState, data.Union());
fbs::FinishRootBuffer(builder, root);
auto msg = std::make_shared<Message>(builder.GetBufferPointer(), builder.GetSize());
server.async_write_all(msg);
}
}
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
#endif
#pragma once
#include "message.hpp"
#include <boost/asio.hpp>
#include <memory>
#include <set>
#include <cstdint>
#include <functional>
namespace net {
using boost::asio::ip::tcp;
class Server;
class Session;
typedef std::function<void(const Session&, const uint8_t*, size_t)> RecvMsgHandler;
class Session : public std::enable_shared_from_this<Session>
{
tcp::socket socket_;
Server& server_;
Message read_msg_;
RecvMsgHandler& recv_msg_handler_;
void async_read_header();
void async_read_body();
public:
Session(tcp::socket socket, Server& server, RecvMsgHandler&);
void start();
std::string address() const;
uint16_t port() const;
void async_write(std::shared_ptr<Message> msg);
};
class Server
{
tcp::acceptor acceptor_;
tcp::socket accept_socket_;
tcp::resolver resolver_;
tcp::socket resolve_socket_;
std::set<std::shared_ptr<Session>> sessions_;
RecvMsgHandler recv_msg_handler_;
void async_accept();
void add_session(tcp::socket socket);
public:
Server(boost::asio::io_service& io_service, short port, RecvMsgHandler);
void remove_session(std::shared_ptr<Session> s);
void async_write_all(std::shared_ptr<Message> msg);
void connect_to(const std::string& ip, uint16_t port);
void print_status();
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment