[sheepdog] [PATCH v2 2/4] sheep: rejoin cluster after a zookeeper session timeout

Kai Zhang kyle at zelin.io
Sat Jun 15 05:22:58 CEST 2013


Add sd_reconnect_handler() to group.c to handle reconnect issue.

Signed-off-by: Kai Zhang <kyle at zelin.io>
---
 sheep/cluster.h           |    1 +
 sheep/cluster/zookeeper.c |   32 +++++++++++++++++++++++++++++---
 sheep/group.c             |   12 ++++++++++++
 3 files changed, 42 insertions(+), 3 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index a19a10b..80a701b 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -156,6 +156,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
 		      size_t nr_members);
 void sd_notify_handler(const struct sd_node *sender, void *msg, size_t msg_len);
 bool sd_block_handler(const struct sd_node *sender);
+int sd_reconnect_handler(void);
 enum cluster_join_result sd_check_join_cb(const struct sd_node *joining,
 					  void *opaque);
 void recalculate_vnodes(struct sd_node *nodes, int nr_nodes);
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index d65c3c2..04984c2 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -75,6 +75,7 @@ static pthread_rwlock_t zk_tree_lock = PTHREAD_RWLOCK_INITIALIZER;
 static LIST_HEAD(zk_block_list);
 static uatomic_bool is_master;
 static uatomic_bool stop;
+static bool first_push = true;
 
 static void zk_compete_master(void);
 
@@ -295,7 +296,6 @@ static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
 
 static void zk_queue_push(struct zk_event *ev)
 {
-	static bool first_push = true;
 	int rc, len;
 	char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN];
 
@@ -481,6 +481,11 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 	char str[MAX_NODE_STR_LEN], *p;
 	int ret;
 
+	if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
+		eventfd_write(efd, 1);
+		return;
+	}
+
 /* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */
 	sd_dprintf("path:%s, type:%d", path, type);
 	if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
@@ -717,8 +722,8 @@ static void zk_handle_join_response(struct zk_event *ev)
 			 node_to_str(&ev->sender.node));
 		if (node_eq(&ev->sender.node, &this_node.node)) {
 			sd_dprintf("create path:%s", path);
-			zk_create_node(path, (char *)&ev->sender,
-				       sizeof(ev->sender), &ZOO_OPEN_ACL_UNSAFE,
+			zk_create_node(path, (char *)zoo_client_id(zhandle),
+				       sizeof(clientid_t), &ZOO_OPEN_ACL_UNSAFE,
 				       ZOO_EPHEMERAL, NULL, 0);
 		} else
 			zk_node_exists(path);
@@ -834,6 +839,27 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 		return;
 	}
 
+	if (zoo_state(zhandle) == ZOO_EXPIRED_SESSION_STATE) {
+		sd_eprintf("detect a session timeout. reconnecting...");
+		/* clean memory states */
+		close(efd);
+		zk_tree_destroy();
+		INIT_RB_ROOT(&zk_node_root);
+		INIT_LIST_HEAD(&zk_block_list);
+		nr_sd_nodes = 0;
+		first_push = true;
+		memset(sd_nodes, 0, sizeof(struct sd_node) * SD_MAX_NODES);
+
+		/* call sd_reconnect_handler to reconnect */
+		while (sd_reconnect_handler()) {
+			sd_eprintf("failed to reconnect. sleep and retry...");
+			sleep(1);
+		}
+		sd_iprintf("reconnected");
+		eventfd_write(efd, 1);
+		return;
+	}
+
 	if (!zk_queue_peek())
 		goto kick_block_event;
 
diff --git a/sheep/group.c b/sheep/group.c
index f74ef10..8584b69 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -1105,6 +1105,18 @@ static int send_join_request(struct sd_node *ent)
 	return ret;
 }
 
+int sd_reconnect_handler(void)
+{
+	sys->status = SD_STATUS_WAIT_FOR_JOIN;
+	sys->join_finished = false;
+	if (sys->cdrv->init(sys->cdrv_option) != 0)
+		return -1;
+	if (send_join_request(&sys->this_node) != 0)
+		return -1;
+
+	return 0;
+}
+
 void sd_join_handler(const struct sd_node *joined,
 		     const struct sd_node *members,
 		     size_t nr_members, enum cluster_join_result result,
-- 
1.7.9.5




More information about the sheepdog mailing list