[Sheepdog] [zookeeper][PATCH 3/5] Fix bug: lose zookeeper event

Yunkai Zhang yunkai.me at gmail.com
Tue Mar 6 19:10:34 CET 2012


Use zookeeper distributed lock to prevent event lost.

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 sheep/cluster/zookeeper.c |   63 +++++++++++++++++++++++++++++++++++----------
 1 files changed, 49 insertions(+), 14 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 23ce4ab..c2592d2 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -72,8 +72,10 @@ static void zk_lock(zhandle_t *zh)
 again:
 	rc = zoo_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
 			ZOO_EPHEMERAL, NULL, 0);
-	if (rc == ZOK)
+	if (rc == ZOK){
+		dprintf("locked\n");
 		return;
+	}
 	else if (rc == ZNODEEXISTS) {
 		dprintf("retry\n");
 		usleep(10000); /* FIXME: use watch notification */
@@ -89,11 +91,14 @@ static void zk_unlock(zhandle_t *zh)
 	rc = zoo_delete(zh, LOCK_ZNODE, -1);
 	if (rc != ZOK)
 		panic("failed to release lock\n");
+
+	dprintf("unlocked\n");
 }
 
 
 /* ZooKeeper-based queue */
 
+static int efd;
 static int queue_pos;
 
 static int zk_queue_empty(zhandle_t *zh)
@@ -114,19 +119,27 @@ static void zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 {
 	int rc;
 	char path[256], buf[256];
+	eventfd_t value = 1;
+
+	zk_lock(zh);
 
 	sprintf(path, "%s/", QUEUE_ZNODE);
 	rc = zoo_create(zh, path, (char *)ev, sizeof(*ev),
 			&ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
+	assert(rc == ZOK);
 
+	dprintf("create path:%s, queue_pos:%d\n", buf, queue_pos);
 	if (queue_pos < 0) {
 		/* the first pushed data should be EVENT_JOIN */
 		assert(ev->type == EVENT_JOIN);
 		sscanf(buf, QUEUE_ZNODE "/%010d", &queue_pos);
 
-		/* watch */
-		zoo_exists(zh, buf, 1, NULL);
+		/* manual notify */
+		dprintf("write event to efd:%d\n", efd);
+		eventfd_write(efd, value);
 	}
+
+	zk_unlock(zh);
 }
 
 static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
@@ -134,33 +147,57 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
 	int rc;
 	char path[256];
 
+	zk_lock(zh);
+
 	queue_pos--;
 
 	if (ev) {
 		/* update the last popped data */
 		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+		dprintf("update path:%s, queue_pos:%d\n", path, queue_pos);
 		rc = zoo_set(zh, path, (char *)ev, sizeof(*ev), -1);
+		assert(rc == ZOK);
+
 	}
 
+	zk_unlock(zh);
+
 	return 0;
 }
 
 static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 {
-	int rc, len;
+	int rc, len, pos;
 	char path[256];
+	eventfd_t value = 1;
 
 	if (zk_queue_empty(zh))
 		return -1;
 
+	zk_lock(zh);
+
 	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
 	len = sizeof(*ev);
+	dprintf("read path:%s\n", path);
 	rc = zoo_get(zh, path, 1, (char *)ev, &len, NULL);
+	assert(rc == ZOK);
 
-	/* watch next data */
-	queue_pos++;
-	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-	zoo_exists(zh, path, 1, NULL);
+	pos = queue_pos++;
+	do {
+		/* watch next data */
+		pos++;
+		sprintf(path, QUEUE_ZNODE "/%010d", pos);
+		dprintf("watch path:%s\n", path);
+
+		rc = zoo_exists(zh, path, 1, NULL);
+		if (rc == ZOK) {
+			/* we lost this message, manual notify */
+			dprintf("write event to efd:%d\n", efd);
+			eventfd_write(efd, value);
+		}
+	} while (rc == ZOK);
+
+	zk_unlock(zh);
 
 	return 0;
 }
@@ -219,7 +256,6 @@ static void zk_queue_init(zhandle_t *zh)
 /* ZooKeeper driver APIs */
 
 static zhandle_t *zhandle;
-static int efd;
 
 static struct work_queue *zk_block_wq;
 
@@ -262,8 +298,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 	struct sd_node *n;
 	struct zk_event ev;
 
-	zk_lock(zh);
-
 	ev.type = type;
 	ev.sender = *node;
 	ev.buf_len = buf_len;
@@ -272,6 +306,7 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 		memcpy(ev.buf, buf, buf_len);
 
 	ev.nr_nodes = get_nodes(zh, ev.nodes);
+	dprintf("get_nodes, nr_nodes:%d\n", ev.nr_nodes);
 
 	switch (type) {
 	case EVENT_JOIN:
@@ -296,8 +331,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 
 	zk_queue_push(zh, &ev);
 out:
-	zk_unlock(zh);
-
 	return 0;
 }
 
@@ -447,11 +480,13 @@ static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
 
 static void zk_block(struct work *work)
 {
+	int rc;
 	struct zk_event ev;
 
 	pthread_mutex_lock(&queue_lock);
 
-	zk_queue_pop(zhandle, &ev);
+	rc = zk_queue_pop(zhandle, &ev);
+	assert(rc == 0);
 
 	ev.block_cb(ev.buf);
 	ev.blocked = 0;
-- 
1.7.7.6




More information about the sheepdog mailing list