From: Yunkai Zhang <qiushu.zyk at taobao.com> The range of sequence number in zookeeper is: [-2^31 ~ (2^31-1)], We store it in queue_pos variable, and use queue_pos++ to refer next znode in /sheepdog/queue. When sheep creates the first znode in /sheepdog/queue, zookeeper initials the sequence number with 0. Each time we create a new znode, it's value plus 1. So it will overflow and become a negative in the future. In fact, it's value just like a circle. This patch try to handle this problem. Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- sheep/cluster/zookeeper.c | 36 ++++++++++++++++++++---------------- 1 files changed, 20 insertions(+), 16 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index d90e27f..a18f9a0 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -65,7 +65,7 @@ enum zk_event_type { }; struct zk_node { - int seq; + int32_t seq; int joined; clientid_t clientid; struct sd_node node; @@ -206,7 +206,7 @@ static void zk_unlock(zhandle_t *zh) /* ZooKeeper-based queue */ static int efd; -static int queue_pos; +static int32_t queue_pos; static int zk_queue_empty(zhandle_t *zh) { @@ -222,9 +222,11 @@ static int zk_queue_empty(zhandle_t *zh) return 1; } -static int zk_queue_push(zhandle_t *zh, struct zk_event *ev) +static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev) { - int rc, seq, len; + static int first_push = 1; + int32_t seq; + int rc, len; char path[256], buf[256]; eventfd_t value = 1; @@ -232,14 +234,14 @@ static int zk_queue_push(zhandle_t *zh, struct zk_event *ev) sprintf(path, "%s/", QUEUE_ZNODE); rc = zk_create(zh, path, (char *)ev, len, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf)); - dprintf("create path:%s, nr_nodes:%ld, queue_pos:%d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc); + dprintf("create path:%s, nr_nodes:%ld, queue_pos:%010d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc); if (rc != ZOK) panic("failed to zk_create path:%s, rc:%d\n", path, rc); - sscanf(buf, QUEUE_ZNODE "/%010d", &seq); - dprintf("path:%s, seq:%d\n", buf, seq); + sscanf(buf, QUEUE_ZNODE "/%d", &seq); + dprintf("path:%s, seq:%010d\n", buf, seq); - if (queue_pos < 0) { + if (first_push) { /* the first pushed data should be EVENT_IGNORE */ assert(ev->type == EVENT_IGNORE); @@ -248,6 +250,8 @@ static int zk_queue_push(zhandle_t *zh, struct zk_event *ev) /* manual notify */ dprintf("write event to efd:%d\n", efd); eventfd_write(efd, value); + + first_push = 0; } return seq; @@ -261,14 +265,14 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev) queue_pos--; - dprintf("queue_pos:%d\n", queue_pos); + dprintf("queue_pos:%010d\n", queue_pos); if (ev) { /* update the last popped data */ len = (char *)(ev->buf) - (char *)ev + ev->buf_len; sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); rc = zk_set(zh, path, (char *)ev, len, -1); - dprintf("update path:%s, queue_pos:%d, len:%d, rc:%d\n", path, queue_pos, len, rc); + dprintf("update path:%s, queue_pos:%010d, len:%d, rc:%d\n", path, queue_pos, len, rc); if (rc != ZOK) panic("failed to zk_set path:%s, rc:%d\n", path, rc); @@ -298,7 +302,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); rc = zk_get(zh, path, 1, (char *)ev, &len, NULL); if (rc == ZOK && node_cmp(&ev->sender.node, &lev->sender.node) == 0 && ev->blocked) { - dprintf("this queue_pos:%d have blocked whole cluster, ignore it\n", queue_pos); + dprintf("this queue_pos:%010d have blocked whole cluster, ignore it\n", queue_pos); queue_pos++; /* watch next data */ @@ -364,7 +368,7 @@ out: static int zk_queue_seq(zhandle_t *zh) { - int seq; + int32_t seq; struct zk_event ev; memset(&ev, 0, sizeof(ev)); @@ -393,7 +397,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes) int i, j, k; struct idxs { int idx; - int seq; + int32_t seq; } idxs[SD_MAX_NODES], t; struct zk_node N[SD_MAX_NODES]; @@ -403,7 +407,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes) for (i=0; i<nr_nodes; i++) { idxs[i].idx = i; idxs[i].seq = znodes[i].seq; - dprintf("zk_nodes[%d], seq:%d, value:%s\n", + dprintf("zk_nodes[%d], seq:%010d, value:%s\n", i, znodes[i].seq, node_to_str(&znodes[i].node)); } @@ -425,7 +429,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes) for (i=0; i<nr_nodes; i++) { N[i] = znodes[idxs[i].idx]; - dprintf("N[%d], seq:%d, value:%s\n", + dprintf("N[%d], seq:%010d, value:%s\n", i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node)); } memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes)); @@ -743,7 +747,7 @@ static int zk_join(struct sd_node *myself, assert(cid != NULL); this_node.clientid = *cid; - dprintf("this_seq:%d, clientid:%ld\n", this_node.seq, cid->client_id); + dprintf("this_seq:%010d, clientid:%ld\n", this_node.seq, cid->client_id); sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself)); dprintf("try to create member path:%s\n", path); -- 1.7.7.6 |