[sheepdog] [PATCH v7 6/7] zookeeper: handle session timeout for all zookeeper operations

Kai Zhang kyle at zelin.io
Mon Jun 24 05:05:26 CEST 2013


The idea is: when a zk_* APIs returns ZK_INVALIDSTATE, it means the connection
and session to zookeeper have been lost.
At this point, callers of zk_* APIs should just do nothing but drop control as
soon as possible.
And another thread will responsable for cleaning memory state, re-connecting
to zookeeper and re-sending join request.

Signed-off-by: Kai Zhang <kyle at zelin.io>
---
 sheep/cluster/zookeeper.c |  363 ++++++++++++++++++++++++++++++---------------
 1 file changed, 245 insertions(+), 118 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index fe56a66..b27583f 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -33,8 +33,7 @@
 
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(parent, path, strs)			       \
-	for (zk_get_children(parent, strs),			       \
-		     (strs)->data += (strs)->count;		       \
+	for ((strs)->data += (strs)->count;			       \
 	     (strs)->count-- ?					       \
 		     snprintf(path, sizeof(path), "%s/%s", parent,     \
 			      *--(strs)->data) : (free((strs)->data), 0); \
@@ -142,18 +141,33 @@ static inline struct zk_node *zk_tree_search(const struct node_id *nid)
 static zhandle_t *zhandle;
 static struct zk_node this_node;
 
+#define CHECK_ZK_RC(rc, path)						\
+	switch (rc) {							\
+	case ZNONODE:							\
+	case ZNODEEXISTS:						\
+	case ZINVALIDSTATE:						\
+	case ZSESSIONEXPIRED:						\
+	case ZOPERATIONTIMEOUT:						\
+	case ZCONNECTIONLOSS:						\
+		sd_eprintf("failed, path:%s, %s", path, zerror(rc));	\
+	case ZOK:							\
+		break;							\
+	default:							\
+		panic("failed, path:%s, %s", path, zerror(rc));		\
+	}
+
 static inline ZOOAPI int zk_delete_node(const char *path, int version)
 {
 	int rc;
 	do {
 		rc = zoo_delete(zhandle, path, version);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+	CHECK_ZK_RC(rc, path);
+
 	return rc;
 }
 
-static inline ZOOAPI void
+static inline ZOOAPI int
 zk_init_node(const char *path)
 {
 	int rc;
@@ -161,9 +175,11 @@ zk_init_node(const char *path)
 		rc = zoo_create(zhandle, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0,
 				NULL, 0);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	CHECK_ZK_RC(rc, path);
 
-	if (rc != ZOK && rc != ZNODEEXISTS)
-		panic("failed, path:%s, %s", path, zerror(rc));
+	if (rc == ZNODEEXISTS)
+		rc = ZOK;
+	return rc;
 }
 
 static inline ZOOAPI int
@@ -175,9 +191,9 @@ zk_create_node(const char *path, const char *value, int valuelen,
 	do {
 		rc = zoo_create(zhandle, path, value, valuelen, acl,
 				flags, path_buffer, path_buffer_len);
-		if (rc != ZOK && rc != ZNODEEXISTS)
-			sd_eprintf("failed, path:%s, %s", path, zerror(rc));
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	CHECK_ZK_RC(rc, path);
+
 	return rc;
 }
 
@@ -198,8 +214,7 @@ zk_create_seq_node(const char *path, const char *value, int valuelen,
 		flags = flags | ZOO_EPHEMERAL;
 	rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE,
 			flags, path_buffer, path_buffer_len);
-	if (rc != ZOK)
-		sd_iprintf("failed, path:%s, %s", path, zerror(rc));
+	CHECK_ZK_RC(rc, path);
 
 	return rc;
 }
@@ -211,9 +226,9 @@ static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
 	do {
 		rc = zoo_get(zhandle, path, 1, (char *)buffer,
 			     buffer_len, NULL);
-		if (rc != ZOK)
-			sd_eprintf("failed, path:%s, %s", path, zerror(rc));
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	CHECK_ZK_RC(rc, path);
+
 	return rc;
 }
 
@@ -224,8 +239,8 @@ zk_set_data(const char *path, const char *buffer, int buflen, int version)
 	do {
 		rc = zoo_set(zhandle, path, buffer, buflen, version);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		panic("failed, path:%s, %s", path, zerror(rc));
+	CHECK_ZK_RC(rc, path);
+
 	return rc;
 }
 
@@ -234,29 +249,29 @@ static inline ZOOAPI int zk_node_exists(const char *path)
 	int rc;
 	do {
 		rc = zoo_exists(zhandle, path, 1, NULL);
-		if (rc != ZOK && rc != ZNONODE)
-			sd_eprintf("failed, path:%s, %s", path, zerror(rc));
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	CHECK_ZK_RC(rc, path);
 
 	return rc;
 }
 
-static inline ZOOAPI void zk_get_children(const char *path,
-					  struct String_vector *strings)
+static inline ZOOAPI int zk_get_children(const char *path,
+					 struct String_vector *strings)
 {
 	int rc;
 	do {
 		rc = zoo_get_children(zhandle, path, 1, strings);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		panic("failed, path:%s, %s", path, zerror(rc));
+	CHECK_ZK_RC(rc, path);
+
+	return rc;
 }
 
 /* ZooKeeper-based queue give us an totally ordered events */
 static int efd;
 static int32_t queue_pos;
 
-static bool zk_queue_peek(void)
+static int zk_queue_peek(bool *peek)
 {
 	int rc;
 	char path[MAX_NODE_STR_LEN];
@@ -265,13 +280,19 @@ static bool zk_queue_peek(void)
 
 	rc = zk_node_exists(path);
 	if (rc == ZOK)
-		return true;
+		*peek = true;
+	else if (rc == ZNONODE) {
+		rc = ZOK;
+		*peek = false;
+	} else
+		sd_eprintf("failed, %s", zerror(rc));
 
-	return false;
+	return rc;
 }
 
 /* return true if there is a node with 'id' in the queue. */
-static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
+static int zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len,
+			    bool *found)
 {
 	int rc, len;
 
@@ -286,23 +307,26 @@ static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
 			if (ev.id == id) {
 				sd_dprintf("id %"PRIx64" is found in %s", id,
 					   seq_path);
-				return true;
+				*found = true;
+				return ZOK;
 			}
 			break;
 		case ZNONODE:
 			sd_dprintf("id %"PRIx64" is not found", id);
-			return false;
+			*found = false;
+			return ZOK;
 		default:
-			panic("failed, %s", zerror(rc));
-			break;
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
 		}
 	}
 }
 
-static void zk_queue_push(struct zk_event *ev)
+static int zk_queue_push(struct zk_event *ev)
 {
 	int rc, len;
 	char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN];
+	bool found;
 
 	len = offsetof(typeof(*ev), buf) + ev->buf_len;
 	snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
@@ -314,12 +338,17 @@ again:
 		break;
 	case ZOPERATIONTIMEOUT:
 	case ZCONNECTIONLOSS:
-		if (!zk_find_seq_node(ev->id, buf, sizeof(buf)))
-			/* retry if seq_node was not created */
-			goto again;
-		break;
+		if (zk_find_seq_node(ev->id, buf, sizeof(buf), &found) == ZOK) {
+			if (found)
+				break;
+			else
+				/* retry if seq_node was not created */
+				goto again;
+		}
+		/* fall through */
 	default:
-		panic("failed, path:%s, %s", path, zerror(rc));
+		sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+		return rc;
 	}
 	if (first_push) {
 		int32_t seq;
@@ -332,6 +361,7 @@ again:
 
 	sd_dprintf("create path:%s, queue_pos:%010"PRId32", len:%d",
 		   buf, queue_pos, len);
+	return ZOK;
 }
 
 static inline void *zk_event_sd_nodes(struct zk_event *ev)
@@ -340,10 +370,10 @@ static inline void *zk_event_sd_nodes(struct zk_event *ev)
 }
 
 /* Change the join event in place and piggyback the nodes information. */
-static void push_join_response(struct zk_event *ev)
+static int push_join_response(struct zk_event *ev)
 {
 	char path[MAX_NODE_STR_LEN];
-	int len;
+	int len, rc;
 
 	ev->type = EVENT_JOIN_RESPONSE;
 	ev->nr_nodes = nr_sd_nodes;
@@ -353,12 +383,17 @@ static void push_join_response(struct zk_event *ev)
 
 	len = offsetof(typeof(*ev), buf) + ev->buf_len;
 	snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
-	zk_set_data(path, (char *)ev, len, -1);
-	sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
-		   path, queue_pos, len);
+	rc = zk_set_data(path, (char *)ev, len, -1);
+	if (rc == ZOK)
+		sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
+			   path, queue_pos, len);
+	else
+		sd_eprintf("failed, %s", zerror(rc));
+
+	return rc;
 }
 
-static void zk_queue_pop_advance(struct zk_event *ev)
+static int zk_queue_pop_advance(struct zk_event *ev)
 {
 	int rc, len;
 	char path[MAX_NODE_STR_LEN];
@@ -366,19 +401,14 @@ static void zk_queue_pop_advance(struct zk_event *ev)
 	len = sizeof(*ev);
 	snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
 	rc = zk_get_data(path, ev, &len);
-	if (rc != ZOK)
-		panic("failed to get data from %s, %s", path, zerror(rc));
-	sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type, len,
-		   queue_pos);
-	queue_pos++;
-}
-
-static int zk_member_empty(void)
-{
-	struct String_vector strs;
+	if (rc == ZOK) {
+		sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type,
+			   len, queue_pos);
+		queue_pos++;
+	} else
+		sd_eprintf("failed, path %s, %s", path, zerror(rc));
 
-	zk_get_children(MEMBER_ZNODE, &strs);
-	return (strs.count == 0);
+	return rc;
 }
 
 static inline void zk_tree_add(struct zk_node *node)
@@ -439,18 +469,30 @@ static inline void build_node_list(void)
 	sd_dprintf("nr_sd_nodes:%zu", nr_sd_nodes);
 }
 
-static inline int zk_master_create(void)
+static int zk_queue_init(void)
 {
-	return zk_create_node(MASTER_ZNONE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
-			      ZOO_EPHEMERAL, NULL, 0);
-}
-
-static void zk_queue_init(void)
-{
-	zk_init_node(BASE_ZNODE);
-	zk_init_node(MASTER_ZNONE);
-	zk_init_node(QUEUE_ZNODE);
-	zk_init_node(MEMBER_ZNODE);
+	int rc;
+	rc = zk_init_node(BASE_ZNODE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", BASE_ZNODE, zerror(rc));
+		return rc;
+	}
+	rc = zk_init_node(MASTER_ZNONE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", MASTER_ZNONE, zerror(rc));
+		return rc;
+	}
+	rc = zk_init_node(QUEUE_ZNODE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", QUEUE_ZNODE, zerror(rc));
+		return rc;
+	}
+	rc = zk_init_node(MEMBER_ZNODE);
+	if (rc != ZOK) {
+		sd_eprintf("failed, path %s, %s", MEMBER_ZNODE, zerror(rc));
+		return rc;
+	}
+	return ZOK;
 }
 
 /* Calculate a unique 64 bit integer from this_node and the sequence number. */
@@ -469,6 +511,7 @@ static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
 		     size_t buf_len)
 {
 	struct zk_event ev;
+	int rc;
 
 	ev.id = get_uniq_id();
 	ev.type = type;
@@ -476,8 +519,11 @@ static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
 	ev.buf_len = buf_len;
 	if (buf)
 		memcpy(ev.buf, buf, buf_len);
-	zk_queue_push(&ev);
-	return 0;
+	rc = zk_queue_push(&ev);
+	if (rc != ZOK)
+		sd_eprintf("failed, type: %d", type);
+
+	return rc;
 }
 
 static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
@@ -488,6 +534,10 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 	int ret;
 
 	if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
+		/*
+		 * do reconnect in main thread to avoid on-the-fly zookeeper
+		 * operations.
+		 */
 		eventfd_write(efd, 1);
 		return;
 	}
@@ -524,7 +574,6 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 		if (n)
 			add_event(EVENT_LEAVE, &znode, NULL, 0);
 	}
-
 }
 
 /*
@@ -544,18 +593,23 @@ static int add_join_event(void *msg, size_t msg_len)
 	ev.buf_len = len;
 	if (msg)
 		memcpy(ev.buf, msg, msg_len);
-	zk_queue_push(&ev);
-	return 0;
+	return zk_queue_push(&ev);
 }
 
-static void zk_get_least_seq(const char *parent, char *least_seq_path,
-			     int path_len, void *buf, int *buf_len)
+static int zk_get_least_seq(const char *parent, char *least_seq_path,
+			    int path_len, void *buf, int *buf_len)
 {
 	char path[MAX_NODE_STR_LEN], *p, *tmp;
 	struct String_vector strs;
 	int rc, least_seq = INT_MAX , seq;
 
 	while (true) {
+		rc = zk_get_children(parent, &strs);
+		if (rc != ZOK) {
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
+		}
+
 		FOR_EACH_ZNODE(parent, path, &strs) {
 			p = strrchr(path, '/');
 			seq = strtol(++p, &tmp, 10);
@@ -569,22 +623,28 @@ static void zk_get_least_seq(const char *parent, char *least_seq_path,
 
 		if (rc == ZOK) {
 			strncpy(least_seq_path, path, path_len);
-			break;
+			return ZOK;
 		} else if (rc == ZNONODE)
 			continue;
-		else
-			panic("failed, path:%s, %s", path, zerror(rc));
+		else {
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
+		}
 	}
 }
 
-static void zk_find_master(int *master_seq, char *master_name)
+static int zk_find_master(int *master_seq, char *master_name)
 {
 	int rc, len = MAX_NODE_STR_LEN;
 	char master_compete_path[MAX_NODE_STR_LEN];
 
 	if (*master_seq < 0) {
-		zk_get_least_seq(MASTER_ZNONE, master_compete_path,
-				 MAX_NODE_STR_LEN, master_name, &len);
+		rc = zk_get_least_seq(MASTER_ZNONE, master_compete_path,
+				      MAX_NODE_STR_LEN, master_name, &len);
+		if (rc != ZOK) {
+			sd_eprintf("failed");
+			return rc;
+		}
 		sscanf(master_compete_path, MASTER_ZNONE "/%"PRId32,
 		       master_seq);
 	} else {
@@ -600,29 +660,35 @@ static void zk_find_master(int *master_seq, char *master_name)
 					   "start to compete master");
 				(*master_seq)++;
 				continue;
-			} else
-				panic("failed, path:%s, %s",
-				      master_compete_path, zerror(rc));
+			} else {
+				sd_eprintf("failed, %s", zerror(rc));
+				return rc;
+			}
 		}
 	}
+
+	return ZOK;
 }
 
 /*
  * block until last sheep joined
- * return sequence number of last sheep or -1 if no previous sheep
+ * last_sheep returns sequence number of last sheep or -1 if no previous sheep
  */
-static int zk_verify_last_sheep_join(int seq)
+static int zk_verify_last_sheep_join(int seq, int *last_sheep)
 {
-	int rc, i, len = MAX_NODE_STR_LEN;
+	int rc, len = MAX_NODE_STR_LEN;
 	char path[MAX_NODE_STR_LEN], name[MAX_NODE_STR_LEN];
 
-	for (i = seq - 1; i >= 0; i--) {
-		snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNONE "/%010"PRId32, i);
+	for (*last_sheep = seq - 1; *last_sheep >= 0; (*last_sheep)--) {
+		snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNONE "/%010"PRId32,
+			 *last_sheep);
 		rc = zk_get_data(path, name, &len);
 		if (rc == ZNONODE)
 			continue;
-		else if (rc != ZOK)
-			panic("failed, path:%s, %s", path, zerror(rc));
+		else if (rc != ZOK) {
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
+		}
 
 		if (!strcmp(name, node_to_str(&this_node.node)))
 			continue;
@@ -632,12 +698,14 @@ static int zk_verify_last_sheep_join(int seq)
 		if (rc == ZOK)
 			break;
 		else if (rc == ZNONODE) {
-			i++;
+			(*last_sheep)++;
 			continue;
-		} else
-			panic("failed, path:%s, %s", path, zerror(rc));
+		} else {
+			sd_eprintf("failed, %s", zerror(rc));
+			return rc;
+		}
 	}
-	return i;
+	return ZOK;
 }
 
 /*
@@ -646,7 +714,7 @@ static int zk_verify_last_sheep_join(int seq)
  */
 static void zk_compete_master(void)
 {
-	int rc;
+	int rc, last_joined_sheep;
 	char master_name[MAX_NODE_STR_LEN];
 	char my_compete_path[MAX_NODE_STR_LEN];
 	static int master_seq = -1, my_seq;
@@ -672,29 +740,42 @@ static void zk_compete_master(void)
 						my_compete_path,
 						MAX_NODE_STR_LEN, true);
 		} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-
-		if (rc != ZOK)
-			panic("failed, %s", zerror(rc));
+		CHECK_ZK_RC(rc, MASTER_ZNONE "/");
+		if (rc != ZOK) {
+			sd_eprintf("failed");
+			goto out_unlock;
+		}
 
 		sd_dprintf("my compete path: %s", my_compete_path);
 		sscanf(my_compete_path, MASTER_ZNONE "/%"PRId32,
 		       &my_seq);
 	}
 
-	zk_find_master(&master_seq, master_name);
+	if (zk_find_master(&master_seq, master_name) != ZOK) {
+		sd_eprintf("failed");
+		goto out_unlock;
+	}
 
 	if (!strcmp(master_name, node_to_str(&this_node.node)))
 		goto success;
 	else if (joined) {
 		sd_iprintf("lost");
 		goto out_unlock;
-	} else if (zk_verify_last_sheep_join(my_seq) < 0) {
-		/* all previous sheep has quit, i'm master */
-		master_seq = my_seq;
-		goto success;
 	} else {
-		sd_iprintf("lost");
-		goto out_unlock;
+		if (zk_verify_last_sheep_join(my_seq,
+					      &last_joined_sheep) != ZOK) {
+			sd_eprintf("failed");
+			goto out_unlock;
+		}
+
+		if (last_joined_sheep < 0) {
+			/* all previous sheep has quit, i'm master */
+			master_seq = my_seq;
+			goto success;
+		} else {
+			sd_iprintf("lost");
+			goto out_unlock;
+		}
 	}
 success:
 	uatomic_set_true(&is_master);
@@ -719,7 +800,11 @@ static int zk_join(const struct sd_node *myself,
 	}
 
 	zk_compete_master();
-	return add_join_event(opaque, opaque_len);
+	rc = add_join_event(opaque, opaque_len);
+	if (rc != ZOK)
+		sd_eprintf("failed, %s", zerror(rc));
+
+	return rc;
 }
 
 static int zk_leave(void)
@@ -777,16 +862,21 @@ static void zk_handle_join_request(struct zk_event *ev)
 static void watch_all_nodes(void)
 {
 	struct String_vector strs;
-	struct zk_node znode;
 	char path[MAX_NODE_STR_LEN];
-	int len = sizeof(znode);
+	int rc;
 
-	if (zk_member_empty())
-		return;
+	rc = zk_get_children(MEMBER_ZNODE, &strs);
+	if (rc != ZOK)
+		goto error;
 
 	FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
-		zk_get_data(path, &znode, &len);
+		rc = zk_node_exists(path);
+		if (rc != ZOK)
+			goto error;
 	}
+	return;
+error:
+	sd_eprintf("failed, %s", zerror(rc));
 }
 
 static void init_node_list(struct zk_event *ev)
@@ -809,6 +899,7 @@ static void init_node_list(struct zk_event *ev)
 static void zk_handle_join_response(struct zk_event *ev)
 {
 	char path[MAX_NODE_STR_LEN];
+	int rc;
 
 	sd_dprintf("JOIN RESPONSE");
 	if (node_eq(&ev->sender.node, &this_node.node))
@@ -833,9 +924,15 @@ static void zk_handle_join_response(struct zk_event *ev)
 		if (node_eq(&ev->sender.node, &this_node.node)) {
 			joined = true;
 			sd_dprintf("create path:%s", path);
-			zk_create_node(path, (char *)zoo_client_id(zhandle),
-				       sizeof(clientid_t), &ZOO_OPEN_ACL_UNSAFE,
-				       ZOO_EPHEMERAL, NULL, 0);
+			rc = zk_create_node(path,
+					    (char *)zoo_client_id(zhandle),
+					    sizeof(clientid_t),
+					    &ZOO_OPEN_ACL_UNSAFE,
+					    ZOO_EPHEMERAL, NULL, 0);
+			if (rc != ZOK) {
+				sd_eprintf("failed, %s", zerror(rc));
+				return;
+			}
 		} else
 			zk_node_exists(path);
 
@@ -932,6 +1029,13 @@ static void (*const zk_event_handlers[])(struct zk_event *ev) = {
 
 static const int zk_max_event_handlers = ARRAY_SIZE(zk_event_handlers);
 
+/*
+ * This method should be done in main thread and triggered when zk_watcher()
+ * receives a session timeout event.
+ * All other zk operations who receive 'ZINVALIDSTATE' return code should drop
+ * control of main thread as soon as possible. So that this method can be
+ * executed and re-establish a new session with zookeeper server.
+ */
 static inline void handle_session_expire(void)
 {
 	/* clean memory states */
@@ -941,6 +1045,7 @@ static inline void handle_session_expire(void)
 	INIT_LIST_HEAD(&zk_block_list);
 	nr_sd_nodes = 0;
 	first_push = true;
+	joined = false;
 	memset(sd_nodes, 0, sizeof(struct sd_node) * SD_MAX_NODES);
 
 	while (sd_reconnect_handler()) {
@@ -953,6 +1058,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 {
 	eventfd_t value;
 	struct zk_event ev;
+	bool peek;
 
 	sd_dprintf("%d, %d", events, queue_pos);
 	if (events & EPOLLHUP) {
@@ -975,18 +1081,38 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 		return;
 	}
 
-	if (!zk_queue_peek())
-		goto kick_block_event;
+	switch (zk_queue_peek(&peek)) {
+	case ZOK:
+		if (!peek)
+			goto kick_block_event;
+		break;
+	default:
+		sd_eprintf("failed");
+		return;
+	}
 
-	zk_queue_pop_advance(&ev);
+	if (zk_queue_pop_advance(&ev) != ZOK) {
+		sd_eprintf("failed");
+		return;
+	}
 	if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
 		zk_event_handlers[ev.type](&ev);
 	else
 		panic("unhandled type %d", ev.type);
 
-	 /* Someone has created next event, go kick event handler. */
-	if (zk_queue_peek()) {
-		eventfd_write(efd, 1);
+	switch (zk_queue_peek(&peek)) {
+	case ZOK:
+		if (peek) {
+			/*
+			 * Someone has created next event,
+			 * go kick event handler.
+			 */
+			eventfd_write(efd, 1);
+			return;
+		}
+		break;
+	default:
+		sd_eprintf("failed");
 		return;
 	}
 
@@ -1032,7 +1158,8 @@ static int zk_init(const char *option)
 
 	uatomic_set_false(&stop);
 	uatomic_set_false(&is_master);
-	zk_queue_init();
+	if (zk_queue_init() != ZOK)
+		return -1;
 
 	efd = eventfd(0, EFD_NONBLOCK);
 	if (efd < 0) {
-- 
1.7.9.5




More information about the sheepdog mailing list