From: Liu Yuan <tailai.ly at taobao.com> For error conditions, such as operation timeout, the write of seq node will fail. We couldn't simply rewrite it because this would result in the same event to be written multiple times. Sheep can't handle this situation at all. We must panic out for error conditions for writing seq node. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/cluster/zookeeper.c | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index bf91ea3..1b4577b 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -23,7 +23,7 @@ #include "util.h" #include "rbtree.h" -#define SESSION_TIMEOUT 10000 /* millisecond */ +#define SESSION_TIMEOUT 20000 /* millisecond */ #define BASE_ZNODE "/sheepdog" #define QUEUE_ZNODE BASE_ZNODE "/queue" @@ -143,7 +143,7 @@ zk_init_node(const char *path) } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); if (rc != ZOK && rc != ZNODEEXISTS) - panic("failed, path:%s, rc:%d\n", path, rc); + panic("failed, path:%s, %s\n", path, zerror(rc)); } static inline ZOOAPI int @@ -156,11 +156,27 @@ zk_create_node(const char *path, const char *value, int valuelen, rc = zoo_create(zhandle, path, value, valuelen, acl, flags, path_buffer, path_buffer_len); if (rc != ZOK && rc != ZNODEEXISTS) - eprintf("failed, path:%s, rc:%d\n", path, rc); + eprintf("failed, path:%s, %s\n", path, zerror(rc)); } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); return rc; } +/* + * FIXME: ZOPERATIONTIMEOUT or ZCONNECTIONLOSS will cause duplicate seq node of + * the same event that we can't handle at all. Panic out in any arror case would + * be the safest option. + */ +static inline ZOOAPI void +zk_create_seq_node(const char *path, const char *value, int valuelen, + char *path_buffer, int path_buffer_len) +{ + int rc; + rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE, + ZOO_SEQUENCE, path_buffer, path_buffer_len); + if (rc != ZOK) + panic("failed, path:%s, %s\n", path, zerror(rc)); +} + static inline ZOOAPI int zk_get_data(const char *path, void *buffer, int *buffer_len) { @@ -169,7 +185,7 @@ static inline ZOOAPI int zk_get_data(const char *path, void *buffer, rc = zoo_get(zhandle, path, 1, (char *)buffer, buffer_len, NULL); if (rc != ZOK) - eprintf("failed, path:%s, rc:%d\n", path, rc); + eprintf("failed, path:%s, %s\n", path, zerror(rc)); } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); return rc; } @@ -182,7 +198,7 @@ zk_set_data(const char *path, const char *buffer, int buflen, int version) rc = zoo_set(zhandle, path, buffer, buflen, version); } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); if (rc != ZOK) - panic("failed, path:%s, rc:%d\n", path, rc); + panic("failed, path:%s, %s\n", path, zerror(rc)); return rc; } @@ -192,7 +208,7 @@ static inline ZOOAPI int zk_node_exists(const char *path) do { rc = zoo_exists(zhandle, path, 1, NULL); if (rc != ZOK && rc != ZNONODE) - eprintf("failed, path:%s, rc:%d\n", path, rc); + eprintf("failed, path:%s, %s\n", path, zerror(rc)); } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); return rc; @@ -206,7 +222,7 @@ static inline ZOOAPI void zk_get_children(const char *path, rc = zoo_get_children(zhandle, path, 1, strings); } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); if (rc != ZOK) - panic("failed:%s, rc:%d\n", path, rc); + panic("failed, path:%s, %s\n", path, zerror(rc)); } /* ZooKeeper-based queue give us an totally ordered events */ @@ -230,14 +246,12 @@ static bool zk_queue_peek(void) static void zk_queue_push(struct zk_event *ev) { static bool first_push = true; - int len, ret; + int len; char path[256], buf[256]; len = (char *)(ev->buf) - (char *)ev + ev->buf_len; sprintf(path, "%s/", QUEUE_ZNODE); - ret = zk_create_node(path, (char *)ev, len, - &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, - sizeof(buf)); + zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf)); if (first_push) { int32_t seq; @@ -247,8 +261,8 @@ static void zk_queue_push(struct zk_event *ev) first_push = false; } - dprintf("create path:%s, queue_pos:%010"PRId32", len:%d, ret %d\n", - buf, queue_pos, len, ret); + dprintf("create path:%s, queue_pos:%010"PRId32", len:%d\n", + buf, queue_pos, len); } static inline void *zk_event_sd_nodes(struct zk_event *ev) -- 1.7.9.5 |