From: Yunkai Zhang <qiushu.zyk at taobao.com> 1. Change pthread lock in zk_dispatch/zk_block so that it will not block IO request between sheep. 2. If one node pointed to by queue_pos was send by leaver, and it have blocked whole cluster, we should ignore it. Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- sheep/cluster/zookeeper.c | 46 +++++++++++++++++++++++++------------------- 1 files changed, 26 insertions(+), 20 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 8daddfc..2d7dd96 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -91,11 +91,12 @@ struct zk_event { /* leave event list */ static LIST_HEAD(zk_levent_list); +static int zk_notify_blocked; static struct zk_node zk_nodes[SD_MAX_NODES]; static size_t nr_zk_nodes; -/* protect queue_start_pos */ -static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; +/* protect zk_notify_blocked */ +static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER; /* protect leave event list */ static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER; @@ -298,6 +299,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) memcpy(ev, lev, sizeof(*ev)); free(lev); + return 0; } @@ -729,14 +731,7 @@ static int zk_join(struct sd_node *myself, static int zk_leave(void) { - int rc; - - /* protect zk_levent_list */ - pthread_mutex_lock(&queue_lock); - rc = add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL); - pthread_mutex_unlock(&queue_lock); - - return rc; + return add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL); } static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg)) @@ -749,9 +744,6 @@ static void zk_block(struct work *work) int rc; struct zk_event ev; - /* get lock only after zk_dispatch finished */ - pthread_mutex_lock(&queue_lock); - rc = zk_queue_pop(zhandle, &ev); assert(rc == 0); @@ -760,7 +752,11 @@ static void zk_block(struct work *work) zk_queue_push_back(zhandle, &ev); - pthread_mutex_unlock(&queue_lock); + pthread_mutex_lock(&blocked_lock); + dprintf("blocked_lock\n"); + zk_notify_blocked = 0; + pthread_mutex_unlock(&blocked_lock); + dprintf("blocked_unlock\n"); } static void zk_block_done(struct work *work) @@ -786,8 +782,15 @@ static int zk_dispatch(void) if (ret < 0) return 0; - /* protect zk_levent_list/nr_zk_nodes and prevent zk_block working */ - pthread_mutex_lock(&queue_lock); + pthread_mutex_lock(&blocked_lock); + dprintf("blocked_lock\n"); + if (zk_notify_blocked) { + pthread_mutex_unlock(&blocked_lock); + dprintf("blocked_unlock, return.\n"); + return 0; + } + pthread_mutex_unlock(&blocked_lock); + dprintf("blocked_unlock, process...\n"); ret = zk_queue_pop(zhandle, &ev); if (ret < 0) @@ -795,6 +798,7 @@ static int zk_dispatch(void) switch (ev.type) { case EVENT_JOIN: + dprintf("JOIN EVENT, blocked:%d\n", ev.blocked); if (ev.blocked) { dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n", nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined); @@ -855,6 +859,7 @@ static int zk_dispatch(void) ev.join_result, ev.buf); break; case EVENT_LEAVE: + dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked); /*reset master if necessary */ dprintf("find node:%s\n", node_to_str(&ev.sender.node)); n = find_node(zk_nodes, nr_zk_nodes, &ev.sender); @@ -877,13 +882,15 @@ static int zk_dispatch(void) sd_leave_handler(&ev.sender.node, entries, nr_zk_nodes); break; case EVENT_NOTIFY: + dprintf("NOTIFY, blocked:%d\n", ev.blocked); if (ev.blocked) { - if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) { - queue_work(zk_block_wq, &work); - + if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) { ev.callbacked = 1; + zk_notify_blocked = 1; zk_queue_push_back(zhandle, &ev); + + queue_work(zk_block_wq, &work); } else zk_queue_push_back(zhandle, NULL); @@ -896,7 +903,6 @@ static int zk_dispatch(void) break; } out: - pthread_mutex_unlock(&queue_lock); return 0; } -- 1.7.7.6 |