[sheepdog] [PATCH 1/2] zk_control: add list and remove queue nodes
Ruoyu
liangry at ucweb.com
Wed Jul 16 10:25:31 CEST 2014
Zookeeper occupies more and more memory in our production environment
because the nodes in /sheepdog/queue are never deleted even after
the messages were already handled.
So, we need a tool to list and remove them periodically.
Signed-off-by: Ruoyu <liangry at ucweb.com>
---
tools/zk_control.c | 239 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 239 insertions(+)
diff --git a/tools/zk_control.c b/tools/zk_control.c
index 6b3b136..94c4277 100644
--- a/tools/zk_control.c
+++ b/tools/zk_control.c
@@ -13,6 +13,11 @@
#include <zookeeper/zookeeper.h>
#include <string.h>
+#include <arpa/inet.h>
+
+#include "list.h"
+#include "rbtree.h"
+#include "internal_proto.h"
#define FOR_EACH_ZNODE(parent, path, strs) \
for ((strs)->data += (strs)->count; \
@@ -21,9 +26,98 @@
*--(strs)->data) : (free((strs)->data), 0); \
free(*(strs)->data))
+enum zk_event_type {
+ EVENT_JOIN = 1,
+ EVENT_ACCEPT,
+ EVENT_LEAVE,
+ EVENT_BLOCK,
+ EVENT_UNBLOCK,
+ EVENT_NOTIFY,
+ EVENT_UPDATE_NODE,
+};
+
+struct zk_node {
+ struct list_node list;
+ struct rb_node rb;
+ struct sd_node node;
+ bool callbacked;
+ bool gone;
+};
+
+#define ZK_MAX_BUF_SIZE (1*1024*1024) /* 1M */
+
+struct zk_event {
+ uint64_t id;
+ enum zk_event_type type;
+ struct zk_node sender;
+ size_t msg_len;
+ size_t nr_nodes;
+ size_t buf_len;
+ uint8_t buf[ZK_MAX_BUF_SIZE];
+};
+
static const char *hosts = "127.0.0.1:2181";
static zhandle_t *zk_handle;
+static const char *evtype_to_str(int type)
+{
+ switch (type) {
+ case EVENT_JOIN:
+ return "JOIN";
+ break;
+ case EVENT_ACCEPT:
+ return "ACCEPT";
+ break;
+ case EVENT_LEAVE:
+ return "LEAVE";
+ break;
+ case EVENT_BLOCK:
+ return "BLOCK";
+ break;
+ case EVENT_UNBLOCK:
+ return "UNBLOCK";
+ break;
+ case EVENT_NOTIFY:
+ return "NOTIFY";
+ break;
+ case EVENT_UPDATE_NODE:
+ return "UPDATE_NODE";
+ break;
+ default:
+ return "UNKNOWN";
+ break;
+ }
+}
+
+static const char *addr_to_str(const uint8_t *addr, uint16_t port)
+{
+ static __thread char str[HOST_NAME_MAX + 8];
+ int af = AF_INET6;
+ int addr_start_idx = 0;
+ const char *ret;
+
+ /* Find address family type */
+ if (addr[12]) {
+ int oct_no = 0;
+ while (!addr[oct_no] && oct_no++ < 12)
+ ;
+ if (oct_no == 12) {
+ af = AF_INET;
+ addr_start_idx = 12;
+ }
+ }
+ ret = inet_ntop(af, addr + addr_start_idx, str, sizeof(str));
+ if (unlikely(ret == NULL))
+ fprintf(stderr, "failed to convert addr to string, %m\n");
+
+ if (port) {
+ int len = strlen(str);
+ snprintf(str + len, sizeof(str) - len, ":%d", port);
+ }
+
+ return str;
+}
+
static inline ZOOAPI int zk_delete_node(const char *path)
{
int rc;
@@ -45,6 +139,18 @@ static inline ZOOAPI int zk_get_children(const char *path,
return rc;
}
+static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
+ int *buffer_len, struct Stat *stat)
+{
+ int rc;
+ do {
+ rc = zoo_get(zk_handle, path, 1, (char *)buffer,
+ buffer_len, stat);
+ } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+
+ return rc;
+}
+
static int do_kill(int argc, char **argv)
{
char *path;
@@ -130,6 +236,137 @@ err:
return -1;
}
+static int do_list_queue(int argc, char **argv)
+{
+#define QUEUE_ZONE "/sheepdog/queue"
+ struct String_vector strs;
+ int rc, len, listed = 0;
+ char path[256], str_ctime[128], str_mtime[128];
+ time_t t1, t2;
+ struct tm tm_ctime, tm_mtime;
+ struct zk_event ev;
+ struct Stat stat;
+ int32_t seq;
+ struct node_id *nid;
+
+ fprintf(stdout, " QUEUE ID TYPE"
+ " SENDER MSG LEN NR BUF LEN"
+ " CREATE TIME MODIFY TIME\n");
+ rc = zk_get_children(QUEUE_ZONE, &strs);
+ switch (rc) {
+ case ZOK:
+ FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
+ len = sizeof(struct zk_event);
+ rc = zk_get_data(path, &ev, &len, &stat);
+ if (rc != ZOK) {
+ fprintf(stderr, "failed to get data "
+ "%s, %s\n",
+ path, zerror(rc));
+ goto err;
+ }
+
+ t1 = stat.ctime / 1000;
+ localtime_r(&t1, &tm_ctime);
+ strftime(str_ctime, sizeof(str_ctime),
+ "%Y-%m-%d %H:%M:%S", &tm_ctime);
+
+ t2 = stat.mtime / 1000;
+ localtime_r(&t2, &tm_mtime);
+ strftime(str_mtime, sizeof(str_mtime),
+ "%Y-%m-%d %H:%M:%S", &tm_mtime);
+
+ sscanf(path, QUEUE_ZONE "/%"PRId32, &seq);
+ nid = &ev.sender.node.nid;
+ fprintf(stdout, "%010"PRId32" %016"PRIx64
+ " %12s %21s %7zd %4zd %7zd %s %s\n",
+ seq, ev.id, evtype_to_str(ev.type),
+ addr_to_str(nid->addr, nid->port),
+ ev.msg_len, ev.nr_nodes, ev.buf_len,
+ str_ctime, str_mtime);
+ listed++;
+ }
+ break;
+ default:
+ goto err;
+ }
+
+ fprintf(stdout, "\ntotal nodes: %d\n", listed);
+ return 0;
+err:
+ fprintf(stderr, "failed to list %s, %s\n", QUEUE_ZONE, zerror(rc));
+ return -1;
+}
+
+static int do_remove_queue(int argc, char **argv)
+{
+#define MIN_THRESHOLD 86400
+ struct String_vector strs;
+ int rc, len, threshold, deleted = 0;
+ const char *node = "/sheepdog/queue";
+ char *p, path[256];
+ struct zk_event ev;
+ struct Stat stat;
+ struct timeval tv;
+
+ if (argc != 3) {
+ fprintf(stderr, "remove queue: need specify "
+ "threshold in seconds\n");
+ return -1;
+ }
+
+ threshold = strtol(argv[2], &p, 10);
+ if (p == argv[2]) {
+ fprintf(stderr, "threshold must be a number\n");
+ return -1;
+ }
+ if (threshold < MIN_THRESHOLD) {
+ threshold = MIN_THRESHOLD;
+ fprintf(stdout, "threshold is less than %d seconds, "
+ "set it to %d\n", MIN_THRESHOLD, MIN_THRESHOLD);
+ }
+
+ gettimeofday(&tv, NULL);
+
+ rc = zk_get_children(node, &strs);
+ switch (rc) {
+ case ZOK:
+ FOR_EACH_ZNODE(node, path, &strs) {
+ len = sizeof(struct zk_event);
+ rc = zk_get_data(path, &ev, &len, &stat);
+ if (rc != ZOK) {
+ fprintf(stderr, "failed to get data "
+ "%s, %s\n",
+ path, zerror(rc));
+ goto err;
+ }
+ if (stat.mtime / 1000 >= tv.tv_sec - threshold)
+ continue;
+
+ rc = zk_delete_node(path);
+ if (rc != ZOK) {
+ fprintf(stderr, "failed to delete "
+ "%s, %s\n",
+ path, zerror(rc));
+ goto err;
+ }
+
+ deleted++;
+ if (deleted % 100 == 0)
+ fprintf(stdout, "%d queue nodes are deleted\n",
+ deleted);
+ }
+ break;
+ default:
+ goto err;
+ }
+
+ fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted);
+ return 0;
+err:
+ fprintf(stderr, "failed to list %s, %s\n", node, zerror(rc));
+ return -1;
+}
+
static struct control_handler {
const char *name;
int (*execute)(int, char **);
@@ -137,6 +374,8 @@ static struct control_handler {
} handlers[] = {
{ "kill", do_kill, "Kill the session" },
{ "remove", do_remove, "Remove the node recursively" },
+ { "lqueue", do_list_queue, "List the data in queue node" },
+ { "rqueue", do_remove_queue, "Remove the data in queue node" },
{ NULL, NULL, NULL },
};
--
1.8.3.2
More information about the sheepdog
mailing list