[Sheepdog] [zookeeper][PATCH v2 08/11] Add code to handle sequence number overflow
Yunkai Zhang
yunkai.me at gmail.com
Thu Apr 26 17:21:27 CEST 2012
From: Yunkai Zhang <qiushu.zyk at taobao.com>
The range of sequence number in zookeeper is: [-2^31 ~ (2^31-1)],
We store it in queue_pos variable, and use queue_pos++ to refer
next znode in /sheepdog/queue.
When sheep creates the first znode in /sheepdog/queue, zookeeper
initials the sequence number with 0. Each time we create a new
znode, it's value plus 1. So it will overflow and become a negative
in the future. In fact, it's value just like a circle.
This patch try to handle this problem.
Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
sheep/cluster/zookeeper.c | 36 ++++++++++++++++++++----------------
1 files changed, 20 insertions(+), 16 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index d90e27f..a18f9a0 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -65,7 +65,7 @@ enum zk_event_type {
};
struct zk_node {
- int seq;
+ int32_t seq;
int joined;
clientid_t clientid;
struct sd_node node;
@@ -206,7 +206,7 @@ static void zk_unlock(zhandle_t *zh)
/* ZooKeeper-based queue */
static int efd;
-static int queue_pos;
+static int32_t queue_pos;
static int zk_queue_empty(zhandle_t *zh)
{
@@ -222,9 +222,11 @@ static int zk_queue_empty(zhandle_t *zh)
return 1;
}
-static int zk_queue_push(zhandle_t *zh, struct zk_event *ev)
+static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
{
- int rc, seq, len;
+ static int first_push = 1;
+ int32_t seq;
+ int rc, len;
char path[256], buf[256];
eventfd_t value = 1;
@@ -232,14 +234,14 @@ static int zk_queue_push(zhandle_t *zh, struct zk_event *ev)
sprintf(path, "%s/", QUEUE_ZNODE);
rc = zk_create(zh, path, (char *)ev, len,
&ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
- dprintf("create path:%s, nr_nodes:%ld, queue_pos:%d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc);
+ dprintf("create path:%s, nr_nodes:%ld, queue_pos:%010d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc);
if (rc != ZOK)
panic("failed to zk_create path:%s, rc:%d\n", path, rc);
- sscanf(buf, QUEUE_ZNODE "/%010d", &seq);
- dprintf("path:%s, seq:%d\n", buf, seq);
+ sscanf(buf, QUEUE_ZNODE "/%d", &seq);
+ dprintf("path:%s, seq:%010d\n", buf, seq);
- if (queue_pos < 0) {
+ if (first_push) {
/* the first pushed data should be EVENT_IGNORE */
assert(ev->type == EVENT_IGNORE);
@@ -248,6 +250,8 @@ static int zk_queue_push(zhandle_t *zh, struct zk_event *ev)
/* manual notify */
dprintf("write event to efd:%d\n", efd);
eventfd_write(efd, value);
+
+ first_push = 0;
}
return seq;
@@ -261,14 +265,14 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
queue_pos--;
- dprintf("queue_pos:%d\n", queue_pos);
+ dprintf("queue_pos:%010d\n", queue_pos);
if (ev) {
/* update the last popped data */
len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
rc = zk_set(zh, path, (char *)ev, len, -1);
- dprintf("update path:%s, queue_pos:%d, len:%d, rc:%d\n", path, queue_pos, len, rc);
+ dprintf("update path:%s, queue_pos:%010d, len:%d, rc:%d\n", path, queue_pos, len, rc);
if (rc != ZOK)
panic("failed to zk_set path:%s, rc:%d\n", path, rc);
@@ -298,7 +302,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
if (rc == ZOK && node_cmp(&ev->sender.node, &lev->sender.node) == 0 && ev->blocked) {
- dprintf("this queue_pos:%d have blocked whole cluster, ignore it\n", queue_pos);
+ dprintf("this queue_pos:%010d have blocked whole cluster, ignore it\n", queue_pos);
queue_pos++;
/* watch next data */
@@ -364,7 +368,7 @@ out:
static int zk_queue_seq(zhandle_t *zh)
{
- int seq;
+ int32_t seq;
struct zk_event ev;
memset(&ev, 0, sizeof(ev));
@@ -393,7 +397,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
int i, j, k;
struct idxs {
int idx;
- int seq;
+ int32_t seq;
} idxs[SD_MAX_NODES], t;
struct zk_node N[SD_MAX_NODES];
@@ -403,7 +407,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
for (i=0; i<nr_nodes; i++) {
idxs[i].idx = i;
idxs[i].seq = znodes[i].seq;
- dprintf("zk_nodes[%d], seq:%d, value:%s\n",
+ dprintf("zk_nodes[%d], seq:%010d, value:%s\n",
i, znodes[i].seq, node_to_str(&znodes[i].node));
}
@@ -425,7 +429,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
for (i=0; i<nr_nodes; i++) {
N[i] = znodes[idxs[i].idx];
- dprintf("N[%d], seq:%d, value:%s\n",
+ dprintf("N[%d], seq:%010d, value:%s\n",
i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node));
}
memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes));
@@ -743,7 +747,7 @@ static int zk_join(struct sd_node *myself,
assert(cid != NULL);
this_node.clientid = *cid;
- dprintf("this_seq:%d, clientid:%ld\n", this_node.seq, cid->client_id);
+ dprintf("this_seq:%010d, clientid:%ld\n", this_node.seq, cid->client_id);
sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
dprintf("try to create member path:%s\n", path);
--
1.7.7.6
More information about the sheepdog
mailing list