[sheepdog] [PATCH v6 6/6] zookeeper: handle session timeout for all zookeeper operations
Kai Zhang
kyle at zelin.io
Fri Jun 21 14:34:44 CEST 2013
The idea is: when a zk_* APIs returns ZK_INVALIDSTATE, it means the connection
and session to zookeeper have been lost.
At this point, callers of zk_* APIs should just do nothing but drop control as
soon as possible.
And another thread will responsable for cleaning memory state, re-connecting
to zookeeper and re-sending join request.
Signed-off-by: Kai Zhang <kyle at zelin.io>
---
sheep/cluster/zookeeper.c | 348 ++++++++++++++++++++++++++++++---------------
1 file changed, 230 insertions(+), 118 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index c848df2..bf22bbd 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -33,8 +33,7 @@
/* iterate child znodes */
#define FOR_EACH_ZNODE(parent, path, strs) \
- for (zk_get_children(parent, strs), \
- (strs)->data += (strs)->count; \
+ for ((strs)->data += (strs)->count; \
(strs)->count-- ? \
snprintf(path, sizeof(path), "%s/%s", parent, \
*--(strs)->data) : (free((strs)->data), 0); \
@@ -142,18 +141,33 @@ static inline struct zk_node *zk_tree_search(const struct node_id *nid)
static zhandle_t *zhandle;
static struct zk_node this_node;
+#define check_zk_rc(rc, path) \
+ switch (rc) { \
+ case ZNONODE: \
+ case ZNODEEXISTS: \
+ case ZINVALIDSTATE: \
+ case ZSESSIONEXPIRED: \
+ case ZOPERATIONTIMEOUT: \
+ case ZCONNECTIONLOSS: \
+ sd_eprintf("failed, path:%s, %s", path, zerror(rc)); \
+ case ZOK: \
+ break; \
+ default: \
+ panic("failed, path:%s, %s", path, zerror(rc)); \
+ }
+
static inline ZOOAPI int zk_delete_node(const char *path, int version)
{
int rc;
do {
rc = zoo_delete(zhandle, path, version);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
- if (rc != ZOK)
- sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+ check_zk_rc(rc, path);
+
return rc;
}
-static inline ZOOAPI void
+static inline ZOOAPI int
zk_init_node(const char *path)
{
int rc;
@@ -161,9 +175,11 @@ zk_init_node(const char *path)
rc = zoo_create(zhandle, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0,
NULL, 0);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+ check_zk_rc(rc, path);
- if (rc != ZOK && rc != ZNODEEXISTS)
- panic("failed, path:%s, %s", path, zerror(rc));
+ if (rc == ZNODEEXISTS)
+ rc = ZOK;
+ return rc;
}
static inline ZOOAPI int
@@ -175,9 +191,9 @@ zk_create_node(const char *path, const char *value, int valuelen,
do {
rc = zoo_create(zhandle, path, value, valuelen, acl,
flags, path_buffer, path_buffer_len);
- if (rc != ZOK && rc != ZNODEEXISTS)
- sd_eprintf("failed, path:%s, %s", path, zerror(rc));
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+ check_zk_rc(rc, path);
+
return rc;
}
@@ -198,8 +214,7 @@ zk_create_seq_node(const char *path, const char *value, int valuelen,
flags = flags | ZOO_EPHEMERAL;
rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE,
flags, path_buffer, path_buffer_len);
- if (rc != ZOK)
- sd_iprintf("failed, path:%s, %s", path, zerror(rc));
+ check_zk_rc(rc, path);
return rc;
}
@@ -211,9 +226,9 @@ static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
do {
rc = zoo_get(zhandle, path, 1, (char *)buffer,
buffer_len, NULL);
- if (rc != ZOK)
- sd_eprintf("failed, path:%s, %s", path, zerror(rc));
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+ check_zk_rc(rc, path);
+
return rc;
}
@@ -224,8 +239,8 @@ zk_set_data(const char *path, const char *buffer, int buflen, int version)
do {
rc = zoo_set(zhandle, path, buffer, buflen, version);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
- if (rc != ZOK)
- panic("failed, path:%s, %s", path, zerror(rc));
+ check_zk_rc(rc, path);
+
return rc;
}
@@ -234,29 +249,29 @@ static inline ZOOAPI int zk_node_exists(const char *path)
int rc;
do {
rc = zoo_exists(zhandle, path, 1, NULL);
- if (rc != ZOK && rc != ZNONODE)
- sd_eprintf("failed, path:%s, %s", path, zerror(rc));
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+ check_zk_rc(rc, path);
return rc;
}
-static inline ZOOAPI void zk_get_children(const char *path,
- struct String_vector *strings)
+static inline ZOOAPI int zk_get_children(const char *path,
+ struct String_vector *strings)
{
int rc;
do {
rc = zoo_get_children(zhandle, path, 1, strings);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
- if (rc != ZOK)
- panic("failed, path:%s, %s", path, zerror(rc));
+ check_zk_rc(rc, path);
+
+ return rc;
}
/* ZooKeeper-based queue give us an totally ordered events */
static int efd;
static int32_t queue_pos;
-static bool zk_queue_peek(void)
+static int zk_queue_peek(bool *peek)
{
int rc;
char path[MAX_NODE_STR_LEN];
@@ -265,13 +280,19 @@ static bool zk_queue_peek(void)
rc = zk_node_exists(path);
if (rc == ZOK)
- return true;
+ *peek = true;
+ else if (rc == ZNONODE) {
+ rc = ZOK;
+ *peek = false;
+ } else
+ sd_eprintf("failed, %s", zerror(rc));
- return false;
+ return rc;
}
/* return true if there is a node with 'id' in the queue. */
-static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
+static int zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len,
+ bool *found)
{
int rc, len;
@@ -286,23 +307,26 @@ static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
if (ev.id == id) {
sd_dprintf("id %"PRIx64" is found in %s", id,
seq_path);
- return true;
+ *found = true;
+ return ZOK;
}
break;
case ZNONODE:
sd_dprintf("id %"PRIx64" is not found", id);
- return false;
+ *found = false;
+ return ZOK;
default:
- panic("failed, %s", zerror(rc));
- break;
+ sd_eprintf("failed, %s", zerror(rc));
+ return rc;
}
}
}
-static void zk_queue_push(struct zk_event *ev)
+static int zk_queue_push(struct zk_event *ev)
{
int rc, len;
char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN];
+ bool found;
len = offsetof(typeof(*ev), buf) + ev->buf_len;
snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
@@ -314,12 +338,17 @@ again:
break;
case ZOPERATIONTIMEOUT:
case ZCONNECTIONLOSS:
- if (!zk_find_seq_node(ev->id, buf, sizeof(buf)))
- /* retry if seq_node was not created */
- goto again;
- break;
+ if (zk_find_seq_node(ev->id, buf, sizeof(buf), &found) == ZOK) {
+ if (found)
+ break;
+ else
+ /* retry if seq_node was not created */
+ goto again;
+ }
+ /* fall through */
default:
- panic("failed, path:%s, %s", path, zerror(rc));
+ sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+ return rc;
}
if (first_push) {
int32_t seq;
@@ -332,6 +361,7 @@ again:
sd_dprintf("create path:%s, queue_pos:%010"PRId32", len:%d",
buf, queue_pos, len);
+ return ZOK;
}
static inline void *zk_event_sd_nodes(struct zk_event *ev)
@@ -340,10 +370,10 @@ static inline void *zk_event_sd_nodes(struct zk_event *ev)
}
/* Change the join event in place and piggyback the nodes information. */
-static void push_join_response(struct zk_event *ev)
+static int push_join_response(struct zk_event *ev)
{
char path[MAX_NODE_STR_LEN];
- int len;
+ int len, rc;
ev->type = EVENT_JOIN_RESPONSE;
ev->nr_nodes = nr_sd_nodes;
@@ -353,12 +383,17 @@ static void push_join_response(struct zk_event *ev)
len = offsetof(typeof(*ev), buf) + ev->buf_len;
snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
- zk_set_data(path, (char *)ev, len, -1);
- sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
- path, queue_pos, len);
+ rc = zk_set_data(path, (char *)ev, len, -1);
+ if (rc == ZOK)
+ sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
+ path, queue_pos, len);
+ else
+ sd_eprintf("failed, %s", zerror(rc));
+
+ return rc;
}
-static void zk_queue_pop_advance(struct zk_event *ev)
+static int zk_queue_pop_advance(struct zk_event *ev)
{
int rc, len;
char path[MAX_NODE_STR_LEN];
@@ -366,19 +401,14 @@ static void zk_queue_pop_advance(struct zk_event *ev)
len = sizeof(*ev);
snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
rc = zk_get_data(path, ev, &len);
- if (rc != ZOK)
- panic("failed to get data from %s, %s", path, zerror(rc));
- sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type, len,
- queue_pos);
- queue_pos++;
-}
-
-static int zk_member_empty(void)
-{
- struct String_vector strs;
+ if (rc == ZOK) {
+ sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type,
+ len, queue_pos);
+ queue_pos++;
+ } else
+ sd_eprintf("failed, path %s, %s", path, zerror(rc));
- zk_get_children(MEMBER_ZNODE, &strs);
- return (strs.count == 0);
+ return rc;
}
static inline void zk_tree_add(struct zk_node *node)
@@ -439,18 +469,30 @@ static inline void build_node_list(void)
sd_dprintf("nr_sd_nodes:%zu", nr_sd_nodes);
}
-static inline int zk_master_create(void)
-{
- return zk_create_node(MASTER_ZNONE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
- ZOO_EPHEMERAL, NULL, 0);
-}
-
-static void zk_queue_init(void)
+static int zk_queue_init(void)
{
- zk_init_node(BASE_ZNODE);
- zk_init_node(MASTER_ZNONE);
- zk_init_node(QUEUE_ZNODE);
- zk_init_node(MEMBER_ZNODE);
+ int rc;
+ rc = zk_init_node(BASE_ZNODE);
+ if (rc != ZOK) {
+ sd_eprintf("failed, path %s, %s", BASE_ZNODE, zerror(rc));
+ return rc;
+ }
+ rc = zk_init_node(MASTER_ZNONE);
+ if (rc != ZOK) {
+ sd_eprintf("failed, path %s, %s", MASTER_ZNONE, zerror(rc));
+ return rc;
+ }
+ rc = zk_init_node(QUEUE_ZNODE);
+ if (rc != ZOK) {
+ sd_eprintf("failed, path %s, %s", QUEUE_ZNODE, zerror(rc));
+ return rc;
+ }
+ rc = zk_init_node(MEMBER_ZNODE);
+ if (rc != ZOK) {
+ sd_eprintf("failed, path %s, %s", MEMBER_ZNODE, zerror(rc));
+ return rc;
+ }
+ return ZOK;
}
/* Calculate a unique 64 bit integer from this_node and the sequence number. */
@@ -469,6 +511,7 @@ static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
size_t buf_len)
{
struct zk_event ev;
+ int rc;
ev.id = get_uniq_id();
ev.type = type;
@@ -476,8 +519,11 @@ static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
ev.buf_len = buf_len;
if (buf)
memcpy(ev.buf, buf, buf_len);
- zk_queue_push(&ev);
- return 0;
+ rc = zk_queue_push(&ev);
+ if (rc != ZOK)
+ sd_eprintf("failed, type: %d", type);
+
+ return rc;
}
static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
@@ -524,7 +570,6 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
if (n)
add_event(EVENT_LEAVE, &znode, NULL, 0);
}
-
}
/*
@@ -544,18 +589,23 @@ static int add_join_event(void *msg, size_t msg_len)
ev.buf_len = len;
if (msg)
memcpy(ev.buf, msg, msg_len);
- zk_queue_push(&ev);
- return 0;
+ return zk_queue_push(&ev);
}
-static void zk_get_least_seq(const char *parent, char *least_seq_path,
- int path_len, void *buf, int *buf_len)
+static int zk_get_least_seq(const char *parent, char *least_seq_path,
+ int path_len, void *buf, int *buf_len)
{
char path[MAX_NODE_STR_LEN], *p, *tmp;
struct String_vector strs;
int rc, least_seq = INT_MAX , seq;
while (true) {
+ rc = zk_get_children(parent, &strs);
+ if (rc != ZOK) {
+ sd_eprintf("failed, %s", zerror(rc));
+ return rc;
+ }
+
FOR_EACH_ZNODE(parent, path, &strs) {
p = strrchr(path, '/');
seq = strtol(++p, &tmp, 10);
@@ -569,22 +619,28 @@ static void zk_get_least_seq(const char *parent, char *least_seq_path,
if (rc == ZOK) {
strncpy(least_seq_path, path, path_len);
- break;
+ return ZOK;
} else if (rc == ZNONODE)
continue;
- else
- panic("failed, path:%s, %s", path, zerror(rc));
+ else {
+ sd_eprintf("failed, %s", zerror(rc));
+ return rc;
+ }
}
}
-static void zk_find_master(int *master_seq, char *master_name)
+static int zk_find_master(int *master_seq, char *master_name)
{
int rc, len = MAX_NODE_STR_LEN;
char master_compete_path[MAX_NODE_STR_LEN];
if (*master_seq < 0) {
- zk_get_least_seq(MASTER_ZNONE, master_compete_path,
- MAX_NODE_STR_LEN, master_name, &len);
+ rc = zk_get_least_seq(MASTER_ZNONE, master_compete_path,
+ MAX_NODE_STR_LEN, master_name, &len);
+ if (rc != ZOK) {
+ sd_eprintf("failed");
+ return rc;
+ }
sscanf(master_compete_path, MASTER_ZNONE "/%"PRId32,
master_seq);
} else {
@@ -600,29 +656,35 @@ static void zk_find_master(int *master_seq, char *master_name)
"start to compete master");
(*master_seq)++;
continue;
- } else
- panic("failed, path:%s, %s",
- master_compete_path, zerror(rc));
+ } else {
+ sd_eprintf("failed, %s", zerror(rc));
+ return rc;
+ }
}
}
+
+ return ZOK;
}
/*
* block until last sheep joined
- * return sequence number of last sheep or -1 if no previous sheep
+ * last_sheep returns sequence number of last sheep or -1 if no previous sheep
*/
-static int zk_verify_last_sheep_join(int seq)
+static int zk_verify_last_sheep_join(int seq, int *last_sheep)
{
- int rc, i, len = MAX_NODE_STR_LEN;
+ int rc, len = MAX_NODE_STR_LEN;
char path[MAX_NODE_STR_LEN], name[MAX_NODE_STR_LEN];
- for (i = seq - 1; i >= 0; i--) {
- snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNONE "/%010"PRId32, i);
+ for (*last_sheep = seq - 1; *last_sheep >= 0; (*last_sheep)--) {
+ snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNONE "/%010"PRId32,
+ *last_sheep);
rc = zk_get_data(path, name, &len);
if (rc == ZNONODE)
continue;
- else if (rc != ZOK)
- panic("failed, path:%s, %s", path, zerror(rc));
+ else if (rc != ZOK) {
+ sd_eprintf("failed, %s", zerror(rc));
+ return rc;
+ }
if (!strcmp(name, node_to_str(&this_node.node)))
continue;
@@ -632,12 +694,14 @@ static int zk_verify_last_sheep_join(int seq)
if (rc == ZOK)
break;
else if (rc == ZNONODE) {
- i++;
+ (*last_sheep)++;
continue;
- } else
- panic("failed, path:%s, %s", path, zerror(rc));
+ } else {
+ sd_eprintf("failed, %s", zerror(rc));
+ return rc;
+ }
}
- return i;
+ return ZOK;
}
/*
@@ -646,7 +710,7 @@ static int zk_verify_last_sheep_join(int seq)
*/
static void zk_compete_master(void)
{
- int rc;
+ int rc, last_joined_sheep;
char master_name[MAX_NODE_STR_LEN];
char my_compete_path[MAX_NODE_STR_LEN];
static int master_seq = -1, my_seq;
@@ -672,27 +736,40 @@ static void zk_compete_master(void)
my_compete_path,
MAX_NODE_STR_LEN, true);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-
- if (rc != ZOK)
- panic("failed, %s", zerror(rc));
+ check_zk_rc(rc, MASTER_ZNONE "/");
+ if (rc != ZOK) {
+ sd_eprintf("failed");
+ goto out;
+ }
sd_dprintf("my compete path: %s", my_compete_path);
sscanf(my_compete_path, MASTER_ZNONE "/%"PRId32,
&my_seq);
}
- zk_find_master(&master_seq, master_name);
+ if (zk_find_master(&master_seq, master_name) != ZOK) {
+ sd_eprintf("failed");
+ goto out;
+ }
if (!strcmp(master_name, node_to_str(&this_node.node)))
goto success;
else if (joined)
goto lost;
- else if (zk_verify_last_sheep_join(my_seq) < 0) {
- /* all previous sheep has quit, i'm master */
- master_seq = my_seq;
- goto success;
- } else
- goto lost;
+ else {
+ if (zk_verify_last_sheep_join(my_seq,
+ &last_joined_sheep) != ZOK) {
+ sd_eprintf("failed");
+ goto out;
+ }
+
+ if (last_joined_sheep < 0) {
+ /* all previous sheep has quit, i'm master */
+ master_seq = my_seq;
+ goto success;
+ } else
+ goto lost;
+ }
success:
uatomic_set_true(&is_master);
sd_iprintf("success");
@@ -720,7 +797,11 @@ static int zk_join(const struct sd_node *myself,
}
zk_compete_master();
- return add_join_event(opaque, opaque_len);
+ rc = add_join_event(opaque, opaque_len);
+ if (rc != ZOK)
+ sd_eprintf("failed, %s", zerror(rc));
+
+ return rc;
}
static int zk_leave(void)
@@ -778,16 +859,21 @@ static void zk_handle_join_request(struct zk_event *ev)
static void watch_all_nodes(void)
{
struct String_vector strs;
- struct zk_node znode;
char path[MAX_NODE_STR_LEN];
- int len = sizeof(znode);
+ int rc;
- if (zk_member_empty())
- return;
+ rc = zk_get_children(MEMBER_ZNODE, &strs);
+ if (rc != ZOK)
+ goto error;
FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
- zk_get_data(path, &znode, &len);
+ rc = zk_node_exists(path);
+ if (rc != ZOK)
+ goto error;
}
+ return;
+error:
+ sd_eprintf("failed, %s", zerror(rc));
}
static void init_node_list(struct zk_event *ev)
@@ -810,6 +896,7 @@ static void init_node_list(struct zk_event *ev)
static void zk_handle_join_response(struct zk_event *ev)
{
char path[MAX_NODE_STR_LEN];
+ int rc;
sd_dprintf("JOIN RESPONSE");
if (node_eq(&ev->sender.node, &this_node.node))
@@ -834,9 +921,15 @@ static void zk_handle_join_response(struct zk_event *ev)
if (node_eq(&ev->sender.node, &this_node.node)) {
joined = true;
sd_dprintf("create path:%s", path);
- zk_create_node(path, (char *)zoo_client_id(zhandle),
- sizeof(clientid_t), &ZOO_OPEN_ACL_UNSAFE,
- ZOO_EPHEMERAL, NULL, 0);
+ rc = zk_create_node(path,
+ (char *)zoo_client_id(zhandle),
+ sizeof(clientid_t),
+ &ZOO_OPEN_ACL_UNSAFE,
+ ZOO_EPHEMERAL, NULL, 0);
+ if (rc != ZOK) {
+ sd_eprintf("failed, %s", zerror(rc));
+ return;
+ }
} else
zk_node_exists(path);
@@ -942,6 +1035,7 @@ static inline void handle_session_expire(void)
INIT_LIST_HEAD(&zk_block_list);
nr_sd_nodes = 0;
first_push = true;
+ joined = false;
memset(sd_nodes, 0, sizeof(struct sd_node) * SD_MAX_NODES);
while (sd_reconnect_handler()) {
@@ -954,6 +1048,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
{
eventfd_t value;
struct zk_event ev;
+ bool peek;
sd_dprintf("%d, %d", events, queue_pos);
if (events & EPOLLHUP) {
@@ -976,18 +1071,34 @@ static void zk_event_handler(int listen_fd, int events, void *data)
return;
}
- if (!zk_queue_peek())
- goto kick_block_event;
+ if (zk_queue_peek(&peek) == ZOK) {
+ if (!peek)
+ goto kick_block_event;
+ } else {
+ sd_eprintf("failed");
+ return;
+ }
- zk_queue_pop_advance(&ev);
+ if (zk_queue_pop_advance(&ev) != ZOK) {
+ sd_eprintf("failed");
+ return;
+ }
if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
zk_event_handlers[ev.type](&ev);
else
panic("unhandled type %d", ev.type);
- /* Someone has created next event, go kick event handler. */
- if (zk_queue_peek()) {
- eventfd_write(efd, 1);
+ if (zk_queue_peek(&peek) == ZOK) {
+ if (peek) {
+ /*
+ * Someone has created next event,
+ * go kick event handler.
+ */
+ eventfd_write(efd, 1);
+ return;
+ }
+ } else {
+ sd_eprintf("failed");
return;
}
@@ -1033,7 +1144,8 @@ static int zk_init(const char *option)
uatomic_set_false(&stop);
uatomic_set_false(&is_master);
- zk_queue_init();
+ if (zk_queue_init() != ZOK)
+ return -1;
efd = eventfd(0, EFD_NONBLOCK);
if (efd < 0) {
--
1.7.9.5
More information about the sheepdog
mailing list