Last active
March 8, 2018 08:17
-
-
Save TimSC/f1ab30d5de112df27c1851d436490fbb to your computer and use it in GitHub Desktop.
Simple msgpack-rpc server, based on Boost ASIO
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//simple msgpack-rpc server, based on Boost ASIO | |
//Based on http://www.boost.org/doc/libs/1_35_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html | |
//msgpack-rpc based on https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md | |
//Build with: g++ -std=c++11 server.cpp -lboost_system -o server | |
#include <iostream> | |
#include <string> | |
#include <boost/bind.hpp> | |
#include <boost/shared_ptr.hpp> | |
#include <boost/enable_shared_from_this.hpp> | |
#include <boost/asio.hpp> | |
#include <msgpack.hpp> //Depends on at least v2 of https://github.com/msgpack/msgpack-c | |
using boost::asio::ip::tcp; | |
using namespace std; | |
const uint32_t SERVER_PORT = 5005; | |
//This is the function we want to make available via RPC | |
int64_t sumfunc(int64_t a, int64_t b) | |
{ | |
return a + b; | |
} | |
void dispatch(tcp::socket &socket, const msgpack::object &deserialized) | |
{ | |
std::cout << "request: " << deserialized << std::endl; | |
if(deserialized.type == msgpack::type::ARRAY | |
and deserialized.via.array.size == 4) //This is manditory for msgpack-rpc format | |
{ | |
//Check we have a valid msgpack-rpc message | |
const msgpack::object &rpc_type = deserialized.via.array.ptr[0]; | |
if(rpc_type.type != msgpack::type::POSITIVE_INTEGER and rpc_type.type != msgpack::type::NEGATIVE_INTEGER) | |
return; | |
const msgpack::object &rpc_msgid = deserialized.via.array.ptr[1]; | |
if(rpc_msgid.type != msgpack::type::POSITIVE_INTEGER and rpc_msgid.type != msgpack::type::NEGATIVE_INTEGER) | |
return; | |
const msgpack::object &rpc_method = deserialized.via.array.ptr[2]; | |
if(rpc_method.type != msgpack::type::STR) | |
return; | |
const msgpack::object &rpc_params = deserialized.via.array.ptr[3]; | |
if(rpc_params.type != msgpack::type::ARRAY) | |
return; | |
string rpc_method_str(rpc_method.via.str.ptr, rpc_method.via.str.size); | |
//cout << "rpc_type " << rpc_type.via.i64 << endl; | |
//cout << "rpc_msgid " << rpc_msgid.via.u64 << endl; | |
//cout << "rpc_method " << rpc_method_str << endl; | |
//cout << "rpc_params array of length " << rpc_params.via.array.size << endl; | |
//Demonstrate simple "sum" function | |
if(rpc_method_str == "sum" and rpc_params.via.array.size == 2) | |
{ | |
const msgpack::object &arg1 = rpc_params.via.array.ptr[0]; | |
const msgpack::object &arg2 = rpc_params.via.array.ptr[1]; | |
int64_t result = sumfunc(arg1.via.i64, arg2.via.i64); | |
//Prepare response | |
msgpack::sbuffer sbuf; | |
msgpack::packer<msgpack::sbuffer> packer(sbuf); | |
packer.pack_array(4); | |
packer.pack_int(1); //Responses have type 1 | |
packer.pack_unsigned_long_long(rpc_msgid.via.u64); | |
packer.pack_nil(); //No error message | |
packer.pack_long_long(result); | |
//Send response (synchronous, for simplicity) | |
string outBuff(sbuf.data(), sbuf.size()); | |
boost::asio::write(socket, boost::asio::buffer(outBuff)); | |
} | |
else | |
{ | |
//Prepare unknown function error response | |
msgpack::sbuffer sbuf; | |
msgpack::packer<msgpack::sbuffer> packer(sbuf); | |
packer.pack_array(4); | |
packer.pack_int(1); //Responses have type 1 | |
packer.pack_unsigned_long_long(rpc_msgid.via.u64); | |
string err = "Unknown method"; | |
packer.pack_str(err.size()); | |
packer.pack_str_body(err.c_str(), err.size()); | |
packer.pack_nil(); //No result data | |
//Send response (synchronous, for simplicity) | |
string outBuff(sbuf.data(), sbuf.size()); | |
boost::asio::write(socket, boost::asio::buffer(outBuff)); | |
} | |
} | |
} | |
class tcp_connection | |
: public boost::enable_shared_from_this<tcp_connection> | |
{ | |
public: | |
typedef boost::shared_ptr<tcp_connection> pointer; | |
static pointer create(boost::asio::io_service& io_service) | |
{ | |
return pointer(new tcp_connection(io_service)); | |
} | |
tcp::socket& socket() | |
{ | |
return socket_; | |
} | |
void start() | |
{ | |
boost::asio::async_read(socket_, input_buffer_, | |
boost::asio::transfer_at_least(1), | |
boost::bind(&tcp_connection::handle_read, shared_from_this(), | |
boost::asio::placeholders::error)); | |
} | |
private: | |
tcp_connection(boost::asio::io_service& io_service) | |
: socket_(io_service) | |
{ | |
unp.reserve_buffer(); | |
} | |
void handle_write(const boost::system::error_code& /*error*/, | |
size_t /*bytes_transferred*/) | |
{ | |
} | |
void handle_read(const boost::system::error_code& ec) | |
{ | |
if (!ec) | |
{ | |
// https://stackoverflow.com/a/3203502/4288232 | |
std::istream is(&input_buffer_); | |
std::string line(std::istreambuf_iterator<char>(is), {}); | |
//Feed data into msgpack unpacker | |
if(line.size() > unp.buffer_capacity()) | |
unp.reserve_buffer(line.size()); | |
memcpy(unp.buffer(), line.data(), line.size()); | |
unp.buffer_consumed(line.size()); | |
//Check if any objects are complete | |
msgpack::object_handle result; | |
while(unp.next(result)) { | |
msgpack::object deserialized(result.get()); | |
dispatch(this->socket_, deserialized); | |
} | |
//Prepare for next read | |
boost::asio::async_read(socket_, input_buffer_, | |
boost::asio::transfer_at_least(1), | |
boost::bind(&tcp_connection::handle_read, shared_from_this(), | |
boost::asio::placeholders::error)); | |
} | |
else | |
{ | |
if(ec == boost::asio::error::eof) | |
std::cout << "Disconnected" << endl; | |
else | |
std::cout << "Error: " << ec << "\n"; | |
} | |
} | |
tcp::socket socket_; | |
boost::asio::streambuf input_buffer_; | |
class msgpack::unpacker unp; | |
}; | |
class tcp_server | |
{ | |
public: | |
tcp_server(boost::asio::io_service& io_service) | |
: acceptor_(io_service, tcp::endpoint(tcp::v4(), SERVER_PORT)) | |
{ | |
start_accept(); | |
} | |
private: | |
void start_accept() | |
{ | |
tcp_connection::pointer new_connection = | |
tcp_connection::create(acceptor_.get_io_service()); | |
acceptor_.async_accept(new_connection->socket(), | |
boost::bind(&tcp_server::handle_accept, this, new_connection, | |
boost::asio::placeholders::error)); | |
} | |
void handle_accept(tcp_connection::pointer new_connection, | |
const boost::system::error_code& error) | |
{ | |
if (!error) | |
{ | |
new_connection->start(); | |
} | |
start_accept(); | |
} | |
tcp::acceptor acceptor_; | |
}; | |
int main() | |
{ | |
try | |
{ | |
boost::asio::io_service io_service; | |
tcp_server server(io_service); | |
io_service.run(); | |
} | |
catch (std::exception& e) | |
{ | |
std::cerr << e.what() << std::endl; | |
return -1; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment