Skip to content

Instantly share code, notes, and snippets.

@mnunberg
Created November 16, 2014 23:22
Show Gist options
  • Save mnunberg/04622aaa32c3d2a74c3d to your computer and use it in GitHub Desktop.
Save mnunberg/04622aaa32c3d2a74c3d to your computer and use it in GitHub Desktop.
lcb+boost
#include <string>
#include <vector>
#include <array>
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <libcouchbase/couchbase.h>
#include <libcouchbase/iops.h>
using std::string;
using std::vector;
using boost::asio::ip::tcp;
using boost::system::error_code;
using namespace boost::asio;
class BoostSocket;
class BoostTimer;
struct InitialConnection;
struct LcbReadHandler;
namespace {
typedef lcb_io_opt_st _IOPS;
typedef lcb_ioE_callback _ECB;
typedef lcb_sockdata_st _SD;
}
class BoostIOPS : _IOPS {
public:
BoostIOPS(io_service *svc_in);
void set_error() { set_error(errno); }
void set_error(const error_code& ec) { set_error(ec.value()); }
void set_error(int err) { LCB_IOPS_ERRNO(this) = err; }
io_service& get_service() { return *svc; }
void ref() { refcount++; }
void unref() { if (! --refcount) { delete this; } }
void run() {
is_stopped = false;
svc->reset();
svc->run();
is_stopped = true;
}
void stop() {
if (is_stopped) {
return;
}
is_stopped = true;
svc->stop();
}
private:
boost::asio::io_service *svc;
boost::asio::io_service svc_s;
size_t refcount;
bool is_stopped;
};
class BoostSocket : public lcb_sockdata_st {
public:
BoostSocket(BoostIOPS *parent, int domain) : m_Socket(parent->get_service()) {
::memset((lcb_sockdata_st*)this, 0, sizeof (lcb_sockdata_st));
m_Parent = parent;
rdarg = NULL;
rdcb = NULL;
refcount = 1;
ip::address ipaddr;
if (domain == AF_INET) {
ipaddr = (ip::address)ip::address_v4();
m_Socket.open(tcp::v4());
} else {
ipaddr = (ip::address)ip::address_v6();
m_Socket.open(tcp::v6());
}
m_Socket.bind(tcp::endpoint(ipaddr, 0));
socket = m_Socket.native_handle();
m_Parent->ref();
}
~BoostSocket() {
m_Parent->unref();
}
int start_connect(const sockaddr *saddr, lcb_io_connect_cb callback) {
ip::address ipaddr;
int port;
if (m_Socket.local_endpoint().address().is_v4()) {
const sockaddr_in *sin = (const sockaddr_in *)saddr;
port = sin->sin_port;
ipaddr = ip::address_v4(htonl(sin->sin_addr.s_addr));
} else if (m_Socket.local_endpoint().address().is_v6()) {
const sockaddr_in6 *sin6 = (const sockaddr_in6 *)saddr;
port = sin6->sin6_port;
std::array<unsigned char, 16> addrbytes;
::memcpy(addrbytes.data(), &sin6->sin6_addr, 16);
ipaddr = ip::address_v6(addrbytes, sin6->sin6_scope_id);
} else {
m_Parent->set_error(ENOTSUP);
return -1;
}
port = htons(port);
m_Socket.async_connect(tcp::endpoint(ipaddr, port), InitialConnection(callback, this));
return 0;
}
int start_read(lcb_IOV *iov, size_t n, void *uarg, lcb_ioC_read2_callback cb) {
rdcb = cb;
rdarg = uarg;
ref();
std::vector<mutable_buffer> bufs;
for (size_t ii = 0; ii < n; ii++) {
bufs.push_back(mutable_buffer(iov[ii].iov_base, iov[ii].iov_len));
}
m_Socket.async_read_some(bufs, LcbReadHandler(this));
return 0;
}
int start_write(lcb_IOV *iov, size_t niov, void *uarg, lcb_ioC_write2_callback cb) {
ref();
std::vector<const_buffer> bufs;
for (size_t ii = 0; ii < niov; ii++) {
bufs.push_back(const_buffer(iov[ii].iov_base, iov[ii].iov_len));
}
async_write(m_Socket, bufs, LcbWriteHandler(this, cb, uarg));
return 0;
}
int is_closed(int flags) {
if (!m_Socket.is_open()) {
return LCB_IO_SOCKCHECK_STATUS_CLOSED;
}
error_code ec;
char buf;
mutable_buffers_1 dummy(&buf, 1);
while (true) {
bool was_blocking = m_Socket.non_blocking();
m_Socket.non_blocking(true, ec);
if (ec) {
return LCB_IO_SOCKCHECK_STATUS_UNKNOWN;
}
size_t nr = m_Socket.receive(dummy, tcp::socket::message_peek, ec);
m_Socket.non_blocking(was_blocking);
if (ec) {
if (ec == error::would_block) {
return LCB_IO_SOCKCHECK_STATUS_OK;
} else if (ec == error::interrupted) {
continue;
} else {
return LCB_IO_SOCKCHECK_STATUS_CLOSED;
}
} else if (nr > 0 && (flags & LCB_IO_SOCKCHECK_PEND_IS_ERROR)) {
return LCB_IO_SOCKCHECK_STATUS_CLOSED;
} else {
return LCB_IO_SOCKCHECK_STATUS_OK;
}
}
return LCB_IO_SOCKCHECK_STATUS_UNKNOWN;
}
int get_nameinfo(lcb_nameinfo_st *ni) {
// Much simpler to just use getsockname!
int rv;
socklen_t lenp;
lenp = sizeof(sockaddr_storage);
rv = getsockname(socket, ni->local.name, &lenp);
if (rv != 0) {
m_Parent->set_error();
return -1;
}
lenp = sizeof(sockaddr_storage);
rv = getpeername(socket, ni->remote.name, &lenp);
if (rv != 0) {
m_Parent->set_error();
return -1;
}
*ni->local.len = lenp;
*ni->remote.len = lenp;
return 0;
}
void close() {
error_code ecdummy;
m_Socket.shutdown(socket_base::shutdown_both, ecdummy);
m_Socket.close(ecdummy);
unref();
}
void ref() { refcount++; }
void unref() { if (! --refcount) { delete this; } }
private:
tcp::socket m_Socket;
BoostIOPS *m_Parent;
lcb_ioC_read2_callback rdcb;
void *rdarg;
size_t refcount;
struct InitialConnection {
InitialConnection(lcb_io_connect_cb cb, BoostSocket *s) : callback(cb), sock(s) {}
lcb_io_connect_cb callback;
BoostSocket *sock;
void operator () (const error_code& ec) {
int rv = ec ? -1 : 0;
if (ec) { sock->m_Parent->set_error(ec); }
callback((_SD *)sock, rv);
}
};
struct LcbReadHandler {
BoostSocket *sock;
LcbReadHandler(BoostSocket *s) : sock(s) {}
void operator() (const error_code& ec, size_t nbytes) {
ssize_t val = -1;
if (ec) { sock->m_Parent->set_error(ec); }
else { val = nbytes; }
sock->rdcb(sock, val, sock->rdarg);
sock->unref();
}
};
struct LcbWriteHandler {
BoostSocket *sock;
void *arg;
lcb_ioC_write2_callback cb;
LcbWriteHandler(BoostSocket *s, lcb_ioC_write2_callback callback, void *cbarg) : sock(s), arg(cbarg), cb(callback) {}
void operator() (const error_code& ec, size_t) {
int val = 0;
if (ec) { val = -1; sock->m_Parent->set_error(ec); }
cb(sock, val, arg);
sock->unref();
}
};
};
class BoostTimer {
public:
struct TimerHandler {
BoostTimer *parent;
TimerHandler(BoostTimer *tm) : parent(tm) {}
void operator() (const error_code& ec) {
if (ec) { return; }
parent->callback(-1, 0, parent->arg);
}
};
BoostTimer(BoostIOPS *parent) : m_Timer(parent->get_service()), m_Parent(parent) {
callback = NULL;
arg = NULL;
m_Parent->ref();
}
~BoostTimer() {
m_Parent->unref();
}
void schedule(uint32_t usec, _ECB cb, void *arg) {
this->callback = cb;
this->arg = arg;
m_Timer.expires_from_now(boost::posix_time::microseconds(usec));
m_Timer.async_wait(TimerHandler(this));
}
void cancel() {
callback = NULL;
arg = NULL;
m_Timer.cancel();
}
private:
boost::asio::deadline_timer m_Timer;
BoostIOPS *m_Parent;
_ECB callback;
void *arg;
};
static BoostIOPS *getIops(_IOPS* io) {
return reinterpret_cast<BoostIOPS *>(io);
}
extern "C" {
static void run_loop(_IOPS* io) {
getIops(io)->run();
}
static void stop_loop(_IOPS* io) {
getIops(io)->stop();
}
static void* create_timer(_IOPS* io) {
return new BoostTimer(getIops(io));
}
static void destroy_timer(_IOPS*, void *timer) {
delete (BoostTimer *)timer;
}
static int schedule_timer(_IOPS*, void *timer, uint32_t us, void *arg, _ECB cb) {
((BoostTimer *)timer)->schedule(us, cb, arg);
return 0;
}
static void cancel_timer(_IOPS*, void *timer) {
((BoostTimer *)timer)->cancel();
}
static _SD *create_socket(_IOPS* io, int domain, int, int) {
return new BoostSocket(getIops(io), domain);
}
static int connect_socket(_IOPS*, _SD* sock, const sockaddr* addr, unsigned, lcb_io_connect_cb cb) {
return ((BoostSocket *)sock)->start_connect(addr, cb);
}
static int get_nameinfo(_IOPS*, _SD *sock, lcb_nameinfo_st *ni) {
return ((BoostSocket *)sock)->get_nameinfo(ni);
}
static int read_socket(_IOPS*, _SD *sock, lcb_IOV *iov, lcb_SIZE niov, void *uarg, lcb_ioC_read2_callback cb) {
return ((BoostSocket *)sock)->start_read(iov, niov, uarg, cb);
}
static int write_socket(_IOPS*, _SD* sock, lcb_IOV *iov, lcb_SIZE niov, void *uarg, lcb_ioC_write2_callback cb) {
return ((BoostSocket *)sock)->start_write(iov, niov, uarg, cb);
}
static unsigned close_socket(_IOPS*, _SD* sock) {
((BoostSocket *)sock)->close(); return 0;
}
static int check_closed(_IOPS*, _SD* sock, int flags) {
return ((BoostSocket *)sock)->is_closed(flags);
}
static void iops_dtor(_IOPS *io) {
getIops(io)->unref();
}
static void get_procs(int, lcb_loop_procs *loop, lcb_timer_procs *tm,
lcb_bsd_procs*, lcb_ev_procs*, lcb_completion_procs* iocp,
lcb_iomodel_t *model)
{
*model = LCB_IOMODEL_COMPLETION;
loop->start = run_loop;
loop->stop = stop_loop;
tm->create = create_timer;
tm->destroy = destroy_timer;
tm->cancel = cancel_timer;
tm->schedule = schedule_timer;
iocp->socket = create_socket;
iocp->connect = connect_socket;
iocp->nameinfo = get_nameinfo;
iocp->read2 = read_socket;
iocp->write2 = write_socket;
iocp->close = close_socket;
iocp->is_closed = check_closed;
}
}
// Constructor
BoostIOPS::BoostIOPS(io_service *svc_in)
{
::memset((_IOPS*)this, 0, sizeof(_IOPS));
refcount = 1;
is_stopped = false;
version = 2;
v.v2.get_procs = get_procs;
destructor = iops_dtor;
if (svc_in != NULL) {
svc = svc_in;
} else {
svc = &svc_s;
}
}
// API Call
extern "C" {
lcb_error_t
lcb_create_boost_asio_io_opts(int, _IOPS **io, void *arg)
{
*io = (_IOPS *) new BoostIOPS((io_service *)arg);
return LCB_SUCCESS;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment