[sheepdog] [PATCH v4 5/5] zookeeper: handle session timeout for all zookeeper operations
Kai Zhang
kyle at zelin.io
Tue Jun 18 08:15:19 CEST 2013
The idea is: when a zk_* APIs returns ZK_INVALIDSTATE, it means the connection
and session to zookeeper have been lost.
At this point, callers of zk_* APIs should just do nothing but drop control as
soon as possible.
And another thread will responsable for cleaning memory state, re-connecting
to zookeeper and re-sending join request.
Signed-off-by: Kai Zhang <kyle at zelin.io>
---
sheep/cluster/zookeeper.c | 255 ++++++++++++++++++++++++++++++---------------
1 file changed, 173 insertions(+), 82 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index ca113dc..09b9b17 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -33,8 +33,7 @@
/* iterate child znodes */
#define FOR_EACH_ZNODE(parent, path, strs) \
- for (zk_get_children(parent, strs), \
- (strs)->data += (strs)->count; \
+ for ((strs)->data += (strs)->count; \
(strs)->count-- ? \
snprintf(path, sizeof(path), "%s/%s", parent, \
*--(strs)->data) : (free((strs)->data), 0); \
@@ -76,6 +75,7 @@ static LIST_HEAD(zk_block_list);
static uatomic_bool is_master;
static uatomic_bool stop;
static bool first_push = true;
+static uint64_t zk_flying_ops;
static void zk_compete_master(void);
@@ -140,28 +140,49 @@ static inline struct zk_node *zk_tree_search(const struct node_id *nid)
static zhandle_t *zhandle;
static struct zk_node this_node;
+static inline void check_zk_rc(int rc, const char *path) {
+ switch (rc) {
+ case ZOK:
+ case ZNONODE:
+ case ZNODEEXISTS:
+ case ZINVALIDSTATE:
+ case ZSESSIONEXPIRED:
+ case ZOPERATIONTIMEOUT:
+ case ZCONNECTIONLOSS:
+ break;
+ default:
+ panic("failed, path:%s, %s", path, zerror(rc));
+ }
+}
+
static inline ZOOAPI int zk_delete_node(const char *path, int version)
{
int rc;
+ uatomic_inc(&zk_flying_ops);
do {
rc = zoo_delete(zhandle, path, version);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
- if (rc != ZOK)
- sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+ uatomic_dec(&zk_flying_ops);
+ check_zk_rc(rc, path);
+
return rc;
}
-static inline ZOOAPI void
+static inline ZOOAPI int
zk_init_node(const char *path)
{
int rc;
+ uatomic_inc(&zk_flying_ops);
do {
rc = zoo_create(zhandle, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0,
NULL, 0);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+ uatomic_dec(&zk_flying_ops);
+ check_zk_rc(rc, path);
- if (rc != ZOK && rc != ZNODEEXISTS)
- panic("failed, path:%s, %s", path, zerror(rc));
+ if (rc == ZNODEEXISTS)
+ rc = ZOK;
+ return rc;
}
static inline ZOOAPI int
@@ -170,12 +191,14 @@ zk_create_node(const char *path, const char *value, int valuelen,
int path_buffer_len)
{
int rc;
+ uatomic_inc(&zk_flying_ops);
do {
rc = zoo_create(zhandle, path, value, valuelen, acl,
flags, path_buffer, path_buffer_len);
- if (rc != ZOK && rc != ZNODEEXISTS)
- sd_eprintf("failed, path:%s, %s", path, zerror(rc));
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+ uatomic_dec(&zk_flying_ops);
+ check_zk_rc(rc, path);
+
return rc;
}
@@ -191,11 +214,14 @@ zk_create_seq_node(const char *path, const char *value, int valuelen,
char *path_buffer, int path_buffer_len)
{
int rc;
+ uatomic_inc(&zk_flying_ops);
rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE,
ZOO_SEQUENCE, path_buffer, path_buffer_len);
+ uatomic_dec(&zk_flying_ops);
+ check_zk_rc(rc, path);
+
if (rc != ZOK)
sd_iprintf("failed, path:%s, %s", path, zerror(rc));
-
return rc;
}
@@ -203,12 +229,14 @@ static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
int *buffer_len)
{
int rc;
+ uatomic_inc(&zk_flying_ops);
do {
rc = zoo_get(zhandle, path, 1, (char *)buffer,
buffer_len, NULL);
- if (rc != ZOK)
- sd_eprintf("failed, path:%s, %s", path, zerror(rc));
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+ uatomic_dec(&zk_flying_ops);
+ check_zk_rc(rc, path);
+
return rc;
}
@@ -216,42 +244,48 @@ static inline ZOOAPI int
zk_set_data(const char *path, const char *buffer, int buflen, int version)
{
int rc;
+ uatomic_inc(&zk_flying_ops);
do {
rc = zoo_set(zhandle, path, buffer, buflen, version);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
- if (rc != ZOK)
- panic("failed, path:%s, %s", path, zerror(rc));
+ uatomic_dec(&zk_flying_ops);
+ check_zk_rc(rc, path);
+
return rc;
}
static inline ZOOAPI int zk_node_exists(const char *path)
{
int rc;
+ uatomic_inc(&zk_flying_ops);
do {
rc = zoo_exists(zhandle, path, 1, NULL);
- if (rc != ZOK && rc != ZNONODE)
- sd_eprintf("failed, path:%s, %s", path, zerror(rc));
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+ uatomic_dec(&zk_flying_ops);
+ check_zk_rc(rc, path);
return rc;
}
-static inline ZOOAPI void zk_get_children(const char *path,
- struct String_vector *strings)
+static inline ZOOAPI int zk_get_children(const char *path,
+ struct String_vector *strings)
{
int rc;
+ uatomic_inc(&zk_flying_ops);
do {
rc = zoo_get_children(zhandle, path, 1, strings);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
- if (rc != ZOK)
- panic("failed, path:%s, %s", path, zerror(rc));
+ uatomic_dec(&zk_flying_ops);
+ check_zk_rc(rc, path);
+
+ return rc;
}
/* ZooKeeper-based queue give us an totally ordered events */
static int efd;
static int32_t queue_pos;
-static bool zk_queue_peek(void)
+static int zk_queue_peek(bool *peek)
{
int rc;
char path[MAX_NODE_STR_LEN];
@@ -260,13 +294,19 @@ static bool zk_queue_peek(void)
rc = zk_node_exists(path);
if (rc == ZOK)
- return true;
+ *peek = true;
+ else if (rc == ZNONODE) {
+ rc = ZOK;
+ *peek = false;
+ } else
+ sd_eprintf("failed, %s", zerror(rc));
- return false;
+ return rc;
}
/* return true if there is a node with 'id' in the queue. */
-static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
+static int zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len,
+ bool *found)
{
int rc, len;
@@ -281,15 +321,17 @@ static bool zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len)
if (ev.id == id) {
sd_dprintf("id %"PRIx64" is found in %s", id,
seq_path);
- return true;
+ *found = true;
+ return ZOK;
}
break;
case ZNONODE:
sd_dprintf("id %"PRIx64" is not found", id);
- return false;
+ *found = false;
+ return ZOK;
default:
- panic("failed, %s", zerror(rc));
- break;
+ sd_eprintf("failed, %s", zerror(rc));
+ return rc;
}
}
}
@@ -298,6 +340,7 @@ static void zk_queue_push(struct zk_event *ev)
{
int rc, len;
char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN];
+ bool found;
len = offsetof(typeof(*ev), buf) + ev->buf_len;
snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
@@ -309,12 +352,17 @@ again:
break;
case ZOPERATIONTIMEOUT:
case ZCONNECTIONLOSS:
- if (!zk_find_seq_node(ev->id, buf, sizeof(buf)))
- /* retry if seq_node was not created */
- goto again;
- break;
+ if (zk_find_seq_node(ev->id, buf, sizeof(buf), &found) == ZOK) {
+ if (found)
+ break;
+ else
+ /* retry if seq_node was not created */
+ goto again;
+ }
+ /* fall through */
default:
- panic("failed, path:%s, %s", path, zerror(rc));
+ sd_eprintf("failed, path:%s, %s", path, zerror(rc));
+ return;
}
if (first_push) {
int32_t seq;
@@ -338,7 +386,7 @@ static inline void *zk_event_sd_nodes(struct zk_event *ev)
static void push_join_response(struct zk_event *ev)
{
char path[MAX_NODE_STR_LEN];
- int len;
+ int len, rc;
ev->type = EVENT_JOIN_RESPONSE;
ev->nr_nodes = nr_sd_nodes;
@@ -348,9 +396,12 @@ static void push_join_response(struct zk_event *ev)
len = offsetof(typeof(*ev), buf) + ev->buf_len;
snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
- zk_set_data(path, (char *)ev, len, -1);
- sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
- path, queue_pos, len);
+ rc = zk_set_data(path, (char *)ev, len, -1);
+ if (rc == ZOK)
+ sd_dprintf("update path:%s, queue_pos:%010"PRId32", len:%d",
+ path, queue_pos, len);
+ else
+ sd_eprintf("failed, %s", zerror(rc));
}
static void zk_queue_pop_advance(struct zk_event *ev)
@@ -361,19 +412,12 @@ static void zk_queue_pop_advance(struct zk_event *ev)
len = sizeof(*ev);
snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
rc = zk_get_data(path, ev, &len);
- if (rc != ZOK)
- panic("failed to get data from %s, %s", path, zerror(rc));
- sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type, len,
- queue_pos);
- queue_pos++;
-}
-
-static int zk_member_empty(void)
-{
- struct String_vector strs;
-
- zk_get_children(MEMBER_ZNODE, &strs);
- return (strs.count == 0);
+ if (rc == ZOK) {
+ sd_dprintf("%s, type:%d, len:%d, pos:%"PRId32, path, ev->type,
+ len, queue_pos);
+ queue_pos++;
+ } else
+ sd_eprintf("failed, path %s, %s", path, zerror(rc));
}
static inline void zk_tree_add(struct zk_node *node)
@@ -434,17 +478,25 @@ static inline void build_node_list(void)
sd_dprintf("nr_sd_nodes:%zu", nr_sd_nodes);
}
-static inline int zk_master_create(void)
+static int zk_queue_init(void)
{
- return zk_create_node(MASTER_ZNONE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
- ZOO_EPHEMERAL, NULL, 0);
-}
-
-static void zk_queue_init(void)
-{
- zk_init_node(BASE_ZNODE);
- zk_init_node(QUEUE_ZNODE);
- zk_init_node(MEMBER_ZNODE);
+ int rc;
+ rc = zk_init_node(BASE_ZNODE);
+ if (rc != ZOK) {
+ sd_eprintf("failed, path %s, %s", BASE_ZNODE, zerror(rc));
+ return rc;
+ }
+ rc = zk_init_node(QUEUE_ZNODE);
+ if (rc != ZOK) {
+ sd_eprintf("failed, path %s, %s", QUEUE_ZNODE, zerror(rc));
+ return rc;
+ }
+ rc = zk_init_node(MEMBER_ZNODE);
+ if (rc != ZOK) {
+ sd_eprintf("failed, path %s, %s", MEMBER_ZNODE, zerror(rc));
+ return rc;
+ }
+ return ZOK;
}
/* Calculate a unique 64 bit integer from this_node and the sequence number. */
@@ -567,8 +619,11 @@ again:
goto out;
else if (rc == ZNONODE)
goto again;
- else
- panic("failed, path:%s, %s", master_path, zerror(rc));
+ else {
+ sd_eprintf("failed, path:%s, %s", master_path,
+ zerror(rc));
+ return;
+ }
} else if (rc == ZNONODE) {
/* compete for master */
rc = zk_create_node(MASTER_ZNONE, node_to_str(&this_node.node),
@@ -582,10 +637,15 @@ again:
return;
} else if (rc == ZNODEEXISTS)
goto again;
- else
- panic("failed, path:%s, %s", MASTER_ZNONE, zerror(rc));
- } else
- panic("failed, path:%s, %s", MASTER_ZNONE, zerror(rc));
+ else {
+ sd_eprintf("failed, path:%s, %s", MASTER_ZNONE,
+ zerror(rc));
+ return;
+ }
+ } else {
+ sd_eprintf("failed, path:%s, %s", MASTER_ZNONE, zerror(rc));
+ return;
+ }
out:
sd_iprintf("lost");
}
@@ -667,16 +727,21 @@ static void zk_handle_join_request(struct zk_event *ev)
static void watch_all_nodes(void)
{
struct String_vector strs;
- struct zk_node znode;
char path[MAX_NODE_STR_LEN];
- int len = sizeof(znode);
+ int rc;
- if (zk_member_empty())
- return;
+ rc = zk_get_children(MEMBER_ZNODE, &strs);
+ if (rc != ZOK)
+ goto error;
FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
- zk_get_data(path, &znode, &len);
+ rc = zk_node_exists(path);
+ if (rc != ZOK)
+ goto error;
}
+ return;
+error:
+ sd_eprintf("failed, %s", zerror(rc));
}
static void init_node_list(struct zk_event *ev)
@@ -699,6 +764,7 @@ static void init_node_list(struct zk_event *ev)
static void zk_handle_join_response(struct zk_event *ev)
{
char path[MAX_NODE_STR_LEN];
+ int rc;
sd_dprintf("JOIN RESPONSE");
if (node_eq(&ev->sender.node, &this_node.node))
@@ -722,9 +788,15 @@ static void zk_handle_join_response(struct zk_event *ev)
node_to_str(&ev->sender.node));
if (node_eq(&ev->sender.node, &this_node.node)) {
sd_dprintf("create path:%s", path);
- zk_create_node(path, (char *)zoo_client_id(zhandle),
- sizeof(clientid_t), &ZOO_OPEN_ACL_UNSAFE,
- ZOO_EPHEMERAL, NULL, 0);
+ rc = zk_create_node(path,
+ (char *)zoo_client_id(zhandle),
+ sizeof(clientid_t),
+ &ZOO_OPEN_ACL_UNSAFE,
+ ZOO_EPHEMERAL, NULL, 0);
+ if (rc != ZOK) {
+ sd_eprintf("failed, %s", zerror(rc));
+ return;
+ }
} else
zk_node_exists(path);
@@ -825,6 +897,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
{
eventfd_t value;
struct zk_event ev;
+ bool peek;
sd_dprintf("%d, %d", events, queue_pos);
if (events & EPOLLHUP) {
@@ -840,7 +913,15 @@ static void zk_event_handler(int listen_fd, int events, void *data)
}
if (zoo_state(zhandle) == ZOO_EXPIRED_SESSION_STATE) {
- sd_eprintf("detect a session timeout. reconnecting...");
+ sd_eprintf("detect a session timeout.");
+ while (uatomic_read(&zk_flying_ops) != 0) {
+ sd_eprintf("waiting for flying zookeeper operations"
+ " %" PRIu64,
+ zk_flying_ops);
+ sleep(1);
+ }
+
+ sd_eprintf("start to reconnect...");
/* clean memory states */
close(efd);
zk_tree_destroy();
@@ -860,8 +941,11 @@ static void zk_event_handler(int listen_fd, int events, void *data)
return;
}
- if (!zk_queue_peek())
- goto kick_block_event;
+ if (zk_queue_peek(&peek) == ZOK) {
+ if (!peek)
+ goto kick_block_event;
+ } else
+ return;
zk_queue_pop_advance(&ev);
if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
@@ -869,11 +953,17 @@ static void zk_event_handler(int listen_fd, int events, void *data)
else
panic("unhandled type %d", ev.type);
- /* Someone has created next event, go kick event handler. */
- if (zk_queue_peek()) {
- eventfd_write(efd, 1);
+ if (zk_queue_peek(&peek) == ZOK) {
+ if (peek) {
+ /*
+ * Someone has created next event,
+ * go kick event handler.
+ */
+ eventfd_write(efd, 1);
+ return;
+ }
+ } else
return;
- }
kick_block_event:
/*
@@ -917,7 +1007,8 @@ static int zk_init(const char *option)
uatomic_set_false(&stop);
uatomic_set_false(&is_master);
- zk_queue_init();
+ if (zk_queue_init() != ZOK)
+ return -1;
efd = eventfd(0, EFD_NONBLOCK);
if (efd < 0) {
--
1.7.9.5
More information about the sheepdog
mailing list