[sheepdog] [PATCH 1/2] zk_control: add list and remove queue nodes

Ruoyu liangry at ucweb.com
Wed Jul 16 10:25:31 CEST 2014


Zookeeper occupies more and more memory in our production environment
because the nodes in /sheepdog/queue are never deleted even after
the messages were already handled.

So, we need a tool to list and remove them periodically.

Signed-off-by: Ruoyu <liangry at ucweb.com>
---
 tools/zk_control.c | 239 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 239 insertions(+)

diff --git a/tools/zk_control.c b/tools/zk_control.c
index 6b3b136..94c4277 100644
--- a/tools/zk_control.c
+++ b/tools/zk_control.c
@@ -13,6 +13,11 @@
 
 #include <zookeeper/zookeeper.h>
 #include <string.h>
+#include <arpa/inet.h>
+
+#include "list.h"
+#include "rbtree.h"
+#include "internal_proto.h"
 
 #define FOR_EACH_ZNODE(parent, path, strs)			       \
 	for ((strs)->data += (strs)->count;			       \
@@ -21,9 +26,98 @@
 			      *--(strs)->data) : (free((strs)->data), 0); \
 	     free(*(strs)->data))
 
+enum zk_event_type {
+	EVENT_JOIN = 1,
+	EVENT_ACCEPT,
+	EVENT_LEAVE,
+	EVENT_BLOCK,
+	EVENT_UNBLOCK,
+	EVENT_NOTIFY,
+	EVENT_UPDATE_NODE,
+};
+
+struct zk_node {
+	struct list_node list;
+	struct rb_node rb;
+	struct sd_node node;
+	bool callbacked;
+	bool gone;
+};
+
+#define ZK_MAX_BUF_SIZE (1*1024*1024) /* 1M */
+
+struct zk_event {
+	uint64_t id;
+	enum zk_event_type type;
+	struct zk_node sender;
+	size_t msg_len;
+	size_t nr_nodes;
+	size_t buf_len;
+	uint8_t buf[ZK_MAX_BUF_SIZE];
+};
+
 static const char *hosts = "127.0.0.1:2181";
 static zhandle_t *zk_handle;
 
+static const char *evtype_to_str(int type)
+{
+	switch (type) {
+	case EVENT_JOIN:
+		return "JOIN";
+		break;
+	case EVENT_ACCEPT:
+		return "ACCEPT";
+		break;
+	case EVENT_LEAVE:
+		return "LEAVE";
+		break;
+	case EVENT_BLOCK:
+		return "BLOCK";
+		break;
+	case EVENT_UNBLOCK:
+		return "UNBLOCK";
+		break;
+	case EVENT_NOTIFY:
+		return "NOTIFY";
+		break;
+	case EVENT_UPDATE_NODE:
+		return "UPDATE_NODE";
+		break;
+	default:
+		return "UNKNOWN";
+		break;
+	}
+}
+
+static const char *addr_to_str(const uint8_t *addr, uint16_t port)
+{
+	static __thread char str[HOST_NAME_MAX + 8];
+	int af = AF_INET6;
+	int addr_start_idx = 0;
+	const char *ret;
+
+	/* Find address family type */
+	if (addr[12]) {
+		int  oct_no = 0;
+		while (!addr[oct_no] && oct_no++ < 12)
+			;
+		if (oct_no == 12) {
+			af = AF_INET;
+			addr_start_idx = 12;
+		}
+	}
+	ret = inet_ntop(af, addr + addr_start_idx, str, sizeof(str));
+	if (unlikely(ret == NULL))
+		fprintf(stderr, "failed to convert addr to string, %m\n");
+
+	if (port) {
+		int  len = strlen(str);
+		snprintf(str + len, sizeof(str) - len, ":%d", port);
+	}
+
+	return str;
+}
+
 static inline ZOOAPI int zk_delete_node(const char *path)
 {
 	int rc;
@@ -45,6 +139,18 @@ static inline ZOOAPI int zk_get_children(const char *path,
 	return rc;
 }
 
+static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
+				     int *buffer_len, struct Stat *stat)
+{
+	int rc;
+	do {
+		rc = zoo_get(zk_handle, path, 1, (char *)buffer,
+			     buffer_len, stat);
+	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+
+	return rc;
+}
+
 static int do_kill(int argc, char **argv)
 {
 	char *path;
@@ -130,6 +236,137 @@ err:
 	return -1;
 }
 
+static int do_list_queue(int argc, char **argv)
+{
+#define QUEUE_ZONE "/sheepdog/queue"
+	struct String_vector strs;
+	int rc, len, listed = 0;
+	char path[256], str_ctime[128], str_mtime[128];
+	time_t t1, t2;
+	struct tm tm_ctime, tm_mtime;
+	struct zk_event ev;
+	struct Stat stat;
+	int32_t seq;
+	struct node_id *nid;
+
+	fprintf(stdout, "     QUEUE                ID          TYPE"
+		"                 SENDER  MSG LEN    NR  BUF LEN"
+		"          CREATE TIME          MODIFY TIME\n");
+	rc = zk_get_children(QUEUE_ZONE, &strs);
+	switch (rc) {
+	case ZOK:
+		FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
+			len = sizeof(struct zk_event);
+			rc = zk_get_data(path, &ev, &len, &stat);
+			if (rc != ZOK) {
+				fprintf(stderr, "failed to get data "
+					"%s, %s\n",
+					path, zerror(rc));
+				goto err;
+			}
+
+			t1 = stat.ctime / 1000;
+			localtime_r(&t1, &tm_ctime);
+			strftime(str_ctime, sizeof(str_ctime),
+					"%Y-%m-%d %H:%M:%S", &tm_ctime);
+
+			t2 = stat.mtime / 1000;
+			localtime_r(&t2, &tm_mtime);
+			strftime(str_mtime, sizeof(str_mtime),
+					"%Y-%m-%d %H:%M:%S", &tm_mtime);
+
+			sscanf(path, QUEUE_ZONE "/%"PRId32, &seq);
+			nid = &ev.sender.node.nid;
+			fprintf(stdout, "%010"PRId32"  %016"PRIx64
+				"  %12s  %21s  %7zd  %4zd  %7zd  %s  %s\n",
+				seq, ev.id, evtype_to_str(ev.type),
+				addr_to_str(nid->addr, nid->port),
+				ev.msg_len, ev.nr_nodes, ev.buf_len,
+				str_ctime, str_mtime);
+			listed++;
+		}
+		break;
+	default:
+		goto err;
+	}
+
+	fprintf(stdout, "\ntotal nodes: %d\n", listed);
+	return 0;
+err:
+	fprintf(stderr, "failed to list %s, %s\n", QUEUE_ZONE, zerror(rc));
+	return -1;
+}
+
+static int do_remove_queue(int argc, char **argv)
+{
+#define MIN_THRESHOLD 86400
+	struct String_vector strs;
+	int rc, len, threshold, deleted = 0;
+	const char *node = "/sheepdog/queue";
+	char *p, path[256];
+	struct zk_event ev;
+	struct Stat stat;
+	struct timeval tv;
+
+	if (argc != 3) {
+		fprintf(stderr, "remove queue: need specify "
+				"threshold in seconds\n");
+		return -1;
+	}
+
+	threshold = strtol(argv[2], &p, 10);
+	if (p == argv[2]) {
+		fprintf(stderr, "threshold must be a number\n");
+		return -1;
+	}
+	if (threshold < MIN_THRESHOLD) {
+		threshold = MIN_THRESHOLD;
+		fprintf(stdout, "threshold is less than %d seconds, "
+			"set it to %d\n", MIN_THRESHOLD, MIN_THRESHOLD);
+	}
+
+	gettimeofday(&tv, NULL);
+
+	rc = zk_get_children(node, &strs);
+	switch (rc) {
+	case ZOK:
+		FOR_EACH_ZNODE(node, path, &strs) {
+			len = sizeof(struct zk_event);
+			rc = zk_get_data(path, &ev, &len, &stat);
+			if (rc != ZOK) {
+				fprintf(stderr, "failed to get data "
+					"%s, %s\n",
+					path, zerror(rc));
+				goto err;
+			}
+			if (stat.mtime / 1000 >= tv.tv_sec - threshold)
+				continue;
+
+			rc = zk_delete_node(path);
+			if (rc != ZOK) {
+				fprintf(stderr, "failed to delete "
+					"%s, %s\n",
+					path, zerror(rc));
+				goto err;
+			}
+
+			deleted++;
+			if (deleted % 100 == 0)
+				fprintf(stdout, "%d queue nodes are deleted\n",
+						deleted);
+		}
+		break;
+	default:
+		goto err;
+	}
+
+	fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted);
+	return 0;
+err:
+	fprintf(stderr, "failed to list %s, %s\n", node, zerror(rc));
+	return -1;
+}
+
 static struct control_handler {
 	const char *name;
 	int (*execute)(int, char **);
@@ -137,6 +374,8 @@ static struct control_handler {
 } handlers[] = {
 	{ "kill", do_kill, "Kill the session" },
 	{ "remove", do_remove, "Remove the node recursively" },
+	{ "lqueue", do_list_queue, "List the data in queue node" },
+	{ "rqueue", do_remove_queue, "Remove the data in queue node" },
 	{ NULL, NULL, NULL },
 };
 
-- 
1.8.3.2





More information about the sheepdog mailing list