1. Change pthread lock in zk_dispatch/zk_block so that it will not block IO request between sheep. 2. If one node pointed to by queue_pos was send by leaver, and it have blocked whole cluster, we should ignore it. Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- sheep/cluster/zookeeper.c | 66 +++++++++++++++++++++++++++++++------------- 1 files changed, 46 insertions(+), 20 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 16b50e3..527c6e4 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -91,12 +91,13 @@ struct zk_event { /* leave event list */ static LIST_HEAD(zk_levent_list); +static int zk_notify_blocked; static struct zk_node zk_nodes[SD_MAX_NODES]; static size_t nr_zk_nodes; static char *zk_option; -/* protect queue_start_pos */ -static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; +/* protect zk_notify_blocked */ +static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER; /* protect leave event list */ static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER; @@ -291,8 +292,29 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) list_del(&lev->list); pthread_mutex_unlock(&leave_lock); + /* if the node pointed to by queue_pos was send by this leaver, + * and it have blocked whole cluster, we should ignore it. */ + len = sizeof(*ev); + sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); + rc = zk_get(zh, path, 1, (char *)ev, &len, NULL); + if (rc == ZOK && node_cmp(&ev->sender.node, &lev->sender.node) == 0 && ev->blocked) { + dprintf("this queue_pos:%d have blocked whole cluster, ignore it\n", queue_pos); + queue_pos++; + + /* watch next data */ + sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); + rc = zk_exists(zh, path, 1, NULL); + dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK)); + if (rc == ZOK) { + /* we lost this message, manual notify */ + dprintf("write event to efd:%d\n", efd); + eventfd_write(efd, value); + } + } + memcpy(ev, lev, sizeof(*ev)); free(lev); + return 0; } @@ -753,14 +775,7 @@ static int zk_join(struct sd_node *myself, static int zk_leave(void) { - int rc; - - /* protect zk_levent_list */ - pthread_mutex_lock(&queue_lock); - rc = add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL); - pthread_mutex_unlock(&queue_lock); - - return rc; + return add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL); } static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg)) @@ -773,9 +788,6 @@ static void zk_block(struct work *work) int rc; struct zk_event ev; - /* get lock only after zk_dispatch finished */ - pthread_mutex_lock(&queue_lock); - rc = zk_queue_pop(zhandle, &ev); assert(rc == 0); @@ -784,7 +796,11 @@ static void zk_block(struct work *work) zk_queue_push_back(zhandle, &ev); - pthread_mutex_unlock(&queue_lock); + pthread_mutex_lock(&blocked_lock); + dprintf("blocked_lock\n"); + zk_notify_blocked = 0; + pthread_mutex_unlock(&blocked_lock); + dprintf("blocked_unlock\n"); } static void zk_block_done(struct work *work) @@ -810,8 +826,15 @@ static int zk_dispatch(void) if (ret < 0) return 0; - /* protect zk_levent_list/nr_zk_nodes and prevent zk_block working */ - pthread_mutex_lock(&queue_lock); + pthread_mutex_lock(&blocked_lock); + dprintf("blocked_lock\n"); + if (zk_notify_blocked) { + pthread_mutex_unlock(&blocked_lock); + dprintf("blocked_unlock, return.\n"); + return 0; + } + pthread_mutex_unlock(&blocked_lock); + dprintf("blocked_unlock, process...\n"); ret = zk_queue_pop(zhandle, &ev); if (ret < 0) @@ -819,6 +842,7 @@ static int zk_dispatch(void) switch (ev.type) { case EVENT_JOIN: + dprintf("JOIN EVENT, blocked:%d\n", ev.blocked); if (ev.blocked) { dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n", nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined); @@ -879,6 +903,7 @@ static int zk_dispatch(void) ev.join_result, ev.buf); break; case EVENT_LEAVE: + dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked); /*reset master if necessary */ dprintf("find node:%s\n", node_to_str(&ev.sender.node)); n = find_node(zk_nodes, nr_zk_nodes, &ev.sender); @@ -901,13 +926,15 @@ static int zk_dispatch(void) zk_hdlrs.leave_handler(&ev.sender.node, entries, nr_zk_nodes); break; case EVENT_NOTIFY: + dprintf("NOTIFY, blocked:%d\n", ev.blocked); if (ev.blocked) { - if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) { - queue_work(zk_block_wq, &work); - + if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) { ev.callbacked = 1; + zk_notify_blocked = 1; zk_queue_push_back(zhandle, &ev); + + queue_work(zk_block_wq, &work); } else zk_queue_push_back(zhandle, NULL); @@ -920,7 +947,6 @@ static int zk_dispatch(void) break; } out: - pthread_mutex_unlock(&queue_lock); return 0; } -- 1.7.7.6 |