[sheepdog] [PATCH 2/4] sheep: rejoin cluster after a zookeeper session timeout
Kai Zhang
kyle at zelin.io
Fri Jun 14 03:46:36 CEST 2013
Add sd_reconnect_handler() to group.c to handle reconnect issue.
Signed-off-by: Kai Zhang <kyle at zelin.io>
---
sheep/cluster.h | 1 +
sheep/cluster/zookeeper.c | 32 +++++++++++++++++++++++++++++---
sheep/group.c | 12 ++++++++++++
3 files changed, 42 insertions(+), 3 deletions(-)
diff --git a/sheep/cluster.h b/sheep/cluster.h
index a19a10b..80a701b 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -156,6 +156,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
size_t nr_members);
void sd_notify_handler(const struct sd_node *sender, void *msg, size_t msg_len);
bool sd_block_handler(const struct sd_node *sender);
+int sd_reconnect_handler(void);
enum cluster_join_result sd_check_join_cb(const struct sd_node *joining,
void *opaque);
void recalculate_vnodes(struct sd_node *nodes, int nr_nodes);
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 09b14c8..a39b427 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -75,6 +75,7 @@ static pthread_rwlock_t zk_tree_lock = PTHREAD_RWLOCK_INITIALIZER;
static LIST_HEAD(zk_block_list);
static uatomic_bool is_master;
static uatomic_bool stop;
+static bool first_push = true;
static void zk_compete_master(void);
@@ -295,7 +296,6 @@ static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
static void zk_queue_push(struct zk_event *ev)
{
- static bool first_push = true;
int rc, len;
char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN];
@@ -481,6 +481,11 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
char str[MAX_NODE_STR_LEN], *p;
int ret;
+ if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
+ eventfd_write(efd, 1);
+ return;
+ }
+
/* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */
sd_dprintf("path:%s, type:%d", path, type);
if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
@@ -705,8 +710,8 @@ static void zk_handle_join_response(struct zk_event *ev)
node_to_str(&ev->sender.node));
if (node_eq(&ev->sender.node, &this_node.node)) {
sd_dprintf("create path:%s", path);
- zk_create_node(path, (char *)&ev->sender,
- sizeof(ev->sender), &ZOO_OPEN_ACL_UNSAFE,
+ zk_create_node(path, (char *)zoo_client_id(zhandle),
+ sizeof(clientid_t), &ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL, NULL, 0);
} else
zk_node_exists(path);
@@ -822,6 +827,27 @@ static void zk_event_handler(int listen_fd, int events, void *data)
return;
}
+ if (zoo_state(zhandle) == ZOO_EXPIRED_SESSION_STATE) {
+ sd_eprintf("detect a session timeout. reconnecting...");
+ /* clean memory states */
+ close(efd);
+ zk_tree_destroy();
+ INIT_RB_ROOT(&zk_node_root);
+ INIT_LIST_HEAD(&zk_block_list);
+ nr_sd_nodes = 0;
+ first_push = true;
+ memset(sd_nodes, 0, sizeof(struct sd_node) * SD_MAX_NODES);
+
+ /* call sd_reconnect_handler to reconnect */
+ while (sd_reconnect_handler()) {
+ sd_eprintf("failed to reconnect. sleep and retry...");
+ sleep(1);
+ }
+ sd_iprintf("reconnected");
+ eventfd_write(efd, 1);
+ return;
+ }
+
if (!zk_queue_peek())
goto kick_block_event;
diff --git a/sheep/group.c b/sheep/group.c
index f74ef10..8584b69 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -1105,6 +1105,18 @@ static int send_join_request(struct sd_node *ent)
return ret;
}
+int sd_reconnect_handler(void)
+{
+ sys->status = SD_STATUS_WAIT_FOR_JOIN;
+ sys->join_finished = false;
+ if (sys->cdrv->init(sys->cdrv_option) != 0)
+ return -1;
+ if (send_join_request(&sys->this_node) != 0)
+ return -1;
+
+ return 0;
+}
+
void sd_join_handler(const struct sd_node *joined,
const struct sd_node *members,
size_t nr_members, enum cluster_join_result result,
--
1.7.1
More information about the sheepdog
mailing list