Created
March 8, 2017 10:25
-
-
Save tmthrgd/9e2cea5ed61c939b4940f8062dc40193 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/ngx_http_ether_module.c b/ngx_http_ether_module.c | |
index c065b40..82fa67a 100644 | |
--- a/ngx_http_ether_module.c | |
+++ b/ngx_http_ether_module.c | |
@@ -106,6 +106,9 @@ typedef struct { | |
ngx_buf_t tmp_recv; | |
+ ngx_event_t get_ev; | |
+ ngx_queue_t sub_gets; | |
+ | |
ngx_queue_t queue; | |
} ngx_ether_memc_server_st; | |
@@ -127,6 +130,10 @@ typedef struct _ngx_ether_memc_op_st { | |
ngx_buf_t send; | |
ngx_buf_t recv; | |
+ int sub_get_noent; | |
+ ngx_queue_t sub_gets; | |
+ ngx_queue_t get_queue; | |
+ | |
ngx_queue_t recv_queue; | |
ngx_queue_t send_queue; | |
} ngx_ether_memc_op_st; | |
@@ -261,8 +268,12 @@ static ngx_uint_t ngx_ether_find_chash_point(ngx_uint_t npoints, | |
static void ngx_ether_memc_read_handler(ngx_event_t *rev); | |
static void ngx_ether_memc_write_handler(ngx_event_t *wev); | |
+static void ngx_ether_multiget_cleanup_handler(void *data); | |
+static void ngx_ether_multiget_timeout_handler(ngx_event_t *ev); | |
+ | |
static void ngx_ether_memc_default_op_handler(ngx_ether_memc_op_st *op, void *data); | |
static void ngx_ether_memc_event_op_handler(ngx_ether_memc_op_st *op, void *data); | |
+static void ngx_ether_memc_multiget_op_handler(ngx_ether_memc_op_st *op, void *data); | |
static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer_st *peer, | |
protocol_binary_command cmd, const ngx_keyval_t *kv, void *data); | |
@@ -2057,6 +2068,7 @@ static ngx_int_t ngx_ether_handle_member_resp_body(ngx_connection_t *c, ngx_ethe | |
ngx_connection_t *sc = NULL; | |
ngx_peer_connection_t pc; | |
ngx_ether_memc_op_st *op; | |
+ ngx_pool_cleanup_t *cln; | |
#if NGX_DEBUG | |
ngx_keyval_t kv = {ngx_null_string, ngx_null_string}; | |
#endif /* NGX_DEBUG */ | |
@@ -2276,6 +2288,10 @@ static ngx_int_t ngx_ether_handle_member_resp_body(ngx_connection_t *c, ngx_ethe | |
q = prev_q; | |
} | |
+ if (server->get_ev.timer_set) { | |
+ ngx_del_timer(&server->get_ev); | |
+ } | |
+ | |
if (server->tmp_recv.start) { | |
ngx_pfree(c->pool, server->tmp_recv.start); | |
} | |
@@ -2449,6 +2465,20 @@ static ngx_int_t ngx_ether_handle_member_resp_body(ngx_connection_t *c, ngx_ethe | |
continue; | |
} | |
+ server->get_ev.handler = ngx_ether_multiget_timeout_handler; | |
+ server->get_ev.data = server; | |
+ server->get_ev.log = c->log; | |
+ | |
+ ngx_queue_init(&server->sub_gets); | |
+ | |
+ cln = ngx_pool_cleanup_add(c->pool, 0); | |
+ if (!cln) { | |
+ goto error; | |
+ } | |
+ | |
+ cln->handler = ngx_ether_multiget_cleanup_handler; | |
+ cln->data = &server->get_ev; | |
+ | |
ngx_queue_init(&server->recv_ops); | |
ngx_queue_init(&server->send_ops); | |
@@ -3063,6 +3093,42 @@ static void ngx_ether_memc_write_handler(ngx_event_t *wev) | |
} | |
} | |
+static void ngx_ether_multiget_cleanup_handler(void *data) | |
+{ | |
+ ngx_event_t *ev = data; | |
+ | |
+ if (ev && ev->timer_set) { | |
+ ngx_del_timer(ev); | |
+ } | |
+} | |
+ | |
+static void ngx_ether_multiget_timeout_handler(ngx_event_t *ev) | |
+{ | |
+ ngx_ether_memc_server_st *server; | |
+ ngx_keyval_t kv; | |
+ ngx_ether_memc_op_st *op; | |
+ | |
+ server = ev->data; | |
+ | |
+ ngx_str_set(&kv.key, ":ether-multiget-sentinal"); | |
+ ngx_str_null(&kv.value); | |
+ | |
+ op = ngx_ether_memc_start_operation(server->peer, PROTOCOL_BINARY_CMD_GET, &kv, server); | |
+ if (!op) { | |
+ ngx_log_error(NGX_LOG_ERR, server->peer->log, 0, | |
+ "sending memc multiget sentinal failed"); | |
+ return; | |
+ } | |
+ | |
+ op->handler = ngx_ether_memc_multiget_op_handler; | |
+ | |
+ op->sub_gets = server->sub_gets; | |
+ op->sub_gets.prev->next = &op->sub_gets; | |
+ op->sub_gets.next->prev = &op->sub_gets; | |
+ | |
+ ngx_queue_init(&server->sub_gets); | |
+} | |
+ | |
static void ngx_ether_memc_default_op_handler(ngx_ether_memc_op_st *op, void *data) | |
{ | |
if (ngx_ether_memc_complete_operation(op, NULL, NULL) != NGX_AGAIN) { | |
@@ -3077,6 +3143,39 @@ static void ngx_ether_memc_event_op_handler(ngx_ether_memc_op_st *op, void *data | |
ngx_post_event(ev, &ngx_posted_events); | |
} | |
+static void ngx_ether_memc_multiget_op_handler(ngx_ether_memc_op_st *op, void *data) | |
+{ | |
+ ngx_queue_t *q1, *q2; | |
+ ngx_ether_memc_op_st *get_op, *recv_op; | |
+ | |
+ for (q1 = ngx_queue_head(&op->sub_gets); | |
+ q1 != ngx_queue_sentinel(&op->sub_gets); | |
+ q1 = ngx_queue_next(q1)) { | |
+ get_op = ngx_queue_data(q1, ngx_ether_memc_op_st, get_queue); | |
+ | |
+ recv_op = NULL; | |
+ for (q2 = ngx_queue_head(&op->server->recv_ops); | |
+ q2 != ngx_queue_sentinel(&op->server->recv_ops); | |
+ q2 = ngx_queue_next(q2)) { | |
+ recv_op = ngx_queue_data(q2, ngx_ether_memc_op_st, recv_queue); | |
+ | |
+ if (get_op == recv_op) { | |
+ break; | |
+ } | |
+ } | |
+ | |
+ if (get_op != recv_op) { | |
+ continue; | |
+ } | |
+ | |
+ ngx_queue_remove(&recv_op->recv_queue); | |
+ | |
+ recv_op->sub_get_noent = 1; | |
+ | |
+ recv_op->handler(recv_op, recv_op->handler_data); | |
+ } | |
+} | |
+ | |
static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer_st *peer, | |
protocol_binary_command cmd, const ngx_keyval_t *kv, void *in_data) | |
{ | |
@@ -3091,7 +3190,7 @@ static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer | |
uint32_t id1; | |
ngx_ether_memc_server_st *server; | |
uint32_t hash; | |
- int is_quiet = 0; | |
+ int is_quiet = 0, multiget_sentinal = 0; | |
protocol_binary_request_header *req_hdr; | |
protocol_binary_request_set *reqs; | |
const protocol_binary_request_no_extras *in_req = in_data; | |
@@ -3104,6 +3203,12 @@ static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer | |
server = in_data; | |
in_req = NULL; | |
+ } else if (cmd == PROTOCOL_BINARY_CMD_GET && ngx_strncmp(kv->key.data, ":ether-multiget-sentinal", kv->key.len) == 0) { | |
+ multiget_sentinal = 1; | |
+ | |
+ server = in_data; | |
+ | |
+ in_req = NULL; | |
} else { | |
if (!peer->memc.npoints) { | |
return NULL; | |
@@ -3203,7 +3308,13 @@ static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer | |
p += ext_len; | |
req_hdr->request.magic = PROTOCOL_BINARY_REQ; | |
- req_hdr->request.opcode = cmd; | |
+ | |
+ if (cmd == PROTOCOL_BINARY_CMD_GET && !multiget_sentinal) { | |
+ req_hdr->request.opcode = PROTOCOL_BINARY_CMD_GETQ; | |
+ } else { | |
+ req_hdr->request.opcode = cmd; | |
+ } | |
+ | |
req_hdr->request.keylen = htons(kv->key.len); | |
req_hdr->request.extlen = ext_len; | |
req_hdr->request.datatype = PROTOCOL_BINARY_RAW_BYTES; | |
@@ -3255,6 +3366,14 @@ static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer | |
ngx_queue_insert_tail(&server->send_ops, &op->send_queue); | |
+ if (cmd == PROTOCOL_BINARY_CMD_GET) { | |
+ ngx_queue_insert_tail(&server->sub_gets, &op->get_queue); | |
+ | |
+ if (!server->get_ev.timer_set) { | |
+ ngx_add_timer(&server->get_ev, 100); | |
+ } | |
+ } | |
+ | |
server->c->write->handler(server->c->write); | |
return op; | |
@@ -3282,6 +3401,16 @@ static ngx_int_t ngx_ether_memc_complete_operation(const ngx_ether_memc_op_st *o | |
protocol_binary_response_no_extras *out_res = out_data; | |
protocol_binary_response_get *resg, *out_resg = out_data; | |
+ if (op->sub_get_noent) { | |
+ if (out_res) { | |
+ out_res->message.header.response.status = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; | |
+ //out_res->message.header.response.cas = 0; | |
+ } | |
+ | |
+ ngx_log_error(NGX_LOG_DEBUG, op->log, 0, "memcached error %hd: ", PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); | |
+ return NGX_ERROR; | |
+ } | |
+ | |
if (!op->recv.start) { | |
/* memc_read_handler has not yet been invoked for this op */ | |
return NGX_AGAIN; | |
@@ -3330,7 +3459,7 @@ static ngx_int_t ngx_ether_memc_complete_operation(const ngx_ether_memc_op_st *o | |
out_res->message.header.response.status = status; | |
out_res->message.header.response.cas = ntohll(res_hdr->response.cas); | |
- if (res_hdr->response.opcode == PROTOCOL_BINARY_CMD_GET) { | |
+ if (res_hdr->response.opcode == PROTOCOL_BINARY_CMD_GETQ) { | |
resg = (protocol_binary_response_get *)res_hdr; | |
out_resg->message.body.flags = ntohl(resg->message.body.flags); | |
} | |
@@ -3346,7 +3475,7 @@ static ngx_int_t ngx_ether_memc_complete_operation(const ngx_ether_memc_op_st *o | |
log_level = NGX_LOG_ERR; | |
- if (res_hdr->response.opcode == PROTOCOL_BINARY_CMD_GET | |
+ if (res_hdr->response.opcode == PROTOCOL_BINARY_CMD_GETQ | |
&& status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) { | |
log_level = NGX_LOG_DEBUG; | |
} | |
@@ -3369,6 +3498,11 @@ static void ngx_ether_memc_cleanup_operation(ngx_ether_memc_op_st *op) | |
ngx_queue_remove(&op->send_queue); | |
} | |
+ /*if (ngx_queue_prev(&op->get_queue) | |
+ && ngx_queue_next(ngx_queue_prev(&op->get_queue)) == &op->get_queue) { | |
+ ngx_queue_remove(&op->get_queue); | |
+ }*/ | |
+ | |
if (op->send.start) { | |
ngx_pfree(peer->pool, op->send.start); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment