[sheepdog] [PATCH v3 5/5] zookeeper: handle session timeout for all zookeeper operations

Kai Zhang kyle at zelin.io
Mon Jun 17 14:28:46 CEST 2013


The idea is: when a zk_* APIs returns ZK_INVALIDSTATE, it means the connection
and session to zookeeper have been lost.
At this point, callers of zk_* APIs should just do nothing but drop control as
soon as possible.
And another thread will responsable for cleaning memory state, re-connecting
to zookeeper and re-sending join request.

Signed-off-by: Kai Zhang <kyle at zelin.io>
---
 sheep/cluster/zookeeper.c |  245 ++++++++++++++++++++++++++++++---------------
 1 file changed, 163 insertions(+), 82 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index ca113dc..eec1e2e 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -33,8 +33,7 @@
 
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(parent, path, strs)			       \
-	for (zk_get_children(parent, strs),			       \
-		     (strs)->data += (strs)->count;		       \
+	for ((strs)->data += (strs)->count;			       \
 	     (strs)->count-- ?					       \
 		     snprintf(path, sizeof(path), "%s/%s", parent,     \
 			      *--(strs)->data) : (free((strs)->data), 0); \
@@ -76,6 +75,7 @@ static LIST_HEAD(zk_block_list);
 static uatomic_bool is_master;
 static uatomic_bool stop;
 static bool first_push = true;
+static uint64_t zk_flying_ops;
 
 static void zk_compete_master(void);
 
@@ -140,28 +140,39 @@ static inline struct zk_node *zk_tree_search(const struct node_id *nid)
 static zhandle_t *zhandle;
 static struct zk_node this_node;
 
+#define check_zk_rc(rc, path)						\
+	if (rc != ZOK && rc != ZNONODE && rc != ZNODEEXISTS &&		\
+	    rc != ZINVALIDSTATE)					\
+		panic("failed, path:%s, %s", path, zerror(rc));
+
 static inline ZOOAPI int zk_delete_node(const char *path, int version)
 {
 	int rc;
+	uatomic_inc(&zk_flying_ops);
 	do {
 		rc = zoo_delete(zhandle, path, version);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+	uatomic_dec(&zk_flying_ops);
+	check_zk_rc(rc, path);
+
 	return rc;
 }
 
-static inline ZOOAPI void
+static inline ZOOAPI int
 zk_init_node(const char *path)
 {
 	int rc;
+	uatomic_inc(&zk_flying_ops);
 	do {
 		rc = zoo_create(zhandle, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0,
 				NULL, 0);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	uatomic_dec(&zk_flying_ops);
+	check_zk_rc(rc, path);
 
-	if (rc != ZOK && rc != ZNODEEXISTS)
-		panic("failed, path:%s, %s", path, zerror(rc));
+	if (rc == ZNODEEXISTS)
+		rc = ZOK;
+	return rc;
 }
 
 static inline ZOOAPI int
@@ -170,12 +181,14 @@ zk_create_node(const char *path, const char *value, int valuelen,
 	       int path_buffer_len)
 {
 	int rc;
+	uatomic_inc(&zk_flying_ops);
 	do {
 		rc = zoo_create(zhandle, path, value, valuelen, acl,
 				flags, path_buffer, path_buffer_len);
-		if (rc != ZOK && rc != ZNODEEXISTS)
-			sd_eprintf("failed, path:%s, %s", path, zerror(rc));
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	uatomic_dec(&zk_flying_ops);
+	check_zk_rc(rc, path);
+
 	return rc;
 }
 
@@ -191,11 +204,14 @@ zk_create_seq_node(const char *path, const char *value, int valuelen,
 		   char *path_buffer, int path_buffer_len)
 {
 	int rc;
+	uatomic_inc(&zk_flying_ops);
 	rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE,
 			ZOO_SEQUENCE, path_buffer, path_buffer_len);
+	uatomic_dec(&zk_flying_ops);
+	check_zk_rc(rc, path);
+
 	if (rc != ZOK)
 		sd_iprintf("failed, path:%s, %s", path, zerror(rc));
-
 	return rc;
 }
 
@@ -203,12 +219,14 @@ static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
 				     int *buffer_len)
 {
 	int rc;
+	uatomic_inc(&zk_flying_ops);
 	do {
 		rc = zoo_get(zhandle, path, 1, (char *)buffer,
 			     buffer_len, NULL);
-		if (rc != ZOK)
-			sd_eprintf("failed, path:%s, %s", path, zerror(rc));
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	uatomic_dec(&zk_flying_ops);
+	check_zk_rc(rc, path);
+
 	return rc;
 }
 
@@ -216,42 +234,48 @@ static inline ZOOAPI int
 zk_set_data(const char *path, const char *buffer, int buflen, int version)
 {
 	int rc;
+	uatomic_inc(&zk_flying_ops);
 	do {
 		rc = zoo_set(zhandle, path, buffer, buflen, version);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		panic("failed, path:%s, %s", path, zerror(rc));
+	uatomic_dec(&zk_flying_ops);
+	check_zk_rc(rc, path);
+
 	return rc;
 }
 
 static inline ZOOAPI int zk_node_exists(const char *path)
 {
 	int rc;
+	uatomic_inc(&zk_flying_ops);
 	do {
 		rc = zoo_exists(zhandle, path, 1, NULL);
-		if (rc != ZOK && rc != ZNONODE)
-			sd_eprintf("failed, path:%s, %s", path, zerror(rc));
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	uatomic_dec(&zk_flying_ops);
+	check_zk_rc(rc, path);
 
 	return rc;
 }
 
-static inline ZOOAPI void zk_get_children(const char *path,
-					  struct String_vector *strings)
+static inline ZOOAPI int zk_get_children(const char *path,
+					 struct String_vector *strings)
 {
 	int rc;
+	uatomic_inc(&zk_flying_ops);
 	do {
 		rc = zoo_get_children(zhandle, path, 1, strings);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		panic("failed, path:%s, %s", path, zerror(rc));
+	uatomic_dec(&zk_flying_ops);
+	check_zk_rc(rc, path);
+
+	return rc;
 }
 
 /* ZooKeeper-based queue give us an totally ordered events */
 static int efd;
 static int32_t queue_pos;
 
-static bool zk_queue_peek(void)
+static int zk_queue_peek(bool *peek)
 {
 	int rc;
 	char path[MAX_NODE_STR_LEN];
@@ -260,13 +284,19 @@ static bool zk_queue_peek(void)
 
 	rc = zk_node_exists(path);
 	if (rc == ZOK)
-		return true;
+		*peek = true;
+	else if (rc == ZNONODE) {
+		rc = ZOK;
+		*peek = false;
+	} else
+		sd_eprintf("failed, %s", zerror(rc));
 
-	return false;
+	return rc;
 }
 
 /* return true if there is a node with 'id' in the queue. */
-static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
+static int zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len,
+			    bool *found)
 {
 	int rc, len;
 
@@ -281,15 +311,17 @@ static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
 			if (ev.id == id) {
 				sd_dprintf("id %"PRIx64" is found in %s", id,
 					   seq_path);
-				return true;
+				*found = true;
+				return ZOK;
 			}
 			break;
 		case ZNONODE:
 			sd_dprintf("id %"PRIx64" is not found", id);
-			return false;
+			*found = false;
+			return ZOK;
 		default:
-			panic("failed, %s", zerror(rc));
-			break;
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
 		}
 	}
 }
@@ -298,6 +330,7 @@ static void zk_queue_push(struct zk_event *ev)
 {
 	int rc, len;
 	char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN];
+	bool found;
 
 	len = offsetof(typeof(*ev), buf) + ev->buf_len;
 	snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
@@ -309,12 +342,17 @@ again:
 		break;
 	case ZOPERATIONTIMEOUT:
 	case ZCONNECTIONLOSS:
-		if (!zk_find_seq_node(ev->id, buf, sizeof(buf)))
-			/* retry if seq_node was not created */
-			goto again;
-		break;
+		if (zk_find_seq_node(ev->id, buf, sizeof(buf), &found) == ZOK) {
+			if (found)
+				break;
+			else
+				/* retry if seq_node was not created */
+				goto again;
+		}
+		/* fall through */
 	default:
-		panic("failed, path:%s, %s", path, zerror(rc));
+		sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+		return;
 	}
 	if (first_push) {
 		int32_t seq;
@@ -338,7 +376,7 @@ static inline void *zk_event_sd_nodes(struct zk_event *ev)
 static void push_join_response(struct zk_event *ev)
 {
 	char path[MAX_NODE_STR_LEN];
-	int len;
+	int len, rc;
 
 	ev->type = EVENT_JOIN_RESPONSE;
 	ev->nr_nodes = nr_sd_nodes;
@@ -348,9 +386,12 @@ static void push_join_response(struct zk_event *ev)
 
 	len = offsetof(typeof(*ev), buf) + ev->buf_len;
 	snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
-	zk_set_data(path, (char *)ev, len, -1);
-	sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
-		   path, queue_pos, len);
+	rc = zk_set_data(path, (char *)ev, len, -1);
+	if (rc == ZOK)
+		sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
+			   path, queue_pos, len);
+	else
+		sd_eprintf("failed, %s", zerror(rc));
 }
 
 static void zk_queue_pop_advance(struct zk_event *ev)
@@ -361,19 +402,12 @@ static void zk_queue_pop_advance(struct zk_event *ev)
 	len = sizeof(*ev);
 	snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
 	rc = zk_get_data(path, ev, &len);
-	if (rc != ZOK)
-		panic("failed to get data from %s, %s", path, zerror(rc));
-	sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type, len,
-		   queue_pos);
-	queue_pos++;
-}
-
-static int zk_member_empty(void)
-{
-	struct String_vector strs;
-
-	zk_get_children(MEMBER_ZNODE, &strs);
-	return (strs.count == 0);
+	if (rc == ZOK) {
+		sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type,
+			   len, queue_pos);
+		queue_pos++;
+	} else
+		sd_eprintf("failed, path %s, %s", path, zerror(rc));
 }
 
 static inline void zk_tree_add(struct zk_node *node)
@@ -434,17 +468,25 @@ static inline void build_node_list(void)
 	sd_dprintf("nr_sd_nodes:%zu", nr_sd_nodes);
 }
 
-static inline int zk_master_create(void)
-{
-	return zk_create_node(MASTER_ZNONE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
-			      ZOO_EPHEMERAL, NULL, 0);
-}
-
-static void zk_queue_init(void)
+static int zk_queue_init(void)
 {
-	zk_init_node(BASE_ZNODE);
-	zk_init_node(QUEUE_ZNODE);
-	zk_init_node(MEMBER_ZNODE);
+	int rc;
+	rc = zk_init_node(BASE_ZNODE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", BASE_ZNODE, zerror(rc));
+		return rc;
+	}
+	rc = zk_init_node(QUEUE_ZNODE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", QUEUE_ZNODE, zerror(rc));
+		return rc;
+	}
+	rc = zk_init_node(MEMBER_ZNODE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", MEMBER_ZNODE, zerror(rc));
+		return rc;
+	}
+	return ZOK;
 }
 
 /* Calculate a unique 64 bit integer from this_node and the sequence number. */
@@ -567,8 +609,11 @@ again:
 			goto out;
 		else if (rc == ZNONODE)
 			goto again;
-		else
-			panic("failed, path:%s, %s", master_path, zerror(rc));
+		else {
+			sd_eprintf("failed, path:%s, %s", master_path,
+				   zerror(rc));
+			return;
+		}
 	} else if (rc == ZNONODE) {
 		/* compete for master */
 		rc = zk_create_node(MASTER_ZNONE, node_to_str(&this_node.node),
@@ -582,10 +627,15 @@ again:
 			return;
 		} else if (rc == ZNODEEXISTS)
 			goto again;
-		else
-			panic("failed, path:%s, %s", MASTER_ZNONE, zerror(rc));
-	} else
-		panic("failed, path:%s, %s", MASTER_ZNONE, zerror(rc));
+		else {
+			sd_eprintf("failed, path:%s, %s", MASTER_ZNONE,
+				   zerror(rc));
+			return;
+		}
+	} else {
+		sd_eprintf("failed, path:%s, %s", MASTER_ZNONE, zerror(rc));
+		return;
+	}
 out:
 	sd_iprintf("lost");
 }
@@ -667,16 +717,21 @@ static void zk_handle_join_request(struct zk_event *ev)
 static void watch_all_nodes(void)
 {
 	struct String_vector strs;
-	struct zk_node znode;
 	char path[MAX_NODE_STR_LEN];
-	int len = sizeof(znode);
+	int rc;
 
-	if (zk_member_empty())
-		return;
+	rc = zk_get_children(MEMBER_ZNODE, &strs);
+	if (rc != ZOK)
+		goto error;
 
 	FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
-		zk_get_data(path, &znode, &len);
+		rc = zk_node_exists(path);
+		if (rc != ZOK)
+			goto error;
 	}
+	return;
+error:
+	sd_eprintf("failed, %s", zerror(rc));
 }
 
 static void init_node_list(struct zk_event *ev)
@@ -699,6 +754,7 @@ static void init_node_list(struct zk_event *ev)
 static void zk_handle_join_response(struct zk_event *ev)
 {
 	char path[MAX_NODE_STR_LEN];
+	int rc;
 
 	sd_dprintf("JOIN RESPONSE");
 	if (node_eq(&ev->sender.node, &this_node.node))
@@ -722,9 +778,15 @@ static void zk_handle_join_response(struct zk_event *ev)
 			 node_to_str(&ev->sender.node));
 		if (node_eq(&ev->sender.node, &this_node.node)) {
 			sd_dprintf("create path:%s", path);
-			zk_create_node(path, (char *)zoo_client_id(zhandle),
-				       sizeof(clientid_t), &ZOO_OPEN_ACL_UNSAFE,
-				       ZOO_EPHEMERAL, NULL, 0);
+			rc = zk_create_node(path,
+					    (char *)zoo_client_id(zhandle),
+					    sizeof(clientid_t),
+					    &ZOO_OPEN_ACL_UNSAFE,
+					    ZOO_EPHEMERAL, NULL, 0);
+			if (rc != ZOK) {
+				sd_eprintf("failed, %s", zerror(rc));
+				return;
+			}
 		} else
 			zk_node_exists(path);
 
@@ -825,6 +887,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 {
 	eventfd_t value;
 	struct zk_event ev;
+	bool peek;
 
 	sd_dprintf("%d, %d", events, queue_pos);
 	if (events & EPOLLHUP) {
@@ -840,7 +903,15 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 	}
 
 	if (zoo_state(zhandle) == ZOO_EXPIRED_SESSION_STATE) {
-		sd_eprintf("detect a session timeout. reconnecting...");
+		sd_eprintf("detect a session timeout.");
+		while (uatomic_read(&zk_flying_ops) != 0) {
+			sd_eprintf("waiting for flying zookeeper operations"
+				   " %" PRIu64,
+				   zk_flying_ops);
+			sleep(1);
+		}
+
+		sd_eprintf("start to reconnect...");
 		/* clean memory states */
 		close(efd);
 		zk_tree_destroy();
@@ -860,8 +931,11 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 		return;
 	}
 
-	if (!zk_queue_peek())
-		goto kick_block_event;
+	if (zk_queue_peek(&peek) == ZOK) {
+		if (!peek)
+			goto kick_block_event;
+	} else
+		return;
 
 	zk_queue_pop_advance(&ev);
 	if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
@@ -869,11 +943,17 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 	else
 		panic("unhandled type %d", ev.type);
 
-	 /* Someone has created next event, go kick event handler. */
-	if (zk_queue_peek()) {
-		eventfd_write(efd, 1);
+	if (zk_queue_peek(&peek) == ZOK) {
+		if (peek) {
+			/*
+			 * Someone has created next event,
+			 * go kick event handler.
+			 */
+			eventfd_write(efd, 1);
+			return;
+		}
+	} else
 		return;
-	}
 
 kick_block_event:
 	/*
@@ -917,7 +997,8 @@ static int zk_init(const char *option)
 
 	uatomic_set_false(&stop);
 	uatomic_set_false(&is_master);
-	zk_queue_init();
+	if (zk_queue_init() != ZOK)
+		return -1;
 
 	efd = eventfd(0, EFD_NONBLOCK);
 	if (efd < 0) {
-- 
1.7.9.5




More information about the sheepdog mailing list