[sheepdog] [PATCH v2 1/2] zk_control: add list and purge queue nodes

Ruoyu liangry at ucweb.com
Tue Jul 22 09:03:15 CEST 2014


On 2014年07月22日 14:28, Hitoshi Mitake wrote:
> At Fri, 18 Jul 2014 15:58:12 +0800,
> Ruoyu wrote:
>> Zookeeper occupies more and more memory in our production environment
>> because the nodes in /sheepdog/queue are never deleted even after
>> the messages were already handled.
>>
>> So, we need a tool to list and purge them periodically.
>>
>> Signed-off-by: Ruoyu <liangry at ucweb.com>
>> ---
>>   tools/zk_control.c | 231 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   1 file changed, 231 insertions(+)
> Applied, thanks. BTW, could you write a brief description of
> difference between versions here from next time?
No problem, thanks.
>
> Thanks,
> Hitoshi
>
>> diff --git a/tools/zk_control.c b/tools/zk_control.c
>> index 6b3b136..53fd1d0 100644
>> --- a/tools/zk_control.c
>> +++ b/tools/zk_control.c
>> @@ -13,6 +13,14 @@
>>   
>>   #include <zookeeper/zookeeper.h>
>>   #include <string.h>
>> +#include <arpa/inet.h>
>> +
>> +#include "list.h"
>> +#include "rbtree.h"
>> +#include "internal_proto.h"
>> +
>> +#define QUEUE_ZONE "/sheepdog/queue"
>> +#define MIN_THRESHOLD 86400
>>   
>>   #define FOR_EACH_ZNODE(parent, path, strs)			       \
>>   	for ((strs)->data += (strs)->count;			       \
>> @@ -21,9 +29,90 @@
>>   			      *--(strs)->data) : (free((strs)->data), 0); \
>>   	     free(*(strs)->data))
>>   
>> +enum zk_event_type {
>> +	EVENT_JOIN = 1,
>> +	EVENT_ACCEPT,
>> +	EVENT_LEAVE,
>> +	EVENT_BLOCK,
>> +	EVENT_UNBLOCK,
>> +	EVENT_NOTIFY,
>> +	EVENT_UPDATE_NODE,
>> +};
>> +
>> +struct zk_node {
>> +	struct list_node list;
>> +	struct rb_node rb;
>> +	struct sd_node node;
>> +	bool callbacked;
>> +	bool gone;
>> +};
>> +
>> +#define ZK_MAX_BUF_SIZE (1*1024*1024) /* 1M */
>> +
>> +struct zk_event {
>> +	uint64_t id;
>> +	enum zk_event_type type;
>> +	struct zk_node sender;
>> +	size_t msg_len;
>> +	size_t nr_nodes;
>> +	size_t buf_len;
>> +	uint8_t buf[ZK_MAX_BUF_SIZE];
>> +};
>> +
>>   static const char *hosts = "127.0.0.1:2181";
>>   static zhandle_t *zk_handle;
>>   
>> +static const char *evtype_to_str(int type)
>> +{
>> +	switch (type) {
>> +	case EVENT_JOIN:
>> +		return "JOIN";
>> +	case EVENT_ACCEPT:
>> +		return "ACCEPT";
>> +	case EVENT_LEAVE:
>> +		return "LEAVE";
>> +	case EVENT_BLOCK:
>> +		return "BLOCK";
>> +	case EVENT_UNBLOCK:
>> +		return "UNBLOCK";
>> +	case EVENT_NOTIFY:
>> +		return "NOTIFY";
>> +	case EVENT_UPDATE_NODE:
>> +		return "UPDATE_NODE";
>> +	default:
>> +		return "UNKNOWN";
>> +	}
>> +}
>> +
>> +static const char *addr_to_str(const uint8_t *addr, uint16_t port)
>> +{
>> +	static __thread char str[HOST_NAME_MAX + 8];
>> +	int af = AF_INET6;
>> +	int addr_start_idx = 0;
>> +	const char *ret;
>> +
>> +	/* Find address family type */
>> +	if (addr[12]) {
>> +		int  oct_no = 0;
>> +		while (!addr[oct_no] && oct_no++ < 12)
>> +			;
>> +		if (oct_no == 12) {
>> +			af = AF_INET;
>> +			addr_start_idx = 12;
>> +		}
>> +	}
>> +	ret = inet_ntop(af, addr + addr_start_idx, str, sizeof(str));
>> +	if (unlikely(ret == NULL))
>> +		fprintf(stderr, "failed to convert addr to string, %m\n");
>> +
>> +	if (port) {
>> +		int  len = strlen(str);
>> +		snprintf(str + len, sizeof(str) - len, ":%d", port);
>> +	}
>> +
>> +	return str;
>> +}
>> +
>>   static inline ZOOAPI int zk_delete_node(const char *path)
>>   {
>>   	int rc;
>> @@ -45,6 +134,18 @@ static inline ZOOAPI int zk_get_children(const char *path,
>>   	return rc;
>>   }
>>   
>> +static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
>> +				     int *buffer_len, struct Stat *stat)
>> +{
>> +	int rc;
>> +	do {
>> +		rc = zoo_get(zk_handle, path, 1, (char *)buffer,
>> +			     buffer_len, stat);
>> +	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
>> +
>> +	return rc;
>> +}
>> +
>>   static int do_kill(int argc, char **argv)
>>   {
>>   	char *path;
>> @@ -130,6 +231,134 @@ err:
>>   	return -1;
>>   }
>>   
>> +static int do_list(int argc, char **argv)
>> +{
>> +	struct String_vector strs;
>> +	int rc, len, total = 0;
>> +	char path[256], str_ctime[128], str_mtime[128];
>> +	time_t t1, t2;
>> +	struct tm tm_ctime, tm_mtime;
>> +	struct zk_event ev;
>> +	struct Stat stat;
>> +	int32_t seq;
>> +	struct node_id *nid;
>> +
>> +	fprintf(stdout, "     QUEUE                ID          TYPE"
>> +		"                 SENDER  MSG LEN    NR  BUF LEN"
>> +		"          CREATE TIME          MODIFY TIME\n");
>> +	rc = zk_get_children(QUEUE_ZONE, &strs);
>> +	switch (rc) {
>> +	case ZOK:
>> +		FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
>> +			len = sizeof(struct zk_event);
>> +			rc = zk_get_data(path, &ev, &len, &stat);
>> +			if (rc != ZOK) {
>> +				fprintf(stderr, "failed to get data "
>> +					"%s, %s\n",
>> +					path, zerror(rc));
>> +				goto err;
>> +			}
>> +
>> +			t1 = stat.ctime / 1000;
>> +			localtime_r(&t1, &tm_ctime);
>> +			strftime(str_ctime, sizeof(str_ctime),
>> +					"%Y-%m-%d %H:%M:%S", &tm_ctime);
>> +
>> +			t2 = stat.mtime / 1000;
>> +			localtime_r(&t2, &tm_mtime);
>> +			strftime(str_mtime, sizeof(str_mtime),
>> +					"%Y-%m-%d %H:%M:%S", &tm_mtime);
>> +
>> +			sscanf(path, QUEUE_ZONE "/%"PRId32, &seq);
>> +			nid = &ev.sender.node.nid;
>> +			fprintf(stdout, "%010"PRId32"  %016"PRIx64
>> +				"  %12s  %21s  %7zd  %4zd  %7zd  %s  %s\n",
>> +				seq, ev.id, evtype_to_str(ev.type),
>> +				addr_to_str(nid->addr, nid->port),
>> +				ev.msg_len, ev.nr_nodes, ev.buf_len,
>> +				str_ctime, str_mtime);
>> +			total++;
>> +		}
>> +		break;
>> +	default:
>> +		goto err;
>> +	}
>> +
>> +	fprintf(stdout, "\ntotal nodes: %d\n", total);
>> +	return 0;
>> +err:
>> +	fprintf(stderr, "failed to list %s, %s\n", QUEUE_ZONE, zerror(rc));
>> +	return -1;
>> +}
>> +
>> +static int do_purge(int argc, char **argv)
>> +{
>> +	struct String_vector strs;
>> +	int rc, len, threshold, deleted = 0;
>> +	char *p, path[256];
>> +	struct zk_event ev;
>> +	struct Stat stat;
>> +	struct timeval tv;
>> +
>> +	if (argc != 3) {
>> +		fprintf(stderr, "remove queue: need specify "
>> +				"threshold in seconds\n");
>> +		return -1;
>> +	}
>> +
>> +	threshold = strtol(argv[2], &p, 10);
>> +	if (p == argv[2]) {
>> +		fprintf(stderr, "threshold must be a number\n");
>> +		return -1;
>> +	}
>> +	if (threshold < MIN_THRESHOLD) {
>> +		threshold = MIN_THRESHOLD;
>> +		fprintf(stdout, "threshold is less than %d seconds, "
>> +			"set it to %d\n", MIN_THRESHOLD, MIN_THRESHOLD);
>> +	}
>> +
>> +	gettimeofday(&tv, NULL);
>> +
>> +	rc = zk_get_children(QUEUE_ZONE, &strs);
>> +	switch (rc) {
>> +	case ZOK:
>> +		FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
>> +			len = sizeof(struct zk_event);
>> +			rc = zk_get_data(path, &ev, &len, &stat);
>> +			if (rc != ZOK) {
>> +				fprintf(stderr, "failed to get data "
>> +					"%s, %s\n",
>> +					path, zerror(rc));
>> +				goto err;
>> +			}
>> +			if (stat.mtime / 1000 >= tv.tv_sec - threshold)
>> +				continue;
>> +
>> +			rc = zk_delete_node(path);
>> +			if (rc != ZOK) {
>> +				fprintf(stderr, "failed to delete "
>> +					"%s, %s\n",
>> +					path, zerror(rc));
>> +				goto err;
>> +			}
>> +
>> +			deleted++;
>> +			if (deleted % 100 == 0)
>> +				fprintf(stdout, "%d queue nodes are deleted\n",
>> +						deleted);
>> +		}
>> +		break;
>> +	default:
>> +		goto err;
>> +	}
>> +
>> +	fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted);
>> +	return 0;
>> +err:
>> +	fprintf(stderr, "failed to purge %s, %s\n", QUEUE_ZONE, zerror(rc));
>> +	return -1;
>> +}
>> +
>>   static struct control_handler {
>>   	const char *name;
>>   	int (*execute)(int, char **);
>> @@ -137,6 +366,8 @@ static struct control_handler {
>>   } handlers[] = {
>>   	{ "kill", do_kill, "Kill the session" },
>>   	{ "remove", do_remove, "Remove the node recursively" },
>> +	{ "list", do_list, "List the data in queue node" },
>> +	{ "purge", do_purge, "Remove the data in queue node" },
>>   	{ NULL, NULL, NULL },
>>   };
>>   
>> -- 
>> 1.8.3.2
>>
>>
>> -- 
>> sheepdog mailing list
>> sheepdog at lists.wpkg.org
>> http://lists.wpkg.org/mailman/listinfo/sheepdog





More information about the sheepdog mailing list