[Sheepdog] [zookeeper][PATCH 5/6] Fix two bug:

Yunkai Zhang yunkai.me at gmail.com
Thu Apr 5 12:23:05 CEST 2012


 1. Change pthread lock in zk_dispatch/zk_block so that
    it will not block IO request between sheep.
 2. If one node pointed to by queue_pos was send by leaver,
    and it have blocked whole cluster, we should ignore it.

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 sheep/cluster/zookeeper.c |   66 +++++++++++++++++++++++++++++++-------------
 1 files changed, 46 insertions(+), 20 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 16b50e3..527c6e4 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -91,12 +91,13 @@ struct zk_event {
 /* leave event list */
 static LIST_HEAD(zk_levent_list);
 
+static int zk_notify_blocked;
 static struct zk_node zk_nodes[SD_MAX_NODES];
 static size_t nr_zk_nodes;
 static char *zk_option;
 
-/* protect queue_start_pos */
-static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
+/* protect zk_notify_blocked */
+static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /* protect leave event list */
 static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -291,8 +292,29 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 		list_del(&lev->list);
 		pthread_mutex_unlock(&leave_lock);
 
+		/* if the node pointed to by queue_pos was send by this leaver,
+		 * and it have blocked whole cluster, we should ignore it. */
+		len = sizeof(*ev);
+		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+		rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
+		if (rc == ZOK && node_cmp(&ev->sender.node, &lev->sender.node) == 0 && ev->blocked) {
+			dprintf("this queue_pos:%d have blocked whole cluster, ignore it\n", queue_pos);
+			queue_pos++;
+
+			/* watch next data */
+			sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+			rc = zk_exists(zh, path, 1, NULL);
+			dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
+			if (rc == ZOK) {
+				/* we lost this message, manual notify */
+				dprintf("write event to efd:%d\n", efd);
+				eventfd_write(efd, value);
+			}
+		}
+
 		memcpy(ev, lev, sizeof(*ev));
 		free(lev);
+
 		return 0;
 	}
 
@@ -753,14 +775,7 @@ static int zk_join(struct sd_node *myself,
 
 static int zk_leave(void)
 {
-	int rc;
-
-	/* protect zk_levent_list */
-	pthread_mutex_lock(&queue_lock);
-	rc = add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
-	pthread_mutex_unlock(&queue_lock);
-
-	return rc;
+	return add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
 }
 
 static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
@@ -773,9 +788,6 @@ static void zk_block(struct work *work)
 	int rc;
 	struct zk_event ev;
 
-	/* get lock only after zk_dispatch finished */
-	pthread_mutex_lock(&queue_lock);
-
 	rc = zk_queue_pop(zhandle, &ev);
 	assert(rc == 0);
 
@@ -784,7 +796,11 @@ static void zk_block(struct work *work)
 
 	zk_queue_push_back(zhandle, &ev);
 
-	pthread_mutex_unlock(&queue_lock);
+	pthread_mutex_lock(&blocked_lock);
+	dprintf("blocked_lock\n");
+	zk_notify_blocked = 0;
+	pthread_mutex_unlock(&blocked_lock);
+	dprintf("blocked_unlock\n");
 }
 
 static void zk_block_done(struct work *work)
@@ -810,8 +826,15 @@ static int zk_dispatch(void)
 	if (ret < 0)
 		return 0;
 
-	/* protect zk_levent_list/nr_zk_nodes and prevent zk_block working */
-	pthread_mutex_lock(&queue_lock);
+	pthread_mutex_lock(&blocked_lock);
+	dprintf("blocked_lock\n");
+	if (zk_notify_blocked) {
+		pthread_mutex_unlock(&blocked_lock);
+		dprintf("blocked_unlock, return.\n");
+		return 0;
+	}
+	pthread_mutex_unlock(&blocked_lock);
+	dprintf("blocked_unlock, process...\n");
 
 	ret = zk_queue_pop(zhandle, &ev);
 	if (ret < 0)
@@ -819,6 +842,7 @@ static int zk_dispatch(void)
 
 	switch (ev.type) {
 	case EVENT_JOIN:
+		dprintf("JOIN EVENT, blocked:%d\n", ev.blocked);
 		if (ev.blocked) {
 			dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n",
 					nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
@@ -879,6 +903,7 @@ static int zk_dispatch(void)
 				    ev.join_result, ev.buf);
 		break;
 	case EVENT_LEAVE:
+		dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
 		/*reset master if necessary */
 		dprintf("find node:%s\n", node_to_str(&ev.sender.node));
 		n = find_node(zk_nodes, nr_zk_nodes, &ev.sender);
@@ -901,13 +926,15 @@ static int zk_dispatch(void)
 		zk_hdlrs.leave_handler(&ev.sender.node, entries, nr_zk_nodes);
 		break;
 	case EVENT_NOTIFY:
+		dprintf("NOTIFY, blocked:%d\n", ev.blocked);
 		if (ev.blocked) {
-			if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
-				queue_work(zk_block_wq, &work);
-
+			if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) {
 				ev.callbacked = 1;
+				zk_notify_blocked = 1;
 
 				zk_queue_push_back(zhandle, &ev);
+
+				queue_work(zk_block_wq, &work);
 			} else
 				zk_queue_push_back(zhandle, NULL);
 
@@ -920,7 +947,6 @@ static int zk_dispatch(void)
 		break;
 	}
 out:
-	pthread_mutex_unlock(&queue_lock);
 	return 0;
 }
 
-- 
1.7.7.6




More information about the sheepdog mailing list