Created
May 2, 2017 07:08
-
-
Save jerrymarino/fd4e4bfb0f8bae97e5c465fe8bbea56b to your computer and use it in GitHub Desktop.
Example of beast http with "long running" operations and Grand Central Dispatch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Beast HTTP server example with long running operations and lib dispatch. | |
| * | |
| * Note: Assume that we are calling dispatch_main() after running the server | |
| */ | |
| #include "file_body.hpp" | |
| #include "mime_type.hpp" | |
| #include <beast/http.hpp> | |
| #include <beast/core/handler_helpers.hpp> | |
| #include <beast/core/handler_ptr.hpp> | |
| #include <beast/core/placeholders.hpp> | |
| #include <beast/core/streambuf.hpp> | |
| #include <boost/asio.hpp> | |
| #include <cstddef> | |
| #include <cstdio> | |
| #include <dispatch/dispatch.h> | |
| #include <iostream> | |
| #include <memory> | |
| #include <mutex> | |
| #include <thread> | |
| #include <utility> | |
| #include <sstream> | |
| #include <map> | |
| struct service_context { | |
| std::string secret; | |
| }; | |
| namespace beast { | |
| namespace http { | |
| using socket_type = boost::asio::ip::tcp::socket; | |
| using req_type = request<string_body>; | |
| using resp_type = response<file_body>; | |
| /** | |
| * Session is an instance of an HTTP session. | |
| * | |
| * The server will allocate a new instance for each accepted | |
| * request. | |
| */ | |
| class session : public std::enable_shared_from_this<session> | |
| { | |
| int _instance_id; | |
| streambuf _streambuf; | |
| socket_type _socket; | |
| service_context _context; | |
| boost::asio::io_service::strand _strand; | |
| req_type _request; | |
| public: | |
| session(session&&) = default; | |
| session(session const&) = default; | |
| session& operator=(session&&) = delete; | |
| session& operator=(session const&) = delete; | |
| session(socket_type&& sock, service_context ctx) | |
| : _socket(std::move(sock)) | |
| , _context(ctx) | |
| , _strand(_socket.get_io_service()) | |
| { | |
| static int n = 0; | |
| _instance_id = ++n; | |
| std::cout << "Secret: "; | |
| std::cout << _context.secret; | |
| std::cout << " Id: "; | |
| std::cout << _instance_id; | |
| std::cout << "\n"; | |
| } | |
| void | |
| fail(error_code ec, std::string what) | |
| { | |
| //TODO: | |
| } | |
| void start() | |
| { | |
| do_read(); | |
| } | |
| std::shared_ptr<session> | |
| detach() | |
| { | |
| return shared_from_this(); | |
| } | |
| void do_read() | |
| { | |
| async_read(_socket, _streambuf, _request, _strand.wrap( | |
| std::bind(&session::on_read, shared_from_this(), | |
| asio::placeholders::error))); | |
| } | |
| void on_read(error_code const& ec) | |
| { | |
| if(ec) | |
| return fail(ec, "read"); | |
| auto path = _request.url; | |
| // Example of long running response handler. | |
| // Assume we have a dispatch main queue running. | |
| // | |
| // - Detach and retain - it is necessary to keep this alive. | |
| // - Quickly return to prevent from blocking acceptor loop. | |
| // - Perform a long running task. | |
| // - Schedule write for the response body ( should be on some other thread than main in practice ). | |
| auto detached_session = detach(); | |
| std::cout << "Do Read: "<< _instance_id << "\n"; | |
| std::cout.flush(); | |
| // 10 second wait. | |
| dispatch_after(dispatch_time(DISPATCH_TIME_NOW, 10 * NSEC_PER_SEC), dispatch_get_main_queue(), ^{ | |
| std::cout << "Enter main: " << detached_session->_instance_id << "\n"; | |
| std::cout << detached_session->_request.url; | |
| std::cout.flush(); | |
| response<string_body> res; | |
| res.status = 200; | |
| res.version = _request.version; | |
| res.fields.insert("Server", "http_async_server"); | |
| res.fields.insert("Content-Type", "text/html"); | |
| res.body = "Hello World"; | |
| prepare(res); | |
| async_write(_socket, std::move(res), | |
| std::bind(&session::on_write, detached_session, | |
| asio::placeholders::error)); | |
| }); | |
| } | |
| void on_write(error_code ec) | |
| { | |
| if(ec) | |
| fail(ec, "write"); | |
| do_read(); | |
| } | |
| }; | |
| class http_async_server | |
| { | |
| using endpoint_type = boost::asio::ip::tcp::endpoint; | |
| using address_type = boost::asio::ip::address; | |
| using socket_type = boost::asio::ip::tcp::socket; | |
| std::mutex _shared_mutex; | |
| bool _logging_enabled = true; | |
| boost::asio::io_service _io_service; | |
| boost::asio::ip::tcp::acceptor _acceptor; | |
| socket_type _socket; | |
| std::string _root_path; | |
| std::vector<std::thread> _thread; | |
| public: | |
| http_async_server(endpoint_type const& ep, | |
| std::size_t threads, std::string const& root) | |
| : _acceptor(_io_service) | |
| , _socket(_io_service) | |
| , _root_path(root) | |
| { | |
| _acceptor.open(ep.protocol()); | |
| _acceptor.bind(ep); | |
| _acceptor.listen( | |
| boost::asio::socket_base::max_connections); | |
| _acceptor.async_accept(_socket, | |
| std::bind(&http_async_server::on_accept, this, | |
| beast::asio::placeholders::error)); | |
| _thread.reserve(threads); | |
| for(std::size_t i = 0; i < threads; ++i) | |
| _thread.emplace_back( | |
| [&] { _io_service.run(); }); | |
| } | |
| ~http_async_server() | |
| { | |
| error_code ec; | |
| _io_service.dispatch( | |
| [&]{ _acceptor.close(ec); }); | |
| for(auto& t : _thread) | |
| t.join(); | |
| } | |
| template<class... Args> | |
| void | |
| log(Args const&... args) | |
| { | |
| if(_logging_enabled) | |
| { | |
| std::lock_guard<std::mutex> lock(_shared_mutex); | |
| log_args(args...); | |
| } | |
| } | |
| private: | |
| template<class Stream, class Handler, | |
| bool isRequest, class Body, class Fields> | |
| class write_op | |
| { | |
| struct data | |
| { | |
| bool cont; | |
| Stream& s; | |
| message<isRequest, Body, Fields> m; | |
| data(Handler& handler, Stream& s_, | |
| message<isRequest, Body, Fields>&& _shared_mutex) | |
| : cont(beast_asio_helpers:: | |
| is_continuation(handler)) | |
| , s(s_) | |
| , m(std::move(_shared_mutex)) | |
| { | |
| } | |
| }; | |
| handler_ptr<data, Handler> d_; | |
| public: | |
| write_op(write_op&&) = default; | |
| write_op(write_op const&) = default; | |
| template<class DeducedHandler, class... Args> | |
| write_op(DeducedHandler&& h, Stream& s, Args&&... args) | |
| : d_(std::forward<DeducedHandler>(h), | |
| s, std::forward<Args>(args)...) | |
| { | |
| (*this)(error_code{}, false); | |
| } | |
| void | |
| operator()(error_code ec, bool again = true) | |
| { | |
| auto& d = *d_; | |
| d.cont = d.cont || again; | |
| if(! again) | |
| { | |
| beast::http::async_write(d.s, d.m, std::move(*this)); | |
| return; | |
| } | |
| d_.invoke(ec); | |
| } | |
| friend | |
| void* asio_handler_allocate( | |
| std::size_t size, write_op* op) | |
| { | |
| return beast_asio_helpers:: | |
| allocate(size, op->d_.handler()); | |
| } | |
| friend | |
| void asio_handler_deallocate( | |
| void* p, std::size_t size, write_op* op) | |
| { | |
| return beast_asio_helpers:: | |
| deallocate(p, size, op->d_.handler()); | |
| } | |
| friend | |
| bool asio_handler_is_continuation(write_op* op) | |
| { | |
| return op->d_->cont; | |
| } | |
| template<class Function> | |
| friend | |
| void asio_handler_invoke(Function&& f, write_op* op) | |
| { | |
| return beast_asio_helpers:: | |
| invoke(f, op->d_.handler()); | |
| } | |
| }; | |
| template<class Stream, | |
| bool isRequest, class Body, class Fields, | |
| class DeducedHandler> | |
| static | |
| void | |
| async_write(Stream& stream, message< | |
| isRequest, Body, Fields>&& msg, | |
| DeducedHandler&& handler) | |
| { | |
| write_op<Stream, typename std::decay<DeducedHandler>::type, | |
| isRequest, Body, Fields>{std::forward<DeducedHandler>( | |
| handler), stream, std::move(msg)}; | |
| } | |
| void log_args() | |
| { | |
| } | |
| template<class Arg, class... Args> | |
| void | |
| log_args(Arg const& arg, Args const&... args) | |
| { | |
| std::cerr << arg; | |
| log_args(args...); | |
| } | |
| void | |
| fail(error_code ec, std::string what) | |
| { | |
| log(what, ": ", ec.message(), "\n"); | |
| } | |
| void | |
| on_accept(error_code ec) | |
| { | |
| if(!_acceptor.is_open()) | |
| return; | |
| if(ec) | |
| return fail(ec, "accept"); | |
| socket_type sock(std::move(_socket)); | |
| _acceptor.async_accept(_socket, | |
| std::bind(&http_async_server::on_accept, this, | |
| asio::placeholders::error)); | |
| service_context ctx; | |
| ctx.secret = "Some Secret"; | |
| auto new_session = std::make_shared<session>(std::move(sock), ctx); | |
| new_session->start(); | |
| } | |
| }; | |
| } // http | |
| } // beast |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment