Last active
June 18, 2025 04:45
-
-
Save Jackarain/3cdc0311ea98fec186142238ad9fe9c8 to your computer and use it in GitHub Desktop.
Json rpc 实现
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// jsonrpc.hpp | |
// ~~~~~~~~~~~ | |
// | |
// Copyright (c) 2023 Jack (jack dot wgm at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
#ifndef INCLUDE__2023_10_18__JSONRPC_HPP | |
#define INCLUDE__2023_10_18__JSONRPC_HPP | |
#include <atomic> | |
#include <functional> | |
#include <memory> | |
#include <utility> | |
#include <mutex> | |
#include <vector> | |
#include <deque> | |
#include <type_traits> | |
#include <unordered_map> | |
#include <boost/system/error_code.hpp> | |
#include <boost/asio/co_spawn.hpp> | |
#include <boost/asio/awaitable.hpp> | |
#include <boost/asio/use_awaitable.hpp> | |
#include <boost/beast/core.hpp> | |
#include <boost/beast/websocket.hpp> | |
#include <boost/beast/websocket/stream.hpp> | |
#include <boost/json/value.hpp> | |
#include <boost/json/object.hpp> | |
#include <boost/json/parse.hpp> | |
#include <boost/json/serialize.hpp> | |
namespace jsonrpc | |
{ | |
namespace beast = boost::beast; | |
namespace net = boost::asio; | |
namespace json = boost::json; | |
namespace detail | |
{ | |
// 这里可以放一些私有的辅助函数或类型定义 | |
class rpc_operation | |
{ | |
public: | |
virtual ~rpc_operation() = default; | |
virtual void operator()(const boost::system::error_code &) = 0; | |
virtual json::object &result() = 0; | |
}; | |
template <class Handler, class ExecutorType> | |
class rpc_call_op : public rpc_operation | |
{ | |
public: | |
rpc_call_op(Handler &&h, ExecutorType executor) | |
: handler_(std::forward<Handler>(h)) | |
, executor_(executor) | |
{ | |
} | |
rpc_call_op(const rpc_call_op &other) | |
: handler_(std::forward<Handler>(other.handler_)) | |
, executor_(other.executor_) | |
, data_(other.data_) | |
{ | |
} | |
rpc_call_op(rpc_call_op &&other) noexcept | |
: handler_(std::forward<Handler>(other.handler_)) | |
, executor_(other.executor_) | |
, data_(other.data_) | |
{ | |
} | |
void operator()(const boost::system::error_code &ec) override | |
{ | |
// 使用 net::dispatch 将结果发送到指定的执行器上 | |
// 这样可以确保在正确的线程或上下文中调用处理程序 | |
net::dispatch( | |
executor_, | |
[handler = std::move(handler_), data = std::move(data_), ec]() mutable | |
{ | |
handler(ec, data); | |
}); | |
} | |
json::object &result() override | |
{ | |
return data_; | |
} | |
private: | |
Handler handler_; | |
ExecutorType executor_; | |
json::object data_; | |
}; | |
using call_op_ptr = std::unique_ptr<rpc_operation>; | |
} | |
template <class StreamType> | |
class jsonrpc_session | |
{ | |
// c++11 noncopyable. | |
jsonrpc_session(const jsonrpc_session &) = delete; | |
jsonrpc_session &operator=(const jsonrpc_session &) = delete; | |
public: | |
using stream_type = StreamType; | |
using next_layer_type = std::remove_reference_t<stream_type>; | |
using executor_type = next_layer_type::executor_type; | |
using call_op_ptr = detail::call_op_ptr; | |
using write_context = std::unique_ptr<std::string>; | |
using write_message_queue = std::deque<write_context>; | |
friend class initiate_async_call; | |
////////////////////////////////////////////////////////////////////////// | |
// 构造函数, 接受一个 WebSocket 对象. | |
jsonrpc_session(stream_type ws) | |
: stream_(std::move(ws)) | |
{ | |
} | |
jsonrpc_session(jsonrpc_session &&rhs) noexcept | |
: stream_(std::move(rhs.stream_)) | |
{ | |
} | |
jsonrpc_session &operator=(jsonrpc_session &&rhs) noexcept | |
{ | |
if (this != &rhs) | |
stream_ = std::move(rhs.stream_); | |
return *this; | |
} | |
~jsonrpc_session() noexcept | |
{ | |
// 确保在析构时停止服务. | |
if (running_) | |
stop(); | |
} | |
////////////////////////////////////////////////////////////////////////// | |
// 启动服务, 开始接收 WebSocket 消息, 如果服务已经在运行, 则什么都不做. | |
void start() | |
{ | |
if (running_) | |
{ | |
BOOST_ASSERT(false && "already running"); | |
return; | |
} | |
running_ = true; | |
net::co_spawn(stream_.get_executor(), [this]() mutable -> net::awaitable<void> | |
{ | |
co_await run(); | |
co_return; | |
}, net::detached); | |
} | |
// 停止服务, 关闭 WebSocket 连接, 如果服务没有运行, 则什么都不做. | |
// 注意: 调用此函数后, 不能再调用 start() 启动服务, 如果需要重新 | |
// 启动服务, 请创建一个新的 jsonrpc_session 实例并调用 start() | |
// 方法. | |
void stop() | |
{ | |
if (!running_) | |
{ | |
BOOST_ASSERT(false && "not running"); | |
return; | |
} | |
running_ = false; | |
boost::system::error_code ec; | |
stream_.close(beast::websocket::close_code::normal, ec); | |
} | |
////////////////////////////////////////////////////////////////////////// | |
class initiate_async_call | |
{ | |
public: | |
using executor_type = jsonrpc_session::executor_type; | |
explicit initiate_async_call(jsonrpc_session* self) | |
: self_(self) | |
{ | |
} | |
executor_type get_executor() const noexcept | |
{ | |
return self_->get_executor(); | |
} | |
template <typename CallHandler> | |
void operator()(CallHandler&& handler, | |
const std::string& method, const json::value& params) const | |
{ | |
auto executor = net::get_associated_executor(handler); | |
using handler_executor_type = std::decay_t<decltype(executor)>; | |
using rpc_call_op_type = detail::rpc_call_op<CallHandler, handler_executor_type>; | |
auto op = std::make_unique<rpc_call_op_type>( | |
std::forward<CallHandler>(handler), executor); | |
json::object data; | |
data["jsonrpc"] = "2.0"; | |
data["method"] = method; | |
data["params"] = params; | |
if (self_->id_recycle_.empty()) | |
{ | |
auto session_id = static_cast<int>(self_->call_ops_.size()); | |
data["id"] = session_id; | |
self_->call_ops_.emplace_back(std::move(op)); | |
} | |
else | |
{ | |
auto session_id = self_->id_recycle_.back(); | |
self_->id_recycle_.pop_back(); | |
data["id"] = session_id; | |
self_->call_ops_[session_id] = std::move(op); | |
} | |
// 发送 JSON 请求数据 | |
auto context = std::make_unique<std::string>(json::serialize(data)); | |
self_->write_message(std::move(context)); | |
} | |
private: | |
jsonrpc_session* self_; | |
}; | |
// 异步发送 JSONRPC 请求, 返回一个 JSON 对象作为响应. | |
// 参数 params 代表要发送的请求数据的 JSONRPC 的 params 字段, | |
// 该参数可以是一个 JSON 对象或数组, 须满足 JSONRPC 规范的要求. | |
// method 参数代表要调用的远程方法名. | |
template<BOOST_ASIO_COMPLETION_TOKEN_FOR( | |
void(boost::system::error_code, json::value)) | |
CallToken = net::default_completion_token_t<executor_type>> | |
auto async_call(const std::string& method, const json::value& params, | |
CallToken&& token = net::default_completion_token_t<executor_type>()) -> | |
decltype( | |
net::async_initiate<CallToken, | |
void(boost::system::error_code, json::value)>( | |
std::declval<initiate_async_call>(), token, method, params)) | |
{ | |
return net::async_initiate<CallToken, | |
void(boost::system::error_code, json::value)>( | |
initiate_async_call(this), token, method, params); | |
} | |
// 回复 JSONRPC 请求, 该函数接受一个 JSON 对象作为参数代表响应数据, | |
// 以及一个字符串 id 代表请求的 ID. 如果 error 参数为 true, 则表示 | |
// 这是一个错误 error 响应, 否则表示正常 result 响应. | |
void reply(json::object response, const std::string& id, bool error = false) | |
{ | |
json::object data; | |
data["jsonrpc"] = "2.0"; | |
data["id"] = id; | |
if (error) | |
data["error"] = response; | |
else | |
data["result"] = response; | |
// 将响应数据序列化为 JSON 字符串并发送 | |
auto context = std::make_unique<std::string>( | |
json::serialize(response, json::storage_ptr{})); | |
write_message(std::move(context)); | |
} | |
// 当接收到对应的 JSON-RPC 方法调用时会调用该函数, 方法名是一个 | |
// 字符串, 代表远程方法的名称, handler 是一个函数对象, | |
// 接受一个 json::object 作为参数, 代表接收到的请求消息. | |
void bind_method( | |
std::string_view method_name, | |
std::function<void(json::object)> handler) | |
{ | |
if (method_name.empty() || !handler) | |
{ | |
BOOST_ASSERT(false && "method name or handler is invalid"); | |
return; | |
} | |
remote_methods_[std::string(method_name)] = std::move(handler); | |
} | |
// 设置请求回调函数, 当接收到请求消息时会调用该函数. | |
// 请求消息在 JSONRPC 中是指包含 id 字段的 json 对象. | |
// 回调函数的参数是一个 json::object, 代表接收到的请求消息. | |
// 如果传入的回调函数为空, 则清除之前设置的回调函数. | |
void default_method_callback(std::function<void(json::object)> cb) | |
{ | |
method_cb_ = cb; | |
} | |
// 清除请求回调函数, 之后接收到请求消息时不会调用任何函数. | |
void default_method_callback() | |
{ | |
method_cb_ = {}; | |
} | |
// 设置通知回调函数, 当接收到通知消息时会调用该函数. | |
// 通知消息在 JSONRPC 中是指没有 id 字段的 json 对象. | |
// 如果传入的回调函数为空, 则清除之前设置的回调函数. | |
// 回调函数的参数是一个 json::object, 代表接收到的通知消息. | |
void notify_callback(std::function<void(json::object)> cb) | |
{ | |
notify_cb_ = cb; | |
} | |
// 清除通知回调函数, 之后接收到通知消息时不会调用任何函数. | |
void notify_callback() | |
{ | |
notify_cb_ = {}; | |
} | |
// 设置错误回调函数, 当接收到无法执行 JSON 解析的错误消息时会调用该函数. | |
// 回调函数的参数是接收到的消息数据. | |
void error_callback(std::function<void(std::string_view)> cb) | |
{ | |
error_cb_ = cb; | |
} | |
// 清除错误回调函数, 之后接收到错误消息时不会调用任何函数. | |
void error_callback() | |
{ | |
error_cb_ = {}; | |
} | |
// 获取当前 jsonrpc_session 的执行器, 该执行器可以用于在协程中调度任务. | |
net::any_io_executor get_executor() const noexcept | |
{ | |
return stream_.get_executor(); | |
} | |
////////////////////////////////////////////////////////////////////////// | |
private: | |
// 运行服务的协程, 负责接收 WebSocket 消息并解析为 JSON 对象, 然 | |
// 后通过创建一个新的协程来处理接收到的 JSON 对象. | |
net::awaitable<void> run() | |
{ | |
try | |
{ | |
boost::system::error_code ec; | |
beast::flat_buffer buf; | |
auto executor = co_await net::this_coro::executor; | |
while (running_) | |
{ | |
auto bytes = co_await stream_.async_read(buf, net::use_awaitable); | |
json::value jv = json::parse( | |
beast::buffers_range(buf), | |
ec, | |
json::storage_ptr{}, | |
{64, json::number_precision::imprecise, true, true, true}); | |
if (ec) | |
{ | |
// 解析失败, 可能是因为接收到的消息不是有效的 JSON, 忽略该消息 | |
// 并继续等待下一个消息. | |
if (error_cb_) | |
error_cb_(std::string_view(buf.data().data(), buf.size())); | |
else | |
BOOST_ASSERT(false && "parse json failed"); | |
buf.consume(bytes); | |
continue; | |
} | |
buf.consume(bytes); | |
auto obj = jv.as_object(); | |
net::co_spawn(executor, [obj = std::move(obj)]() mutable -> net::awaitable<void> | |
{ | |
co_await dispath(std::move(obj)); | |
co_return; | |
}, net::detached); | |
} | |
} | |
catch (const std::exception &) | |
{ | |
// 捕获异常并调用错误回调函数 | |
if (error_cb_) | |
error_cb_("exception occurred while running jsonrpc session"); | |
else | |
BOOST_ASSERT(false && "exception occurred while running jsonrpc session"); | |
} | |
} | |
net::awaitable<void> dispath(json::object obj) | |
{ | |
auto try_id = obj.try_at("id"); | |
if (!try_id.has_value()) | |
{ | |
// 这是一个通知消息,回调通知处理函数 | |
if (notify_cb_) | |
notify_cb_(std::move(obj)); | |
co_return; | |
} | |
// 这是一个请求或响应消息,检查 id 字段 | |
auto id = try_id->value(); | |
if (!id.is_string() && !id.is_number()) | |
{ | |
// id 字段不是字符串或数字,忽略该消息 | |
BOOST_ASSERT(false && "id must be string or number"); | |
co_return; | |
} | |
if (obj.if_contains("result") || obj.if_contains("error")) | |
{ | |
// 包含 result 或 error 的 json 对象说明当前是作为调用者身份 | |
// 向远端发起 RPC 请求的回应. | |
if (call_ops_.empty()) | |
{ | |
// 如果没有正在进行的调用操作,忽略该消息 | |
BOOST_ASSERT(false && "no call operation in progress"); | |
co_return; | |
} | |
int session_id = -1; | |
if (id.is_string()) | |
{ | |
// 尝试将字符串 id 转换为数字 | |
try | |
{ | |
session_id = std::stoi(id.as_string()); | |
} | |
catch (const std::exception &) | |
{ | |
// 转换失败,忽略该消息 | |
if (error_cb_) | |
error_cb_("invalid id format"); | |
else | |
BOOST_ASSERT(false && "invalid id format"); | |
co_return; | |
} | |
} | |
co_await handle_call(std::move(obj), session_id); | |
co_return; | |
} | |
else if (obj.if_contains("method")) | |
{ | |
if (!obj["method"].is_string()) | |
{ | |
// method 字段不是字符串,忽略该消息 | |
if (error_cb_) | |
error_cb_("method must be string"); | |
else | |
BOOST_ASSERT(false && "method must be string"); | |
co_return; | |
} | |
// 包含 method 字段的 json 对象说明当前是作为服务端身份 | |
std::string method = obj["method"].as_string(); | |
co_await handle_method(std::move(obj), method); | |
// 处理方法调用消息 | |
co_return; | |
} | |
else | |
{ | |
// 既不是请求也不是响应,忽略该消息 | |
BOOST_ASSERT(false && "not a request or response"); | |
} | |
co_return; | |
} | |
net::awaitable<void> handle_call(json::object obj, int session_id) | |
{ | |
// 查找是否有对应的调用操作 | |
std::lock_guard<std::mutex> lock(call_op_mutex_); | |
if (session_id < 0 || session_id >= static_cast<int>(call_ops_.size())) | |
{ | |
// id 不在有效范围内,忽略该消息 | |
if (error_cb_) | |
error_cb_("invalid session id"); | |
else | |
BOOST_ASSERT(false && "invalid session id"); | |
co_return; | |
} | |
// 获取对应的调用操作 | |
call_op_ptr handler = std::move(call_ops_[session_id]); | |
// 回收 RPC 调用操作的 id | |
id_recycle_.push_back(session_id); | |
BOOST_ASSERT(handler && "call op is nullptr!"); // for debug, call_ops_[session_id].reset(); // 清除对应的调用操作 | |
if (handler) | |
{ | |
// 调用操作存在,执行它 | |
handler->result() = std::move(obj); | |
(*handler)(boost::system::error_code{}); | |
} | |
else | |
{ | |
// 没有找到对应的调用操作,忽略该消息 | |
if (error_cb_) | |
error_cb_("no call operation found for id"); | |
else | |
BOOST_ASSERT(false && "no call operation found for id"); | |
} | |
co_return; | |
} | |
net::awaitable<void> handle_method(json::object obj, std::string_view method_name) | |
{ | |
// 检查是否有对应的远程方法 | |
auto it = remote_methods_.find(method_name); | |
if (it != remote_methods_.end()) | |
{ | |
// 找到对应的远程方法,调用它 | |
it->second(std::move(obj)); | |
} | |
else if (method_cb_) | |
{ | |
// 如果没有找到对应的远程方法,调用默认的 method 回调函数 | |
method_cb_(std::move(obj)); | |
} | |
else | |
{ | |
// 没有设置 method 回调函数,忽略该消息 | |
BOOST_ASSERT(false && "no method callback set"); | |
} | |
co_return; | |
} | |
// 异步写入消息到 WebSocket, 该函数接受一个 write_context, 该上下文包含要写入的消息数据. | |
// 如果当前没有正在进行的写入操作, 则直接发送消息, 否则将消息添加到写入队列中. | |
// 注意: 该函数是线程安全的, 可以在任何线程中调用. | |
void write_message(write_context context) | |
{ | |
if (!context || context->empty()) | |
{ | |
BOOST_ASSERT(false && "context is empty"); | |
return; | |
} | |
net::dispatch(stream_.get_executor(), | |
[this, context = std::move(context)]() mutable | |
{ | |
bool write_in_progress = !write_msgs_.empty(); | |
if (write_in_progress) | |
{ | |
// 如果有正在进行的写入操作,将消息添加到队列中 | |
write_msgs_.emplace_back(std::move(context)); | |
return; | |
} | |
// 直接调用协程来处理写入消息 | |
net::co_spawn(stream_.get_executor(), | |
[this]() mutable -> net::awaitable<void> | |
{ | |
co_await write_messages(); | |
co_return; | |
}, net::detached); | |
}); | |
} | |
// 处理 WebSocket 写入消息的协程 | |
net::awaitable<void> write_messages() | |
{ | |
try | |
{ | |
while (!write_msgs_.empty()) | |
{ | |
auto msg = std::move(write_msgs_.front()); | |
write_msgs_.pop_front(); | |
// 发送消息 | |
co_await stream_.async_write(net::buffer(*msg), net::use_awaitable); | |
} | |
} | |
catch(const std::exception& e) | |
{ | |
error_cb_(std::string_view(e.what())); | |
} | |
} | |
private: | |
// Stream 对象, 用于与远程服务进行通信. | |
stream_type stream_; | |
std::atomic<bool> running_{false}; | |
// 回调函数, 用于处理请求、通知消息和错误消息 | |
std::function<void(json::object)> method_cb_; | |
std::function<void(json::object)> notify_cb_; | |
std::function<void(std::string_view)> error_cb_; | |
std::unordered_map<std::string, | |
std::function<void(json::object)>> remote_methods_; | |
std::mutex call_op_mutex_; | |
std::vector<int> id_recycle_; | |
std::vector<call_op_ptr> call_ops_; | |
write_message_queue write_msgs_; | |
}; | |
using WsSession = jsonrpc_session<beast::websocket::stream<beast::tcp_stream>>; | |
} | |
#endif // INCLUDE__2023_10_18__JSONRPC_HPP |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment