[sheepdog] [PATCH v2 1/2] zk_control: add list and purge queue nodes

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Tue Jul 22 08:28:22 CEST 2014


At Fri, 18 Jul 2014 15:58:12 +0800,
Ruoyu wrote:
> 
> Zookeeper occupies more and more memory in our production environment
> because the nodes in /sheepdog/queue are never deleted even after
> the messages were already handled.
> 
> So, we need a tool to list and purge them periodically.
> 
> Signed-off-by: Ruoyu <liangry at ucweb.com>
> ---
>  tools/zk_control.c | 231 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 231 insertions(+)

Applied, thanks. BTW, could you write a brief description of
difference between versions here from next time?

Thanks,
Hitoshi

> 
> diff --git a/tools/zk_control.c b/tools/zk_control.c
> index 6b3b136..53fd1d0 100644
> --- a/tools/zk_control.c
> +++ b/tools/zk_control.c
> @@ -13,6 +13,14 @@
>  
>  #include <zookeeper/zookeeper.h>
>  #include <string.h>
> +#include <arpa/inet.h>
> +
> +#include "list.h"
> +#include "rbtree.h"
> +#include "internal_proto.h"
> +
> +#define QUEUE_ZONE "/sheepdog/queue"
> +#define MIN_THRESHOLD 86400
>  
>  #define FOR_EACH_ZNODE(parent, path, strs)			       \
>  	for ((strs)->data += (strs)->count;			       \
> @@ -21,9 +29,90 @@
>  			      *--(strs)->data) : (free((strs)->data), 0); \
>  	     free(*(strs)->data))
>  
> +enum zk_event_type {
> +	EVENT_JOIN = 1,
> +	EVENT_ACCEPT,
> +	EVENT_LEAVE,
> +	EVENT_BLOCK,
> +	EVENT_UNBLOCK,
> +	EVENT_NOTIFY,
> +	EVENT_UPDATE_NODE,
> +};
> +
> +struct zk_node {
> +	struct list_node list;
> +	struct rb_node rb;
> +	struct sd_node node;
> +	bool callbacked;
> +	bool gone;
> +};
> +
> +#define ZK_MAX_BUF_SIZE (1*1024*1024) /* 1M */
> +
> +struct zk_event {
> +	uint64_t id;
> +	enum zk_event_type type;
> +	struct zk_node sender;
> +	size_t msg_len;
> +	size_t nr_nodes;
> +	size_t buf_len;
> +	uint8_t buf[ZK_MAX_BUF_SIZE];
> +};
> +
>  static const char *hosts = "127.0.0.1:2181";
>  static zhandle_t *zk_handle;
>  
> +static const char *evtype_to_str(int type)
> +{
> +	switch (type) {
> +	case EVENT_JOIN:
> +		return "JOIN";
> +	case EVENT_ACCEPT:
> +		return "ACCEPT";
> +	case EVENT_LEAVE:
> +		return "LEAVE";
> +	case EVENT_BLOCK:
> +		return "BLOCK";
> +	case EVENT_UNBLOCK:
> +		return "UNBLOCK";
> +	case EVENT_NOTIFY:
> +		return "NOTIFY";
> +	case EVENT_UPDATE_NODE:
> +		return "UPDATE_NODE";
> +	default:
> +		return "UNKNOWN";
> +	}
> +}
> +
> +static const char *addr_to_str(const uint8_t *addr, uint16_t port)
> +{
> +	static __thread char str[HOST_NAME_MAX + 8];
> +	int af = AF_INET6;
> +	int addr_start_idx = 0;
> +	const char *ret;
> +
> +	/* Find address family type */
> +	if (addr[12]) {
> +		int  oct_no = 0;
> +		while (!addr[oct_no] && oct_no++ < 12)
> +			;
> +		if (oct_no == 12) {
> +			af = AF_INET;
> +			addr_start_idx = 12;
> +		}
> +	}
> +	ret = inet_ntop(af, addr + addr_start_idx, str, sizeof(str));
> +	if (unlikely(ret == NULL))
> +		fprintf(stderr, "failed to convert addr to string, %m\n");
> +
> +	if (port) {
> +		int  len = strlen(str);
> +		snprintf(str + len, sizeof(str) - len, ":%d", port);
> +	}
> +
> +	return str;
> +}
> +
>  static inline ZOOAPI int zk_delete_node(const char *path)
>  {
>  	int rc;
> @@ -45,6 +134,18 @@ static inline ZOOAPI int zk_get_children(const char *path,
>  	return rc;
>  }
>  
> +static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
> +				     int *buffer_len, struct Stat *stat)
> +{
> +	int rc;
> +	do {
> +		rc = zoo_get(zk_handle, path, 1, (char *)buffer,
> +			     buffer_len, stat);
> +	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
> +
> +	return rc;
> +}
> +
>  static int do_kill(int argc, char **argv)
>  {
>  	char *path;
> @@ -130,6 +231,134 @@ err:
>  	return -1;
>  }
>  
> +static int do_list(int argc, char **argv)
> +{
> +	struct String_vector strs;
> +	int rc, len, total = 0;
> +	char path[256], str_ctime[128], str_mtime[128];
> +	time_t t1, t2;
> +	struct tm tm_ctime, tm_mtime;
> +	struct zk_event ev;
> +	struct Stat stat;
> +	int32_t seq;
> +	struct node_id *nid;
> +
> +	fprintf(stdout, "     QUEUE                ID          TYPE"
> +		"                 SENDER  MSG LEN    NR  BUF LEN"
> +		"          CREATE TIME          MODIFY TIME\n");
> +	rc = zk_get_children(QUEUE_ZONE, &strs);
> +	switch (rc) {
> +	case ZOK:
> +		FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
> +			len = sizeof(struct zk_event);
> +			rc = zk_get_data(path, &ev, &len, &stat);
> +			if (rc != ZOK) {
> +				fprintf(stderr, "failed to get data "
> +					"%s, %s\n",
> +					path, zerror(rc));
> +				goto err;
> +			}
> +
> +			t1 = stat.ctime / 1000;
> +			localtime_r(&t1, &tm_ctime);
> +			strftime(str_ctime, sizeof(str_ctime),
> +					"%Y-%m-%d %H:%M:%S", &tm_ctime);
> +
> +			t2 = stat.mtime / 1000;
> +			localtime_r(&t2, &tm_mtime);
> +			strftime(str_mtime, sizeof(str_mtime),
> +					"%Y-%m-%d %H:%M:%S", &tm_mtime);
> +
> +			sscanf(path, QUEUE_ZONE "/%"PRId32, &seq);
> +			nid = &ev.sender.node.nid;
> +			fprintf(stdout, "%010"PRId32"  %016"PRIx64
> +				"  %12s  %21s  %7zd  %4zd  %7zd  %s  %s\n",
> +				seq, ev.id, evtype_to_str(ev.type),
> +				addr_to_str(nid->addr, nid->port),
> +				ev.msg_len, ev.nr_nodes, ev.buf_len,
> +				str_ctime, str_mtime);
> +			total++;
> +		}
> +		break;
> +	default:
> +		goto err;
> +	}
> +
> +	fprintf(stdout, "\ntotal nodes: %d\n", total);
> +	return 0;
> +err:
> +	fprintf(stderr, "failed to list %s, %s\n", QUEUE_ZONE, zerror(rc));
> +	return -1;
> +}
> +
> +static int do_purge(int argc, char **argv)
> +{
> +	struct String_vector strs;
> +	int rc, len, threshold, deleted = 0;
> +	char *p, path[256];
> +	struct zk_event ev;
> +	struct Stat stat;
> +	struct timeval tv;
> +
> +	if (argc != 3) {
> +		fprintf(stderr, "remove queue: need specify "
> +				"threshold in seconds\n");
> +		return -1;
> +	}
> +
> +	threshold = strtol(argv[2], &p, 10);
> +	if (p == argv[2]) {
> +		fprintf(stderr, "threshold must be a number\n");
> +		return -1;
> +	}
> +	if (threshold < MIN_THRESHOLD) {
> +		threshold = MIN_THRESHOLD;
> +		fprintf(stdout, "threshold is less than %d seconds, "
> +			"set it to %d\n", MIN_THRESHOLD, MIN_THRESHOLD);
> +	}
> +
> +	gettimeofday(&tv, NULL);
> +
> +	rc = zk_get_children(QUEUE_ZONE, &strs);
> +	switch (rc) {
> +	case ZOK:
> +		FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
> +			len = sizeof(struct zk_event);
> +			rc = zk_get_data(path, &ev, &len, &stat);
> +			if (rc != ZOK) {
> +				fprintf(stderr, "failed to get data "
> +					"%s, %s\n",
> +					path, zerror(rc));
> +				goto err;
> +			}
> +			if (stat.mtime / 1000 >= tv.tv_sec - threshold)
> +				continue;
> +
> +			rc = zk_delete_node(path);
> +			if (rc != ZOK) {
> +				fprintf(stderr, "failed to delete "
> +					"%s, %s\n",
> +					path, zerror(rc));
> +				goto err;
> +			}
> +
> +			deleted++;
> +			if (deleted % 100 == 0)
> +				fprintf(stdout, "%d queue nodes are deleted\n",
> +						deleted);
> +		}
> +		break;
> +	default:
> +		goto err;
> +	}
> +
> +	fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted);
> +	return 0;
> +err:
> +	fprintf(stderr, "failed to purge %s, %s\n", QUEUE_ZONE, zerror(rc));
> +	return -1;
> +}
> +
>  static struct control_handler {
>  	const char *name;
>  	int (*execute)(int, char **);
> @@ -137,6 +366,8 @@ static struct control_handler {
>  } handlers[] = {
>  	{ "kill", do_kill, "Kill the session" },
>  	{ "remove", do_remove, "Remove the node recursively" },
> +	{ "list", do_list, "List the data in queue node" },
> +	{ "purge", do_purge, "Remove the data in queue node" },
>  	{ NULL, NULL, NULL },
>  };
>  
> -- 
> 1.8.3.2
> 
> 
> -- 
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog



More information about the sheepdog mailing list