[sheepdog] [PATCH] sheep, dog: add speed throttling of auto-recovery

FUKUDA Yasuhito fukuda.yasuhito at po.ntts.co.jp
Wed Dec 17 09:32:49 CET 2014


Current auto recovery to consume as much as possible the resources of sheepdog node.

So, this patch intended to allow the speed throttling of auto-recovery.
By speed throttling, reduce the resource consumption at auto recovery.

Add new options to sheep and dog commands.
Its options are "interval" and "object processing number".

see examples follows.

ex) sheep -R max=5,interval=1000 /var/lib/sheepdog
    dog node recovery set-throttle 5 1000
    dog node recovery get-throttle

Signed-off-by: Yasuhito Fukuda <fukuda.yasuhito at po.ntts.co.jp>
---
 dog/node.c               |  146 +++++++++++++++++++++++++++++++++++++++++++++-
 include/internal_proto.h |    2 +
 include/sheepdog_proto.h |    6 ++
 sheep/ops.c              |   38 ++++++++++++
 sheep/recovery.c         |  139 ++++++++++++++++++++++++++++++++++++++++++-
 sheep/sheep.c            |   41 +++++++++++++
 sheep/sheep_priv.h       |    4 +
 7 files changed, 370 insertions(+), 6 deletions(-)

diff --git a/dog/node.c b/dog/node.c
index a4e9142..d4c8fe7 100644
--- a/dog/node.c
+++ b/dog/node.c
@@ -183,7 +183,7 @@ static int node_recovery_progress(void)
 	return result < 0 ? EXIT_SYSFAIL : EXIT_SUCCESS;
 }
 
-static int node_recovery(int argc, char **argv)
+static int node_recovery_info(int argc, char **argv)
 {
 	struct sd_node *n;
 	int ret, i = 0;
@@ -235,6 +235,120 @@ static int node_recovery(int argc, char **argv)
 	return EXIT_SUCCESS;
 }
 
+static int node_recovery_set(int argc, char **argv)
+{
+	char *p;
+	struct recovery_throttling *rthrottling;
+
+	rthrottling = xmalloc(sizeof(struct recovery_throttling));
+
+	if (!argv[optind] || !argv[optind + 1]) {
+		sd_err("Invalid interval max (%s), interval (%s)",
+		 argv[optind], argv[optind + 1]);
+		exit(EXIT_USAGE);
+	}
+
+	rthrottling->max_exec_count = strtoul(argv[optind], &p, 10);
+	if (argv[optind] == p || rthrottling->max_exec_count < 0 ||
+	 UINT32_MAX <= rthrottling->max_exec_count || errno != 0 ||
+	 *p != '\0') {
+		sd_err("Invalid max (%s)", argv[optind]);
+		exit(EXIT_USAGE);
+	}
+
+	optind++;
+
+	rthrottling->queue_work_interval = strtoull(argv[optind], &p, 10);
+	if (argv[optind] == p || rthrottling->queue_work_interval < 0 ||
+	 UINT64_MAX <= rthrottling->queue_work_interval || errno != 0 ||
+	 *p != '\0') {
+		sd_err("Invalid interval (%s)", argv[optind]);
+		exit(EXIT_USAGE);
+	}
+
+	if ((rthrottling->max_exec_count == 0 &&
+	 rthrottling->queue_work_interval != 0) ||
+	 (rthrottling->max_exec_count != 0 &&
+	 rthrottling->queue_work_interval == 0)) {
+		sd_err("Invalid interval max (%"PRIu32"), interval (%"PRIu64")",
+		rthrottling->max_exec_count, rthrottling->queue_work_interval);
+		exit(EXIT_USAGE);
+	}
+
+	int ret = 0;
+	struct sd_req req;
+	struct sd_rsp *rsp = (struct sd_rsp *)&req;
+
+	sd_init_req(&req, SD_OP_SET_RECOVERY);
+	req.flags = SD_FLAG_CMD_WRITE;
+	req.data_length = sizeof(struct recovery_throttling);
+	ret = dog_exec_req(&sd_nid, &req, rthrottling);
+
+	if (ret < 0)
+		ret = EXIT_SYSFAIL;
+
+	if (rsp->result == SD_RES_SUCCESS)
+		ret = EXIT_SUCCESS;
+	else
+		ret = EXIT_FAILURE;
+
+	switch (ret) {
+	case EXIT_FAILURE:
+	case EXIT_SYSFAIL:
+		sd_err("Failed to execute request");
+		ret = -1;
+		break;
+	case EXIT_SUCCESS:
+		/* do nothing */
+		break;
+	default:
+		sd_err("unknown return code: %d", ret);
+		ret = -1;
+		break;
+	}
+
+	free(rthrottling);
+	return ret;
+}
+
+static int node_recovery_get(int argc, char **argv)
+{
+	struct recovery_throttling rthrottling;
+	int ret = 0;
+
+	struct sd_req req;
+	struct sd_rsp *rsp = (struct sd_rsp *)&req;
+
+	sd_init_req(&req, SD_OP_GET_RECOVERY);
+	req.data_length = sizeof(rthrottling);
+
+	ret = dog_exec_req(&sd_nid, &req, &rthrottling);
+	if (ret < 0)
+		ret = EXIT_SYSFAIL;
+
+	if (rsp->result == SD_RES_SUCCESS)
+		ret = EXIT_SUCCESS;
+	else
+		ret = EXIT_FAILURE;
+
+	switch (ret) {
+	case EXIT_FAILURE:
+	case EXIT_SYSFAIL:
+		sd_err("Failed to execute request");
+		ret = -1;
+		break;
+	case EXIT_SUCCESS:
+		sd_info("max (%"PRIu32"), interval (%"PRIu64")",
+		 rthrottling.max_exec_count, rthrottling.queue_work_interval);
+		break;
+	default:
+		sd_err("unknown return code: %d", ret);
+		ret = -1;
+		break;
+	}
+	return ret;
+}
+
 static struct sd_node *idx_to_node(struct rb_root *nroot, int idx)
 {
 	struct sd_node *n = rb_entry(rb_first(nroot), struct sd_node, rb);
@@ -538,6 +652,31 @@ static struct sd_option node_options[] = {
 	{ 0, NULL, false, NULL },
 };
 
+static struct subcommand node_recovery_cmd[] = {
+	{"info", NULL, "aphPrT", "show recovery information of nodes (default)",
+	 NULL, CMD_NEED_NODELIST, node_recovery_info, node_options},
+	{"set-throttle", "<max> <interval>", NULL, "set new throttling", NULL,
+	 CMD_NEED_ARG|CMD_NEED_NODELIST, node_recovery_set, node_options},
+	{"get-throttle", NULL, NULL, "get current throttling", NULL,
+	 CMD_NEED_NODELIST, node_recovery_get, node_options},
+	{NULL},
+};
+
+static int node_recovery(int argc, char **argv)
+{
+	int ret;
+	if (argc == optind) {
+		ret = update_node_list(SD_MAX_NODES);
+		if (ret < 0) {
+			sd_err("Failed to get node list");
+			exit(EXIT_SYSFAIL);
+		}
+		return node_recovery_info(argc, argv);
+	}
+
+	return do_generic_subcommand(node_recovery_cmd, argc, argv);
+}
+
 static int node_log_level_set(int argc, char **argv)
 {
 	int ret = 0;
@@ -632,8 +771,9 @@ static struct subcommand node_cmd[] = {
 	 CMD_NEED_NODELIST, node_list},
 	{"info", NULL, "aprhT", "show information about each node", NULL,
 	 CMD_NEED_NODELIST, node_info},
-	{"recovery", NULL, "aphPrT", "show recovery information of nodes", NULL,
-	 CMD_NEED_NODELIST, node_recovery, node_options},
+	{"recovery", "<max> <interval>", "aphPrT",
+	 "show recovery information or set/get recovery speed throttling of nodes",
+	 node_recovery_cmd, 0, node_recovery, node_options},
 	{"md", "[disks]", "aprAfhT", "See 'dog node md' for more information",
 	 node_md_cmd, CMD_NEED_ARG, node_md, node_options},
 	{"stat", NULL, "aprwhT", "show stat information about the node", NULL,
diff --git a/include/internal_proto.h b/include/internal_proto.h
index 3f5d77f..f6ba18e 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -111,6 +111,8 @@
 #define SD_OP_VDI_STATE_SNAPSHOT_CTL  0xC7
 #define SD_OP_INODE_COHERENCE 0xC8
 #define SD_OP_READ_DEL_VDIS  0xC9
+#define SD_OP_GET_RECOVERY      0xCA
+#define SD_OP_SET_RECOVERY      0xCB
 
 /* internal flags for hdr.flags, must be above 0x80 */
 #define SD_FLAG_CMD_RECOVERY 0x0080
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 4f0c48c..5f6d157 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -272,6 +272,12 @@ struct generation_reference {
 	int32_t count;
 };
 
+struct recovery_throttling {
+	uint32_t max_exec_count;
+	uint64_t queue_work_interval;
+	bool throttling;
+};
+
 struct sd_inode {
 	char name[SD_MAX_VDI_LEN];
 	char tag[SD_MAX_VDI_TAG_LEN];
diff --git a/sheep/ops.c b/sheep/ops.c
index e4daca2..2b4a769 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -1444,6 +1444,30 @@ static int cluster_inode_coherence(const struct sd_req *req,
 			       !!req->inode_coherence.validate, &sender->nid);
 }
 
+static int local_get_recovery(struct request *req)
+{
+	struct recovery_throttling rthrottling;
+
+	rthrottling = get_recovery();
+	memcpy(req->data, &rthrottling, sizeof(rthrottling));
+	req->rp.data_length = sizeof(rthrottling);
+
+	return SD_RES_SUCCESS;
+}
+
+static int local_set_recovery(struct request *req)
+{
+	struct recovery_throttling *rthrottling;
+
+	rthrottling = xmalloc(sizeof(struct recovery_throttling));
+
+	memcpy(rthrottling, req->data, sizeof(struct recovery_throttling));
+	set_recovery(rthrottling);
+
+	free(rthrottling);
+	return SD_RES_SUCCESS;
+}
+
 static struct sd_op_template sd_ops[] = {
 
 	/* cluster operations */
@@ -1891,6 +1915,20 @@ static struct sd_op_template sd_ops[] = {
 		.type = SD_OP_TYPE_PEER,
 		.process_work = peer_decref_object,
 	},
+
+	[SD_OP_GET_RECOVERY] = {
+		.name = "GET_RECOVERY",
+		.type = SD_OP_TYPE_LOCAL,
+		.force = true,
+		.process_work = local_get_recovery,
+	},
+
+	[SD_OP_SET_RECOVERY] = {
+		.name = "SET_RECOVERY",
+		.type = SD_OP_TYPE_LOCAL,
+		.force = true,
+		.process_work = local_set_recovery,
+	},
 };
 
 const struct sd_op_template *get_sd_op(uint8_t opcode)
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 85dad21..325122b 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -78,6 +78,15 @@ struct recovery_info {
 	struct sd_mutex vinfo_lock;
 
 	struct sd_node *excluded;
+
+	uint32_t max_exec_count;
+	uint64_t queue_work_interval;
+	bool throttling;
+};
+
+struct recovery_timer {
+	void (*callback)(void *);
+	void *data;
 };
 
 static struct recovery_info *next_rinfo;
@@ -900,6 +909,91 @@ void resume_suspended_recovery(void)
 	}
 }
 
+static void recovery_timer_handler(int fd, int events, void *data)
+{
+	struct recovery_timer *t = data;
+	uint64_t val;
+
+	if (read(fd, &val, sizeof(val)) < 0)
+		return;
+	t->callback(t->data);
+	unregister_event(fd);
+	close(fd);
+}
+
+static void add_recovery_timer(struct recovery_timer *t, unsigned int mseconds)
+{
+	struct itimerspec it;
+	int tfd;
+
+	tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+	if (tfd < 0) {
+		sd_err("timerfd_create: %m");
+		return;
+	}
+
+	memset(&it, 0, sizeof(it));
+	it.it_value.tv_sec = mseconds / 1000;
+	it.it_value.tv_nsec = (mseconds % 1000) * 1000000;
+
+	if (timerfd_settime(tfd, 0, &it, NULL) < 0) {
+		sd_err("timerfd_settime: %m");
+		return;
+	}
+
+	if (register_event(tfd, recovery_timer_handler, t) < 0)
+		sd_err("failed to register timer fd");
+}
+
+static void recover_next_object_delay(void *arg)
+{
+	struct recovery_info *rinfo = main_thread_get(current_rinfo);
+	uint32_t nr_threads = md_nr_disks() * 2;
+	double thread_unit_exec = 0;
+	double mod = 0;
+
+	if (!rinfo)
+		return;
+
+	thread_unit_exec = (double) rinfo->max_exec_count / nr_threads;
+	mod = rinfo->max_exec_count % nr_threads;
+
+	if (rinfo->max_exec_count <= nr_threads || mod != 0) {
+		if (rand() % 100 + 1 <= (mod / nr_threads) * 100)
+			thread_unit_exec = ceil(thread_unit_exec);
+		else
+			thread_unit_exec = floor(thread_unit_exec);
+	}
+
+	for (int i = 0; i < thread_unit_exec; i++) {
+		rinfo = main_thread_get(current_rinfo);
+
+		if (!rinfo)
+			return;
+
+		if (rinfo->next - rinfo->done > rinfo->max_exec_count)
+			break;
+
+		recover_next_object(rinfo);
+	}
+
+	if (rinfo->throttling != sys->rthrottling.throttling) {
+		rinfo->max_exec_count = sys->rthrottling.max_exec_count;
+		rinfo->queue_work_interval =
+				 sys->rthrottling.queue_work_interval;
+		rinfo->throttling = sys->rthrottling.throttling;
+	}
+
+	if (rinfo->throttling) {
+		static struct recovery_timer rt = {
+			.callback = recover_next_object_delay,
+			.data = &rt,
+		};
+		add_recovery_timer(&rt, rinfo->queue_work_interval);
+	} else
+		recover_next_object(rinfo);
+}
+
 static void recover_object_main(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work,
@@ -935,7 +1029,16 @@ static void recover_object_main(struct work *work)
 	if (rinfo->done >= rinfo->count)
 		goto finish_recovery;
 
-	recover_next_object(rinfo);
+	if (!rinfo->throttling && !sys->rthrottling.throttling)
+		recover_next_object(rinfo);
+	else if (!rinfo->throttling && sys->rthrottling.throttling) {
+		static struct recovery_timer rt = {
+			.callback = recover_next_object_delay,
+			.data = &rt,
+		};
+		add_recovery_timer(&rt, sys->rthrottling.queue_work_interval);
+	}
+
 	free_recovery_obj_work(row);
 	return;
 finish_recovery:
@@ -982,8 +1085,17 @@ static void finish_object_list(struct work *work)
 		return;
 	}
 
-	for (uint32_t i = 0; i < nr_threads; i++)
-		recover_next_object(rinfo);
+	for (uint32_t i = 0; i < nr_threads; i++) {
+		if (rinfo->throttling) {
+			static struct recovery_timer rt = {
+				.callback = recover_next_object_delay,
+				.data = &rt,
+			};
+			add_recovery_timer(&rt, rinfo->queue_work_interval);
+		} else
+			recover_next_object(rinfo);
+	}
+
 	return;
 }
 
@@ -1143,6 +1255,9 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo,
 	rinfo->max_epoch = sys->cinfo.epoch;
 	rinfo->vinfo_array = xzalloc(sizeof(struct vnode_info *) *
 				     rinfo->max_epoch);
+	rinfo->max_exec_count = sys->rthrottling.max_exec_count;
+	rinfo->queue_work_interval = sys->rthrottling.queue_work_interval;
+	rinfo->throttling = sys->rthrottling.throttling;
 	sd_init_mutex(&rinfo->vinfo_lock);
 	if (epoch_lifted)
 		rinfo->notify_complete = true; /* Reweight or node recovery */
@@ -1236,3 +1351,21 @@ void get_recovery_state(struct recovery_state *state)
 	state->nr_finished = rinfo->done;
 	state->nr_total = rinfo->count;
 }
+
+void set_recovery(struct recovery_throttling *rthrottling)
+{
+	sys->rthrottling.max_exec_count = rthrottling->max_exec_count;
+	sys->rthrottling.queue_work_interval =
+				 rthrottling->queue_work_interval;
+	if (rthrottling->max_exec_count > 0 &&
+	 rthrottling->queue_work_interval > 0)
+		sys->rthrottling.throttling = true;
+	else
+		sys->rthrottling.throttling = false;
+}
+
+struct recovery_throttling get_recovery(void)
+{
+	return sys->rthrottling;
+}
+
diff --git a/sheep/sheep.c b/sheep/sheep.c
index ef45a33..9fc7610 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -115,6 +115,12 @@ static const char log_help[] =
 "  syslog             syslog of the system\n"
 "  stdout             standard output\n";
 
+static const char recovery_help[] =
+"Available arguments:\n"
+"\tmax=: object recovery process maximum count of each interval\n"
+"\tinterval=: object recovery interval time (millisec)\n"
+"Example:\n\t$ sheep -R max=50,interval=1000 ...\n";
+
 static struct sd_option sheep_options[] = {
 	{'b', "bindaddr", true, "specify IP address of interface to listen on",
 	 bind_help},
@@ -137,6 +143,8 @@ static struct sd_option sheep_options[] = {
 	{'P', "pidfile", true, "create a pid file"},
 	{'r', "http", true, "enable http service. (default: disabled)",
 	 http_help},
+	{'R', "recovery", true, "specify the recovery speed throttling",
+	 recovery_help},
 	{'u', "upgrade", false, "upgrade to the latest data layout"},
 	{'v', "version", false, "show the version"},
 	{'w', "cache", true, "enable object cache", cache_help},
@@ -424,6 +432,26 @@ static struct option_parser journal_parsers[] = {
 	{ NULL, NULL },
 };
 
+static uint32_t max_exec_count;
+static uint64_t queue_work_interval;
+static int max_exec_count_parser(const char *s)
+{
+	max_exec_count = strtol(s, NULL, 10);
+	return 0;
+}
+
+static int queue_work_interval_parser(const char *s)
+{
+	queue_work_interval = strtol(s, NULL, 10);
+	return 0;
+}
+
+static struct option_parser recovery_parsers[] = {
+	{ "max=", max_exec_count_parser },
+	{ "interval=", queue_work_interval_parser },
+	{ NULL, NULL },
+};
+
 static size_t get_nr_nodes(void)
 {
 	struct vnode_info *vinfo;
@@ -633,6 +661,10 @@ int main(int argc, char **argv)
 
 	sys->node_status = SD_NODE_STATUS_INITIALIZATION;
 
+	sys->rthrottling.max_exec_count = 0;
+	sys->rthrottling.queue_work_interval = 0;
+	sys->rthrottling.throttling = false;
+
 	install_crash_handler(crash_handler);
 	signal(SIGPIPE, SIG_IGN);
 
@@ -751,6 +783,15 @@ int main(int argc, char **argv)
 		case 'h':
 			usage(0);
 			break;
+		case 'R':
+			if (option_parse(optarg, ",", recovery_parsers) < 0)
+				exit(1);
+			sys->rthrottling.max_exec_count = max_exec_count;
+			sys->rthrottling.queue_work_interval
+						 = queue_work_interval;
+			if (max_exec_count > 0 && queue_work_interval > 0)
+				sys->rthrottling.throttling = true;
+			break;
 		case 'v':
 			fprintf(stdout, "Sheepdog daemon version %s\n",
 				PACKAGE_VERSION);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 4ac08f8..170e8ff 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -143,6 +143,8 @@ struct system_info {
 	bool gateway_only;
 	bool nosync;
 
+	struct recovery_throttling rthrottling;
+
 	struct work_queue *net_wqueue;
 	struct work_queue *gateway_wqueue;
 	struct work_queue *io_wqueue;
@@ -428,6 +430,8 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *, bool,
 bool oid_in_recovery(uint64_t oid);
 bool node_in_recovery(void);
 void get_recovery_state(struct recovery_state *state);
+void set_recovery(struct recovery_throttling *rthrottling);
+struct recovery_throttling get_recovery(void);
 
 int read_backend_object(uint64_t oid, char *data, unsigned int datalen,
 		       uint64_t offset);
-- 
1.7.1



-- 
NTTソフトウェア株式会社
クラウド事業部 第一事業ユニット(C一BU)
福田康人(FUKUDA Yasuhito)
E-mail:fukuda.yasuhito at po.ntts.co.jp
〒220-0012 横浜市西区みなとみらい4-4-5
横浜アイマークプレイス13階
TEL:045-212-7393/FAX:045-662-7856





More information about the sheepdog mailing list