[Sheepdog] [zookeeper][PATCH v2 08/11] Add code to handle sequence number overflow

Yunkai Zhang yunkai.me at gmail.com
Thu Apr 26 17:21:27 CEST 2012


From: Yunkai Zhang <qiushu.zyk at taobao.com>

The range of sequence number in zookeeper is: [-2^31 ~ (2^31-1)],
We store it in queue_pos variable, and use queue_pos++ to refer
next znode in /sheepdog/queue.

When sheep creates the first znode in /sheepdog/queue, zookeeper
initials the sequence number with 0. Each time we create a new
znode, it's value plus 1. So it will overflow and become a negative
in the future. In fact, it's value just like a circle.

This patch try to handle this problem.

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 sheep/cluster/zookeeper.c |   36 ++++++++++++++++++++----------------
 1 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index d90e27f..a18f9a0 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -65,7 +65,7 @@ enum zk_event_type {
 };
 
 struct zk_node {
-	int seq;
+	int32_t seq;
 	int joined;
 	clientid_t clientid;
 	struct sd_node node;
@@ -206,7 +206,7 @@ static void zk_unlock(zhandle_t *zh)
 /* ZooKeeper-based queue */
 
 static int efd;
-static int queue_pos;
+static int32_t queue_pos;
 
 static int zk_queue_empty(zhandle_t *zh)
 {
@@ -222,9 +222,11 @@ static int zk_queue_empty(zhandle_t *zh)
 	return 1;
 }
 
-static int zk_queue_push(zhandle_t *zh, struct zk_event *ev)
+static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 {
-	int rc, seq, len;
+	static int first_push = 1;
+	int32_t seq;
+	int rc, len;
 	char path[256], buf[256];
 	eventfd_t value = 1;
 
@@ -232,14 +234,14 @@ static int zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 	sprintf(path, "%s/", QUEUE_ZNODE);
 	rc = zk_create(zh, path, (char *)ev, len,
 		&ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
-	dprintf("create path:%s, nr_nodes:%ld, queue_pos:%d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc);
+	dprintf("create path:%s, nr_nodes:%ld, queue_pos:%010d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc);
 	if (rc != ZOK)
 		panic("failed to zk_create path:%s, rc:%d\n", path, rc);
 
-	sscanf(buf, QUEUE_ZNODE "/%010d", &seq);
-	dprintf("path:%s, seq:%d\n", buf, seq);
+	sscanf(buf, QUEUE_ZNODE "/%d", &seq);
+	dprintf("path:%s, seq:%010d\n", buf, seq);
 
-	if (queue_pos < 0) {
+	if (first_push) {
 
 		/* the first pushed data should be EVENT_IGNORE */
 		assert(ev->type == EVENT_IGNORE);
@@ -248,6 +250,8 @@ static int zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 		/* manual notify */
 		dprintf("write event to efd:%d\n", efd);
 		eventfd_write(efd, value);
+
+		first_push = 0;
 	}
 
 	return seq;
@@ -261,14 +265,14 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
 
 	queue_pos--;
 
-	dprintf("queue_pos:%d\n", queue_pos);
+	dprintf("queue_pos:%010d\n", queue_pos);
 
 	if (ev) {
 		/* update the last popped data */
 		len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
 		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
 		rc = zk_set(zh, path, (char *)ev, len, -1);
-		dprintf("update path:%s, queue_pos:%d, len:%d, rc:%d\n", path, queue_pos, len, rc);
+		dprintf("update path:%s, queue_pos:%010d, len:%d, rc:%d\n", path, queue_pos, len, rc);
 		if (rc != ZOK)
 			panic("failed to zk_set path:%s, rc:%d\n", path, rc);
 
@@ -298,7 +302,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
 		rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
 		if (rc == ZOK && node_cmp(&ev->sender.node, &lev->sender.node) == 0 && ev->blocked) {
-			dprintf("this queue_pos:%d have blocked whole cluster, ignore it\n", queue_pos);
+			dprintf("this queue_pos:%010d have blocked whole cluster, ignore it\n", queue_pos);
 			queue_pos++;
 
 			/* watch next data */
@@ -364,7 +368,7 @@ out:
 
 static int zk_queue_seq(zhandle_t *zh)
 {
-	int seq;
+	int32_t seq;
 	struct zk_event ev;
 
 	memset(&ev, 0, sizeof(ev));
@@ -393,7 +397,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
 	int i, j, k;
 	struct idxs {
 		int idx;
-		int seq;
+		int32_t seq;
 	} idxs[SD_MAX_NODES], t;
 	struct zk_node N[SD_MAX_NODES];
 
@@ -403,7 +407,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
 	for (i=0; i<nr_nodes; i++) {
 		idxs[i].idx = i;
 		idxs[i].seq = znodes[i].seq;
-		dprintf("zk_nodes[%d], seq:%d, value:%s\n",
+		dprintf("zk_nodes[%d], seq:%010d, value:%s\n",
 			i, znodes[i].seq, node_to_str(&znodes[i].node));
 	}
 
@@ -425,7 +429,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
 
 	for (i=0; i<nr_nodes; i++) {
 		N[i] = znodes[idxs[i].idx];
-		dprintf("N[%d], seq:%d, value:%s\n",
+		dprintf("N[%d], seq:%010d, value:%s\n",
 			i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node));
 	}
 	memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes));
@@ -743,7 +747,7 @@ static int zk_join(struct sd_node *myself,
 	assert(cid != NULL);
 	this_node.clientid = *cid;
 
-	dprintf("this_seq:%d, clientid:%ld\n", this_node.seq, cid->client_id);
+	dprintf("this_seq:%010d, clientid:%ld\n", this_node.seq, cid->client_id);
 
 	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
 	dprintf("try to create member path:%s\n", path);
-- 
1.7.7.6




More information about the sheepdog mailing list