[sheepdog] [PATCH 7/8] zookeeper: pass membership in event for join request
Liu Yuan
namei.unix at gmail.com
Sun Dec 23 16:26:31 CET 2012
From: Liu Yuan <tailai.ly at taobao.com>
We shouldn't simply get the membership from zookeeper because we might lose
some event when multiple events happens, for e,g, one node leave while another
joins in, we would possibly see a single event instead of two events that are
expected by sheep.
Pass membership in the join event solves it.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/cluster.h | 2 +-
sheep/cluster/zookeeper.c | 119 ++++++++++++++++++++++++++++++---------------
2 files changed, 80 insertions(+), 41 deletions(-)
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 2679c19..d1e711d 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -24,7 +24,7 @@
#include "logger.h"
/* maximum payload size sent in ->notify and ->unblock */
-#define SD_MAX_EVENT_BUF_SIZE (64 * 1024)
+#define SD_MAX_EVENT_BUF_SIZE (128 * 1024) /* 128k */
enum cluster_join_result {
CJ_RES_SUCCESS, /* Success */
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 336c827..bf91ea3 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -59,6 +59,8 @@ struct zk_event {
enum zk_event_type type;
struct zk_node sender;
enum cluster_join_result join_result;
+ size_t msg_len;
+ size_t nr_nodes;
size_t buf_len;
uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
};
@@ -153,6 +155,8 @@ zk_create_node(const char *path, const char *value, int valuelen,
do {
rc = zoo_create(zhandle, path, value, valuelen, acl,
flags, path_buffer, path_buffer_len);
+ if (rc != ZOK && rc != ZNODEEXISTS)
+ eprintf("failed, path:%s, rc:%d\n", path, rc);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
return rc;
}
@@ -164,6 +168,8 @@ static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
do {
rc = zoo_get(zhandle, path, 1, (char *)buffer,
buffer_len, NULL);
+ if (rc != ZOK)
+ eprintf("failed, path:%s, rc:%d\n", path, rc);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
return rc;
}
@@ -185,6 +191,8 @@ static inline ZOOAPI int zk_node_exists(const char *path)
int rc;
do {
rc = zoo_exists(zhandle, path, 1, NULL);
+ if (rc != ZOK && rc != ZNONODE)
+ eprintf("failed, path:%s, rc:%d\n", path, rc);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
return rc;
@@ -239,20 +247,25 @@ static void zk_queue_push(struct zk_event *ev)
first_push = false;
}
- if (ret == ZOK)
- dprintf("create:%s, queue_pos:%010"PRId32", len:%d\n",
- buf, queue_pos, len);
+ dprintf("create path:%s, queue_pos:%010"PRId32", len:%d, ret %d\n",
+ buf, queue_pos, len, ret);
}
-/*
- * Change the event in place and expect the dedicated handler to be called
- * via zk_watcher which wakes up one of the zk_event_handlers.
- */
-static int zk_queue_push_back(struct zk_event *ev)
+static inline void *zk_event_sd_nodes(struct zk_event *ev)
+{
+ return (char *)ev->buf + ev->msg_len;
+}
+
+/* Change the join event in place and piggyback the nodes information. */
+static int push_join_response(struct zk_event *ev)
{
- int len;
char path[256];
+ int len;
+ ev->type = EVENT_JOIN_RESPONSE;
+ ev->nr_nodes = nr_sd_nodes;
+ memcpy(zk_event_sd_nodes(ev), sd_nodes,
+ nr_sd_nodes * sizeof(struct sd_node));
queue_pos--;
len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
@@ -362,33 +375,6 @@ static void zk_queue_init(void)
zk_init_node(MEMBER_ZNODE);
}
-static void zk_member_init(void)
-{
- int rc, len;
- struct String_vector strs;
- struct zk_node znode;
- char path[256];
-
- if (zk_member_empty())
- return;
-
- FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
- len = sizeof(znode);
- rc = zk_get_data(path, &znode, &len);
- if (rc != ZOK)
- continue;
-
- switch (rc) {
- case ZOK:
- zk_tree_add(&znode);
- case ZNONODE:
- break;
- default:
- panic("zk_get_data failed:%s, rc:%d\n", path, rc);
- }
- }
-}
-
static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
size_t buf_len)
{
@@ -431,6 +417,26 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
}
+/*
+ * We plcaehode the enough space to piggyback the nodes information on join
+ * response message so that every node can see the same membership view.
+ */
+static int add_join_event(void *msg, size_t msg_len)
+{
+ struct zk_event ev;
+ size_t len = msg_len + sizeof(struct sd_node) * SD_MAX_NODES;
+
+ assert(len <= SD_MAX_EVENT_BUF_SIZE);
+ ev.type = EVENT_JOIN_REQUEST;
+ ev.sender = this_node;
+ ev.msg_len = msg_len;
+ ev.buf_len = len;
+ if (msg)
+ memcpy(ev.buf, msg, msg_len);
+ zk_queue_push(&ev);
+ return 0;
+}
+
static int zk_join(const struct sd_node *myself,
void *opaque, size_t opaque_len)
{
@@ -450,7 +456,7 @@ static int zk_join(const struct sd_node *myself,
while (zk_member_empty() && zk_master_create() != ZOK)
;/* wait */
- return add_event(EVENT_JOIN_REQUEST, &this_node, opaque, opaque_len);
+ return add_join_event(opaque, opaque_len);
}
static int zk_leave(void)
@@ -483,10 +489,10 @@ static void zk_handle_join_request(struct zk_event *ev)
queue_pos--;
return;
}
+
res = sd_check_join_cb(&ev->sender.node, ev->buf);
ev->join_result = res;
- ev->type = EVENT_JOIN_RESPONSE;
- zk_queue_push_back(ev);
+ push_join_response(ev);
if (res == CJ_RES_MASTER_TRANSFER) {
eprintf("failed to join sheepdog cluster: "
"please retry when master is up\n");
@@ -496,13 +502,46 @@ static void zk_handle_join_request(struct zk_event *ev)
dprintf("I'm the master now\n");
}
+static void watch_all_nodes(void)
+{
+ struct String_vector strs;
+ struct zk_node znode;
+ char path[256];
+ int len = sizeof(znode);
+
+ if (zk_member_empty())
+ return;
+
+ FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
+ zk_get_data(path, &znode, &len);
+ }
+}
+
+static void init_node_list(struct zk_event *ev)
+{
+ uint8_t *p = zk_event_sd_nodes(ev);
+ size_t node_nr = ev->nr_nodes;
+ int i;
+
+ dprintf("%zu\n", node_nr);
+ for (i = 0; i < node_nr; i++) {
+ struct zk_node zk;
+ mempcpy(&zk.node, p, sizeof(struct sd_node));
+ zk_tree_add(&zk);
+ p += sizeof(struct sd_node);
+ }
+
+ watch_all_nodes();
+}
+
static void zk_handle_join_response(struct zk_event *ev)
{
char path[256];
dprintf("JOIN RESPONSE\n");
if (node_eq(&ev->sender.node, &this_node.node))
- zk_member_init();
+ /* newly joined node */
+ init_node_list(ev);
if (ev->join_result == CJ_RES_MASTER_TRANSFER)
/*
--
1.7.9.5
More information about the sheepdog
mailing list