[sheepdog] [PATCH 1/2] zookeeper: update queue_pos to zookeeper-server

Meng Lingkun starmlk at 163.com
Mon Mar 9 05:55:49 CET 2015


From: Meng Lingkun <menglingkun at cmss.chinamobile.com>

When sheep connects to zookeeper server, create and initialize
queue_pos/$sheep_id. And in the running time, zookeeper driver
will update its queue_pos to zookeeper-server.

Signed-off-by: Meng Lingkun <menglingkun at cmss.chinamobile.com>
---
 sheep/cluster/zookeeper.c | 32 ++++++++++++++++++++++++++++++++
 1 file changed, 32 insertions(+)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index bf94871..b346592 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -32,6 +32,7 @@
 #define MEMBER_ZNODE "/member"
 #define MASTER_ZNODE "/master"
 #define LOCK_ZNODE "/lock"
+#define QUEUE_POS_ZNODE "/queue_pos"
 
 static int zk_timeout = SESSION_TIMEOUT;
 static int my_master_seq;
@@ -452,6 +453,7 @@ static void lock_table_remove_znodes(void)
 /* ZooKeeper-based queue give us an totally ordered events */
 static int efd;
 static int32_t queue_pos;
+#define QUEUE_DEL_BATCH 1000
 
 static int zk_queue_peek(bool *peek)
 {
@@ -580,6 +582,7 @@ static int zk_queue_pop_advance(struct zk_event *ev)
 {
 	int len;
 	char path[MAX_NODE_STR_LEN];
+	char queue_pos_path[MAX_NODE_STR_LEN];
 
 	len = sizeof(*ev);
 	snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
@@ -587,6 +590,16 @@ static int zk_queue_pop_advance(struct zk_event *ev)
 	RETURN_IF_ERROR(zk_get_data(path, ev, &len), "path %s", path);
 	sd_debug("%s, type:%d, len:%d, pos:%" PRId32, path, ev->type, len,
 		 queue_pos);
+
+	if (queue_pos % QUEUE_DEL_BATCH == 0 && ev->type != EVENT_JOIN) {
+		snprintf(queue_pos_path, sizeof(queue_pos_path),
+			QUEUE_POS_ZNODE"/%s", node_to_str(&this_node.node));
+		RETURN_IF_ERROR(zk_set_data(queue_pos_path, (char *)&queue_pos,
+			sizeof(queue_pos), -1), "");
+		sd_debug("update queue pos %s to pos %" PRId32,
+			queue_pos_path, queue_pos);
+	}
+
 	queue_pos++;
 	return ZOK;
 }
@@ -644,6 +657,8 @@ static int zk_queue_init(void)
 	RETURN_IF_ERROR(zk_init_node(MASTER_ZNODE), "path %s", MASTER_ZNODE);
 	RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE);
 	RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE);
+	RETURN_IF_ERROR(zk_init_node(QUEUE_POS_ZNODE), "path %s",
+		QUEUE_POS_ZNODE);
 	return ZOK;
 }
 
@@ -749,6 +764,12 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 			return;
 		}
 
+		ret = sscanf(path, QUEUE_ZNODE "/%s", str);
+		if (ret == 1) {
+			sd_debug("deleted queue %s", str);
+			return;
+		}
+
 		ret = sscanf(path, MEMBER_ZNODE "/%s", str);
 		if (ret != 1)
 			return;
@@ -1085,6 +1106,8 @@ static void init_node_list(struct zk_event *ev)
 static void zk_handle_accept(struct zk_event *ev)
 {
 	char path[MAX_NODE_STR_LEN];
+	char queue_pos_path[MAX_NODE_STR_LEN];
+	uint32_t pos = -1;
 	int rc;
 
 	sd_debug("ACCEPT");
@@ -1096,6 +1119,8 @@ static void zk_handle_accept(struct zk_event *ev)
 
 	snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
 		 node_to_str(&ev->sender.node));
+	snprintf(queue_pos_path, sizeof(queue_pos_path), QUEUE_POS_ZNODE"/%s",
+		node_to_str(&ev->sender.node));
 	if (node_eq(&ev->sender.node, &this_node.node)) {
 		joined = true;
 		sd_debug("create path:%s", path);
@@ -1105,6 +1130,13 @@ static void zk_handle_accept(struct zk_event *ev)
 				    &ZOO_OPEN_ACL_UNSAFE,
 				    ZOO_EPHEMERAL, NULL, 0);
 		RETURN_VOID_IF_ERROR(rc, "");
+		sd_debug("create path:%s", queue_pos_path);
+		rc = zk_create_node(queue_pos_path,
+				    (char *)&pos,
+				    sizeof(pos),
+				    &ZOO_OPEN_ACL_UNSAFE,
+				    ZOO_EPHEMERAL, NULL, 0);
+		RETURN_VOID_IF_ERROR(rc, "");
 	} else
 		zk_node_exists(path);
 
-- 
1.9.1





More information about the sheepdog mailing list