From: Liu Yuan <tailai.ly at taobao.com> We shouldn't simply get the membership from zookeeper because we might lose some event when multiple events happens, for e,g, one node leave while another joins in, we would possibly see a single event instead of two events that are expected by sheep. Pass membership in the join event solves it. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/cluster.h | 2 +- sheep/cluster/zookeeper.c | 119 ++++++++++++++++++++++++++++++--------------- 2 files changed, 80 insertions(+), 41 deletions(-) diff --git a/sheep/cluster.h b/sheep/cluster.h index 2679c19..d1e711d 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -24,7 +24,7 @@ #include "logger.h" /* maximum payload size sent in ->notify and ->unblock */ -#define SD_MAX_EVENT_BUF_SIZE (64 * 1024) +#define SD_MAX_EVENT_BUF_SIZE (128 * 1024) /* 128k */ enum cluster_join_result { CJ_RES_SUCCESS, /* Success */ diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 336c827..bf91ea3 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -59,6 +59,8 @@ struct zk_event { enum zk_event_type type; struct zk_node sender; enum cluster_join_result join_result; + size_t msg_len; + size_t nr_nodes; size_t buf_len; uint8_t buf[SD_MAX_EVENT_BUF_SIZE]; }; @@ -153,6 +155,8 @@ zk_create_node(const char *path, const char *value, int valuelen, do { rc = zoo_create(zhandle, path, value, valuelen, acl, flags, path_buffer, path_buffer_len); + if (rc != ZOK && rc != ZNODEEXISTS) + eprintf("failed, path:%s, rc:%d\n", path, rc); } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); return rc; } @@ -164,6 +168,8 @@ static inline ZOOAPI int zk_get_data(const char *path, void *buffer, do { rc = zoo_get(zhandle, path, 1, (char *)buffer, buffer_len, NULL); + if (rc != ZOK) + eprintf("failed, path:%s, rc:%d\n", path, rc); } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); return rc; } @@ -185,6 +191,8 @@ static inline ZOOAPI int zk_node_exists(const char *path) int rc; do { rc = zoo_exists(zhandle, path, 1, NULL); + if (rc != ZOK && rc != ZNONODE) + eprintf("failed, path:%s, rc:%d\n", path, rc); } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); return rc; @@ -239,20 +247,25 @@ static void zk_queue_push(struct zk_event *ev) first_push = false; } - if (ret == ZOK) - dprintf("create:%s, queue_pos:%010"PRId32", len:%d\n", - buf, queue_pos, len); + dprintf("create path:%s, queue_pos:%010"PRId32", len:%d, ret %d\n", + buf, queue_pos, len, ret); } -/* - * Change the event in place and expect the dedicated handler to be called - * via zk_watcher which wakes up one of the zk_event_handlers. - */ -static int zk_queue_push_back(struct zk_event *ev) +static inline void *zk_event_sd_nodes(struct zk_event *ev) +{ + return (char *)ev->buf + ev->msg_len; +} + +/* Change the join event in place and piggyback the nodes information. */ +static int push_join_response(struct zk_event *ev) { - int len; char path[256]; + int len; + ev->type = EVENT_JOIN_RESPONSE; + ev->nr_nodes = nr_sd_nodes; + memcpy(zk_event_sd_nodes(ev), sd_nodes, + nr_sd_nodes * sizeof(struct sd_node)); queue_pos--; len = (char *)(ev->buf) - (char *)ev + ev->buf_len; @@ -362,33 +375,6 @@ static void zk_queue_init(void) zk_init_node(MEMBER_ZNODE); } -static void zk_member_init(void) -{ - int rc, len; - struct String_vector strs; - struct zk_node znode; - char path[256]; - - if (zk_member_empty()) - return; - - FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) { - len = sizeof(znode); - rc = zk_get_data(path, &znode, &len); - if (rc != ZOK) - continue; - - switch (rc) { - case ZOK: - zk_tree_add(&znode); - case ZNONODE: - break; - default: - panic("zk_get_data failed:%s, rc:%d\n", path, rc); - } - } -} - static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf, size_t buf_len) { @@ -431,6 +417,26 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, } +/* + * We plcaehode the enough space to piggyback the nodes information on join + * response message so that every node can see the same membership view. + */ +static int add_join_event(void *msg, size_t msg_len) +{ + struct zk_event ev; + size_t len = msg_len + sizeof(struct sd_node) * SD_MAX_NODES; + + assert(len <= SD_MAX_EVENT_BUF_SIZE); + ev.type = EVENT_JOIN_REQUEST; + ev.sender = this_node; + ev.msg_len = msg_len; + ev.buf_len = len; + if (msg) + memcpy(ev.buf, msg, msg_len); + zk_queue_push(&ev); + return 0; +} + static int zk_join(const struct sd_node *myself, void *opaque, size_t opaque_len) { @@ -450,7 +456,7 @@ static int zk_join(const struct sd_node *myself, while (zk_member_empty() && zk_master_create() != ZOK) ;/* wait */ - return add_event(EVENT_JOIN_REQUEST, &this_node, opaque, opaque_len); + return add_join_event(opaque, opaque_len); } static int zk_leave(void) @@ -483,10 +489,10 @@ static void zk_handle_join_request(struct zk_event *ev) queue_pos--; return; } + res = sd_check_join_cb(&ev->sender.node, ev->buf); ev->join_result = res; - ev->type = EVENT_JOIN_RESPONSE; - zk_queue_push_back(ev); + push_join_response(ev); if (res == CJ_RES_MASTER_TRANSFER) { eprintf("failed to join sheepdog cluster: " "please retry when master is up\n"); @@ -496,13 +502,46 @@ static void zk_handle_join_request(struct zk_event *ev) dprintf("I'm the master now\n"); } +static void watch_all_nodes(void) +{ + struct String_vector strs; + struct zk_node znode; + char path[256]; + int len = sizeof(znode); + + if (zk_member_empty()) + return; + + FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) { + zk_get_data(path, &znode, &len); + } +} + +static void init_node_list(struct zk_event *ev) +{ + uint8_t *p = zk_event_sd_nodes(ev); + size_t node_nr = ev->nr_nodes; + int i; + + dprintf("%zu\n", node_nr); + for (i = 0; i < node_nr; i++) { + struct zk_node zk; + mempcpy(&zk.node, p, sizeof(struct sd_node)); + zk_tree_add(&zk); + p += sizeof(struct sd_node); + } + + watch_all_nodes(); +} + static void zk_handle_join_response(struct zk_event *ev) { char path[256]; dprintf("JOIN RESPONSE\n"); if (node_eq(&ev->sender.node, &this_node.node)) - zk_member_init(); + /* newly joined node */ + init_node_list(ev); if (ev->join_result == CJ_RES_MASTER_TRANSFER) /* -- 1.7.9.5 |