[sheepdog] [PATCH 7/8] zookeeper: pass membership in event for join request

Liu Yuan namei.unix at gmail.com
Sun Dec 23 16:26:31 CET 2012


From: Liu Yuan <tailai.ly at taobao.com>

We shouldn't simply get the membership from zookeeper because we might lose
some event when multiple events happens, for e,g, one node leave while another
joins in, we would possibly see a single event instead of two events that are
expected by sheep.

Pass membership in the join event solves it.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/cluster.h           |    2 +-
 sheep/cluster/zookeeper.c |  119 ++++++++++++++++++++++++++++++---------------
 2 files changed, 80 insertions(+), 41 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 2679c19..d1e711d 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -24,7 +24,7 @@
 #include "logger.h"
 
 /* maximum payload size sent in ->notify and ->unblock */
-#define SD_MAX_EVENT_BUF_SIZE (64 * 1024)
+#define SD_MAX_EVENT_BUF_SIZE (128 * 1024) /* 128k */
 
 enum cluster_join_result {
 	CJ_RES_SUCCESS, /* Success */
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 336c827..bf91ea3 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -59,6 +59,8 @@ struct zk_event {
 	enum zk_event_type type;
 	struct zk_node sender;
 	enum cluster_join_result join_result;
+	size_t msg_len;
+	size_t nr_nodes;
 	size_t buf_len;
 	uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
 };
@@ -153,6 +155,8 @@ zk_create_node(const char *path, const char *value, int valuelen,
 	do {
 		rc = zoo_create(zhandle, path, value, valuelen, acl,
 				flags, path_buffer, path_buffer_len);
+		if (rc != ZOK && rc != ZNODEEXISTS)
+			eprintf("failed, path:%s, rc:%d\n", path, rc);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
 	return rc;
 }
@@ -164,6 +168,8 @@ static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
 	do {
 		rc = zoo_get(zhandle, path, 1, (char *)buffer,
 			     buffer_len, NULL);
+		if (rc != ZOK)
+			eprintf("failed, path:%s, rc:%d\n", path, rc);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
 	return rc;
 }
@@ -185,6 +191,8 @@ static inline ZOOAPI int zk_node_exists(const char *path)
 	int rc;
 	do {
 		rc = zoo_exists(zhandle, path, 1, NULL);
+		if (rc != ZOK && rc != ZNONODE)
+			eprintf("failed, path:%s, rc:%d\n", path, rc);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
 
 	return rc;
@@ -239,20 +247,25 @@ static void zk_queue_push(struct zk_event *ev)
 		first_push = false;
 	}
 
-	if (ret == ZOK)
-		dprintf("create:%s, queue_pos:%010"PRId32", len:%d\n",
-			buf, queue_pos, len);
+	dprintf("create path:%s, queue_pos:%010"PRId32", len:%d, ret %d\n",
+		buf, queue_pos, len, ret);
 }
 
-/*
- * Change the event in place and expect the dedicated handler to be called
- * via zk_watcher which wakes up one of the zk_event_handlers.
- */
-static int zk_queue_push_back(struct zk_event *ev)
+static inline void *zk_event_sd_nodes(struct zk_event *ev)
+{
+	return (char *)ev->buf + ev->msg_len;
+}
+
+/* Change the join event in place and piggyback the nodes information. */
+static int push_join_response(struct zk_event *ev)
 {
-	int len;
 	char path[256];
+	int len;
 
+	ev->type = EVENT_JOIN_RESPONSE;
+	ev->nr_nodes = nr_sd_nodes;
+	memcpy(zk_event_sd_nodes(ev), sd_nodes,
+	       nr_sd_nodes * sizeof(struct sd_node));
 	queue_pos--;
 
 	len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
@@ -362,33 +375,6 @@ static void zk_queue_init(void)
 	zk_init_node(MEMBER_ZNODE);
 }
 
-static void zk_member_init(void)
-{
-	int rc, len;
-	struct String_vector strs;
-	struct zk_node znode;
-	char path[256];
-
-	if (zk_member_empty())
-		return;
-
-	FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
-		len = sizeof(znode);
-		rc = zk_get_data(path, &znode, &len);
-		if (rc != ZOK)
-			continue;
-
-		switch (rc) {
-		case ZOK:
-			zk_tree_add(&znode);
-		case ZNONODE:
-			break;
-		default:
-			panic("zk_get_data failed:%s, rc:%d\n", path, rc);
-		}
-	}
-}
-
 static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
 		     size_t buf_len)
 {
@@ -431,6 +417,26 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 
 }
 
+/*
+ * We plcaehode the enough space to piggyback the nodes information on join
+ * response message so that every node can see the same membership view.
+ */
+static int add_join_event(void *msg, size_t msg_len)
+{
+	struct zk_event ev;
+	size_t len = msg_len + sizeof(struct sd_node) * SD_MAX_NODES;
+
+	assert(len <= SD_MAX_EVENT_BUF_SIZE);
+	ev.type = EVENT_JOIN_REQUEST;
+	ev.sender = this_node;
+	ev.msg_len = msg_len;
+	ev.buf_len = len;
+	if (msg)
+		memcpy(ev.buf, msg, msg_len);
+	zk_queue_push(&ev);
+	return 0;
+}
+
 static int zk_join(const struct sd_node *myself,
 		   void *opaque, size_t opaque_len)
 {
@@ -450,7 +456,7 @@ static int zk_join(const struct sd_node *myself,
 	while (zk_member_empty() && zk_master_create() != ZOK)
 		;/* wait */
 
-	return add_event(EVENT_JOIN_REQUEST, &this_node, opaque, opaque_len);
+	return add_join_event(opaque, opaque_len);
 }
 
 static int zk_leave(void)
@@ -483,10 +489,10 @@ static void zk_handle_join_request(struct zk_event *ev)
 		queue_pos--;
 		return;
 	}
+
 	res = sd_check_join_cb(&ev->sender.node, ev->buf);
 	ev->join_result = res;
-	ev->type = EVENT_JOIN_RESPONSE;
-	zk_queue_push_back(ev);
+	push_join_response(ev);
 	if (res == CJ_RES_MASTER_TRANSFER) {
 		eprintf("failed to join sheepdog cluster: "
 			"please retry when master is up\n");
@@ -496,13 +502,46 @@ static void zk_handle_join_request(struct zk_event *ev)
 	dprintf("I'm the master now\n");
 }
 
+static void watch_all_nodes(void)
+{
+	struct String_vector strs;
+	struct zk_node znode;
+	char path[256];
+	int len = sizeof(znode);
+
+	if (zk_member_empty())
+		return;
+
+	FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
+		zk_get_data(path, &znode, &len);
+	}
+}
+
+static void init_node_list(struct zk_event *ev)
+{
+	uint8_t *p = zk_event_sd_nodes(ev);
+	size_t node_nr = ev->nr_nodes;
+	int i;
+
+	dprintf("%zu\n", node_nr);
+	for (i = 0; i < node_nr; i++) {
+		struct zk_node zk;
+		mempcpy(&zk.node, p, sizeof(struct sd_node));
+		zk_tree_add(&zk);
+		p += sizeof(struct sd_node);
+	}
+
+	watch_all_nodes();
+}
+
 static void zk_handle_join_response(struct zk_event *ev)
 {
 	char path[256];
 
 	dprintf("JOIN RESPONSE\n");
 	if (node_eq(&ev->sender.node, &this_node.node))
-		zk_member_init();
+		/* newly joined node */
+		init_node_list(ev);
 
 	if (ev->join_result == CJ_RES_MASTER_TRANSFER)
 		/*
-- 
1.7.9.5




More information about the sheepdog mailing list