[sheepdog] [PATCH] zookeeper & tools/zk_control: add delete subcommand to remove queue znodes

Liu Yuan namei.unix at gmail.com
Mon Mar 9 04:28:02 CET 2015


On Mon, Mar 09, 2015 at 11:12:54AM +0800, Meng Lingkun wrote:
> From: Meng Lingkun <menglingkun at cmss.chinamobile.com>
> 
> Zookeeper cluster driver can't and doesn't delete the znode in queue
> after using it. It makes zookeeper occupy much more memory. And it makes
> troubles when zookeeper client reconnects to zookeeper server.
> refenrence: https://issues.apache.org/jira/browse/ZOOKEEPER-706
> Subcommad delete provides a better way than purge subcommand:
> 1. sheep update it's queue_pos to zk_server;
> 2. zk_control find the minimum of all sheeps' queue_pos, and removes
>    znodes seq < minimum;
> 3. zk_control can run in crond to limit the queue znode.
> 

Please separate this into two patches, one is for zookeeper, and one is for
zk_control.

Yuan

> Signed-off-by: Meng Lingkun <menglingkun at cmss.chinamobile.com>
> ---
>  sheep/cluster/zookeeper.c | 32 +++++++++++++++++
>  tools/zk_control.c        | 88 ++++++++++++++++++++++++++++++++++++++++++++++-
>  2 files changed, 119 insertions(+), 1 deletion(-)
> 
> diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
> index bf94871..b346592 100644
> --- a/sheep/cluster/zookeeper.c
> +++ b/sheep/cluster/zookeeper.c
> @@ -32,6 +32,7 @@
>  #define MEMBER_ZNODE "/member"
>  #define MASTER_ZNODE "/master"
>  #define LOCK_ZNODE "/lock"
> +#define QUEUE_POS_ZNODE "/queue_pos"
>  
>  static int zk_timeout = SESSION_TIMEOUT;
>  static int my_master_seq;
> @@ -452,6 +453,7 @@ static void lock_table_remove_znodes(void)
>  /* ZooKeeper-based queue give us an totally ordered events */
>  static int efd;
>  static int32_t queue_pos;
> +#define QUEUE_DEL_BATCH 1000
>  
>  static int zk_queue_peek(bool *peek)
>  {
> @@ -580,6 +582,7 @@ static int zk_queue_pop_advance(struct zk_event *ev)
>  {
>  	int len;
>  	char path[MAX_NODE_STR_LEN];
> +	char queue_pos_path[MAX_NODE_STR_LEN];
>  
>  	len = sizeof(*ev);
>  	snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
> @@ -587,6 +590,16 @@ static int zk_queue_pop_advance(struct zk_event *ev)
>  	RETURN_IF_ERROR(zk_get_data(path, ev, &len), "path %s", path);
>  	sd_debug("%s, type:%d, len:%d, pos:%" PRId32, path, ev->type, len,
>  		 queue_pos);
> +
> +	if (queue_pos % QUEUE_DEL_BATCH == 0 && ev->type != EVENT_JOIN) {
> +		snprintf(queue_pos_path, sizeof(queue_pos_path),
> +			QUEUE_POS_ZNODE"/%s", node_to_str(&this_node.node));
> +		RETURN_IF_ERROR(zk_set_data(queue_pos_path, (char *)&queue_pos,
> +			sizeof(queue_pos), -1), "");
> +		sd_debug("update queue pos %s to pos %" PRId32,
> +			queue_pos_path, queue_pos);
> +	}
> +
>  	queue_pos++;
>  	return ZOK;
>  }
> @@ -644,6 +657,8 @@ static int zk_queue_init(void)
>  	RETURN_IF_ERROR(zk_init_node(MASTER_ZNODE), "path %s", MASTER_ZNODE);
>  	RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE);
>  	RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE);
> +	RETURN_IF_ERROR(zk_init_node(QUEUE_POS_ZNODE), "path %s",
> +		QUEUE_POS_ZNODE);
>  	return ZOK;
>  }
>  
> @@ -749,6 +764,12 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
>  			return;
>  		}
>  
> +		ret = sscanf(path, QUEUE_ZNODE "/%s", str);
> +		if (ret == 1) {
> +			sd_debug("deleted queue %s", str);
> +			return;
> +		}
> +
>  		ret = sscanf(path, MEMBER_ZNODE "/%s", str);
>  		if (ret != 1)
>  			return;
> @@ -1085,6 +1106,8 @@ static void init_node_list(struct zk_event *ev)
>  static void zk_handle_accept(struct zk_event *ev)
>  {
>  	char path[MAX_NODE_STR_LEN];
> +	char queue_pos_path[MAX_NODE_STR_LEN];
> +	uint32_t pos = -1;
>  	int rc;
>  
>  	sd_debug("ACCEPT");
> @@ -1096,6 +1119,8 @@ static void zk_handle_accept(struct zk_event *ev)
>  
>  	snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
>  		 node_to_str(&ev->sender.node));
> +	snprintf(queue_pos_path, sizeof(queue_pos_path), QUEUE_POS_ZNODE"/%s",
> +		node_to_str(&ev->sender.node));
>  	if (node_eq(&ev->sender.node, &this_node.node)) {
>  		joined = true;
>  		sd_debug("create path:%s", path);
> @@ -1105,6 +1130,13 @@ static void zk_handle_accept(struct zk_event *ev)
>  				    &ZOO_OPEN_ACL_UNSAFE,
>  				    ZOO_EPHEMERAL, NULL, 0);
>  		RETURN_VOID_IF_ERROR(rc, "");
> +		sd_debug("create path:%s", queue_pos_path);
> +		rc = zk_create_node(queue_pos_path,
> +				    (char *)&pos,
> +				    sizeof(pos),
> +				    &ZOO_OPEN_ACL_UNSAFE,
> +				    ZOO_EPHEMERAL, NULL, 0);
> +		RETURN_VOID_IF_ERROR(rc, "");
>  	} else
>  		zk_node_exists(path);
>  
> diff --git a/tools/zk_control.c b/tools/zk_control.c
> index 3313bef..15c2dac 100644
> --- a/tools/zk_control.c
> +++ b/tools/zk_control.c
> @@ -21,6 +21,7 @@
>  #include "internal_proto.h"
>  
>  #define QUEUE_ZNODE "/sheepdog/queue"
> +#define QUEUE_POS_ZNODE "/sheepdog/queue_pos"
>  #define MIN_THRESHOLD 86400
>  
>  #define FOR_EACH_ZNODE(parent, path, strs)			       \
> @@ -369,6 +370,90 @@ err:
>  	return -1;
>  }
>  
> +static int do_delete(int argc, char **argv)
> +{
> +	struct String_vector strs;
> +	int rc, len, deleted = 0;
> +	int32_t pos, min_pos = INT32_MAX;
> +	char path[256];
> +	struct zk_event ev;
> +	struct Stat stat;
> +	if (argc != 2) {
> +		fprintf(stderr, "remove queue, no more arguments\n");
> +		return -1;
> +	}
> +
> +	rc = zk_get_children(QUEUE_POS_ZNODE, &strs);
> +	switch (rc) {
> +	case ZOK:
> +		FOR_EACH_ZNODE(QUEUE_POS_ZNODE, path, &strs) {
> +			len = sizeof(int32_t);
> +			rc = zk_get_data(path, &pos, &len, &stat);
> +			if (rc != ZOK) {
> +				fprintf(stderr, "failed to get data "
> +					"%s, %s\n",
> +					path, zerror(rc));
> +				goto err;
> +			}
> +
> +			if (pos < min_pos && pos != -1)
> +				min_pos = pos;
> +		}
> +		break;
> +	default:
> +		goto err;
> +	}
> +
> +	fprintf(stdout, "queue nodes seq < %d will be deleted\n", min_pos);
> +
> +	if (min_pos == INT32_MAX) {
> +		fprintf(stdout, "no queue nodes to be deleted\n");
> +		return 0;
> +	}
> +
> +	rc = zk_get_children(QUEUE_ZNODE, &strs);
> +	fprintf(stdout, "There are %d znode in queue\n", strs.count);
> +	switch (rc) {
> +	case ZOK:
> +		FOR_EACH_ZNODE(QUEUE_ZNODE, path, &strs) {
> +			len = sizeof(struct zk_event);
> +			rc = zk_get_data(path, &ev, &len, &stat);
> +			if (rc != ZOK) {
> +				fprintf(stderr, "failed to get data "
> +					"%s, %s\n",
> +					path, zerror(rc));
> +				goto err;
> +			}
> +
> +			sscanf(path, QUEUE_ZNODE "/%"PRId32, &pos);
> +			if (pos >= min_pos)
> +				continue;
> +
> +			rc = zk_delete_node(path);
> +			if (rc != ZOK) {
> +				fprintf(stderr, "failed to delete "
> +					"%s, %s\n",
> +					path, zerror(rc));
> +				goto err;
> +			}
> +
> +			deleted++;
> +			if (deleted % 100 == 0)
> +				fprintf(stdout, "%d queue nodes are deleted\n",
> +						deleted);
> +		}
> +		break;
> +	default:
> +		goto err;
> +	}
> +
> +	fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted);
> +	return 0;
> +err:
> +	fprintf(stderr, "failed to delete %s, %s\n", QUEUE_ZNODE, zerror(rc));
> +	return -1;
> +}
> +
>  static struct control_handler {
>  	const char *name;
>  	int (*execute)(int, char **);
> @@ -377,7 +462,8 @@ static struct control_handler {
>  	{ "kill", do_kill, "Kill the session" },
>  	{ "remove", do_remove, "Remove the node recursively" },
>  	{ "list", do_list, "List the data in queue node" },
> -	{ "purge", do_purge, "Remove the data in queue node" },
> +	{ "purge", do_purge, "Remove the data in queue node by time" },
> +	{ "delete", do_delete, "Remove the data in queue node not used" },
>  	{ NULL, NULL, NULL },
>  };
>  
> -- 
> 1.9.1
> 
> 
> -- 
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> https://lists.wpkg.org/mailman/listinfo/sheepdog



More information about the sheepdog mailing list