Created
October 14, 2012 15:07
-
-
Save brycelelbach/3888857 to your computer and use it in GitHub Desktop.
Zero Copy with Boost.Asio/Boost.Serialization
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2012 Bryce Adelstein-Lelbach | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// hpx::util::container_device can be found here: | |
// http://raw.github.com/STEllAR-GROUP/hpx/master/hpx/util/container_device.hpp | |
// The portable binary archives used can be found here: | |
// http://raw.github.com/STEllAR-GROUP/hpx/master/hpx/util/portable_binary_oarchive.hpp | |
// http://raw.github.com/STEllAR-GROUP/hpx/master/src/util/portable_binary_oarchive.cpp | |
// http://raw.github.com/STEllAR-GROUP/hpx/master/hpx/util/portable_binary_iarchive.hpp | |
// http://raw.github.com/STEllAR-GROUP/hpx/master/src/util/portable_binary_iarchive.cpp | |
#include <type_traits> | |
template <typename T, typename enable = void> | |
struct is_bitwise_serializable : std::is_arithmetic<T>::type { }; | |
template <typename T> | |
struct is_bitwise_serializable<const T> : is_bitwise_serializable<T> { }; | |
template <typename T> | |
struct is_bitwise_serializable<T&> : is_bitwise_serializable<T> { }; | |
template <typename T> | |
struct is_bitwise_serializable<T&&> : is_bitwise_serializable<T> { }; | |
// This specialization has to be done not just for std::vector, but for other | |
// special cases of boost::asio::buffer, such as boost::array, std::array, | |
// perhaps std::valarray too. | |
template <typename T> | |
struct is_bitwise_serializable<std::vector<T> > | |
{ | |
typedef typename is_bitwise_serializable<T>::type type; | |
}; | |
// We never directly serialize an std::vector; we actually only serialize one | |
// type (a parcel). Parcels contain a polymorphic object (an action) that has | |
// all our data in it. Because of this, I believe we can safely do zero-copy | |
// for std::vector and other none polymorphic types. On the receiving end, we | |
// will know how to read the data because the polymorphic type was serialized | |
// normally through Boost.Serialization. | |
struct zero_copy_oarchive : boost::enable_shared_from_this<zero_copy_oarchive> | |
{ | |
typedef std::function< | |
void(boost::system::error_code const&, std::size_t) | |
> handler_type; | |
private: | |
boost::asio::ip::tcp::socket socket_; | |
bool homogeneity_; ///< Is it safe to do bitwise serialization? E.g. does | |
/// the target have the endianness as us, etc? | |
std::vector<boost::asio::const_buffer> message_; | |
std::vector<boost::integer::ulittle64_t> chunk_sizes_; | |
boost::integer::ulittle64_t chunks_; // chunk_sizes_.size() | |
std::vector<slow_buffer> slow_buffers_; | |
public: | |
zero_copy_oarchive( | |
boost::asio::io_service& io, | |
bool homogeneity = true | |
) | |
: socket_(io) | |
, homogeneity_(homogeneity) | |
{} | |
~zero_copy_iarchive() | |
{ | |
// Gracefully and portably shutdown the socket. | |
boost::system::error_code ec; | |
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); | |
socket_.close(ec); | |
} | |
template <typename T> | |
void operator& (T&& t) { save(std::forward<T>(t)); } | |
template <typename T> | |
void operator<< (T&& t) { save(std::forward<T>(t)); } | |
// NOTE: The lifetime of the data we're serializing is controlled, so t | |
// going out of scope isn't an issue. | |
template <typename T> | |
void save(T&& t) | |
{ | |
typedef typename is_bitwise_serializable<T>::type>::type predicate_; | |
if (homogeneity_) | |
save(std::forward<T>(t), predicate_()); | |
else | |
save(std::forward<T>(t), boost::mpl::false_()); | |
} | |
// This overload has to be done not just for std::vector, but for other | |
// special cases of boost::asio::buffer, such as boost::array, std::array, | |
// perhaps std::valarray too. | |
template <typename T> | |
void save(std::vector<T>&& t, boost::mpl::true_) | |
{ | |
// Save the size, so we can know how much to read on the other end. This | |
// allows us to do zero copy when reading. | |
chunk_sizes_.push_back(t.size()); | |
message_.push_back(boost::asio::buffer(std::forward<T>(t))); | |
} | |
template <typename T> | |
void save(T&& t, boost::mpl::true_) | |
{ | |
message_.push_back(boost::asio::buffer(&t, sizeof(t))); | |
} | |
template <typename T> | |
void save(T&& t, boost::mpl::false_) | |
{ | |
slow_buffers_.push_back(std::vector<char>()); | |
std::vector<char>& slow_buffer_ = slow_buffers_.back(); | |
typedef hpx::util::container_device<std::vector<char> > io_device_type; | |
boost::iostreams::stream<io_device_type> io(slow_buffer_); | |
// Serialize t the slow way. | |
hpx::util::portable_binary_oarchive archive(io); | |
archive & std::forward<T>(t); | |
// Save the size, so we can know how much to read on the other end. This | |
// allows us to do zero copy when reading. | |
chunk_sizes_.push_back(slow_buffer_.size()); | |
message_.push_back(boost::asio::buffer(slow_buffer_)); | |
} | |
// Asynchronously write a data structure to the socket. | |
// NOTE: parcel should be reference counted. | |
template <typename Parcel> | |
void async_write(Parcel parcel, handler_type const& handler) | |
{ | |
handler_ = handler; | |
// The first buffer is the number of elements in the list. The second | |
// buffer is our list of sizes. We'll fill these in later. | |
message_.push_back(boost::asio::buffer(&chunks_, sizeof(chunks_)); | |
message_.push_back(boost::asio::const_buffer()); | |
// FIXME: Not sure if this is the correct way to kick off the | |
// serialization call chain. | |
boost::serialization::serialize(*this, parcel, 0 /* version */); | |
// NOTE: Non-container chunks (e.g. single elements) are not in the size | |
// list. | |
chunks_ = chunk_sizes_.size(); | |
message_[1] = boost::asio::buffer(chunk_sizes_); | |
boost::asio::async_write(socket_, message_, | |
boost::bind(&zero_copy_oarchive::handle_write<Parcel>, | |
shared_from_this(), | |
boost::asio::placeholders::error, | |
boost::asio::placeholders::bytes_transferred, | |
parcel)); | |
} | |
template <typename Parcel> | |
void handle_write(boost::system::error_code const& e, std::size_t bytes, | |
Parcel& parcel) | |
{ | |
message_.clear(); | |
chunk_sizes_.clear(); | |
chunks_ = 0; | |
slow_buffers_.clear(); | |
} | |
}; | |
// Note: We must "deserialize" the object BEFORE we read the data, but AFTER | |
// we have read the sizes. This allows us to do zero-copy, because we know the | |
// layout of the data structure before we call async_read. | |
struct zero_copy_iarchive : boost::enable_shared_from_this<zero_copy_oarchive> | |
{ | |
typedef std::function< | |
void(boost::system::error_code const&, std::size_t) | |
> handler_type; | |
private: | |
boost::asio::ip::tcp::socket socket_; | |
handler_type handler_; | |
bool homogeneity_; ///< Is it safe to do bitwise serialization? E.g. does | |
/// the target have the endianness as us, etc? | |
std::size_t pass_; | |
std::vector<boost::asio::const_buffer> message_; | |
std::vector<boost::integer::ulittle64_t> chunk_sizes_; | |
boost::integer::ulittle64_t chunks_; // chunk_sizes_.size() | |
std::size_t current_chunk_; | |
std::vector<slow_buffer> slow_buffers_; | |
std::size_t current_slow_buffer_; | |
public: | |
zero_copy_iarchive( | |
boost::asio::io_service& io, | |
bool homogeneity = true | |
) | |
: socket_(io) | |
, homogeneity_(homogeneity) | |
{} | |
~zero_copy_iarchive() | |
{ | |
// Gracefully and portably shutdown the socket. | |
boost::system::error_code ec; | |
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); | |
socket_.close(ec); | |
} | |
template <typename T> | |
void operator& (T& t) { load(t); } | |
template <typename T> | |
void operator>> (T& t) { load(t); } | |
template <typename T> | |
void load(T& t) | |
{ | |
typedef typename is_bitwise_serializable<T>::type>::type predicate_; | |
// Pass 1 builds the structure of message_. It is done right before | |
// message_ is read. | |
if (1 == pass_) | |
{ | |
if (homogeneity_) | |
load_pass1(t, predicate_()); | |
else | |
load_pass1(t, boost::mpl::false_()); | |
} | |
// Pass 2 decodes message_ after it has been read. | |
else if (2 == pass_) | |
{ | |
if (homogeneity_) | |
load_pass2(t, predicate_()); | |
else | |
load_pass2(t, boost::mpl::false_()); | |
} | |
else | |
BOOST_ASSERT(false); | |
} | |
// This overload has to be done not just for std::vector, but for other | |
// special cases of boost::asio::buffer, such as boost::array, std::array, | |
// perhaps std::valarray too. | |
template <typename T> | |
void load_pass1(std::vector<T>& t, boost::mpl::true_) | |
{ | |
// Use the size list to figure out how large this vector needs to be. | |
t.resize(chunk_sizes_[current_chunk_++]); | |
message_.push_back(boost::asio::buffer(t)); | |
} | |
template <typename T> | |
void load_pass1(T& t, boost::mpl::true_) | |
{ | |
message_.push_back(boost::asio::buffer(&t, sizeof(t))); | |
} | |
template <typename T> | |
void load_pass1(T& t, boost::mpl::false_) | |
{ | |
// Use the size list to figure out how large this vector needs to be. | |
slow_buffers_.push_back(std::vector<char> | |
(chunk_sizes_[current_chunk_++])); | |
std::vector<char>& slow_buffer_ = slow_buffers_.back(); | |
message_.push_back(boost::asio::buffer(slow_buffer_)); | |
} | |
// This overload has to be done not just for std::vector, but for other | |
// special cases of boost::asio::buffer, such as boost::array, std::array, | |
// perhaps std::valarray too. | |
template <typename T> | |
void load_pass2(std::vector<T>& t, boost::mpl::true_) | |
{ | |
// No-op. | |
} | |
template <typename T> | |
void load_pass2(T& t, boost::mpl::true_) | |
{ | |
// No-op. | |
} | |
template <typename T> | |
void load_pass2(T& t, boost::mpl::false_) | |
{ | |
std::vector<char>& slow_buffer_ = slow_buffers_[current_slow_buffer_++]; | |
typedef hpx::util::container_device<std::vector<char> > io_device_type; | |
boost::iostreams::stream<io_device_type> io(slow_buffer_); | |
// Deserialize t the slow way. | |
hpx::util::portable_binary_oarchive archive(io); | |
archive & std::forward<T>(t); | |
} | |
// Asynchronously read a data structure from the socket. | |
template <typename Parcel> | |
void async_read(Parcel parcel, handler_type const& handler) | |
{ | |
handler_ = handler; | |
// The first thing we need is the number of elements in the list of | |
// chunk sizes. | |
boost::asio::async_read(socket_, | |
boost::asio::buffer(&chunks_, sizeof(chunks_)), | |
boost::bind(&zero_copy_iarchive::handle_read_chunks<Parcel>, | |
shared_from_this(), | |
boost::asio::placeholders::error, | |
boost::asio::placeholders::bytes_transferred, | |
parcel)); | |
} | |
template <typename Parcel> | |
void handle_read_chunks( | |
boost::system::error_code const& e, | |
std::size_t bytes, | |
Parcel parcel | |
) | |
{ | |
// Now we know how large chunk_sizes_ needs to be. | |
chunk_sizes_.resize(chunks_); | |
// The second thing we need is the list of chunk sizes. | |
boost::asio::async_read(socket_, | |
boost::asio::buffer(chunk_sizes_), | |
boost::bind(&zero_copy_iarchive::handle_read_chunk_sizes<Parcel>, | |
shared_from_this(), | |
boost::asio::placeholders::error, | |
boost::asio::placeholders::bytes_transferred, | |
parcel)); | |
} | |
template <typename Parcel> | |
void handle_read_chunk_sizes(boost::system::error_code const& e, | |
std::size_t bytes, Parcel parcel) | |
{ | |
// First pass. Create the message structure. Note that this doesn't | |
// actually read in anything. | |
// FIXME: Not sure if this is the correct way to kick off the | |
// serialization call chain. | |
pass_ = 1; | |
boost::serialization::serialize(*this, parcel, 0 /* version */); | |
boost::asio::async_read(socket_, message_, | |
boost::bind(&zero_copy_iarchive::handle_read_message<Parcel>, | |
shared_from_this(), | |
boost::asio::placeholders::error, | |
boost::asio::placeholders::bytes_transferred, | |
parcel)); | |
} | |
template <typename Parcel> | |
void handle_read_message(boost::system::error_code const& e, | |
std::size_t bytes, Parcel parcel) | |
{ | |
// Second pass. Do any required deserialization. | |
// FIXME: Not sure if this is the correct way to kick off the | |
// serialization call chain. | |
pass_ = 2; | |
boost::serialization::serialize(*this, parcel, 0 /* version */); | |
} | |
}; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment