From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> This checks znodes in the zookeeper queue, and retries zoo_create() when the requested znode is not created. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/cluster/zookeeper.c | 78 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 7 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index d82a486..5480475 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -58,6 +58,7 @@ struct zk_node { }; struct zk_event { + uint64_t id; enum zk_event_type type; struct zk_node sender; enum cluster_join_result join_result; @@ -174,11 +175,13 @@ zk_create_node(const char *path, const char *value, int valuelen, } /* - * FIXME: ZOPERATIONTIMEOUT or ZCONNECTIONLOSS will cause duplicate seq node of - * the same event that we can't handle at all. Panic out in any error case would - * be the safest option. + * Create a znode after adding a unique monotonically increasing sequence number + * to the path name. + * + * Note that the caller has to retry this function when this returns + * ZOPERATIONTIMEOUT or ZCONNECTIONLOSS and the znode is not created. */ -static inline ZOOAPI void +static inline ZOOAPI int zk_create_seq_node(const char *path, const char *value, int valuelen, char *path_buffer, int path_buffer_len) { @@ -186,7 +189,9 @@ zk_create_seq_node(const char *path, const char *value, int valuelen, rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, path_buffer, path_buffer_len); if (rc != ZOK) - panic("failed, path:%s, %s", path, zerror(rc)); + sd_iprintf("failed, path:%s, %s", path, zerror(rc)); + + return rc; } static inline ZOOAPI int zk_get_data(const char *path, void *buffer, @@ -255,15 +260,60 @@ static bool zk_queue_peek(void) return false; } +/* return true if there is a node with 'id' in the queue. */ +static bool zk_find_seq_node(uint64_t id) +{ + int rc, len; + + for (int seq = queue_pos; ; seq++) { + char seq_path[MAX_NODE_STR_LEN]; + struct zk_event ev; + + snprintf(seq_path, sizeof(seq_path), QUEUE_ZNODE"/%010"PRId32, + seq); + len = offsetof(typeof(ev), id) + sizeof(ev.id); + rc = zk_get_data(seq_path, &ev, &len); + switch (rc) { + case ZOK: + if (ev.id == id) { + sd_dprintf("id %"PRIx64" is found in %s", id, + seq_path); + return true; + } + break; + case ZNONODE: + sd_dprintf("id %"PRIx64" is not found", id); + return false; + default: + panic("failed, %s", zerror(rc)); + break; + } + } +} + static void zk_queue_push(struct zk_event *ev) { static bool first_push = true; - int len; + int rc, len; char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN]; len = (char *)(ev->buf) - (char *)ev + ev->buf_len; snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE); - zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf)); +again: + rc = zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf)); + switch (rc) { + case ZOK: + /* Success */ + break; + case ZOPERATIONTIMEOUT: + case ZCONNECTIONLOSS: + if (!zk_find_seq_node(ev->id)) + /* retry if seq_node was not created */ + goto again; + break; + default: + panic("failed, path:%s, %s", path, zerror(rc)); + } if (first_push) { int32_t seq; @@ -418,11 +468,24 @@ static void zk_queue_init(void) zk_init_node(MEMBER_ZNODE); } +/* Calculate a unique 64 bit integer from this_node and the sequence number. */ +static uint64_t get_uniq_id(void) +{ + static int seq; + uint64_t id, n = uatomic_add_return(&seq, 1); + + id = fnv_64a_buf(&this_node, sizeof(this_node), FNV1A_64_INIT); + id = fnv_64a_buf(&n, sizeof(n), id); + + return id; +} + static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf, size_t buf_len) { struct zk_event ev; + ev.id = get_uniq_id(); ev.type = type; ev.sender = *znode; ev.buf_len = buf_len; @@ -478,6 +541,7 @@ static int add_join_event(void *msg, size_t msg_len) size_t len = msg_len + sizeof(struct sd_node) * SD_MAX_NODES; assert(len <= SD_MAX_EVENT_BUF_SIZE); + ev.id = get_uniq_id(); ev.type = EVENT_JOIN_REQUEST; ev.sender = this_node; ev.msg_len = msg_len; -- 1.7.9.5 |