Skip to content

Instantly share code, notes, and snippets.

@mnunberg
Created February 26, 2014 16:33
Show Gist options
  • Save mnunberg/9233135 to your computer and use it in GitHub Desktop.
Save mnunberg/9233135 to your computer and use it in GitHub Desktop.
#include "readbuf.h"
struct my_bufinfo {
slist_node slnode; /** Node in send queue */
char *buf; /** Buffer for current segment */
size_t nbuf; /** Size of buffer */
size_t offset; /** Offset of buffer */
rdb_SEGMENT *segment; /** Segment for buffer */
};
struct my_client {
int outfd; /** Downstream socket */
slist_root sendq; /** Queue of buffer fragments to send */
};
/**
* This callback is invoked for each chunk of data received from the network
*/
static void
packet_read_callback(void *cookie,
const char *buf,
size_t ncur,
size_t nremain,
size_t ntotal,
rdb_SEGMENT *seg)
{
struct my_client *client = cookie;
struct my_bufinfo *bi, *last_bi;
ssize_t nw;
size_t offset = 0;
/**
* If the send queue is empty then there is nothing in the send queue.
* We can immediately start sending data
*/
if (SLIST_IS_EMPTY(&client->sendq)) {
/** Try to send immediately */
nw = send(client->outfd, buf, ncur, 0);
assert(nw != -1);
offset = nw;
if (nw == ncur) {
/** If the data is fully flushed we don't obtain a refcount */
return;
}
last_bi = NULL;
} else {
/**
* Use the last item in our own send queue and see if we can
* collapse the buffers
*/
last_bi = SLIST_ITEM(client->sendq.last, my_bufinfo, slnode);
}
/**
* We _could_ make an rdb_collapse_segment but that would involve
* forcing the user to pass a pointer-to-integer and thus forcing a
* specific integer width, rather than using coercion on simple variables.
*
* We only need to actually maintain a separate region/IOV if the current
* segment begins on a different offset than the previous segment.
*
* The user _may_ further optimize by buffering this in a different packet
* structure and then reordering this within the primary send queue (or,
* just set a flag to receive the full packet).
*/
if (last_bi && last_bi->segment == seg) {
last_bi->nbuf += ncur;
} else {
/** New buffer region. This should not happen very often */
bi = calloc(1, sizeof *bi);
bi->buf = buf + offset;
bi->nbuf = ncur - offset;
bi->segment = seg;
slist_append(&client->sendq, &bi->slnode);
rdb_ref(seg);
}
schedule_async_write(client);
}
static void
full_packet_callback(void *cookie,
rdb_SEGMENT *segments,
struct lcb_iovec_st *iov,
int nitems)
{
/** This can be done as well.. */
}
static void
async_write(struct my_client *ctx)
{
struct iovec iov[10];
slist_iterator iter;
int ii = 0;
/**
* Iterate through all the items in the send queue until either no
* items remain or the IOV limit has been exceeded.
* Note that if efficiency over iteration is a concern the user can
*
* just perform a normal send on the first segment etc. etc.
*/
SLIST_ITERFOR(&iter, &ctx->sendq) {
my_bufinfo *curbuf = SLIST_ITEM(iter.cur, my_bufinfo, slnode);
iov[ii].iov_base = curbuf->buf;
iov[ii].iov_len = curbuf->nbuf;
if (++ii == 10) {
break;
}
}
struct msghdr mh;
mh.msg_iov = iov;
mh.msg_iovlen = ii;
ssize_t nw = sendmsg(ctx->outfd, &mh, 0);
write_complete(ctx, nw);
}
static void
write_complete(struct my_client *ctx, size_t nw)
{
/**
* Once a specific amount of data has been flushed we can start releasing
* some buffers over to libcouchbase (or the allocator) again.
*/
slist_iterator iter;
SLIST_ITERFOR(&iter, &ctx->sendq) {
my_bufinfo *curbuf = SLIST_ITEM(iter.cur, my_bufinfo, slnode);
size_t to_chop = min(nw, curbuf->nbuf);
curbuf->nbuf -= to_chop;
curbuf->buf += to_chop;
if (curbuf->nbuf == 0) {
rdb_unref(curbuf->segment);
slist_iter_delete(&ctx->sendq, &iter);
}
nw -= to_chop;
if (!nw) {
break;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment