[sheepdog] [PATCH v5 6/6] zookeeper: handle session timeout for all zookeeper operations

Kai Zhang kyle at zelin.io
Thu Jun 20 21:00:22 CEST 2013


The idea is: when a zk_* APIs returns ZK_INVALIDSTATE, it means the connection
and session to zookeeper have been lost.
At this point, callers of zk_* APIs should just do nothing but drop control as
soon as possible.
And another thread will responsable for cleaning memory state, re-connecting
to zookeeper and re-sending join request.

Signed-off-by: Kai Zhang <kyle at zelin.io>
---
 sheep/cluster/zookeeper.c |  300 +++++++++++++++++++++++++++++----------------
 1 file changed, 196 insertions(+), 104 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index c358581..07aea4c 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -33,8 +33,7 @@
 
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(parent, path, strs)			       \
-	for (zk_get_children(parent, strs),			       \
-		     (strs)->data += (strs)->count;		       \
+	for ((strs)->data += (strs)->count;			       \
 	     (strs)->count-- ?					       \
 		     snprintf(path, sizeof(path), "%s/%s", parent,     \
 			      *--(strs)->data) : (free((strs)->data), 0); \
@@ -141,18 +140,33 @@ static inline struct zk_node *zk_tree_search(const struct node_id *nid)
 static zhandle_t *zhandle;
 static struct zk_node this_node;
 
+#define check_zk_rc(rc, path)						\
+	switch (rc) {							\
+	case ZNONODE:							\
+	case ZNODEEXISTS:						\
+	case ZINVALIDSTATE:						\
+	case ZSESSIONEXPIRED:						\
+	case ZOPERATIONTIMEOUT:						\
+	case ZCONNECTIONLOSS:						\
+		sd_eprintf("failed, path:%s, %s", path, zerror(rc));	\
+	case ZOK:							\
+		break;							\
+	default:							\
+		panic("failed, path:%s, %s", path, zerror(rc));		\
+	}
+
 static inline ZOOAPI int zk_delete_node(const char *path, int version)
 {
 	int rc;
 	do {
 		rc = zoo_delete(zhandle, path, version);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+	check_zk_rc(rc, path);
+
 	return rc;
 }
 
-static inline ZOOAPI void
+static inline ZOOAPI int
 zk_init_node(const char *path)
 {
 	int rc;
@@ -160,9 +174,11 @@ zk_init_node(const char *path)
 		rc = zoo_create(zhandle, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0,
 				NULL, 0);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	check_zk_rc(rc, path);
 
-	if (rc != ZOK && rc != ZNODEEXISTS)
-		panic("failed, path:%s, %s", path, zerror(rc));
+	if (rc == ZNODEEXISTS)
+		rc = ZOK;
+	return rc;
 }
 
 static inline ZOOAPI int
@@ -174,9 +190,9 @@ zk_create_node(const char *path, const char *value, int valuelen,
 	do {
 		rc = zoo_create(zhandle, path, value, valuelen, acl,
 				flags, path_buffer, path_buffer_len);
-		if (rc != ZOK && rc != ZNODEEXISTS)
-			sd_eprintf("failed, path:%s, %s", path, zerror(rc));
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	check_zk_rc(rc, path);
+
 	return rc;
 }
 
@@ -197,8 +213,7 @@ zk_create_seq_node(const char *path, const char *value, int valuelen,
 		flags = flags | ZOO_EPHEMERAL;
 	rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE,
 			flags, path_buffer, path_buffer_len);
-	if (rc != ZOK)
-		sd_iprintf("failed, path:%s, %s", path, zerror(rc));
+	check_zk_rc(rc, path);
 
 	return rc;
 }
@@ -210,9 +225,9 @@ static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
 	do {
 		rc = zoo_get(zhandle, path, 1, (char *)buffer,
 			     buffer_len, NULL);
-		if (rc != ZOK)
-			sd_eprintf("failed, path:%s, %s", path, zerror(rc));
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	check_zk_rc(rc, path);
+
 	return rc;
 }
 
@@ -223,8 +238,8 @@ zk_set_data(const char *path, const char *buffer, int buflen, int version)
 	do {
 		rc = zoo_set(zhandle, path, buffer, buflen, version);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		panic("failed, path:%s, %s", path, zerror(rc));
+	check_zk_rc(rc, path);
+
 	return rc;
 }
 
@@ -233,29 +248,29 @@ static inline ZOOAPI int zk_node_exists(const char *path)
 	int rc;
 	do {
 		rc = zoo_exists(zhandle, path, 1, NULL);
-		if (rc != ZOK && rc != ZNONODE)
-			sd_eprintf("failed, path:%s, %s", path, zerror(rc));
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	check_zk_rc(rc, path);
 
 	return rc;
 }
 
-static inline ZOOAPI void zk_get_children(const char *path,
-					  struct String_vector *strings)
+static inline ZOOAPI int zk_get_children(const char *path,
+					 struct String_vector *strings)
 {
 	int rc;
 	do {
 		rc = zoo_get_children(zhandle, path, 1, strings);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		panic("failed, path:%s, %s", path, zerror(rc));
+	check_zk_rc(rc, path);
+
+	return rc;
 }
 
 /* ZooKeeper-based queue give us an totally ordered events */
 static int efd;
 static int32_t queue_pos;
 
-static bool zk_queue_peek(void)
+static int zk_queue_peek(bool *peek)
 {
 	int rc;
 	char path[MAX_NODE_STR_LEN];
@@ -264,13 +279,19 @@ static bool zk_queue_peek(void)
 
 	rc = zk_node_exists(path);
 	if (rc == ZOK)
-		return true;
+		*peek = true;
+	else if (rc == ZNONODE) {
+		rc = ZOK;
+		*peek = false;
+	} else
+		sd_eprintf("failed, %s", zerror(rc));
 
-	return false;
+	return rc;
 }
 
 /* return true if there is a node with 'id' in the queue. */
-static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
+static int zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len,
+			    bool *found)
 {
 	int rc, len;
 
@@ -285,15 +306,17 @@ static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
 			if (ev.id == id) {
 				sd_dprintf("id %"PRIx64" is found in %s", id,
 					   seq_path);
-				return true;
+				*found = true;
+				return ZOK;
 			}
 			break;
 		case ZNONODE:
 			sd_dprintf("id %"PRIx64" is not found", id);
-			return false;
+			*found = false;
+			return ZOK;
 		default:
-			panic("failed, %s", zerror(rc));
-			break;
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
 		}
 	}
 }
@@ -302,6 +325,7 @@ static void zk_queue_push(struct zk_event *ev)
 {
 	int rc, len;
 	char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN];
+	bool found;
 
 	len = offsetof(typeof(*ev), buf) + ev->buf_len;
 	snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
@@ -313,12 +337,17 @@ again:
 		break;
 	case ZOPERATIONTIMEOUT:
 	case ZCONNECTIONLOSS:
-		if (!zk_find_seq_node(ev->id, buf, sizeof(buf)))
-			/* retry if seq_node was not created */
-			goto again;
-		break;
+		if (zk_find_seq_node(ev->id, buf, sizeof(buf), &found) == ZOK) {
+			if (found)
+				break;
+			else
+				/* retry if seq_node was not created */
+				goto again;
+		}
+		/* fall through */
 	default:
-		panic("failed, path:%s, %s", path, zerror(rc));
+		sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+		return;
 	}
 	if (first_push) {
 		int32_t seq;
@@ -342,7 +371,7 @@ static inline void *zk_event_sd_nodes(struct zk_event *ev)
 static void push_join_response(struct zk_event *ev)
 {
 	char path[MAX_NODE_STR_LEN];
-	int len;
+	int len, rc;
 
 	ev->type = EVENT_JOIN_RESPONSE;
 	ev->nr_nodes = nr_sd_nodes;
@@ -352,9 +381,12 @@ static void push_join_response(struct zk_event *ev)
 
 	len = offsetof(typeof(*ev), buf) + ev->buf_len;
 	snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
-	zk_set_data(path, (char *)ev, len, -1);
-	sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
-		   path, queue_pos, len);
+	rc = zk_set_data(path, (char *)ev, len, -1);
+	if (rc == ZOK)
+		sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
+			   path, queue_pos, len);
+	else
+		sd_eprintf("failed, %s", zerror(rc));
 }
 
 static void zk_queue_pop_advance(struct zk_event *ev)
@@ -365,19 +397,12 @@ static void zk_queue_pop_advance(struct zk_event *ev)
 	len = sizeof(*ev);
 	snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
 	rc = zk_get_data(path, ev, &len);
-	if (rc != ZOK)
-		panic("failed to get data from %s, %s", path, zerror(rc));
-	sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type, len,
-		   queue_pos);
-	queue_pos++;
-}
-
-static int zk_member_empty(void)
-{
-	struct String_vector strs;
-
-	zk_get_children(MEMBER_ZNODE, &strs);
-	return (strs.count == 0);
+	if (rc == ZOK) {
+		sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type,
+			   len, queue_pos);
+		queue_pos++;
+	} else
+		sd_eprintf("failed, path %s, %s", path, zerror(rc));
 }
 
 static inline void zk_tree_add(struct zk_node *node)
@@ -438,18 +463,30 @@ static inline void build_node_list(void)
 	sd_dprintf("nr_sd_nodes:%zu", nr_sd_nodes);
 }
 
-static inline int zk_master_create(void)
-{
-	return zk_create_node(MASTER_ZNONE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
-			      ZOO_EPHEMERAL, NULL, 0);
-}
-
-static void zk_queue_init(void)
+static int zk_queue_init(void)
 {
-	zk_init_node(BASE_ZNODE);
-	zk_init_node(MASTER_ZNONE);
-	zk_init_node(QUEUE_ZNODE);
-	zk_init_node(MEMBER_ZNODE);
+	int rc;
+	rc = zk_init_node(BASE_ZNODE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", BASE_ZNODE, zerror(rc));
+		return rc;
+	}
+	rc = zk_init_node(MASTER_ZNONE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", MASTER_ZNONE, zerror(rc));
+		return rc;
+	}
+	rc = zk_init_node(QUEUE_ZNODE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", QUEUE_ZNODE, zerror(rc));
+		return rc;
+	}
+	rc = zk_init_node(MEMBER_ZNODE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", MEMBER_ZNODE, zerror(rc));
+		return rc;
+	}
+	return ZOK;
 }
 
 /* Calculate a unique 64 bit integer from this_node and the sequence number. */
@@ -547,14 +584,20 @@ static int add_join_event(void *msg, size_t msg_len)
 	return 0;
 }
 
-static void zk_get_least_seq(const char *parent, char *least_seq_path,
-			     int path_len, void *buf, int *buf_len)
+static int zk_get_least_seq(const char *parent, char *least_seq_path,
+			    int path_len, void *buf, int *buf_len)
 {
 	char path[MAX_NODE_STR_LEN], *p, *tmp;
 	struct String_vector strs;
 	int rc, least_seq = INT_MAX , seq;
 
 	while (true) {
+		rc = zk_get_children(parent, &strs);
+		if (rc != ZOK) {
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
+		}
+
 		FOR_EACH_ZNODE(parent, path, &strs) {
 			p = strrchr(path, '/');
 			seq = strtol(++p, &tmp, 10);
@@ -568,15 +611,17 @@ static void zk_get_least_seq(const char *parent, char *least_seq_path,
 
 		if (rc == ZOK) {
 			strncpy(least_seq_path, path, path_len);
-			break;
+			return ZOK;
 		} else if (rc == ZNONODE)
 			continue;
-		else
-			panic("failed, path:%s, %s", path, zerror(rc));
+		else {
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
+		}
 	}
 }
 
-static void zk_find_master(int *master_seq, char *master_name)
+static int zk_find_master(int *master_seq, char *master_name)
 {
 	int rc, len = MAX_NODE_STR_LEN;
 	char master_compete_path[MAX_NODE_STR_LEN];
@@ -599,29 +644,35 @@ static void zk_find_master(int *master_seq, char *master_name)
 				sd_iprintf("detect master leave, "
 					   "start to compete master");
 				continue;
-			} else
-				panic("failed, path:%s, %s",
-				      master_compete_path, zerror(rc));
+			} else {
+				sd_eprintf("failed, %s", zerror(rc));
+				return rc;
+			}
 		}
 	}
+
+	return ZOK;
 }
 
 /*
  * block until last sheep joined
- * return sequence number of last sheep or -1 if no previous sheep
+ * last_sheep returns sequence number of last sheep or -1 if no previous sheep
  */
-static int zk_verify_last_sheep_join(int seq)
+static int zk_verify_last_sheep_join(int seq, int *last_sheep)
 {
-	int rc, i, len = MAX_NODE_STR_LEN;
+	int rc, len = MAX_NODE_STR_LEN;
 	char path[MAX_NODE_STR_LEN], name[MAX_NODE_STR_LEN];
 
-	for (i = seq - 1; i >= 0; i--) {
-		snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNONE "/%010"PRId32, i);
+	for (*last_sheep = seq - 1; *last_sheep >= 0; (*last_sheep)--) {
+		snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNONE "/%010"PRId32,
+			 *last_sheep);
 		rc = zk_get_data(path, name, &len);
 		if (rc == ZNONODE)
 			continue;
-		else if (rc != ZOK)
-			panic("failed, path:%s, %s", path, zerror(rc));
+		else if (rc != ZOK) {
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
+		}
 
 		if (!strcmp(name, node_to_str(&this_node.node)))
 			continue;
@@ -631,12 +682,14 @@ static int zk_verify_last_sheep_join(int seq)
 		if (rc == ZOK)
 			break;
 		else if (rc == ZNONODE) {
-			i++;
+			(*last_sheep)++;
 			continue;
-		} else
-			panic("failed, path:%s, %s", path, zerror(rc));
+		} else {
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
+		}
 	}
-	return i;
+	return ZOK;
 }
 
 /*
@@ -645,7 +698,7 @@ static int zk_verify_last_sheep_join(int seq)
  */
 static void zk_compete_master(void)
 {
-	int rc;
+	int rc, last_joined_sheep;
 	char master_name[MAX_NODE_STR_LEN];
 	char my_compete_path[MAX_NODE_STR_LEN];
 	static int master_seq = -1, my_seq;
@@ -674,18 +727,29 @@ static void zk_compete_master(void)
 		       &my_seq);
 	}
 
-	zk_find_master(&master_seq, master_name);
+	if (zk_find_master(&master_seq, master_name) != ZOK) {
+		sd_eprintf("failed");
+		return;
+	}
 
 	if (!strcmp(master_name, node_to_str(&this_node.node)))
 		goto success;
 	else if (joined)
 		goto lost;
-	else if (zk_verify_last_sheep_join(my_seq) < 0) {
-		/* all previous sheep has quit, i'm master */
-		master_seq = my_seq;
-		goto success;
-	} else
-		goto lost;
+	else {
+		if (zk_verify_last_sheep_join(my_seq,
+					      &last_joined_sheep) != ZOK) {
+			sd_eprintf("failed");
+			return;
+		}
+
+		if (last_joined_sheep < 0) {
+			/* all previous sheep has quit, i'm master */
+			master_seq = my_seq;
+			goto success;
+		} else
+			goto lost;
+	}
 success:
 	uatomic_set_true(&is_master);
 	sd_iprintf("success");
@@ -770,16 +834,21 @@ static void zk_handle_join_request(struct zk_event *ev)
 static void watch_all_nodes(void)
 {
 	struct String_vector strs;
-	struct zk_node znode;
 	char path[MAX_NODE_STR_LEN];
-	int len = sizeof(znode);
+	int rc;
 
-	if (zk_member_empty())
-		return;
+	rc = zk_get_children(MEMBER_ZNODE, &strs);
+	if (rc != ZOK)
+		goto error;
 
 	FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
-		zk_get_data(path, &znode, &len);
+		rc = zk_node_exists(path);
+		if (rc != ZOK)
+			goto error;
 	}
+	return;
+error:
+	sd_eprintf("failed, %s", zerror(rc));
 }
 
 static void init_node_list(struct zk_event *ev)
@@ -802,6 +871,7 @@ static void init_node_list(struct zk_event *ev)
 static void zk_handle_join_response(struct zk_event *ev)
 {
 	char path[MAX_NODE_STR_LEN];
+	int rc;
 
 	sd_dprintf("JOIN RESPONSE");
 	if (node_eq(&ev->sender.node, &this_node.node))
@@ -826,9 +896,15 @@ static void zk_handle_join_response(struct zk_event *ev)
 		if (node_eq(&ev->sender.node, &this_node.node)) {
 			joined = true;
 			sd_dprintf("create path:%s", path);
-			zk_create_node(path, (char *)zoo_client_id(zhandle),
-				       sizeof(clientid_t), &ZOO_OPEN_ACL_UNSAFE,
-				       ZOO_EPHEMERAL, NULL, 0);
+			rc = zk_create_node(path,
+					    (char *)zoo_client_id(zhandle),
+					    sizeof(clientid_t),
+					    &ZOO_OPEN_ACL_UNSAFE,
+					    ZOO_EPHEMERAL, NULL, 0);
+			if (rc != ZOK) {
+				sd_eprintf("failed, %s", zerror(rc));
+				return;
+			}
 		} else
 			zk_node_exists(path);
 
@@ -934,6 +1010,7 @@ static inline void handle_session_expire(void)
 	INIT_LIST_HEAD(&zk_block_list);
 	nr_sd_nodes = 0;
 	first_push = true;
+	joined = false;
 	memset(sd_nodes, 0, sizeof(struct sd_node) * SD_MAX_NODES);
 
 	while (sd_reconnect_handler()) {
@@ -946,6 +1023,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 {
 	eventfd_t value;
 	struct zk_event ev;
+	bool peek;
 
 	sd_dprintf("%d, %d", events, queue_pos);
 	if (events & EPOLLHUP) {
@@ -968,8 +1046,13 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 		return;
 	}
 
-	if (!zk_queue_peek())
-		goto kick_block_event;
+	if (zk_queue_peek(&peek) == ZOK) {
+		if (!peek)
+			goto kick_block_event;
+	} else {
+		sd_eprintf("failed");
+		return;
+	}
 
 	zk_queue_pop_advance(&ev);
 	if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
@@ -977,9 +1060,17 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 	else
 		panic("unhandled type %d", ev.type);
 
-	 /* Someone has created next event, go kick event handler. */
-	if (zk_queue_peek()) {
-		eventfd_write(efd, 1);
+	if (zk_queue_peek(&peek) == ZOK) {
+		if (peek) {
+			/*
+			 * Someone has created next event,
+			 * go kick event handler.
+			 */
+			eventfd_write(efd, 1);
+			return;
+		}
+	} else {
+		sd_eprintf("failed");
 		return;
 	}
 
@@ -1025,7 +1116,8 @@ static int zk_init(const char *option)
 
 	uatomic_set_false(&stop);
 	uatomic_set_false(&is_master);
-	zk_queue_init();
+	if (zk_queue_init() != ZOK)
+		return -1;
 
 	efd = eventfd(0, EFD_NONBLOCK);
 	if (efd < 0) {
-- 
1.7.9.5




More information about the sheepdog mailing list