Created
March 20, 2019 04:34
-
-
Save dotnwat/a12394af830c3ccab3d319852256287b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <sstream> | |
#include <seastar/core/app-template.hh> | |
#include <seastar/core/sharded.hh> | |
#include "kvstore.smf.fb.h" | |
namespace po = boost::program_options; | |
int main(int args, char **argv) | |
{ | |
seastar::app_template app; | |
app.add_options() | |
("ip", po::value<std::string>()->default_value("127.0.0.1"), "server address") | |
("port", po::value<uint16_t>()->default_value(20776), "server port") | |
; | |
return app.run(args, argv, [&app] { | |
auto&& cfg = app.configuration(); | |
auto ip = cfg["ip"].as<std::string>().c_str(); | |
auto port = cfg["port"].as<uint16_t>(); | |
smf::rpc_client_opts opts{}; | |
opts.server_addr = seastar::ipv4_addr{ip, port}; | |
auto client = seastar::make_shared< | |
kvstore::fbs::MemoryNodeClient>(std::move(opts)); | |
seastar::engine().at_exit([client] { return client->stop(); }); | |
return client->connect().then([client] { | |
boost::counting_iterator<int> from(0); | |
boost::counting_iterator<int> to(10); | |
return seastar::do_for_each(from, to, [client](int i) mutable { | |
seastar::sstring key = fmt::format("key.{}", i); | |
seastar::sstring val = fmt::format("val.{}", i); | |
smf::rpc_typed_envelope<kvstore::fbs::PutRequest> req; | |
req.data->key = std::move(key); | |
req.data->value = std::move(val); | |
return client->Put(std::move(req)).then([](auto res) { | |
if (res) { | |
LOG_INFO("recv core={} status={}", | |
seastar::engine().cpu_id(), | |
res.ctx->status()); | |
} | |
return seastar::make_ready_future<>(); | |
}); | |
}); | |
}).then([client] { | |
return client->stop(); | |
}).then([] { | |
return seastar::make_ready_future<int>(0); | |
}); | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <seastar/core/app-template.hh> | |
#include <seastar/core/sharded.hh> | |
#include <smf/rpc_server.h> | |
#include "kvstore.smf.fb.h" | |
class kvstore_db { | |
public: | |
seastar::future<> put(seastar::sstring key, seastar::sstring value) { | |
LOG_INFO("handling put core={} key={} value={}", | |
seastar::engine().cpu_id(), key, value); | |
store_.insert(std::make_pair(std::move(key), std::move(value))); | |
return seastar::make_ready_future<>(); | |
} | |
seastar::future<> stop() { | |
for (auto kv : store_) { | |
LOG_INFO("{}/{}: {} {}", &store_, | |
seastar::engine().cpu_id(), | |
kv.first, kv.second); | |
} | |
return seastar::make_ready_future<>(); | |
} | |
private: | |
std::unordered_map<seastar::sstring, seastar::sstring> store_; | |
}; | |
class kvstore_service final : public kvstore::fbs::MemoryNode { | |
virtual seastar::future<smf::rpc_typed_envelope<kvstore::fbs::PutResponse>> Put( | |
smf::rpc_recv_typed_context<kvstore::fbs::PutRequest>&& req) final; | |
public: | |
kvstore_service(seastar::sharded<kvstore_db>& store) : | |
store_(store) | |
{} | |
private: | |
seastar::sharded<kvstore_db>& store_; | |
}; | |
seastar::future<smf::rpc_typed_envelope<kvstore::fbs::PutResponse>> | |
kvstore_service::Put(smf::rpc_recv_typed_context<kvstore::fbs::PutRequest>&& req) | |
{ | |
if (req) { | |
LOG_INFO("recv core={} key={} value={}", seastar::engine().cpu_id(), | |
req->key()->str(), req->value()->str()); | |
seastar::sstring key = req->key()->str(); | |
auto hash = std::hash<seastar::sstring>{}(req->key()->str()); | |
auto cpu_id = hash % seastar::smp::count; | |
store_.invoke_on(cpu_id, &kvstore_db::put, | |
std::move(key), req->value()->str()); | |
} | |
smf::rpc_typed_envelope<kvstore::fbs::PutResponse> data; | |
data.envelope.set_status(200); | |
return seastar::make_ready_future< | |
smf::rpc_typed_envelope<kvstore::fbs::PutResponse>>(std::move(data)); | |
} | |
int main(int args, char **argv, char **env) | |
{ | |
seastar::app_template app; | |
{ | |
namespace po = boost::program_options; | |
app.add_options() | |
("ip", po::value<std::string>()->default_value("127.0.0.1"), "bind to") | |
("port", po::value<uint16_t>()->default_value(20776), "listen on") | |
; | |
} | |
seastar::sharded<kvstore_db> store; | |
seastar::sharded<smf::rpc_server> rpc; | |
return app.run_deprecated(args, argv, [&app, &store, &rpc] { | |
seastar::engine().at_exit([&] { return rpc.stop(); }); | |
seastar::engine().at_exit([&] { return store.stop(); }); | |
auto&& cfg = app.configuration(); | |
smf::rpc_server_args args; | |
args.ip = cfg["ip"].as<std::string>().c_str(); | |
args.rpc_port = cfg["port"].as<uint16_t>(); | |
return rpc.start(args).then([&store] { | |
return store.start(); | |
}).then([&rpc, &store] { | |
return rpc.invoke_on_all([&store](smf::rpc_server& server) { | |
server.register_service<kvstore_service>(store); | |
}); | |
}).then([&rpc] { | |
return rpc.invoke_on_all(&smf::rpc_server::start); | |
}); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://gist.github.com/noahdesu/a12394af830c3ccab3d319852256287b#file-client-cc-L20 - should be
seastar::sstring
https://gist.github.com/noahdesu/a12394af830c3ccab3d319852256287b#file-server-cc-L52 - is great!