Skip to content

Instantly share code, notes, and snippets.

@dotnwat
Created March 20, 2019 04:34
Show Gist options
  • Save dotnwat/a12394af830c3ccab3d319852256287b to your computer and use it in GitHub Desktop.
Save dotnwat/a12394af830c3ccab3d319852256287b to your computer and use it in GitHub Desktop.
#include <iostream>
#include <sstream>
#include <seastar/core/app-template.hh>
#include <seastar/core/sharded.hh>
#include "kvstore.smf.fb.h"
namespace po = boost::program_options;
int main(int args, char **argv)
{
seastar::app_template app;
app.add_options()
("ip", po::value<std::string>()->default_value("127.0.0.1"), "server address")
("port", po::value<uint16_t>()->default_value(20776), "server port")
;
return app.run(args, argv, [&app] {
auto&& cfg = app.configuration();
auto ip = cfg["ip"].as<std::string>().c_str();
auto port = cfg["port"].as<uint16_t>();
smf::rpc_client_opts opts{};
opts.server_addr = seastar::ipv4_addr{ip, port};
auto client = seastar::make_shared<
kvstore::fbs::MemoryNodeClient>(std::move(opts));
seastar::engine().at_exit([client] { return client->stop(); });
return client->connect().then([client] {
boost::counting_iterator<int> from(0);
boost::counting_iterator<int> to(10);
return seastar::do_for_each(from, to, [client](int i) mutable {
seastar::sstring key = fmt::format("key.{}", i);
seastar::sstring val = fmt::format("val.{}", i);
smf::rpc_typed_envelope<kvstore::fbs::PutRequest> req;
req.data->key = std::move(key);
req.data->value = std::move(val);
return client->Put(std::move(req)).then([](auto res) {
if (res) {
LOG_INFO("recv core={} status={}",
seastar::engine().cpu_id(),
res.ctx->status());
}
return seastar::make_ready_future<>();
});
});
}).then([client] {
return client->stop();
}).then([] {
return seastar::make_ready_future<int>(0);
});
});
}
#include <seastar/core/app-template.hh>
#include <seastar/core/sharded.hh>
#include <smf/rpc_server.h>
#include "kvstore.smf.fb.h"
class kvstore_db {
public:
seastar::future<> put(seastar::sstring key, seastar::sstring value) {
LOG_INFO("handling put core={} key={} value={}",
seastar::engine().cpu_id(), key, value);
store_.insert(std::make_pair(std::move(key), std::move(value)));
return seastar::make_ready_future<>();
}
seastar::future<> stop() {
for (auto kv : store_) {
LOG_INFO("{}/{}: {} {}", &store_,
seastar::engine().cpu_id(),
kv.first, kv.second);
}
return seastar::make_ready_future<>();
}
private:
std::unordered_map<seastar::sstring, seastar::sstring> store_;
};
class kvstore_service final : public kvstore::fbs::MemoryNode {
virtual seastar::future<smf::rpc_typed_envelope<kvstore::fbs::PutResponse>> Put(
smf::rpc_recv_typed_context<kvstore::fbs::PutRequest>&& req) final;
public:
kvstore_service(seastar::sharded<kvstore_db>& store) :
store_(store)
{}
private:
seastar::sharded<kvstore_db>& store_;
};
seastar::future<smf::rpc_typed_envelope<kvstore::fbs::PutResponse>>
kvstore_service::Put(smf::rpc_recv_typed_context<kvstore::fbs::PutRequest>&& req)
{
if (req) {
LOG_INFO("recv core={} key={} value={}", seastar::engine().cpu_id(),
req->key()->str(), req->value()->str());
seastar::sstring key = req->key()->str();
auto hash = std::hash<seastar::sstring>{}(req->key()->str());
auto cpu_id = hash % seastar::smp::count;
store_.invoke_on(cpu_id, &kvstore_db::put,
std::move(key), req->value()->str());
}
smf::rpc_typed_envelope<kvstore::fbs::PutResponse> data;
data.envelope.set_status(200);
return seastar::make_ready_future<
smf::rpc_typed_envelope<kvstore::fbs::PutResponse>>(std::move(data));
}
int main(int args, char **argv, char **env)
{
seastar::app_template app;
{
namespace po = boost::program_options;
app.add_options()
("ip", po::value<std::string>()->default_value("127.0.0.1"), "bind to")
("port", po::value<uint16_t>()->default_value(20776), "listen on")
;
}
seastar::sharded<kvstore_db> store;
seastar::sharded<smf::rpc_server> rpc;
return app.run_deprecated(args, argv, [&app, &store, &rpc] {
seastar::engine().at_exit([&] { return rpc.stop(); });
seastar::engine().at_exit([&] { return store.stop(); });
auto&& cfg = app.configuration();
smf::rpc_server_args args;
args.ip = cfg["ip"].as<std::string>().c_str();
args.rpc_port = cfg["port"].as<uint16_t>();
return rpc.start(args).then([&store] {
return store.start();
}).then([&rpc, &store] {
return rpc.invoke_on_all([&store](smf::rpc_server& server) {
server.register_service<kvstore_service>(store);
});
}).then([&rpc] {
return rpc.invoke_on_all(&smf::rpc_server::start);
});
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment