Skip to content

Instantly share code, notes, and snippets.

@testillano
Created June 3, 2022 20:02
Show Gist options
  • Save testillano/a66283408165ccb6eba844bd6fa6518b to your computer and use it in GitHub Desktop.
Save testillano/a66283408165ccb6eba844bd6fa6518b to your computer and use it in GitHub Desktop.
Extended https://gist.github.com/tatsuhiro-t/ba3f7d72d037027ae47b, to manage incoming request data
#include <iostream>
#include <string>
#include <mutex>
#include <thread>
#include <future>
#include <deque>
#include <memory>
#include <sstream>
#include <nghttp2/asio_http2_server.h>
using namespace nghttp2::asio_http2;
using namespace nghttp2::asio_http2::server;
struct Stream : public std::enable_shared_from_this<Stream> {
Stream(const request &req, const response &res,
boost::asio::io_service &io_service,
std::shared_ptr<std::stringstream> requestBody)
: io_service(io_service), req(req), res(res), closed(false), request_body(requestBody) {}
void commit_result() {
auto self = shared_from_this();
std::string responseBody = request_body->str(); // mirror body as example
auto ss = std::make_shared<std::string>(std::move(responseBody));
io_service.post([self, ss]() {
std::lock_guard<std::mutex> lg(self->mu);
if (self->closed) {
return;
}
self->res.write_head(200);
self->res.end(std::move(*ss));
});
}
void set_closed(bool f) {
std::lock_guard<std::mutex> lg(mu);
closed = f;
}
boost::asio::io_service &io_service;
std::mutex mu;
const request &req;
const response &res;
bool closed;
std::shared_ptr<std::stringstream> request_body;
};
struct Queue {
void push(std::shared_ptr<Stream> st) {
std::lock_guard<std::mutex> lg(mu);
q.push_back(st);
cv.notify_all();
}
std::shared_ptr<Stream> pop() {
std::unique_lock<std::mutex> ulk(mu);
cv.wait(ulk, [this]() { return !q.empty(); });
auto res = q.front();
q.pop_front();
return res;
}
std::mutex mu;
std::condition_variable cv;
std::deque<std::shared_ptr<Stream>> q;
};
int main(int argc, char **argv) {
http2 server;
server.num_threads(2);
Queue q;
for (int i = 0; i < 10; ++i) {
auto th = std::thread([&q]() {
for (;;) {
auto st = q.pop();
//sleep(1);
st->commit_result();
}
});
th.detach();
}
server.handle("/", [&q](const request &req, const response &res) {
auto &io_service = res.io_service();
auto requestBody = std::make_shared<std::stringstream>();
auto st = std::make_shared<Stream>(req, res, io_service, requestBody);
req.on_data([&q, requestBody, st, &req, &res, &io_service](const uint8_t* data, std::size_t len)
{
if (len > 0)
{
std::copy(data, data + len, std::ostream_iterator<uint8_t>(*requestBody));
return;
}
q.push(st);
});
res.on_close([st](uint32_t error_code) { st->set_closed(true); });
//q.push(st);
});
boost::system::error_code ec;
if (server.listen_and_serve(ec, "127.0.0.1", "3000")) {
std::cerr << "error: " << ec.message() << std::endl;
}
}
@testillano
Copy link
Author

testillano commented Jun 3, 2022

There is a crashdump with high load, when interrupting the client connection:
https://gist.github.com/tatsuhiro-t/ba3f7d72d037027ae47b?permalink_comment_id=4188856#gistcomment-4188856

The difference regarding original gist from tatsuhiro, is the use of on_data to receive data (lines 90-99).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment