Created
April 14, 2016 14:11
-
-
Save simgt/60713269f186ca8cef693ff762c66093 to your computer and use it in GitHub Desktop.
Flatbuffers, Boost.Asio, TCP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <cstdint> | |
#include <cassert> | |
#include <algorithm> | |
#include <cstdlib> | |
#include <cstdio> | |
namespace net { | |
class Message | |
{ | |
public: | |
enum { HeaderSize = 4 }; | |
enum { MaxBodySize = 512 }; | |
Message() | |
: body_size_(0) | |
{ | |
} | |
Message(const uint8_t* body, size_t body_size) | |
: body_size_(body_size) | |
{ | |
assert(body_size <= MaxBodySize); | |
std::copy(body, body + body_size, this->body()); | |
encode_header(); | |
} | |
const uint8_t* data() const | |
{ | |
return data_; | |
} | |
uint8_t* data() | |
{ | |
return data_; | |
} | |
std::size_t size() const | |
{ | |
return HeaderSize + body_size_; | |
} | |
const uint8_t* body() const | |
{ | |
return data_ + HeaderSize; | |
} | |
uint8_t* body() | |
{ | |
return data_ + HeaderSize; | |
} | |
std::size_t body_size() const | |
{ | |
return body_size_; | |
} | |
void body_size(std::size_t new_size) | |
{ | |
assert(new_size <= MaxBodySize); | |
body_size_ = new_size; | |
} | |
bool decode_header() | |
{ | |
char header[HeaderSize + 1] = {0}; | |
std::copy(data_, data_ + HeaderSize, header); | |
body_size_ = std::atoi(header); | |
if (body_size_ > MaxBodySize) | |
{ | |
body_size_ = 0; | |
return false; | |
} | |
return true; | |
} | |
void encode_header() | |
{ | |
char header[HeaderSize + 1] = ""; | |
std::sprintf(header, "%4d", static_cast<int>(body_size_)); | |
std::copy(header, header + HeaderSize, data_); | |
} | |
private: | |
uint8_t data_[HeaderSize + MaxBodySize]; | |
std::size_t body_size_; | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace net.fbs; | |
struct Vec3 { | |
x:float; | |
y:float; | |
z:float; | |
} | |
struct Quat { | |
w:float; | |
x:float; | |
y:float; | |
z:float; | |
} | |
table SensorsReading { | |
mag:Vec3; | |
acc:Vec3; | |
gyr:Vec3; | |
pos:Vec3; | |
} | |
table FilterState { | |
pos:Vec3; | |
att:Quat; | |
} | |
union Data { SensorsReading, FilterState } | |
table Root { | |
data:Data; | |
} | |
root_type Root; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "server.hpp" | |
#include <cstdlib> | |
#include <iostream> | |
#include <utility> | |
#include <cstdio> | |
namespace net { | |
Session::Session(tcp::socket socket, Server& server, RecvMsgHandler& recv_msg_handler) | |
: socket_(std::move(socket)), | |
server_(server), | |
recv_msg_handler_(recv_msg_handler) | |
{ | |
} | |
void Session::start() | |
{ | |
async_read_header(); | |
} | |
std::string Session::address() const | |
{ | |
const tcp::endpoint remote_ep = socket_.remote_endpoint(); | |
return remote_ep.address().to_string(); | |
} | |
uint16_t Session::port() const | |
{ | |
const tcp::endpoint remote_ep = socket_.remote_endpoint(); | |
return remote_ep.port(); | |
} | |
void Session::async_write(std::shared_ptr<Message> msg) | |
{ | |
auto self(shared_from_this()); | |
boost::asio::async_write(socket_, boost::asio::buffer(msg->data(), msg->size()), | |
[this, self, msg](boost::system::error_code ec, std::size_t /*size*/) | |
{ | |
if (ec) | |
{ | |
server_.remove_session(self); | |
} | |
}); | |
} | |
void Session::async_read_header() | |
{ | |
auto self(shared_from_this()); | |
boost::asio::async_read(socket_, | |
boost::asio::buffer(read_msg_.data(), Message::HeaderSize), | |
[this, self](boost::system::error_code ec, std::size_t size) | |
{ | |
assert(size >= Message::HeaderSize); | |
if (!ec) | |
{ | |
if (read_msg_.decode_header()) | |
{ | |
async_read_body(); | |
} | |
else | |
{ | |
std::cerr << "Wrong header O_o" << std::endl; | |
} | |
} | |
else | |
{ | |
server_.remove_session(self); | |
} | |
}); | |
} | |
void Session::async_read_body() | |
{ | |
auto self(shared_from_this()); | |
boost::asio::async_read(socket_, | |
boost::asio::buffer(read_msg_.body(), read_msg_.body_size()), | |
[this, self](boost::system::error_code ec, std::size_t /*size*/) | |
{ | |
if (!ec) | |
{ | |
recv_msg_handler_(*this, read_msg_.body(), read_msg_.body_size()); | |
async_read_header(); | |
} | |
else | |
{ | |
server_.remove_session(self); | |
} | |
}); | |
} | |
Server::Server(boost::asio::io_service& io_service, short port, RecvMsgHandler recv_msg_handler) | |
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)), | |
accept_socket_(io_service), | |
resolver_(io_service), | |
resolve_socket_(io_service), | |
recv_msg_handler_(std::move(recv_msg_handler)) | |
{ | |
async_accept(); | |
} | |
void Server::async_accept() | |
{ | |
acceptor_.async_accept(accept_socket_, | |
[this](boost::system::error_code ec) | |
{ | |
if (!ec) | |
{ | |
add_session(std::move(accept_socket_)); | |
} | |
async_accept(); | |
}); | |
} | |
void Server::add_session(tcp::socket socket) | |
{ | |
auto s = std::make_shared<Session>(std::move(socket), *this, recv_msg_handler_); | |
sessions_.emplace(s); | |
std::cout << "session " << s->address() << ':' << s->port() << " added" << std::endl; | |
s->start(); | |
} | |
void Server::remove_session(std::shared_ptr<Session> s) | |
{ | |
sessions_.erase(s); | |
} | |
void Server::async_write_all(std::shared_ptr<Message> msg) | |
{ | |
for (auto s : sessions_) | |
{ | |
s->async_write(msg); | |
} | |
} | |
void Server::connect_to(const std::string& ip, uint16_t port) | |
{ | |
tcp::resolver::query query(ip, std::to_string(port)); | |
boost::asio::connect(resolve_socket_, resolver_.resolve(query)); | |
add_session(std::move(resolve_socket_)); | |
} | |
void Server::print_status() | |
{ | |
std::cout << sessions_.size() << " sessions" << std::endl; | |
for (auto s : sessions_) | |
{ | |
std::cout << ' ' << s->address() << ':' << s->port() << std::endl; | |
} | |
} | |
} | |
#if 1 | |
// Test code bellow | |
#include "schema_generated.h" | |
#include <thread> | |
#include <chrono> | |
using namespace net; | |
std::ostream& operator<< (std::ostream& out, const fbs::Vec3& vec) | |
{ | |
out << vec.x() << " " << vec.y() << " " << vec.z(); | |
return out; | |
} | |
std::ostream& operator<< (std::ostream& out, const fbs::Quat& quat) | |
{ | |
out << quat.w() << " " << quat.x() << " " << quat.y() << " " << quat.z(); | |
return out; | |
} | |
void handle_sensors_reading(const fbs::SensorsReading* raw) | |
{ | |
std::cout << "acc = " << *raw->acc() << std::endl; | |
} | |
void handle_filter_state(const fbs::FilterState* filter) | |
{ | |
std::cout << "att = " << *filter->att() << std::endl; | |
} | |
void handle_receive_msg(const Session& s, const uint8_t* data, size_t size) | |
{ | |
auto verifier = flatbuffers::Verifier(data, size); | |
if (fbs::VerifyRootBuffer(verifier)) | |
{ | |
auto root = fbs::GetRoot(data); | |
std::cout << "Rx " << fbs::EnumNameData(root->data_type()) | |
<< " from " << s.address() << ":" << s.port() << std::endl; | |
switch (root->data_type()) | |
{ | |
case fbs::Data_SensorsReading: | |
handle_sensors_reading(static_cast<const fbs::SensorsReading*>(root->data())); | |
break; | |
case fbs::Data_FilterState: | |
handle_filter_state(static_cast<const fbs::FilterState*>(root->data())); | |
break; | |
default: | |
throw std::runtime_error("Unknown message body type"); | |
} | |
} | |
else | |
{ | |
throw std::runtime_error("Ill-formed message"); | |
} | |
} | |
int main(int argc, char* argv[]) | |
{ | |
try | |
{ | |
if (argc < 2) | |
{ | |
std::cerr << "Usage: " << argv[0] | |
<< " SERVER_PORT [OTHER_IP OTHER_PORT]" << std::endl; | |
return 1; | |
} | |
boost::asio::io_service io_service; | |
using namespace std::placeholders; | |
Server server(io_service, std::atoi(argv[1]), std::bind(handle_receive_msg, _1, _2, _3)); | |
// connect to the other server | |
if (argc == 4) | |
{ | |
server.connect_to(argv[2], std::atoi(argv[3])); | |
} | |
for (size_t i = 10; i > 0; i--) | |
{ | |
io_service.poll(); | |
server.print_status(); | |
// send junk data | |
if (argc == 4) | |
{ | |
{ | |
flatbuffers::FlatBufferBuilder builder; | |
fbs::Vec3 mag(1, 0, 0), acc(0, 0, 9.80665), gyr(1, 1, 1), tri(0, 0, 0); | |
auto data = fbs::CreateSensorsReading(builder, &mag, &acc, &gyr, &tri); | |
auto root = fbs::CreateRoot(builder, fbs::Data_SensorsReading, data.Union()); | |
fbs::FinishRootBuffer(builder, root); | |
auto msg = std::make_shared<Message>(builder.GetBufferPointer(), builder.GetSize()); | |
server.async_write_all(msg); | |
} | |
{ | |
flatbuffers::FlatBufferBuilder builder; | |
fbs::Vec3 pos(1, 2, 3); | |
fbs::Quat att(0, 1, 1, 1); | |
auto data = fbs::CreateFilterState(builder, &pos, &att); | |
auto root = fbs::CreateRoot(builder, fbs::Data_FilterState, data.Union()); | |
fbs::FinishRootBuffer(builder, root); | |
auto msg = std::make_shared<Message>(builder.GetBufferPointer(), builder.GetSize()); | |
server.async_write_all(msg); | |
} | |
} | |
std::this_thread::sleep_for(std::chrono::seconds(2)); | |
} | |
} | |
catch (std::exception& e) | |
{ | |
std::cerr << "Exception: " << e.what() << "\n"; | |
} | |
return 0; | |
} | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "message.hpp" | |
#include <boost/asio.hpp> | |
#include <memory> | |
#include <set> | |
#include <cstdint> | |
#include <functional> | |
namespace net { | |
using boost::asio::ip::tcp; | |
class Server; | |
class Session; | |
typedef std::function<void(const Session&, const uint8_t*, size_t)> RecvMsgHandler; | |
class Session : public std::enable_shared_from_this<Session> | |
{ | |
tcp::socket socket_; | |
Server& server_; | |
Message read_msg_; | |
RecvMsgHandler& recv_msg_handler_; | |
void async_read_header(); | |
void async_read_body(); | |
public: | |
Session(tcp::socket socket, Server& server, RecvMsgHandler&); | |
void start(); | |
std::string address() const; | |
uint16_t port() const; | |
void async_write(std::shared_ptr<Message> msg); | |
}; | |
class Server | |
{ | |
tcp::acceptor acceptor_; | |
tcp::socket accept_socket_; | |
tcp::resolver resolver_; | |
tcp::socket resolve_socket_; | |
std::set<std::shared_ptr<Session>> sessions_; | |
RecvMsgHandler recv_msg_handler_; | |
void async_accept(); | |
void add_session(tcp::socket socket); | |
public: | |
Server(boost::asio::io_service& io_service, short port, RecvMsgHandler); | |
void remove_session(std::shared_ptr<Session> s); | |
void async_write_all(std::shared_ptr<Message> msg); | |
void connect_to(const std::string& ip, uint16_t port); | |
void print_status(); | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment