Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jerrymarino/fd4e4bfb0f8bae97e5c465fe8bbea56b to your computer and use it in GitHub Desktop.

Select an option

Save jerrymarino/fd4e4bfb0f8bae97e5c465fe8bbea56b to your computer and use it in GitHub Desktop.
Example of beast http with "long running" operations and Grand Central Dispatch
/**
* Beast HTTP server example with long running operations and lib dispatch.
*
* Note: Assume that we are calling dispatch_main() after running the server
*/
#include "file_body.hpp"
#include "mime_type.hpp"
#include <beast/http.hpp>
#include <beast/core/handler_helpers.hpp>
#include <beast/core/handler_ptr.hpp>
#include <beast/core/placeholders.hpp>
#include <beast/core/streambuf.hpp>
#include <boost/asio.hpp>
#include <cstddef>
#include <cstdio>
#include <dispatch/dispatch.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <sstream>
#include <map>
struct service_context {
std::string secret;
};
namespace beast {
namespace http {
using socket_type = boost::asio::ip::tcp::socket;
using req_type = request<string_body>;
using resp_type = response<file_body>;
/**
* Session is an instance of an HTTP session.
*
* The server will allocate a new instance for each accepted
* request.
*/
class session : public std::enable_shared_from_this<session>
{
int _instance_id;
streambuf _streambuf;
socket_type _socket;
service_context _context;
boost::asio::io_service::strand _strand;
req_type _request;
public:
session(session&&) = default;
session(session const&) = default;
session& operator=(session&&) = delete;
session& operator=(session const&) = delete;
session(socket_type&& sock, service_context ctx)
: _socket(std::move(sock))
, _context(ctx)
, _strand(_socket.get_io_service())
{
static int n = 0;
_instance_id = ++n;
std::cout << "Secret: ";
std::cout << _context.secret;
std::cout << " Id: ";
std::cout << _instance_id;
std::cout << "\n";
}
void
fail(error_code ec, std::string what)
{
//TODO:
}
void start()
{
do_read();
}
std::shared_ptr<session>
detach()
{
return shared_from_this();
}
void do_read()
{
async_read(_socket, _streambuf, _request, _strand.wrap(
std::bind(&session::on_read, shared_from_this(),
asio::placeholders::error)));
}
void on_read(error_code const& ec)
{
if(ec)
return fail(ec, "read");
auto path = _request.url;
// Example of long running response handler.
// Assume we have a dispatch main queue running.
//
// - Detach and retain - it is necessary to keep this alive.
// - Quickly return to prevent from blocking acceptor loop.
// - Perform a long running task.
// - Schedule write for the response body ( should be on some other thread than main in practice ).
auto detached_session = detach();
std::cout << "Do Read: "<< _instance_id << "\n";
std::cout.flush();
// 10 second wait.
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, 10 * NSEC_PER_SEC), dispatch_get_main_queue(), ^{
std::cout << "Enter main: " << detached_session->_instance_id << "\n";
std::cout << detached_session->_request.url;
std::cout.flush();
response<string_body> res;
res.status = 200;
res.version = _request.version;
res.fields.insert("Server", "http_async_server");
res.fields.insert("Content-Type", "text/html");
res.body = "Hello World";
prepare(res);
async_write(_socket, std::move(res),
std::bind(&session::on_write, detached_session,
asio::placeholders::error));
});
}
void on_write(error_code ec)
{
if(ec)
fail(ec, "write");
do_read();
}
};
class http_async_server
{
using endpoint_type = boost::asio::ip::tcp::endpoint;
using address_type = boost::asio::ip::address;
using socket_type = boost::asio::ip::tcp::socket;
std::mutex _shared_mutex;
bool _logging_enabled = true;
boost::asio::io_service _io_service;
boost::asio::ip::tcp::acceptor _acceptor;
socket_type _socket;
std::string _root_path;
std::vector<std::thread> _thread;
public:
http_async_server(endpoint_type const& ep,
std::size_t threads, std::string const& root)
: _acceptor(_io_service)
, _socket(_io_service)
, _root_path(root)
{
_acceptor.open(ep.protocol());
_acceptor.bind(ep);
_acceptor.listen(
boost::asio::socket_base::max_connections);
_acceptor.async_accept(_socket,
std::bind(&http_async_server::on_accept, this,
beast::asio::placeholders::error));
_thread.reserve(threads);
for(std::size_t i = 0; i < threads; ++i)
_thread.emplace_back(
[&] { _io_service.run(); });
}
~http_async_server()
{
error_code ec;
_io_service.dispatch(
[&]{ _acceptor.close(ec); });
for(auto& t : _thread)
t.join();
}
template<class... Args>
void
log(Args const&... args)
{
if(_logging_enabled)
{
std::lock_guard<std::mutex> lock(_shared_mutex);
log_args(args...);
}
}
private:
template<class Stream, class Handler,
bool isRequest, class Body, class Fields>
class write_op
{
struct data
{
bool cont;
Stream& s;
message<isRequest, Body, Fields> m;
data(Handler& handler, Stream& s_,
message<isRequest, Body, Fields>&& _shared_mutex)
: cont(beast_asio_helpers::
is_continuation(handler))
, s(s_)
, m(std::move(_shared_mutex))
{
}
};
handler_ptr<data, Handler> d_;
public:
write_op(write_op&&) = default;
write_op(write_op const&) = default;
template<class DeducedHandler, class... Args>
write_op(DeducedHandler&& h, Stream& s, Args&&... args)
: d_(std::forward<DeducedHandler>(h),
s, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
}
void
operator()(error_code ec, bool again = true)
{
auto& d = *d_;
d.cont = d.cont || again;
if(! again)
{
beast::http::async_write(d.s, d.m, std::move(*this));
return;
}
d_.invoke(ec);
}
friend
void* asio_handler_allocate(
std::size_t size, write_op* op)
{
return beast_asio_helpers::
allocate(size, op->d_.handler());
}
friend
void asio_handler_deallocate(
void* p, std::size_t size, write_op* op)
{
return beast_asio_helpers::
deallocate(p, size, op->d_.handler());
}
friend
bool asio_handler_is_continuation(write_op* op)
{
return op->d_->cont;
}
template<class Function>
friend
void asio_handler_invoke(Function&& f, write_op* op)
{
return beast_asio_helpers::
invoke(f, op->d_.handler());
}
};
template<class Stream,
bool isRequest, class Body, class Fields,
class DeducedHandler>
static
void
async_write(Stream& stream, message<
isRequest, Body, Fields>&& msg,
DeducedHandler&& handler)
{
write_op<Stream, typename std::decay<DeducedHandler>::type,
isRequest, Body, Fields>{std::forward<DeducedHandler>(
handler), stream, std::move(msg)};
}
void log_args()
{
}
template<class Arg, class... Args>
void
log_args(Arg const& arg, Args const&... args)
{
std::cerr << arg;
log_args(args...);
}
void
fail(error_code ec, std::string what)
{
log(what, ": ", ec.message(), "\n");
}
void
on_accept(error_code ec)
{
if(!_acceptor.is_open())
return;
if(ec)
return fail(ec, "accept");
socket_type sock(std::move(_socket));
_acceptor.async_accept(_socket,
std::bind(&http_async_server::on_accept, this,
asio::placeholders::error));
service_context ctx;
ctx.secret = "Some Secret";
auto new_session = std::make_shared<session>(std::move(sock), ctx);
new_session->start();
}
};
} // http
} // beast
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment