[sheepdog] [PATCH v2 1/2] zk_control: add list and purge queue nodes
Ruoyu
liangry at ucweb.com
Tue Jul 22 09:03:15 CEST 2014
On 2014年07月22日 14:28, Hitoshi Mitake wrote:
> At Fri, 18 Jul 2014 15:58:12 +0800,
> Ruoyu wrote:
>> Zookeeper occupies more and more memory in our production environment
>> because the nodes in /sheepdog/queue are never deleted even after
>> the messages were already handled.
>>
>> So, we need a tool to list and purge them periodically.
>>
>> Signed-off-by: Ruoyu <liangry at ucweb.com>
>> ---
>> tools/zk_control.c | 231 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>> 1 file changed, 231 insertions(+)
> Applied, thanks. BTW, could you write a brief description of
> difference between versions here from next time?
No problem, thanks.
>
> Thanks,
> Hitoshi
>
>> diff --git a/tools/zk_control.c b/tools/zk_control.c
>> index 6b3b136..53fd1d0 100644
>> --- a/tools/zk_control.c
>> +++ b/tools/zk_control.c
>> @@ -13,6 +13,14 @@
>>
>> #include <zookeeper/zookeeper.h>
>> #include <string.h>
>> +#include <arpa/inet.h>
>> +
>> +#include "list.h"
>> +#include "rbtree.h"
>> +#include "internal_proto.h"
>> +
>> +#define QUEUE_ZONE "/sheepdog/queue"
>> +#define MIN_THRESHOLD 86400
>>
>> #define FOR_EACH_ZNODE(parent, path, strs) \
>> for ((strs)->data += (strs)->count; \
>> @@ -21,9 +29,90 @@
>> *--(strs)->data) : (free((strs)->data), 0); \
>> free(*(strs)->data))
>>
>> +enum zk_event_type {
>> + EVENT_JOIN = 1,
>> + EVENT_ACCEPT,
>> + EVENT_LEAVE,
>> + EVENT_BLOCK,
>> + EVENT_UNBLOCK,
>> + EVENT_NOTIFY,
>> + EVENT_UPDATE_NODE,
>> +};
>> +
>> +struct zk_node {
>> + struct list_node list;
>> + struct rb_node rb;
>> + struct sd_node node;
>> + bool callbacked;
>> + bool gone;
>> +};
>> +
>> +#define ZK_MAX_BUF_SIZE (1*1024*1024) /* 1M */
>> +
>> +struct zk_event {
>> + uint64_t id;
>> + enum zk_event_type type;
>> + struct zk_node sender;
>> + size_t msg_len;
>> + size_t nr_nodes;
>> + size_t buf_len;
>> + uint8_t buf[ZK_MAX_BUF_SIZE];
>> +};
>> +
>> static const char *hosts = "127.0.0.1:2181";
>> static zhandle_t *zk_handle;
>>
>> +static const char *evtype_to_str(int type)
>> +{
>> + switch (type) {
>> + case EVENT_JOIN:
>> + return "JOIN";
>> + case EVENT_ACCEPT:
>> + return "ACCEPT";
>> + case EVENT_LEAVE:
>> + return "LEAVE";
>> + case EVENT_BLOCK:
>> + return "BLOCK";
>> + case EVENT_UNBLOCK:
>> + return "UNBLOCK";
>> + case EVENT_NOTIFY:
>> + return "NOTIFY";
>> + case EVENT_UPDATE_NODE:
>> + return "UPDATE_NODE";
>> + default:
>> + return "UNKNOWN";
>> + }
>> +}
>> +
>> +static const char *addr_to_str(const uint8_t *addr, uint16_t port)
>> +{
>> + static __thread char str[HOST_NAME_MAX + 8];
>> + int af = AF_INET6;
>> + int addr_start_idx = 0;
>> + const char *ret;
>> +
>> + /* Find address family type */
>> + if (addr[12]) {
>> + int oct_no = 0;
>> + while (!addr[oct_no] && oct_no++ < 12)
>> + ;
>> + if (oct_no == 12) {
>> + af = AF_INET;
>> + addr_start_idx = 12;
>> + }
>> + }
>> + ret = inet_ntop(af, addr + addr_start_idx, str, sizeof(str));
>> + if (unlikely(ret == NULL))
>> + fprintf(stderr, "failed to convert addr to string, %m\n");
>> +
>> + if (port) {
>> + int len = strlen(str);
>> + snprintf(str + len, sizeof(str) - len, ":%d", port);
>> + }
>> +
>> + return str;
>> +}
>> +
>> static inline ZOOAPI int zk_delete_node(const char *path)
>> {
>> int rc;
>> @@ -45,6 +134,18 @@ static inline ZOOAPI int zk_get_children(const char *path,
>> return rc;
>> }
>>
>> +static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
>> + int *buffer_len, struct Stat *stat)
>> +{
>> + int rc;
>> + do {
>> + rc = zoo_get(zk_handle, path, 1, (char *)buffer,
>> + buffer_len, stat);
>> + } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
>> +
>> + return rc;
>> +}
>> +
>> static int do_kill(int argc, char **argv)
>> {
>> char *path;
>> @@ -130,6 +231,134 @@ err:
>> return -1;
>> }
>>
>> +static int do_list(int argc, char **argv)
>> +{
>> + struct String_vector strs;
>> + int rc, len, total = 0;
>> + char path[256], str_ctime[128], str_mtime[128];
>> + time_t t1, t2;
>> + struct tm tm_ctime, tm_mtime;
>> + struct zk_event ev;
>> + struct Stat stat;
>> + int32_t seq;
>> + struct node_id *nid;
>> +
>> + fprintf(stdout, " QUEUE ID TYPE"
>> + " SENDER MSG LEN NR BUF LEN"
>> + " CREATE TIME MODIFY TIME\n");
>> + rc = zk_get_children(QUEUE_ZONE, &strs);
>> + switch (rc) {
>> + case ZOK:
>> + FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
>> + len = sizeof(struct zk_event);
>> + rc = zk_get_data(path, &ev, &len, &stat);
>> + if (rc != ZOK) {
>> + fprintf(stderr, "failed to get data "
>> + "%s, %s\n",
>> + path, zerror(rc));
>> + goto err;
>> + }
>> +
>> + t1 = stat.ctime / 1000;
>> + localtime_r(&t1, &tm_ctime);
>> + strftime(str_ctime, sizeof(str_ctime),
>> + "%Y-%m-%d %H:%M:%S", &tm_ctime);
>> +
>> + t2 = stat.mtime / 1000;
>> + localtime_r(&t2, &tm_mtime);
>> + strftime(str_mtime, sizeof(str_mtime),
>> + "%Y-%m-%d %H:%M:%S", &tm_mtime);
>> +
>> + sscanf(path, QUEUE_ZONE "/%"PRId32, &seq);
>> + nid = &ev.sender.node.nid;
>> + fprintf(stdout, "%010"PRId32" %016"PRIx64
>> + " %12s %21s %7zd %4zd %7zd %s %s\n",
>> + seq, ev.id, evtype_to_str(ev.type),
>> + addr_to_str(nid->addr, nid->port),
>> + ev.msg_len, ev.nr_nodes, ev.buf_len,
>> + str_ctime, str_mtime);
>> + total++;
>> + }
>> + break;
>> + default:
>> + goto err;
>> + }
>> +
>> + fprintf(stdout, "\ntotal nodes: %d\n", total);
>> + return 0;
>> +err:
>> + fprintf(stderr, "failed to list %s, %s\n", QUEUE_ZONE, zerror(rc));
>> + return -1;
>> +}
>> +
>> +static int do_purge(int argc, char **argv)
>> +{
>> + struct String_vector strs;
>> + int rc, len, threshold, deleted = 0;
>> + char *p, path[256];
>> + struct zk_event ev;
>> + struct Stat stat;
>> + struct timeval tv;
>> +
>> + if (argc != 3) {
>> + fprintf(stderr, "remove queue: need specify "
>> + "threshold in seconds\n");
>> + return -1;
>> + }
>> +
>> + threshold = strtol(argv[2], &p, 10);
>> + if (p == argv[2]) {
>> + fprintf(stderr, "threshold must be a number\n");
>> + return -1;
>> + }
>> + if (threshold < MIN_THRESHOLD) {
>> + threshold = MIN_THRESHOLD;
>> + fprintf(stdout, "threshold is less than %d seconds, "
>> + "set it to %d\n", MIN_THRESHOLD, MIN_THRESHOLD);
>> + }
>> +
>> + gettimeofday(&tv, NULL);
>> +
>> + rc = zk_get_children(QUEUE_ZONE, &strs);
>> + switch (rc) {
>> + case ZOK:
>> + FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
>> + len = sizeof(struct zk_event);
>> + rc = zk_get_data(path, &ev, &len, &stat);
>> + if (rc != ZOK) {
>> + fprintf(stderr, "failed to get data "
>> + "%s, %s\n",
>> + path, zerror(rc));
>> + goto err;
>> + }
>> + if (stat.mtime / 1000 >= tv.tv_sec - threshold)
>> + continue;
>> +
>> + rc = zk_delete_node(path);
>> + if (rc != ZOK) {
>> + fprintf(stderr, "failed to delete "
>> + "%s, %s\n",
>> + path, zerror(rc));
>> + goto err;
>> + }
>> +
>> + deleted++;
>> + if (deleted % 100 == 0)
>> + fprintf(stdout, "%d queue nodes are deleted\n",
>> + deleted);
>> + }
>> + break;
>> + default:
>> + goto err;
>> + }
>> +
>> + fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted);
>> + return 0;
>> +err:
>> + fprintf(stderr, "failed to purge %s, %s\n", QUEUE_ZONE, zerror(rc));
>> + return -1;
>> +}
>> +
>> static struct control_handler {
>> const char *name;
>> int (*execute)(int, char **);
>> @@ -137,6 +366,8 @@ static struct control_handler {
>> } handlers[] = {
>> { "kill", do_kill, "Kill the session" },
>> { "remove", do_remove, "Remove the node recursively" },
>> + { "list", do_list, "List the data in queue node" },
>> + { "purge", do_purge, "Remove the data in queue node" },
>> { NULL, NULL, NULL },
>> };
>>
>> --
>> 1.8.3.2
>>
>>
>> --
>> sheepdog mailing list
>> sheepdog at lists.wpkg.org
>> http://lists.wpkg.org/mailman/listinfo/sheepdog
More information about the sheepdog
mailing list