[sheepdog] [PATCH] zookeeper & tools/zk_control: add delete subcommand to remove queue znodes

Meng Lingkun starmlk at 163.com
Mon Mar 9 04:12:54 CET 2015


From: Meng Lingkun <menglingkun at cmss.chinamobile.com>

Zookeeper cluster driver can't and doesn't delete the znode in queue
after using it. It makes zookeeper occupy much more memory. And it makes
troubles when zookeeper client reconnects to zookeeper server.
refenrence: https://issues.apache.org/jira/browse/ZOOKEEPER-706
Subcommad delete provides a better way than purge subcommand:
1. sheep update it's queue_pos to zk_server;
2. zk_control find the minimum of all sheeps' queue_pos, and removes
   znodes seq < minimum;
3. zk_control can run in crond to limit the queue znode.

Signed-off-by: Meng Lingkun <menglingkun at cmss.chinamobile.com>
---
 sheep/cluster/zookeeper.c | 32 +++++++++++++++++
 tools/zk_control.c        | 88 ++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 119 insertions(+), 1 deletion(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index bf94871..b346592 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -32,6 +32,7 @@
 #define MEMBER_ZNODE "/member"
 #define MASTER_ZNODE "/master"
 #define LOCK_ZNODE "/lock"
+#define QUEUE_POS_ZNODE "/queue_pos"
 
 static int zk_timeout = SESSION_TIMEOUT;
 static int my_master_seq;
@@ -452,6 +453,7 @@ static void lock_table_remove_znodes(void)
 /* ZooKeeper-based queue give us an totally ordered events */
 static int efd;
 static int32_t queue_pos;
+#define QUEUE_DEL_BATCH 1000
 
 static int zk_queue_peek(bool *peek)
 {
@@ -580,6 +582,7 @@ static int zk_queue_pop_advance(struct zk_event *ev)
 {
 	int len;
 	char path[MAX_NODE_STR_LEN];
+	char queue_pos_path[MAX_NODE_STR_LEN];
 
 	len = sizeof(*ev);
 	snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
@@ -587,6 +590,16 @@ static int zk_queue_pop_advance(struct zk_event *ev)
 	RETURN_IF_ERROR(zk_get_data(path, ev, &len), "path %s", path);
 	sd_debug("%s, type:%d, len:%d, pos:%" PRId32, path, ev->type, len,
 		 queue_pos);
+
+	if (queue_pos % QUEUE_DEL_BATCH == 0 && ev->type != EVENT_JOIN) {
+		snprintf(queue_pos_path, sizeof(queue_pos_path),
+			QUEUE_POS_ZNODE"/%s", node_to_str(&this_node.node));
+		RETURN_IF_ERROR(zk_set_data(queue_pos_path, (char *)&queue_pos,
+			sizeof(queue_pos), -1), "");
+		sd_debug("update queue pos %s to pos %" PRId32,
+			queue_pos_path, queue_pos);
+	}
+
 	queue_pos++;
 	return ZOK;
 }
@@ -644,6 +657,8 @@ static int zk_queue_init(void)
 	RETURN_IF_ERROR(zk_init_node(MASTER_ZNODE), "path %s", MASTER_ZNODE);
 	RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE);
 	RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE);
+	RETURN_IF_ERROR(zk_init_node(QUEUE_POS_ZNODE), "path %s",
+		QUEUE_POS_ZNODE);
 	return ZOK;
 }
 
@@ -749,6 +764,12 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 			return;
 		}
 
+		ret = sscanf(path, QUEUE_ZNODE "/%s", str);
+		if (ret == 1) {
+			sd_debug("deleted queue %s", str);
+			return;
+		}
+
 		ret = sscanf(path, MEMBER_ZNODE "/%s", str);
 		if (ret != 1)
 			return;
@@ -1085,6 +1106,8 @@ static void init_node_list(struct zk_event *ev)
 static void zk_handle_accept(struct zk_event *ev)
 {
 	char path[MAX_NODE_STR_LEN];
+	char queue_pos_path[MAX_NODE_STR_LEN];
+	uint32_t pos = -1;
 	int rc;
 
 	sd_debug("ACCEPT");
@@ -1096,6 +1119,8 @@ static void zk_handle_accept(struct zk_event *ev)
 
 	snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
 		 node_to_str(&ev->sender.node));
+	snprintf(queue_pos_path, sizeof(queue_pos_path), QUEUE_POS_ZNODE"/%s",
+		node_to_str(&ev->sender.node));
 	if (node_eq(&ev->sender.node, &this_node.node)) {
 		joined = true;
 		sd_debug("create path:%s", path);
@@ -1105,6 +1130,13 @@ static void zk_handle_accept(struct zk_event *ev)
 				    &ZOO_OPEN_ACL_UNSAFE,
 				    ZOO_EPHEMERAL, NULL, 0);
 		RETURN_VOID_IF_ERROR(rc, "");
+		sd_debug("create path:%s", queue_pos_path);
+		rc = zk_create_node(queue_pos_path,
+				    (char *)&pos,
+				    sizeof(pos),
+				    &ZOO_OPEN_ACL_UNSAFE,
+				    ZOO_EPHEMERAL, NULL, 0);
+		RETURN_VOID_IF_ERROR(rc, "");
 	} else
 		zk_node_exists(path);
 
diff --git a/tools/zk_control.c b/tools/zk_control.c
index 3313bef..15c2dac 100644
--- a/tools/zk_control.c
+++ b/tools/zk_control.c
@@ -21,6 +21,7 @@
 #include "internal_proto.h"
 
 #define QUEUE_ZNODE "/sheepdog/queue"
+#define QUEUE_POS_ZNODE "/sheepdog/queue_pos"
 #define MIN_THRESHOLD 86400
 
 #define FOR_EACH_ZNODE(parent, path, strs)			       \
@@ -369,6 +370,90 @@ err:
 	return -1;
 }
 
+static int do_delete(int argc, char **argv)
+{
+	struct String_vector strs;
+	int rc, len, deleted = 0;
+	int32_t pos, min_pos = INT32_MAX;
+	char path[256];
+	struct zk_event ev;
+	struct Stat stat;
+	if (argc != 2) {
+		fprintf(stderr, "remove queue, no more arguments\n");
+		return -1;
+	}
+
+	rc = zk_get_children(QUEUE_POS_ZNODE, &strs);
+	switch (rc) {
+	case ZOK:
+		FOR_EACH_ZNODE(QUEUE_POS_ZNODE, path, &strs) {
+			len = sizeof(int32_t);
+			rc = zk_get_data(path, &pos, &len, &stat);
+			if (rc != ZOK) {
+				fprintf(stderr, "failed to get data "
+					"%s, %s\n",
+					path, zerror(rc));
+				goto err;
+			}
+
+			if (pos < min_pos && pos != -1)
+				min_pos = pos;
+		}
+		break;
+	default:
+		goto err;
+	}
+
+	fprintf(stdout, "queue nodes seq < %d will be deleted\n", min_pos);
+
+	if (min_pos == INT32_MAX) {
+		fprintf(stdout, "no queue nodes to be deleted\n");
+		return 0;
+	}
+
+	rc = zk_get_children(QUEUE_ZNODE, &strs);
+	fprintf(stdout, "There are %d znode in queue\n", strs.count);
+	switch (rc) {
+	case ZOK:
+		FOR_EACH_ZNODE(QUEUE_ZNODE, path, &strs) {
+			len = sizeof(struct zk_event);
+			rc = zk_get_data(path, &ev, &len, &stat);
+			if (rc != ZOK) {
+				fprintf(stderr, "failed to get data "
+					"%s, %s\n",
+					path, zerror(rc));
+				goto err;
+			}
+
+			sscanf(path, QUEUE_ZNODE "/%"PRId32, &pos);
+			if (pos >= min_pos)
+				continue;
+
+			rc = zk_delete_node(path);
+			if (rc != ZOK) {
+				fprintf(stderr, "failed to delete "
+					"%s, %s\n",
+					path, zerror(rc));
+				goto err;
+			}
+
+			deleted++;
+			if (deleted % 100 == 0)
+				fprintf(stdout, "%d queue nodes are deleted\n",
+						deleted);
+		}
+		break;
+	default:
+		goto err;
+	}
+
+	fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted);
+	return 0;
+err:
+	fprintf(stderr, "failed to delete %s, %s\n", QUEUE_ZNODE, zerror(rc));
+	return -1;
+}
+
 static struct control_handler {
 	const char *name;
 	int (*execute)(int, char **);
@@ -377,7 +462,8 @@ static struct control_handler {
 	{ "kill", do_kill, "Kill the session" },
 	{ "remove", do_remove, "Remove the node recursively" },
 	{ "list", do_list, "List the data in queue node" },
-	{ "purge", do_purge, "Remove the data in queue node" },
+	{ "purge", do_purge, "Remove the data in queue node by time" },
+	{ "delete", do_delete, "Remove the data in queue node not used" },
 	{ NULL, NULL, NULL },
 };
 
-- 
1.9.1





More information about the sheepdog mailing list