Skip to content

Instantly share code, notes, and snippets.

@brycelelbach
Created October 14, 2012 15:07
Show Gist options
  • Save brycelelbach/3888857 to your computer and use it in GitHub Desktop.
Save brycelelbach/3888857 to your computer and use it in GitHub Desktop.
Zero Copy with Boost.Asio/Boost.Serialization
// Copyright (c) 2012 Bryce Adelstein-Lelbach
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
// hpx::util::container_device can be found here:
// http://raw.github.com/STEllAR-GROUP/hpx/master/hpx/util/container_device.hpp
// The portable binary archives used can be found here:
// http://raw.github.com/STEllAR-GROUP/hpx/master/hpx/util/portable_binary_oarchive.hpp
// http://raw.github.com/STEllAR-GROUP/hpx/master/src/util/portable_binary_oarchive.cpp
// http://raw.github.com/STEllAR-GROUP/hpx/master/hpx/util/portable_binary_iarchive.hpp
// http://raw.github.com/STEllAR-GROUP/hpx/master/src/util/portable_binary_iarchive.cpp
#include <type_traits>
template <typename T, typename enable = void>
struct is_bitwise_serializable : std::is_arithmetic<T>::type { };
template <typename T>
struct is_bitwise_serializable<const T> : is_bitwise_serializable<T> { };
template <typename T>
struct is_bitwise_serializable<T&> : is_bitwise_serializable<T> { };
template <typename T>
struct is_bitwise_serializable<T&&> : is_bitwise_serializable<T> { };
// This specialization has to be done not just for std::vector, but for other
// special cases of boost::asio::buffer, such as boost::array, std::array,
// perhaps std::valarray too.
template <typename T>
struct is_bitwise_serializable<std::vector<T> >
{
typedef typename is_bitwise_serializable<T>::type type;
};
// We never directly serialize an std::vector; we actually only serialize one
// type (a parcel). Parcels contain a polymorphic object (an action) that has
// all our data in it. Because of this, I believe we can safely do zero-copy
// for std::vector and other none polymorphic types. On the receiving end, we
// will know how to read the data because the polymorphic type was serialized
// normally through Boost.Serialization.
struct zero_copy_oarchive : boost::enable_shared_from_this<zero_copy_oarchive>
{
typedef std::function<
void(boost::system::error_code const&, std::size_t)
> handler_type;
private:
boost::asio::ip::tcp::socket socket_;
bool homogeneity_; ///< Is it safe to do bitwise serialization? E.g. does
/// the target have the endianness as us, etc?
std::vector<boost::asio::const_buffer> message_;
std::vector<boost::integer::ulittle64_t> chunk_sizes_;
boost::integer::ulittle64_t chunks_; // chunk_sizes_.size()
std::vector<slow_buffer> slow_buffers_;
public:
zero_copy_oarchive(
boost::asio::io_service& io,
bool homogeneity = true
)
: socket_(io)
, homogeneity_(homogeneity)
{}
~zero_copy_iarchive()
{
// Gracefully and portably shutdown the socket.
boost::system::error_code ec;
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket_.close(ec);
}
template <typename T>
void operator& (T&& t) { save(std::forward<T>(t)); }
template <typename T>
void operator<< (T&& t) { save(std::forward<T>(t)); }
// NOTE: The lifetime of the data we're serializing is controlled, so t
// going out of scope isn't an issue.
template <typename T>
void save(T&& t)
{
typedef typename is_bitwise_serializable<T>::type>::type predicate_;
if (homogeneity_)
save(std::forward<T>(t), predicate_());
else
save(std::forward<T>(t), boost::mpl::false_());
}
// This overload has to be done not just for std::vector, but for other
// special cases of boost::asio::buffer, such as boost::array, std::array,
// perhaps std::valarray too.
template <typename T>
void save(std::vector<T>&& t, boost::mpl::true_)
{
// Save the size, so we can know how much to read on the other end. This
// allows us to do zero copy when reading.
chunk_sizes_.push_back(t.size());
message_.push_back(boost::asio::buffer(std::forward<T>(t)));
}
template <typename T>
void save(T&& t, boost::mpl::true_)
{
message_.push_back(boost::asio::buffer(&t, sizeof(t)));
}
template <typename T>
void save(T&& t, boost::mpl::false_)
{
slow_buffers_.push_back(std::vector<char>());
std::vector<char>& slow_buffer_ = slow_buffers_.back();
typedef hpx::util::container_device<std::vector<char> > io_device_type;
boost::iostreams::stream<io_device_type> io(slow_buffer_);
// Serialize t the slow way.
hpx::util::portable_binary_oarchive archive(io);
archive & std::forward<T>(t);
// Save the size, so we can know how much to read on the other end. This
// allows us to do zero copy when reading.
chunk_sizes_.push_back(slow_buffer_.size());
message_.push_back(boost::asio::buffer(slow_buffer_));
}
// Asynchronously write a data structure to the socket.
// NOTE: parcel should be reference counted.
template <typename Parcel>
void async_write(Parcel parcel, handler_type const& handler)
{
handler_ = handler;
// The first buffer is the number of elements in the list. The second
// buffer is our list of sizes. We'll fill these in later.
message_.push_back(boost::asio::buffer(&chunks_, sizeof(chunks_));
message_.push_back(boost::asio::const_buffer());
// FIXME: Not sure if this is the correct way to kick off the
// serialization call chain.
boost::serialization::serialize(*this, parcel, 0 /* version */);
// NOTE: Non-container chunks (e.g. single elements) are not in the size
// list.
chunks_ = chunk_sizes_.size();
message_[1] = boost::asio::buffer(chunk_sizes_);
boost::asio::async_write(socket_, message_,
boost::bind(&zero_copy_oarchive::handle_write<Parcel>,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
parcel));
}
template <typename Parcel>
void handle_write(boost::system::error_code const& e, std::size_t bytes,
Parcel& parcel)
{
message_.clear();
chunk_sizes_.clear();
chunks_ = 0;
slow_buffers_.clear();
}
};
// Note: We must "deserialize" the object BEFORE we read the data, but AFTER
// we have read the sizes. This allows us to do zero-copy, because we know the
// layout of the data structure before we call async_read.
struct zero_copy_iarchive : boost::enable_shared_from_this<zero_copy_oarchive>
{
typedef std::function<
void(boost::system::error_code const&, std::size_t)
> handler_type;
private:
boost::asio::ip::tcp::socket socket_;
handler_type handler_;
bool homogeneity_; ///< Is it safe to do bitwise serialization? E.g. does
/// the target have the endianness as us, etc?
std::size_t pass_;
std::vector<boost::asio::const_buffer> message_;
std::vector<boost::integer::ulittle64_t> chunk_sizes_;
boost::integer::ulittle64_t chunks_; // chunk_sizes_.size()
std::size_t current_chunk_;
std::vector<slow_buffer> slow_buffers_;
std::size_t current_slow_buffer_;
public:
zero_copy_iarchive(
boost::asio::io_service& io,
bool homogeneity = true
)
: socket_(io)
, homogeneity_(homogeneity)
{}
~zero_copy_iarchive()
{
// Gracefully and portably shutdown the socket.
boost::system::error_code ec;
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket_.close(ec);
}
template <typename T>
void operator& (T& t) { load(t); }
template <typename T>
void operator>> (T& t) { load(t); }
template <typename T>
void load(T& t)
{
typedef typename is_bitwise_serializable<T>::type>::type predicate_;
// Pass 1 builds the structure of message_. It is done right before
// message_ is read.
if (1 == pass_)
{
if (homogeneity_)
load_pass1(t, predicate_());
else
load_pass1(t, boost::mpl::false_());
}
// Pass 2 decodes message_ after it has been read.
else if (2 == pass_)
{
if (homogeneity_)
load_pass2(t, predicate_());
else
load_pass2(t, boost::mpl::false_());
}
else
BOOST_ASSERT(false);
}
// This overload has to be done not just for std::vector, but for other
// special cases of boost::asio::buffer, such as boost::array, std::array,
// perhaps std::valarray too.
template <typename T>
void load_pass1(std::vector<T>& t, boost::mpl::true_)
{
// Use the size list to figure out how large this vector needs to be.
t.resize(chunk_sizes_[current_chunk_++]);
message_.push_back(boost::asio::buffer(t));
}
template <typename T>
void load_pass1(T& t, boost::mpl::true_)
{
message_.push_back(boost::asio::buffer(&t, sizeof(t)));
}
template <typename T>
void load_pass1(T& t, boost::mpl::false_)
{
// Use the size list to figure out how large this vector needs to be.
slow_buffers_.push_back(std::vector<char>
(chunk_sizes_[current_chunk_++]));
std::vector<char>& slow_buffer_ = slow_buffers_.back();
message_.push_back(boost::asio::buffer(slow_buffer_));
}
// This overload has to be done not just for std::vector, but for other
// special cases of boost::asio::buffer, such as boost::array, std::array,
// perhaps std::valarray too.
template <typename T>
void load_pass2(std::vector<T>& t, boost::mpl::true_)
{
// No-op.
}
template <typename T>
void load_pass2(T& t, boost::mpl::true_)
{
// No-op.
}
template <typename T>
void load_pass2(T& t, boost::mpl::false_)
{
std::vector<char>& slow_buffer_ = slow_buffers_[current_slow_buffer_++];
typedef hpx::util::container_device<std::vector<char> > io_device_type;
boost::iostreams::stream<io_device_type> io(slow_buffer_);
// Deserialize t the slow way.
hpx::util::portable_binary_oarchive archive(io);
archive & std::forward<T>(t);
}
// Asynchronously read a data structure from the socket.
template <typename Parcel>
void async_read(Parcel parcel, handler_type const& handler)
{
handler_ = handler;
// The first thing we need is the number of elements in the list of
// chunk sizes.
boost::asio::async_read(socket_,
boost::asio::buffer(&chunks_, sizeof(chunks_)),
boost::bind(&zero_copy_iarchive::handle_read_chunks<Parcel>,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
parcel));
}
template <typename Parcel>
void handle_read_chunks(
boost::system::error_code const& e,
std::size_t bytes,
Parcel parcel
)
{
// Now we know how large chunk_sizes_ needs to be.
chunk_sizes_.resize(chunks_);
// The second thing we need is the list of chunk sizes.
boost::asio::async_read(socket_,
boost::asio::buffer(chunk_sizes_),
boost::bind(&zero_copy_iarchive::handle_read_chunk_sizes<Parcel>,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
parcel));
}
template <typename Parcel>
void handle_read_chunk_sizes(boost::system::error_code const& e,
std::size_t bytes, Parcel parcel)
{
// First pass. Create the message structure. Note that this doesn't
// actually read in anything.
// FIXME: Not sure if this is the correct way to kick off the
// serialization call chain.
pass_ = 1;
boost::serialization::serialize(*this, parcel, 0 /* version */);
boost::asio::async_read(socket_, message_,
boost::bind(&zero_copy_iarchive::handle_read_message<Parcel>,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
parcel));
}
template <typename Parcel>
void handle_read_message(boost::system::error_code const& e,
std::size_t bytes, Parcel parcel)
{
// Second pass. Do any required deserialization.
// FIXME: Not sure if this is the correct way to kick off the
// serialization call chain.
pass_ = 2;
boost::serialization::serialize(*this, parcel, 0 /* version */);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment