From: Liu Yuan <tailai.ly at taobao.com> Kick block event only if there is no nonblock event. We perfer to handle nonblock event becasue: 1. Sheep assuems that unblock() and notify() is a transaction, so we can only kick next block event after sd_notify_handler() is called 2. We should process leave/join event as soon as possible. - use zk_node for block event Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/cluster/zookeeper.c | 106 +++++++++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index fe4093d..336c827 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -49,13 +49,13 @@ enum zk_event_type { }; struct zk_node { + struct list_head list; struct rb_node rb; struct sd_node node; + bool callbacked; }; struct zk_event { - struct list_head list; - bool callbacked; enum zk_event_type type; struct zk_node sender; enum cluster_join_result join_result; @@ -66,7 +66,7 @@ struct zk_event { static struct sd_node sd_nodes[SD_MAX_NODES]; static size_t nr_sd_nodes; struct rb_root zk_node_root = RB_ROOT; -static LIST_HEAD(zk_block_event_list); +static LIST_HEAD(zk_block_list); static bool joined; static struct zk_node *zk_tree_insert(struct zk_node *new) @@ -214,9 +214,9 @@ static bool zk_queue_peek(void) rc = zk_node_exists(path); if (rc == ZOK) - return false; + return true; - return true; + return false; } static void zk_queue_push(struct zk_event *ev) @@ -264,29 +264,17 @@ static int zk_queue_push_back(struct zk_event *ev) return 0; } -static int zk_queue_pop(struct zk_event *ev) +static void zk_queue_pop_advance(struct zk_event *ev) { int len; char path[256]; - if (zk_queue_peek()) - return -1; - len = sizeof(*ev); sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos); assert(zk_get_data(path, ev, &len) == ZOK); dprintf("%s, type:%d, len:%d, pos:%"PRId32"\n", path, ev->type, len, queue_pos); - - /* watch next */ queue_pos++; - sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos); - /* If not joined, we must wait for join response. No kick. */ - if (zk_node_exists(path) == ZOK && joined) - /* Someone has created this node, go kick event handler */ - eventfd_write(efd, 1); - - return 0; } static int zk_member_empty(void) @@ -550,32 +538,27 @@ static void zk_handle_join_response(struct zk_event *ev) ev->join_result, ev->buf); } -/* When block event is deleted from list, we should call this function */ -static void kick_next_block_event(void) +static void kick_block_event(void) { - struct zk_event *zke; + struct zk_node *block; - if (list_empty(&zk_block_event_list)) + if (list_empty(&zk_block_list)) return; - zke = list_first_entry(&zk_block_event_list, typeof(*zke), list); - if (!zke->callbacked) - zke->callbacked = sd_block_handler(&zke->sender.node); + block = list_first_entry(&zk_block_list, typeof(*block), list); + if (!block->callbacked) + block->callbacked = sd_block_handler(&block->node); } -static bool block_event_list_del(struct zk_node *n) +static void block_event_list_del(struct zk_node *n) { - struct zk_event *ev, *t; - bool ret = false; + struct zk_node *ev, *t; - list_for_each_entry_safe(ev, t, &zk_block_event_list, list) { - if (node_eq(&ev->sender.node, &n->node)) { + list_for_each_entry_safe(ev, t, &zk_block_list, list) { + if (node_eq(&ev->node, &n->node)) { list_del(&ev->list); free(ev); - ret = true; } } - - return ret; } static void zk_handle_leave(struct zk_event *ev) @@ -587,8 +570,7 @@ static void zk_handle_leave(struct zk_event *ev) node_to_str(&ev->sender.node)); return; } - if (block_event_list_del(n)) - kick_next_block_event(); + block_event_list_del(n); zk_tree_del(n); build_node_list(); sd_leave_handler(&ev->sender.node, sd_nodes, nr_sd_nodes); @@ -596,29 +578,30 @@ static void zk_handle_leave(struct zk_event *ev) static void zk_handle_block(struct zk_event *ev) { - struct zk_event *zke = xzalloc(sizeof(*zke)); + struct zk_node *block = xmalloc(sizeof(*block)); dprintf("BLOCK\n"); - *zke = *ev; - zke->callbacked = false; - list_add_tail(&zke->list, &zk_block_event_list); - zke = list_first_entry(&zk_block_event_list, typeof(*zke), list); - if (!zke->callbacked) - zke->callbacked = sd_block_handler(&zke->sender.node); + block->node = ev->sender.node; + block->callbacked = false; + list_add_tail(&block->list, &zk_block_list); + block = list_first_entry(&zk_block_list, typeof(*block), list); + if (!block->callbacked) + block->callbacked = sd_block_handler(&block->node); } static void zk_handle_unblock(struct zk_event *ev) { - struct zk_event *zke; + struct zk_node *block; dprintf("UNBLOCK\n"); - zke = list_first_entry(&zk_block_event_list, typeof(*zke), list); - if (zke->callbacked) - add_event(EVENT_NOTIFY, &zke->sender, ev->buf, ev->buf_len); + if (list_empty(&zk_block_list)) + return; + block = list_first_entry(&zk_block_list, typeof(*block), list); + if (block->callbacked) + add_event(EVENT_NOTIFY, block, ev->buf, ev->buf_len); - list_del(&zke->list); - free(zke); - kick_next_block_event(); + list_del(&block->list); + free(block); } static void zk_handle_notify(struct zk_event *ev) @@ -643,6 +626,7 @@ static void zk_event_handler(int listen_fd, int events, void *data) eventfd_t value; struct zk_event ev; + dprintf("%d, %d\n", events, queue_pos); if (events & EPOLLHUP) { eprintf("zookeeper driver received EPOLLHUP event, exiting.\n"); log_close(); @@ -654,13 +638,31 @@ static void zk_event_handler(int listen_fd, int events, void *data) return; } - if (zk_queue_pop(&ev) < 0) - return; + if (!zk_queue_peek()) + goto kick_block_event; + zk_queue_pop_advance(&ev); if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type]) zk_event_handlers[ev.type](&ev); else - eprintf("unhandled type %d\n", ev.type); + panic("unhandled type %d\n", ev.type); + + /* Someone has created next event, go kick event handler. */ + if (zk_queue_peek() && joined) { + eventfd_write(efd, 1); + return; + } + +kick_block_event: + /* + * Kick block event only if there is no nonblock event. We perfer to + * handle nonblock event becasue: + * + * 1. Sheep assuems that unblock() and notify() is a transaction, so we + * can only kick next block event after sd_notify_handler() is called + * 2. We should process leave/join event as soon as possible. + */ + kick_block_event(); } static int zk_init(const char *option) -- 1.7.9.5 |