[sheepdog] [PATCH v6 1/6] zookeeper: fixed concurrent startup error

Kai Zhang kyle at zelin.io
Fri Jun 21 14:34:39 CEST 2013


Current implementation of zookeeper driver has a risk when multiple sheep
start up concurrently.

Consider the following situation:
1. There is a 3 node cluster: sheep1, sheep2, sheep3.
2. Both sheep2 and sheep3 leave cluster.
3. Both sheep2 and sheep3 start up after previous zookeeper session timeout.
4. Sheep1 leaves the cluster before sheep2 and sheep3 receiving join requests
   from zookeeper.
5. When sheep2 and sheep3 receive the join requests, both of them assume they
   are master due to zk_member_empty() returns true.

The new implementation is following the standard way that zookeeper sugguests.
At start up, each sheep creates a ephemeral sequencial node under MASTER_ZNODE.
The sheep who has the least number will be the master.

However, due to the join logic in zookeeper, there is a risk of hang during
join cluster.
This is because zk_compete_master and add_join_event are two seperate operations,
and there is no transection protection.
For example sheep1 got a less sequence number for master competition than sheep2,
but a larger sequence number for join request event.
So if the current master sheep3 quit before handling sheep2's join request,
sheep2 will hang on joining cluster.

To solve this problem, we force sheep send join request only when it can confirm
its a sheep with a less sequence number of master competition has joined.

Signed-off-by: Kai Zhang <kyle at zelin.io>
---
 sheep/cluster/zookeeper.c |  221 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 187 insertions(+), 34 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 45db10a..2ee1b49 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -33,7 +33,7 @@
 
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(parent, path, strs)			       \
-	for (zk_get_children(parent, strs),		               \
+	for (zk_get_children(parent, strs),			       \
 		     (strs)->data += (strs)->count;		       \
 	     (strs)->count-- ?					       \
 		     snprintf(path, sizeof(path), "%s/%s", parent,     \
@@ -72,7 +72,13 @@ static struct sd_node sd_nodes[SD_MAX_NODES];
 static size_t nr_sd_nodes;
 static struct rb_root zk_node_root = RB_ROOT;
 static pthread_rwlock_t zk_tree_lock = PTHREAD_RWLOCK_INITIALIZER;
+static pthread_rwlock_t zk_compete_master_lock = PTHREAD_RWLOCK_INITIALIZER;
 static LIST_HEAD(zk_block_list);
+static uatomic_bool is_master;
+static uatomic_bool stop;
+static bool joined;
+
+static void zk_compete_master(void);
 
 static struct zk_node *zk_tree_insert(struct zk_node *new)
 {
@@ -183,11 +189,14 @@ zk_create_node(const char *path, const char *value, int valuelen,
  */
 static inline ZOOAPI int
 zk_create_seq_node(const char *path, const char *value, int valuelen,
-		   char *path_buffer, int path_buffer_len)
+		   char *path_buffer, int path_buffer_len, bool ephemeral)
 {
 	int rc;
+	int flags = ZOO_SEQUENCE;
+	if (ephemeral)
+		flags = flags | ZOO_EPHEMERAL;
 	rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE,
-			ZOO_SEQUENCE, path_buffer, path_buffer_len);
+			flags, path_buffer, path_buffer_len);
 	if (rc != ZOK)
 		sd_iprintf("failed, path:%s, %s", path, zerror(rc));
 
@@ -298,7 +307,7 @@ static void zk_queue_push(struct zk_event *ev)
 	len = offsetof(typeof(*ev), buf) + ev->buf_len;
 	snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
 again:
-	rc = zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf));
+	rc = zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf), false);
 	switch (rc) {
 	case ZOK:
 		/* Success */
@@ -436,32 +445,10 @@ static inline int zk_master_create(void)
 			      ZOO_EPHEMERAL, NULL, 0);
 }
 
-static bool is_master(void)
-{
-	struct rb_node *n;
-	struct zk_node *zk = NULL;
-
-	if (!nr_sd_nodes) {
-		if (zk_member_empty())
-			return true;
-		else
-			return false;
-	}
-
-	for (n = rb_first(&zk_node_root); n; n = rb_next(n)) {
-		zk = rb_entry(n, struct zk_node, rb);
-		if (!zk->gone)
-			break;
-	}
-	if (zk && node_eq(&zk->node, &this_node.node))
-		return true;
-
-	return false;
-}
-
 static void zk_queue_init(void)
 {
 	zk_init_node(BASE_ZNODE);
+	zk_init_node(MASTER_ZNONE);
 	zk_init_node(QUEUE_ZNODE);
 	zk_init_node(MEMBER_ZNODE);
 }
@@ -511,6 +498,12 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 	} else if (type == ZOO_DELETED_EVENT) {
 		struct zk_node *n;
 
+		ret = sscanf(path, MASTER_ZNONE "/%s", str);
+		if (ret == 1) {
+			zk_compete_master();
+			return;
+		}
+
 		ret = sscanf(path, MEMBER_ZNODE "/%s", str);
 		if (ret != 1)
 			return;
@@ -550,6 +543,162 @@ static int add_join_event(void *msg, size_t msg_len)
 	return 0;
 }
 
+static void zk_get_least_seq(const char *parent, char *least_seq_path,
+			     int path_len, void *buf, int *buf_len)
+{
+	char path[MAX_NODE_STR_LEN], *p, *tmp;
+	struct String_vector strs;
+	int rc, least_seq = INT_MAX , seq;
+
+	while (true) {
+		FOR_EACH_ZNODE(parent, path, &strs) {
+			p = strrchr(path, '/');
+			seq = strtol(++p, &tmp, 10);
+			if (seq < least_seq)
+				least_seq = seq;
+		}
+
+		snprintf(path, MAX_NODE_STR_LEN, "%s/%010"PRId32,
+			 parent, least_seq);
+		rc = zk_get_data(path, buf, buf_len);
+
+		if (rc == ZOK) {
+			strncpy(least_seq_path, path, path_len);
+			break;
+		} else if (rc == ZNONODE)
+			continue;
+		else
+			panic("failed, path:%s, %s", path, zerror(rc));
+	}
+}
+
+static void zk_find_master(int *master_seq, char *master_name)
+{
+	int rc, len = MAX_NODE_STR_LEN;
+	char master_compete_path[MAX_NODE_STR_LEN];
+
+	if (*master_seq < 0) {
+		zk_get_least_seq(MASTER_ZNONE, master_compete_path,
+				 MAX_NODE_STR_LEN, master_name, &len);
+		sscanf(master_compete_path, MASTER_ZNONE "/%"PRId32,
+		       master_seq);
+	} else {
+		while(true) {
+			snprintf(master_compete_path, len,
+				 MASTER_ZNONE "/%010"PRId32, *master_seq);
+			rc = zk_get_data(master_compete_path, master_name,
+					 &len);
+			if (rc == ZOK)
+				break;
+			else if (rc == ZNONODE) {
+				sd_iprintf("detect master leave, "
+					   "start to compete master");
+				(*master_seq)++;
+				continue;
+			} else
+				panic("failed, path:%s, %s",
+				      master_compete_path, zerror(rc));
+		}
+	}
+}
+
+/*
+ * block until last sheep joined
+ * return sequence number of last sheep or -1 if no previous sheep
+ */
+static int zk_verify_last_sheep_join(int seq)
+{
+	int rc, i, len = MAX_NODE_STR_LEN;
+	char path[MAX_NODE_STR_LEN], name[MAX_NODE_STR_LEN];
+
+	for (i = seq - 1; i >= 0; i--) {
+		snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNONE "/%010"PRId32, i);
+		rc = zk_get_data(path, name, &len);
+		if (rc == ZNONODE)
+			continue;
+		else if (rc != ZOK)
+			panic("failed, path:%s, %s", path, zerror(rc));
+
+		if (!strcmp(name, node_to_str(&this_node.node)))
+			continue;
+
+		snprintf(path, MAX_NODE_STR_LEN, MEMBER_ZNODE "/%s", name);
+		rc = zk_node_exists(path);
+		if (rc == ZOK)
+			break;
+		else if (rc == ZNONODE) {
+			i++;
+			continue;
+		} else
+			panic("failed, path:%s, %s", path, zerror(rc));
+	}
+	return i;
+}
+
+/*
+ * Create sequential node under MASTER_ZNODE.
+ * Sheep with least sequential number win the competition.
+ */
+static void zk_compete_master(void)
+{
+	int rc;
+	char master_name[MAX_NODE_STR_LEN];
+	char my_compete_path[MAX_NODE_STR_LEN];
+	static int master_seq = -1, my_seq;
+
+	/*
+	 * This is to protect master_seq and my_seq because this function will
+	 * be called by both main thread and zookeeper's event thread.
+	 */
+	pthread_rwlock_wrlock(&zk_compete_master_lock);
+
+	if (uatomic_is_true(&is_master) || uatomic_is_true(&stop))
+		goto out;
+
+	if (!joined) {
+		sd_iprintf("start to compete master for the first time");
+		do {
+			if (uatomic_is_true(&stop))
+				goto out;
+			/* duplicate sequential node has no side-effect */
+			rc = zk_create_seq_node(MASTER_ZNONE "/",
+						node_to_str(&this_node.node),
+						MAX_NODE_STR_LEN,
+						my_compete_path,
+						MAX_NODE_STR_LEN, true);
+		} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+
+		if (rc != ZOK)
+			panic("failed, %s", zerror(rc));
+
+		sd_dprintf("my compete path: %s", my_compete_path);
+		sscanf(my_compete_path, MASTER_ZNONE "/%"PRId32,
+		       &my_seq);
+	}
+
+	zk_find_master(&master_seq, master_name);
+
+	if (!strcmp(master_name, node_to_str(&this_node.node)))
+		goto success;
+	else if (joined)
+		goto lost;
+	else if (zk_verify_last_sheep_join(my_seq) < 0) {
+		/* all previous sheep has quit, i'm master */
+		master_seq = my_seq;
+		goto success;
+	} else
+		goto lost;
+success:
+	uatomic_set_true(&is_master);
+	sd_iprintf("success");
+	goto out;
+lost:
+	sd_iprintf("lost");
+	goto out;
+out:
+	pthread_rwlock_unlock(&zk_compete_master_lock);
+}
+
 static int zk_join(const struct sd_node *myself,
 		   void *opaque, size_t opaque_len)
 {
@@ -565,18 +714,19 @@ static int zk_join(const struct sd_node *myself,
 		exit(1);
 	}
 
-	/* For concurrent nodes setup, we allow only one to continue */
-	while (zk_member_empty() && zk_master_create() != ZOK)
-		;/* wait */
-
+	zk_compete_master();
 	return add_join_event(opaque, opaque_len);
 }
 
 static int zk_leave(void)
 {
 	char path[PATH_MAX];
+
+	sd_iprintf("leaving from cluster");
+	uatomic_set_true(&stop);
+
 	snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
-			node_to_str(&this_node.node));
+		 node_to_str(&this_node.node));
 	add_event(EVENT_LEAVE, &this_node, NULL, 0);
 	zk_delete_node(path, -1);
 	return 0;
@@ -602,7 +752,7 @@ static void zk_handle_join_request(struct zk_event *ev)
 	enum cluster_join_result res;
 
 	sd_dprintf("sender: %s", node_to_str(&ev->sender.node));
-	if (!is_master()) {
+	if (!uatomic_is_true(&is_master)) {
 		/* Let's await master acking the join-request */
 		queue_pos--;
 		return;
@@ -614,7 +764,7 @@ static void zk_handle_join_request(struct zk_event *ev)
 	if (res == CJ_RES_MASTER_TRANSFER) {
 		sd_eprintf("failed to join sheepdog cluster: "
 			   "please retry when master is up");
-		add_event(EVENT_LEAVE, &this_node, NULL, 0);
+		zk_leave();
 		exit(1);
 	}
 	sd_dprintf("I'm the master now");
@@ -677,6 +827,7 @@ static void zk_handle_join_response(struct zk_event *ev)
 		snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
 			 node_to_str(&ev->sender.node));
 		if (node_eq(&ev->sender.node, &this_node.node)) {
+			joined = true;
 			sd_dprintf("create path:%s", path);
 			zk_create_node(path, (char *)&ev->sender,
 				       sizeof(ev->sender), &ZOO_OPEN_ACL_UNSAFE,
@@ -850,6 +1001,8 @@ static int zk_init(const char *option)
 		return -1;
 	}
 
+	uatomic_set_false(&stop);
+	uatomic_set_false(&is_master);
 	zk_queue_init();
 
 	efd = eventfd(0, EFD_NONBLOCK);
-- 
1.7.9.5




More information about the sheepdog mailing list