Created
February 26, 2014 16:33
-
-
Save mnunberg/9233135 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "readbuf.h" | |
struct my_bufinfo { | |
slist_node slnode; /** Node in send queue */ | |
char *buf; /** Buffer for current segment */ | |
size_t nbuf; /** Size of buffer */ | |
size_t offset; /** Offset of buffer */ | |
rdb_SEGMENT *segment; /** Segment for buffer */ | |
}; | |
struct my_client { | |
int outfd; /** Downstream socket */ | |
slist_root sendq; /** Queue of buffer fragments to send */ | |
}; | |
/** | |
* This callback is invoked for each chunk of data received from the network | |
*/ | |
static void | |
packet_read_callback(void *cookie, | |
const char *buf, | |
size_t ncur, | |
size_t nremain, | |
size_t ntotal, | |
rdb_SEGMENT *seg) | |
{ | |
struct my_client *client = cookie; | |
struct my_bufinfo *bi, *last_bi; | |
ssize_t nw; | |
size_t offset = 0; | |
/** | |
* If the send queue is empty then there is nothing in the send queue. | |
* We can immediately start sending data | |
*/ | |
if (SLIST_IS_EMPTY(&client->sendq)) { | |
/** Try to send immediately */ | |
nw = send(client->outfd, buf, ncur, 0); | |
assert(nw != -1); | |
offset = nw; | |
if (nw == ncur) { | |
/** If the data is fully flushed we don't obtain a refcount */ | |
return; | |
} | |
last_bi = NULL; | |
} else { | |
/** | |
* Use the last item in our own send queue and see if we can | |
* collapse the buffers | |
*/ | |
last_bi = SLIST_ITEM(client->sendq.last, my_bufinfo, slnode); | |
} | |
/** | |
* We _could_ make an rdb_collapse_segment but that would involve | |
* forcing the user to pass a pointer-to-integer and thus forcing a | |
* specific integer width, rather than using coercion on simple variables. | |
* | |
* We only need to actually maintain a separate region/IOV if the current | |
* segment begins on a different offset than the previous segment. | |
* | |
* The user _may_ further optimize by buffering this in a different packet | |
* structure and then reordering this within the primary send queue (or, | |
* just set a flag to receive the full packet). | |
*/ | |
if (last_bi && last_bi->segment == seg) { | |
last_bi->nbuf += ncur; | |
} else { | |
/** New buffer region. This should not happen very often */ | |
bi = calloc(1, sizeof *bi); | |
bi->buf = buf + offset; | |
bi->nbuf = ncur - offset; | |
bi->segment = seg; | |
slist_append(&client->sendq, &bi->slnode); | |
rdb_ref(seg); | |
} | |
schedule_async_write(client); | |
} | |
static void | |
full_packet_callback(void *cookie, | |
rdb_SEGMENT *segments, | |
struct lcb_iovec_st *iov, | |
int nitems) | |
{ | |
/** This can be done as well.. */ | |
} | |
static void | |
async_write(struct my_client *ctx) | |
{ | |
struct iovec iov[10]; | |
slist_iterator iter; | |
int ii = 0; | |
/** | |
* Iterate through all the items in the send queue until either no | |
* items remain or the IOV limit has been exceeded. | |
* Note that if efficiency over iteration is a concern the user can | |
* | |
* just perform a normal send on the first segment etc. etc. | |
*/ | |
SLIST_ITERFOR(&iter, &ctx->sendq) { | |
my_bufinfo *curbuf = SLIST_ITEM(iter.cur, my_bufinfo, slnode); | |
iov[ii].iov_base = curbuf->buf; | |
iov[ii].iov_len = curbuf->nbuf; | |
if (++ii == 10) { | |
break; | |
} | |
} | |
struct msghdr mh; | |
mh.msg_iov = iov; | |
mh.msg_iovlen = ii; | |
ssize_t nw = sendmsg(ctx->outfd, &mh, 0); | |
write_complete(ctx, nw); | |
} | |
static void | |
write_complete(struct my_client *ctx, size_t nw) | |
{ | |
/** | |
* Once a specific amount of data has been flushed we can start releasing | |
* some buffers over to libcouchbase (or the allocator) again. | |
*/ | |
slist_iterator iter; | |
SLIST_ITERFOR(&iter, &ctx->sendq) { | |
my_bufinfo *curbuf = SLIST_ITEM(iter.cur, my_bufinfo, slnode); | |
size_t to_chop = min(nw, curbuf->nbuf); | |
curbuf->nbuf -= to_chop; | |
curbuf->buf += to_chop; | |
if (curbuf->nbuf == 0) { | |
rdb_unref(curbuf->segment); | |
slist_iter_delete(&ctx->sendq, &iter); | |
} | |
nw -= to_chop; | |
if (!nw) { | |
break; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment