[sheepdog] [PATCH] zookeeper & tools/zk_control: add delete subcommand to remove queue znodes
Liu Yuan
namei.unix at gmail.com
Mon Mar 9 04:28:02 CET 2015
On Mon, Mar 09, 2015 at 11:12:54AM +0800, Meng Lingkun wrote:
> From: Meng Lingkun <menglingkun at cmss.chinamobile.com>
>
> Zookeeper cluster driver can't and doesn't delete the znode in queue
> after using it. It makes zookeeper occupy much more memory. And it makes
> troubles when zookeeper client reconnects to zookeeper server.
> refenrence: https://issues.apache.org/jira/browse/ZOOKEEPER-706
> Subcommad delete provides a better way than purge subcommand:
> 1. sheep update it's queue_pos to zk_server;
> 2. zk_control find the minimum of all sheeps' queue_pos, and removes
> znodes seq < minimum;
> 3. zk_control can run in crond to limit the queue znode.
>
Please separate this into two patches, one is for zookeeper, and one is for
zk_control.
Yuan
> Signed-off-by: Meng Lingkun <menglingkun at cmss.chinamobile.com>
> ---
> sheep/cluster/zookeeper.c | 32 +++++++++++++++++
> tools/zk_control.c | 88 ++++++++++++++++++++++++++++++++++++++++++++++-
> 2 files changed, 119 insertions(+), 1 deletion(-)
>
> diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
> index bf94871..b346592 100644
> --- a/sheep/cluster/zookeeper.c
> +++ b/sheep/cluster/zookeeper.c
> @@ -32,6 +32,7 @@
> #define MEMBER_ZNODE "/member"
> #define MASTER_ZNODE "/master"
> #define LOCK_ZNODE "/lock"
> +#define QUEUE_POS_ZNODE "/queue_pos"
>
> static int zk_timeout = SESSION_TIMEOUT;
> static int my_master_seq;
> @@ -452,6 +453,7 @@ static void lock_table_remove_znodes(void)
> /* ZooKeeper-based queue give us an totally ordered events */
> static int efd;
> static int32_t queue_pos;
> +#define QUEUE_DEL_BATCH 1000
>
> static int zk_queue_peek(bool *peek)
> {
> @@ -580,6 +582,7 @@ static int zk_queue_pop_advance(struct zk_event *ev)
> {
> int len;
> char path[MAX_NODE_STR_LEN];
> + char queue_pos_path[MAX_NODE_STR_LEN];
>
> len = sizeof(*ev);
> snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
> @@ -587,6 +590,16 @@ static int zk_queue_pop_advance(struct zk_event *ev)
> RETURN_IF_ERROR(zk_get_data(path, ev, &len), "path %s", path);
> sd_debug("%s, type:%d, len:%d, pos:%" PRId32, path, ev->type, len,
> queue_pos);
> +
> + if (queue_pos % QUEUE_DEL_BATCH == 0 && ev->type != EVENT_JOIN) {
> + snprintf(queue_pos_path, sizeof(queue_pos_path),
> + QUEUE_POS_ZNODE"/%s", node_to_str(&this_node.node));
> + RETURN_IF_ERROR(zk_set_data(queue_pos_path, (char *)&queue_pos,
> + sizeof(queue_pos), -1), "");
> + sd_debug("update queue pos %s to pos %" PRId32,
> + queue_pos_path, queue_pos);
> + }
> +
> queue_pos++;
> return ZOK;
> }
> @@ -644,6 +657,8 @@ static int zk_queue_init(void)
> RETURN_IF_ERROR(zk_init_node(MASTER_ZNODE), "path %s", MASTER_ZNODE);
> RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE);
> RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE);
> + RETURN_IF_ERROR(zk_init_node(QUEUE_POS_ZNODE), "path %s",
> + QUEUE_POS_ZNODE);
> return ZOK;
> }
>
> @@ -749,6 +764,12 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
> return;
> }
>
> + ret = sscanf(path, QUEUE_ZNODE "/%s", str);
> + if (ret == 1) {
> + sd_debug("deleted queue %s", str);
> + return;
> + }
> +
> ret = sscanf(path, MEMBER_ZNODE "/%s", str);
> if (ret != 1)
> return;
> @@ -1085,6 +1106,8 @@ static void init_node_list(struct zk_event *ev)
> static void zk_handle_accept(struct zk_event *ev)
> {
> char path[MAX_NODE_STR_LEN];
> + char queue_pos_path[MAX_NODE_STR_LEN];
> + uint32_t pos = -1;
> int rc;
>
> sd_debug("ACCEPT");
> @@ -1096,6 +1119,8 @@ static void zk_handle_accept(struct zk_event *ev)
>
> snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
> node_to_str(&ev->sender.node));
> + snprintf(queue_pos_path, sizeof(queue_pos_path), QUEUE_POS_ZNODE"/%s",
> + node_to_str(&ev->sender.node));
> if (node_eq(&ev->sender.node, &this_node.node)) {
> joined = true;
> sd_debug("create path:%s", path);
> @@ -1105,6 +1130,13 @@ static void zk_handle_accept(struct zk_event *ev)
> &ZOO_OPEN_ACL_UNSAFE,
> ZOO_EPHEMERAL, NULL, 0);
> RETURN_VOID_IF_ERROR(rc, "");
> + sd_debug("create path:%s", queue_pos_path);
> + rc = zk_create_node(queue_pos_path,
> + (char *)&pos,
> + sizeof(pos),
> + &ZOO_OPEN_ACL_UNSAFE,
> + ZOO_EPHEMERAL, NULL, 0);
> + RETURN_VOID_IF_ERROR(rc, "");
> } else
> zk_node_exists(path);
>
> diff --git a/tools/zk_control.c b/tools/zk_control.c
> index 3313bef..15c2dac 100644
> --- a/tools/zk_control.c
> +++ b/tools/zk_control.c
> @@ -21,6 +21,7 @@
> #include "internal_proto.h"
>
> #define QUEUE_ZNODE "/sheepdog/queue"
> +#define QUEUE_POS_ZNODE "/sheepdog/queue_pos"
> #define MIN_THRESHOLD 86400
>
> #define FOR_EACH_ZNODE(parent, path, strs) \
> @@ -369,6 +370,90 @@ err:
> return -1;
> }
>
> +static int do_delete(int argc, char **argv)
> +{
> + struct String_vector strs;
> + int rc, len, deleted = 0;
> + int32_t pos, min_pos = INT32_MAX;
> + char path[256];
> + struct zk_event ev;
> + struct Stat stat;
> + if (argc != 2) {
> + fprintf(stderr, "remove queue, no more arguments\n");
> + return -1;
> + }
> +
> + rc = zk_get_children(QUEUE_POS_ZNODE, &strs);
> + switch (rc) {
> + case ZOK:
> + FOR_EACH_ZNODE(QUEUE_POS_ZNODE, path, &strs) {
> + len = sizeof(int32_t);
> + rc = zk_get_data(path, &pos, &len, &stat);
> + if (rc != ZOK) {
> + fprintf(stderr, "failed to get data "
> + "%s, %s\n",
> + path, zerror(rc));
> + goto err;
> + }
> +
> + if (pos < min_pos && pos != -1)
> + min_pos = pos;
> + }
> + break;
> + default:
> + goto err;
> + }
> +
> + fprintf(stdout, "queue nodes seq < %d will be deleted\n", min_pos);
> +
> + if (min_pos == INT32_MAX) {
> + fprintf(stdout, "no queue nodes to be deleted\n");
> + return 0;
> + }
> +
> + rc = zk_get_children(QUEUE_ZNODE, &strs);
> + fprintf(stdout, "There are %d znode in queue\n", strs.count);
> + switch (rc) {
> + case ZOK:
> + FOR_EACH_ZNODE(QUEUE_ZNODE, path, &strs) {
> + len = sizeof(struct zk_event);
> + rc = zk_get_data(path, &ev, &len, &stat);
> + if (rc != ZOK) {
> + fprintf(stderr, "failed to get data "
> + "%s, %s\n",
> + path, zerror(rc));
> + goto err;
> + }
> +
> + sscanf(path, QUEUE_ZNODE "/%"PRId32, &pos);
> + if (pos >= min_pos)
> + continue;
> +
> + rc = zk_delete_node(path);
> + if (rc != ZOK) {
> + fprintf(stderr, "failed to delete "
> + "%s, %s\n",
> + path, zerror(rc));
> + goto err;
> + }
> +
> + deleted++;
> + if (deleted % 100 == 0)
> + fprintf(stdout, "%d queue nodes are deleted\n",
> + deleted);
> + }
> + break;
> + default:
> + goto err;
> + }
> +
> + fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted);
> + return 0;
> +err:
> + fprintf(stderr, "failed to delete %s, %s\n", QUEUE_ZNODE, zerror(rc));
> + return -1;
> +}
> +
> static struct control_handler {
> const char *name;
> int (*execute)(int, char **);
> @@ -377,7 +462,8 @@ static struct control_handler {
> { "kill", do_kill, "Kill the session" },
> { "remove", do_remove, "Remove the node recursively" },
> { "list", do_list, "List the data in queue node" },
> - { "purge", do_purge, "Remove the data in queue node" },
> + { "purge", do_purge, "Remove the data in queue node by time" },
> + { "delete", do_delete, "Remove the data in queue node not used" },
> { NULL, NULL, NULL },
> };
>
> --
> 1.9.1
>
>
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> https://lists.wpkg.org/mailman/listinfo/sheepdog
More information about the sheepdog
mailing list