[Sheepdog] [zookeeper][PATCH v2 04/11] Fix two bug:

Yunkai Zhang yunkai.me at gmail.com
Thu Apr 26 17:21:23 CEST 2012


From: Yunkai Zhang <qiushu.zyk at taobao.com>

1. Change pthread lock in zk_dispatch/zk_block so that
it will not block IO request between sheep.
2. If one node pointed to by queue_pos was send by leaver,
and it have blocked whole cluster, we should ignore it.

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 sheep/cluster/zookeeper.c |   46 +++++++++++++++++++++++++-------------------
 1 files changed, 26 insertions(+), 20 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 8daddfc..2d7dd96 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -91,11 +91,12 @@ struct zk_event {
 /* leave event list */
 static LIST_HEAD(zk_levent_list);
 
+static int zk_notify_blocked;
 static struct zk_node zk_nodes[SD_MAX_NODES];
 static size_t nr_zk_nodes;
 
-/* protect queue_start_pos */
-static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
+/* protect zk_notify_blocked */
+static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /* protect leave event list */
 static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -298,6 +299,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 
 		memcpy(ev, lev, sizeof(*ev));
 		free(lev);
+
 		return 0;
 	}
 
@@ -729,14 +731,7 @@ static int zk_join(struct sd_node *myself,
 
 static int zk_leave(void)
 {
-	int rc;
-
-	/* protect zk_levent_list */
-	pthread_mutex_lock(&queue_lock);
-	rc = add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
-	pthread_mutex_unlock(&queue_lock);
-
-	return rc;
+	return add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
 }
 
 static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
@@ -749,9 +744,6 @@ static void zk_block(struct work *work)
 	int rc;
 	struct zk_event ev;
 
-	/* get lock only after zk_dispatch finished */
-	pthread_mutex_lock(&queue_lock);
-
 	rc = zk_queue_pop(zhandle, &ev);
 	assert(rc == 0);
 
@@ -760,7 +752,11 @@ static void zk_block(struct work *work)
 
 	zk_queue_push_back(zhandle, &ev);
 
-	pthread_mutex_unlock(&queue_lock);
+	pthread_mutex_lock(&blocked_lock);
+	dprintf("blocked_lock\n");
+	zk_notify_blocked = 0;
+	pthread_mutex_unlock(&blocked_lock);
+	dprintf("blocked_unlock\n");
 }
 
 static void zk_block_done(struct work *work)
@@ -786,8 +782,15 @@ static int zk_dispatch(void)
 	if (ret < 0)
 		return 0;
 
-	/* protect zk_levent_list/nr_zk_nodes and prevent zk_block working */
-	pthread_mutex_lock(&queue_lock);
+	pthread_mutex_lock(&blocked_lock);
+	dprintf("blocked_lock\n");
+	if (zk_notify_blocked) {
+		pthread_mutex_unlock(&blocked_lock);
+		dprintf("blocked_unlock, return.\n");
+		return 0;
+	}
+	pthread_mutex_unlock(&blocked_lock);
+	dprintf("blocked_unlock, process...\n");
 
 	ret = zk_queue_pop(zhandle, &ev);
 	if (ret < 0)
@@ -795,6 +798,7 @@ static int zk_dispatch(void)
 
 	switch (ev.type) {
 	case EVENT_JOIN:
+		dprintf("JOIN EVENT, blocked:%d\n", ev.blocked);
 		if (ev.blocked) {
 			dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n",
 					nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
@@ -855,6 +859,7 @@ static int zk_dispatch(void)
 				    ev.join_result, ev.buf);
 		break;
 	case EVENT_LEAVE:
+		dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
 		/*reset master if necessary */
 		dprintf("find node:%s\n", node_to_str(&ev.sender.node));
 		n = find_node(zk_nodes, nr_zk_nodes, &ev.sender);
@@ -877,13 +882,15 @@ static int zk_dispatch(void)
 		sd_leave_handler(&ev.sender.node, entries, nr_zk_nodes);
 		break;
 	case EVENT_NOTIFY:
+		dprintf("NOTIFY, blocked:%d\n", ev.blocked);
 		if (ev.blocked) {
-			if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
-				queue_work(zk_block_wq, &work);
-
+			if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) {
 				ev.callbacked = 1;
+				zk_notify_blocked = 1;
 
 				zk_queue_push_back(zhandle, &ev);
+
+				queue_work(zk_block_wq, &work);
 			} else
 				zk_queue_push_back(zhandle, NULL);
 
@@ -896,7 +903,6 @@ static int zk_dispatch(void)
 		break;
 	}
 out:
-	pthread_mutex_unlock(&queue_lock);
 	return 0;
 }
 
-- 
1.7.7.6




More information about the sheepdog mailing list