Created
November 16, 2014 23:22
-
-
Save mnunberg/04622aaa32c3d2a74c3d to your computer and use it in GitHub Desktop.
lcb+boost
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <string> | |
#include <vector> | |
#include <array> | |
#include <boost/asio.hpp> | |
#include <boost/chrono.hpp> | |
#include <libcouchbase/couchbase.h> | |
#include <libcouchbase/iops.h> | |
using std::string; | |
using std::vector; | |
using boost::asio::ip::tcp; | |
using boost::system::error_code; | |
using namespace boost::asio; | |
class BoostSocket; | |
class BoostTimer; | |
struct InitialConnection; | |
struct LcbReadHandler; | |
namespace { | |
typedef lcb_io_opt_st _IOPS; | |
typedef lcb_ioE_callback _ECB; | |
typedef lcb_sockdata_st _SD; | |
} | |
class BoostIOPS : _IOPS { | |
public: | |
BoostIOPS(io_service *svc_in); | |
void set_error() { set_error(errno); } | |
void set_error(const error_code& ec) { set_error(ec.value()); } | |
void set_error(int err) { LCB_IOPS_ERRNO(this) = err; } | |
io_service& get_service() { return *svc; } | |
void ref() { refcount++; } | |
void unref() { if (! --refcount) { delete this; } } | |
void run() { | |
is_stopped = false; | |
svc->reset(); | |
svc->run(); | |
is_stopped = true; | |
} | |
void stop() { | |
if (is_stopped) { | |
return; | |
} | |
is_stopped = true; | |
svc->stop(); | |
} | |
private: | |
boost::asio::io_service *svc; | |
boost::asio::io_service svc_s; | |
size_t refcount; | |
bool is_stopped; | |
}; | |
class BoostSocket : public lcb_sockdata_st { | |
public: | |
BoostSocket(BoostIOPS *parent, int domain) : m_Socket(parent->get_service()) { | |
::memset((lcb_sockdata_st*)this, 0, sizeof (lcb_sockdata_st)); | |
m_Parent = parent; | |
rdarg = NULL; | |
rdcb = NULL; | |
refcount = 1; | |
ip::address ipaddr; | |
if (domain == AF_INET) { | |
ipaddr = (ip::address)ip::address_v4(); | |
m_Socket.open(tcp::v4()); | |
} else { | |
ipaddr = (ip::address)ip::address_v6(); | |
m_Socket.open(tcp::v6()); | |
} | |
m_Socket.bind(tcp::endpoint(ipaddr, 0)); | |
socket = m_Socket.native_handle(); | |
m_Parent->ref(); | |
} | |
~BoostSocket() { | |
m_Parent->unref(); | |
} | |
int start_connect(const sockaddr *saddr, lcb_io_connect_cb callback) { | |
ip::address ipaddr; | |
int port; | |
if (m_Socket.local_endpoint().address().is_v4()) { | |
const sockaddr_in *sin = (const sockaddr_in *)saddr; | |
port = sin->sin_port; | |
ipaddr = ip::address_v4(htonl(sin->sin_addr.s_addr)); | |
} else if (m_Socket.local_endpoint().address().is_v6()) { | |
const sockaddr_in6 *sin6 = (const sockaddr_in6 *)saddr; | |
port = sin6->sin6_port; | |
std::array<unsigned char, 16> addrbytes; | |
::memcpy(addrbytes.data(), &sin6->sin6_addr, 16); | |
ipaddr = ip::address_v6(addrbytes, sin6->sin6_scope_id); | |
} else { | |
m_Parent->set_error(ENOTSUP); | |
return -1; | |
} | |
port = htons(port); | |
m_Socket.async_connect(tcp::endpoint(ipaddr, port), InitialConnection(callback, this)); | |
return 0; | |
} | |
int start_read(lcb_IOV *iov, size_t n, void *uarg, lcb_ioC_read2_callback cb) { | |
rdcb = cb; | |
rdarg = uarg; | |
ref(); | |
std::vector<mutable_buffer> bufs; | |
for (size_t ii = 0; ii < n; ii++) { | |
bufs.push_back(mutable_buffer(iov[ii].iov_base, iov[ii].iov_len)); | |
} | |
m_Socket.async_read_some(bufs, LcbReadHandler(this)); | |
return 0; | |
} | |
int start_write(lcb_IOV *iov, size_t niov, void *uarg, lcb_ioC_write2_callback cb) { | |
ref(); | |
std::vector<const_buffer> bufs; | |
for (size_t ii = 0; ii < niov; ii++) { | |
bufs.push_back(const_buffer(iov[ii].iov_base, iov[ii].iov_len)); | |
} | |
async_write(m_Socket, bufs, LcbWriteHandler(this, cb, uarg)); | |
return 0; | |
} | |
int is_closed(int flags) { | |
if (!m_Socket.is_open()) { | |
return LCB_IO_SOCKCHECK_STATUS_CLOSED; | |
} | |
error_code ec; | |
char buf; | |
mutable_buffers_1 dummy(&buf, 1); | |
while (true) { | |
bool was_blocking = m_Socket.non_blocking(); | |
m_Socket.non_blocking(true, ec); | |
if (ec) { | |
return LCB_IO_SOCKCHECK_STATUS_UNKNOWN; | |
} | |
size_t nr = m_Socket.receive(dummy, tcp::socket::message_peek, ec); | |
m_Socket.non_blocking(was_blocking); | |
if (ec) { | |
if (ec == error::would_block) { | |
return LCB_IO_SOCKCHECK_STATUS_OK; | |
} else if (ec == error::interrupted) { | |
continue; | |
} else { | |
return LCB_IO_SOCKCHECK_STATUS_CLOSED; | |
} | |
} else if (nr > 0 && (flags & LCB_IO_SOCKCHECK_PEND_IS_ERROR)) { | |
return LCB_IO_SOCKCHECK_STATUS_CLOSED; | |
} else { | |
return LCB_IO_SOCKCHECK_STATUS_OK; | |
} | |
} | |
return LCB_IO_SOCKCHECK_STATUS_UNKNOWN; | |
} | |
int get_nameinfo(lcb_nameinfo_st *ni) { | |
// Much simpler to just use getsockname! | |
int rv; | |
socklen_t lenp; | |
lenp = sizeof(sockaddr_storage); | |
rv = getsockname(socket, ni->local.name, &lenp); | |
if (rv != 0) { | |
m_Parent->set_error(); | |
return -1; | |
} | |
lenp = sizeof(sockaddr_storage); | |
rv = getpeername(socket, ni->remote.name, &lenp); | |
if (rv != 0) { | |
m_Parent->set_error(); | |
return -1; | |
} | |
*ni->local.len = lenp; | |
*ni->remote.len = lenp; | |
return 0; | |
} | |
void close() { | |
error_code ecdummy; | |
m_Socket.shutdown(socket_base::shutdown_both, ecdummy); | |
m_Socket.close(ecdummy); | |
unref(); | |
} | |
void ref() { refcount++; } | |
void unref() { if (! --refcount) { delete this; } } | |
private: | |
tcp::socket m_Socket; | |
BoostIOPS *m_Parent; | |
lcb_ioC_read2_callback rdcb; | |
void *rdarg; | |
size_t refcount; | |
struct InitialConnection { | |
InitialConnection(lcb_io_connect_cb cb, BoostSocket *s) : callback(cb), sock(s) {} | |
lcb_io_connect_cb callback; | |
BoostSocket *sock; | |
void operator () (const error_code& ec) { | |
int rv = ec ? -1 : 0; | |
if (ec) { sock->m_Parent->set_error(ec); } | |
callback((_SD *)sock, rv); | |
} | |
}; | |
struct LcbReadHandler { | |
BoostSocket *sock; | |
LcbReadHandler(BoostSocket *s) : sock(s) {} | |
void operator() (const error_code& ec, size_t nbytes) { | |
ssize_t val = -1; | |
if (ec) { sock->m_Parent->set_error(ec); } | |
else { val = nbytes; } | |
sock->rdcb(sock, val, sock->rdarg); | |
sock->unref(); | |
} | |
}; | |
struct LcbWriteHandler { | |
BoostSocket *sock; | |
void *arg; | |
lcb_ioC_write2_callback cb; | |
LcbWriteHandler(BoostSocket *s, lcb_ioC_write2_callback callback, void *cbarg) : sock(s), arg(cbarg), cb(callback) {} | |
void operator() (const error_code& ec, size_t) { | |
int val = 0; | |
if (ec) { val = -1; sock->m_Parent->set_error(ec); } | |
cb(sock, val, arg); | |
sock->unref(); | |
} | |
}; | |
}; | |
class BoostTimer { | |
public: | |
struct TimerHandler { | |
BoostTimer *parent; | |
TimerHandler(BoostTimer *tm) : parent(tm) {} | |
void operator() (const error_code& ec) { | |
if (ec) { return; } | |
parent->callback(-1, 0, parent->arg); | |
} | |
}; | |
BoostTimer(BoostIOPS *parent) : m_Timer(parent->get_service()), m_Parent(parent) { | |
callback = NULL; | |
arg = NULL; | |
m_Parent->ref(); | |
} | |
~BoostTimer() { | |
m_Parent->unref(); | |
} | |
void schedule(uint32_t usec, _ECB cb, void *arg) { | |
this->callback = cb; | |
this->arg = arg; | |
m_Timer.expires_from_now(boost::posix_time::microseconds(usec)); | |
m_Timer.async_wait(TimerHandler(this)); | |
} | |
void cancel() { | |
callback = NULL; | |
arg = NULL; | |
m_Timer.cancel(); | |
} | |
private: | |
boost::asio::deadline_timer m_Timer; | |
BoostIOPS *m_Parent; | |
_ECB callback; | |
void *arg; | |
}; | |
static BoostIOPS *getIops(_IOPS* io) { | |
return reinterpret_cast<BoostIOPS *>(io); | |
} | |
extern "C" { | |
static void run_loop(_IOPS* io) { | |
getIops(io)->run(); | |
} | |
static void stop_loop(_IOPS* io) { | |
getIops(io)->stop(); | |
} | |
static void* create_timer(_IOPS* io) { | |
return new BoostTimer(getIops(io)); | |
} | |
static void destroy_timer(_IOPS*, void *timer) { | |
delete (BoostTimer *)timer; | |
} | |
static int schedule_timer(_IOPS*, void *timer, uint32_t us, void *arg, _ECB cb) { | |
((BoostTimer *)timer)->schedule(us, cb, arg); | |
return 0; | |
} | |
static void cancel_timer(_IOPS*, void *timer) { | |
((BoostTimer *)timer)->cancel(); | |
} | |
static _SD *create_socket(_IOPS* io, int domain, int, int) { | |
return new BoostSocket(getIops(io), domain); | |
} | |
static int connect_socket(_IOPS*, _SD* sock, const sockaddr* addr, unsigned, lcb_io_connect_cb cb) { | |
return ((BoostSocket *)sock)->start_connect(addr, cb); | |
} | |
static int get_nameinfo(_IOPS*, _SD *sock, lcb_nameinfo_st *ni) { | |
return ((BoostSocket *)sock)->get_nameinfo(ni); | |
} | |
static int read_socket(_IOPS*, _SD *sock, lcb_IOV *iov, lcb_SIZE niov, void *uarg, lcb_ioC_read2_callback cb) { | |
return ((BoostSocket *)sock)->start_read(iov, niov, uarg, cb); | |
} | |
static int write_socket(_IOPS*, _SD* sock, lcb_IOV *iov, lcb_SIZE niov, void *uarg, lcb_ioC_write2_callback cb) { | |
return ((BoostSocket *)sock)->start_write(iov, niov, uarg, cb); | |
} | |
static unsigned close_socket(_IOPS*, _SD* sock) { | |
((BoostSocket *)sock)->close(); return 0; | |
} | |
static int check_closed(_IOPS*, _SD* sock, int flags) { | |
return ((BoostSocket *)sock)->is_closed(flags); | |
} | |
static void iops_dtor(_IOPS *io) { | |
getIops(io)->unref(); | |
} | |
static void get_procs(int, lcb_loop_procs *loop, lcb_timer_procs *tm, | |
lcb_bsd_procs*, lcb_ev_procs*, lcb_completion_procs* iocp, | |
lcb_iomodel_t *model) | |
{ | |
*model = LCB_IOMODEL_COMPLETION; | |
loop->start = run_loop; | |
loop->stop = stop_loop; | |
tm->create = create_timer; | |
tm->destroy = destroy_timer; | |
tm->cancel = cancel_timer; | |
tm->schedule = schedule_timer; | |
iocp->socket = create_socket; | |
iocp->connect = connect_socket; | |
iocp->nameinfo = get_nameinfo; | |
iocp->read2 = read_socket; | |
iocp->write2 = write_socket; | |
iocp->close = close_socket; | |
iocp->is_closed = check_closed; | |
} | |
} | |
// Constructor | |
BoostIOPS::BoostIOPS(io_service *svc_in) | |
{ | |
::memset((_IOPS*)this, 0, sizeof(_IOPS)); | |
refcount = 1; | |
is_stopped = false; | |
version = 2; | |
v.v2.get_procs = get_procs; | |
destructor = iops_dtor; | |
if (svc_in != NULL) { | |
svc = svc_in; | |
} else { | |
svc = &svc_s; | |
} | |
} | |
// API Call | |
extern "C" { | |
lcb_error_t | |
lcb_create_boost_asio_io_opts(int, _IOPS **io, void *arg) | |
{ | |
*io = (_IOPS *) new BoostIOPS((io_service *)arg); | |
return LCB_SUCCESS; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment