Skip to content

Instantly share code, notes, and snippets.

@TimSC
Last active March 8, 2018 08:17
Show Gist options
  • Save TimSC/f1ab30d5de112df27c1851d436490fbb to your computer and use it in GitHub Desktop.
Save TimSC/f1ab30d5de112df27c1851d436490fbb to your computer and use it in GitHub Desktop.
Simple msgpack-rpc server, based on Boost ASIO
//simple msgpack-rpc server, based on Boost ASIO
//Based on http://www.boost.org/doc/libs/1_35_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html
//msgpack-rpc based on https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md
//Build with: g++ -std=c++11 server.cpp -lboost_system -o server
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <msgpack.hpp> //Depends on at least v2 of https://github.com/msgpack/msgpack-c
using boost::asio::ip::tcp;
using namespace std;
const uint32_t SERVER_PORT = 5005;
//This is the function we want to make available via RPC
int64_t sumfunc(int64_t a, int64_t b)
{
return a + b;
}
void dispatch(tcp::socket &socket, const msgpack::object &deserialized)
{
std::cout << "request: " << deserialized << std::endl;
if(deserialized.type == msgpack::type::ARRAY
and deserialized.via.array.size == 4) //This is manditory for msgpack-rpc format
{
//Check we have a valid msgpack-rpc message
const msgpack::object &rpc_type = deserialized.via.array.ptr[0];
if(rpc_type.type != msgpack::type::POSITIVE_INTEGER and rpc_type.type != msgpack::type::NEGATIVE_INTEGER)
return;
const msgpack::object &rpc_msgid = deserialized.via.array.ptr[1];
if(rpc_msgid.type != msgpack::type::POSITIVE_INTEGER and rpc_msgid.type != msgpack::type::NEGATIVE_INTEGER)
return;
const msgpack::object &rpc_method = deserialized.via.array.ptr[2];
if(rpc_method.type != msgpack::type::STR)
return;
const msgpack::object &rpc_params = deserialized.via.array.ptr[3];
if(rpc_params.type != msgpack::type::ARRAY)
return;
string rpc_method_str(rpc_method.via.str.ptr, rpc_method.via.str.size);
//cout << "rpc_type " << rpc_type.via.i64 << endl;
//cout << "rpc_msgid " << rpc_msgid.via.u64 << endl;
//cout << "rpc_method " << rpc_method_str << endl;
//cout << "rpc_params array of length " << rpc_params.via.array.size << endl;
//Demonstrate simple "sum" function
if(rpc_method_str == "sum" and rpc_params.via.array.size == 2)
{
const msgpack::object &arg1 = rpc_params.via.array.ptr[0];
const msgpack::object &arg2 = rpc_params.via.array.ptr[1];
int64_t result = sumfunc(arg1.via.i64, arg2.via.i64);
//Prepare response
msgpack::sbuffer sbuf;
msgpack::packer<msgpack::sbuffer> packer(sbuf);
packer.pack_array(4);
packer.pack_int(1); //Responses have type 1
packer.pack_unsigned_long_long(rpc_msgid.via.u64);
packer.pack_nil(); //No error message
packer.pack_long_long(result);
//Send response (synchronous, for simplicity)
string outBuff(sbuf.data(), sbuf.size());
boost::asio::write(socket, boost::asio::buffer(outBuff));
}
else
{
//Prepare unknown function error response
msgpack::sbuffer sbuf;
msgpack::packer<msgpack::sbuffer> packer(sbuf);
packer.pack_array(4);
packer.pack_int(1); //Responses have type 1
packer.pack_unsigned_long_long(rpc_msgid.via.u64);
string err = "Unknown method";
packer.pack_str(err.size());
packer.pack_str_body(err.c_str(), err.size());
packer.pack_nil(); //No result data
//Send response (synchronous, for simplicity)
string outBuff(sbuf.data(), sbuf.size());
boost::asio::write(socket, boost::asio::buffer(outBuff));
}
}
}
class tcp_connection
: public boost::enable_shared_from_this<tcp_connection>
{
public:
typedef boost::shared_ptr<tcp_connection> pointer;
static pointer create(boost::asio::io_service& io_service)
{
return pointer(new tcp_connection(io_service));
}
tcp::socket& socket()
{
return socket_;
}
void start()
{
boost::asio::async_read(socket_, input_buffer_,
boost::asio::transfer_at_least(1),
boost::bind(&tcp_connection::handle_read, shared_from_this(),
boost::asio::placeholders::error));
}
private:
tcp_connection(boost::asio::io_service& io_service)
: socket_(io_service)
{
unp.reserve_buffer();
}
void handle_write(const boost::system::error_code& /*error*/,
size_t /*bytes_transferred*/)
{
}
void handle_read(const boost::system::error_code& ec)
{
if (!ec)
{
// https://stackoverflow.com/a/3203502/4288232
std::istream is(&input_buffer_);
std::string line(std::istreambuf_iterator<char>(is), {});
//Feed data into msgpack unpacker
if(line.size() > unp.buffer_capacity())
unp.reserve_buffer(line.size());
memcpy(unp.buffer(), line.data(), line.size());
unp.buffer_consumed(line.size());
//Check if any objects are complete
msgpack::object_handle result;
while(unp.next(result)) {
msgpack::object deserialized(result.get());
dispatch(this->socket_, deserialized);
}
//Prepare for next read
boost::asio::async_read(socket_, input_buffer_,
boost::asio::transfer_at_least(1),
boost::bind(&tcp_connection::handle_read, shared_from_this(),
boost::asio::placeholders::error));
}
else
{
if(ec == boost::asio::error::eof)
std::cout << "Disconnected" << endl;
else
std::cout << "Error: " << ec << "\n";
}
}
tcp::socket socket_;
boost::asio::streambuf input_buffer_;
class msgpack::unpacker unp;
};
class tcp_server
{
public:
tcp_server(boost::asio::io_service& io_service)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), SERVER_PORT))
{
start_accept();
}
private:
void start_accept()
{
tcp_connection::pointer new_connection =
tcp_connection::create(acceptor_.get_io_service());
acceptor_.async_accept(new_connection->socket(),
boost::bind(&tcp_server::handle_accept, this, new_connection,
boost::asio::placeholders::error));
}
void handle_accept(tcp_connection::pointer new_connection,
const boost::system::error_code& error)
{
if (!error)
{
new_connection->start();
}
start_accept();
}
tcp::acceptor acceptor_;
};
int main()
{
try
{
boost::asio::io_service io_service;
tcp_server server(io_service);
io_service.run();
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
return -1;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment