[sheepdog] [PATCH v5] sheep/recovery: multi-threading recovery process

Liu Yuan namei.unix at gmail.com
Thu Feb 6 10:18:57 CET 2014


Rationale for multi-threaded recovery:

1. If one node is added, we find that all the VMs on other nodes will get
   noticeably affected until 50% data is transferred to the new node.

2. For node failure, we might not have problems of running VM but the
   recovery process boost will benefit IO operation of VM with less
   chances to be blocked for write and also improve reliability.

3. For disk failure in node, this is similar to adding a node. All
   the data on the broken disk will be recovered on other disks in
   this node. Speedy recoery not only improve data reliability but
   also cause less writing blocking on the lost data.

Our oid scheduling algorithm is intact and simply add multi-threading onto top
of current recovery algorithm with minimal changes.

- we still have ->oids array to denote oids to be recovered
- we start up 2 * nr_disks threads for recovery
- the tricky part is that we need to wait all the running threads to
  completion before start next recovery events for multiple nodes/disks events

This patch passes "./check -g md -md" on my local box

Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
 v5:
 - remove running_threads_nr 
 v4:
 - fix lfind() in oid_in_recovery() to find if oid in the recovery list correctly
   to pass tests/func/010
 - add comment for run_next_rw() for why we check running_threads_nr > 1.

 sheep/md.c         |  9 ++++--
 sheep/recovery.c   | 91 ++++++++++++++++++++++++++++++++++++++++--------------
 sheep/sheep.c      |  2 +-
 sheep/sheep_priv.h |  1 +
 4 files changed, 77 insertions(+), 26 deletions(-)

diff --git a/sheep/md.c b/sheep/md.c
index 0a2903f..e7e8ec2 100644
--- a/sheep/md.c
+++ b/sheep/md.c
@@ -43,9 +43,9 @@ static struct md md = {
 	.lock = SD_RW_LOCK_INITIALIZER,
 };
 
-static inline int nr_online_disks(void)
+static inline uint32_t nr_online_disks(void)
 {
-	int nr;
+	uint32_t nr;
 
 	sd_read_lock(&md.lock);
 	nr = md.nr_disks;
@@ -683,3 +683,8 @@ uint64_t md_get_size(uint64_t *used)
 
 	return fsize + *used;
 }
+
+uint32_t md_nr_disks(void)
+{
+	return nr_online_disks();
+}
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 50b6c58..cba1e2d 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -54,6 +54,7 @@ struct recovery_info {
 	uint32_t epoch;
 	uint32_t tgt_epoch;
 	uint64_t done;
+	uint64_t next;
 
 	/*
 	 * true when automatic recovery is disabled
@@ -507,6 +508,8 @@ static int do_recover_object(struct recovery_obj_work *row)
 {
 	uint64_t oid = row->oid;
 
+	sd_debug("try recover object %"PRIx64, oid);
+
 	if (is_erasure_oid(oid))
 		return recover_erasure_object(row);
 	else
@@ -597,13 +600,15 @@ main_fn bool oid_in_recovery(uint64_t oid)
 			return false;
 		}
 
-		if (rinfo->oids[rinfo->done] == oid) {
+		if (xlfind(&oid, rinfo->oids + rinfo->done,
+			   rinfo->next - rinfo->done, oid_cmp)) {
 			if (rinfo->suspended)
 				break;
 			/*
 			 * When recovery is not suspended,
-			 * rinfo->oids[rinfo->done] is currently being recovered
-			 * and no need to call prepare_schedule_oid().
+			 * rinfo->oids[rinfo->done .. rinfo->next) is currently
+			 * being recovered and no need to call
+			 * prepare_schedule_oid().
 			 */
 			return true;
 		}
@@ -613,8 +618,8 @@ main_fn bool oid_in_recovery(uint64_t oid)
 		 *
 		 * FIXME: do we need more efficient yet complex data structure?
 		 */
-		if (xlfind(&oid, rinfo->oids + rinfo->done + 1,
-			   rinfo->count - (rinfo->done + 1), oid_cmp))
+		if (xlfind(&oid, rinfo->oids + rinfo->next,
+			   rinfo->count - rinfo->next + 1, oid_cmp))
 			break;
 
 		/*
@@ -666,12 +671,19 @@ static void free_recovery_info(struct recovery_info *rinfo)
 /* Return true if next recovery work is queued. */
 static inline bool run_next_rw(void)
 {
-	struct recovery_info *nrinfo = uatomic_xchg_ptr(&next_rinfo, NULL);
+	struct recovery_info *nrinfo = uatomic_read(&next_rinfo);
 	struct recovery_info *cur = main_thread_get(current_rinfo);
 
 	if (nrinfo == NULL)
 		return false;
 
+	/* Some objects are still in recovery. */
+	if (cur->done < cur->next) {
+		sd_debug("some threads still running, wait for completion");
+		return true;
+	}
+
+	nrinfo = uatomic_xchg_ptr(&next_rinfo, NULL);
 	/*
 	 * When md recovery supersed the reweight or node recovery, we need to
 	 * notify completion.
@@ -743,14 +755,14 @@ static inline bool oid_in_prio_oids(struct recovery_info *rinfo, uint64_t oid)
 /*
  * Schedule prio_oids to be recovered first in FIFO order
  *
- * rw->done is index of the original next object to be recovered and also the
- * number of objects already recovered.
+ * rw->next is index of the original next object to be recovered and also the
+ * number of objects already recovered and being recovered.
  * we just move rw->prio_oids in between:
- *   new_oids = [0..rw->done - 1] + [rw->prio_oids] + [rw->done]
+ *   new_oids = [0..rw->next - 1] + [rw->prio_oids] + [rw->next]
  */
 static inline void finish_schedule_oids(struct recovery_info *rinfo)
 {
-	uint64_t i, nr_recovered = rinfo->done, new_idx;
+	uint64_t i, nr_recovered = rinfo->next, new_idx;
 	uint64_t *new_oids;
 
 	/* If I am the last oid, done */
@@ -763,7 +775,7 @@ static inline void finish_schedule_oids(struct recovery_info *rinfo)
 	       rinfo->nr_prio_oids * sizeof(uint64_t));
 	new_idx = nr_recovered + rinfo->nr_prio_oids;
 
-	for (i = rinfo->done; i < rinfo->count; i++) {
+	for (i = rinfo->next; i < rinfo->count; i++) {
 		if (oid_in_prio_oids(rinfo, rinfo->oids[i]))
 			continue;
 		new_oids[new_idx++] = rinfo->oids[i];
@@ -810,8 +822,13 @@ static void recover_next_object(struct recovery_info *rinfo)
 		return;
 	}
 
+	/* no more objects to be recovered */
+	if (rinfo->next >= rinfo->count)
+		return;
+
 	/* Try recover next object */
 	queue_recovery_work(rinfo);
+	rinfo->next++;
 }
 
 void resume_suspended_recovery(void)
@@ -833,8 +850,20 @@ static void recover_object_main(struct work *work)
 						     base);
 	struct recovery_info *rinfo = main_thread_get(current_rinfo);
 
-	if (run_next_rw())
-		goto out;
+	/* ->oids[done, next] is out of order since finish order is random */
+	if (rinfo->oids[rinfo->done] != row->oid) {
+		uint64_t *p = xlfind(&row->oid, rinfo->oids + rinfo->done,
+				     rinfo->next - rinfo->done, oid_cmp);
+
+		*p = rinfo->oids[rinfo->done];
+		rinfo->oids[rinfo->done] = row->oid;
+	}
+	rinfo->done++;
+
+	if (run_next_rw()) {
+		free_recovery_obj_work(row);
+		return;
+	}
 
 	if (row->stop) {
 		/*
@@ -843,24 +872,23 @@ static void recover_object_main(struct work *work)
 		 * requests
 		 */
 		rinfo->notify_complete = false;
-		finish_recovery(rinfo);
 		sd_debug("recovery is stopped");
-		goto out;
+		goto finish_recovery;
 	}
 
 	wakeup_requests_on_oid(row->oid);
-	rinfo->done++;
 
 	sd_info("object %"PRIx64" is recovered (%"PRIu64"/%"PRIu64")", row->oid,
 		rinfo->done, rinfo->count);
 
-	if (rinfo->done < rinfo->count) {
-		recover_next_object(rinfo);
-		goto out;
-	}
+	if (rinfo->done >= rinfo->count)
+		goto finish_recovery;
 
+	recover_next_object(rinfo);
+	free_recovery_obj_work(row);
+	return;
+finish_recovery:
 	finish_recovery(rinfo);
-out:
 	free_recovery_obj_work(row);
 }
 
@@ -872,6 +900,22 @@ static void finish_object_list(struct work *work)
 						      struct recovery_list_work,
 						      base);
 	struct recovery_info *rinfo = main_thread_get(current_rinfo);
+	/*
+	 * Rationale for multi-threaded recovery:
+	 * 1. If one node is added, we find that all the VMs on other nodes will
+	 *    get noticeably affected until 50% data is transferred to the new
+	 *    node.
+	 * 2. For node failure, we might not have problems of running VM but the
+	 *    recovery process boost will benefit IO operation of VM with less
+	 *    chances to be blocked for write and also improve reliability.
+	 * 3. For disk failure in node, this is similar to adding a node. All
+	 *    the data on the broken disk will be recovered on other disks in
+	 *    this node. Speedy recoery not only improve data reliability but
+	 *    also cause less writing blocking on the lost data.
+	 *
+	 * We choose md_nr_disks() * 2 threads for recovery, no rationale.
+	 */
+	uint32_t nr_threads = md_nr_disks() * 2;
 
 	rinfo->state = RW_RECOVER_OBJ;
 	rinfo->count = rlw->count;
@@ -887,7 +931,8 @@ static void finish_object_list(struct work *work)
 		return;
 	}
 
-	recover_next_object(rinfo);
+	for (uint32_t i = 0; i < nr_threads; i++)
+		recover_next_object(rinfo);
 	return;
 }
 
@@ -1076,7 +1121,7 @@ static void queue_recovery_work(struct recovery_info *rinfo)
 		break;
 	case RW_RECOVER_OBJ:
 		row = xzalloc(sizeof(*row));
-		row->oid = rinfo->oids[rinfo->done];
+		row->oid = rinfo->oids[rinfo->next];
 
 		rw = &row->base;
 		rw->work.fn = recover_object_work;
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 3cf1378..e18dabf 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -444,7 +444,7 @@ static int create_work_queues(void)
 	sys->net_wqueue = create_work_queue("net", WQ_UNLIMITED);
 	sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
 	sys->io_wqueue = create_work_queue("io", WQ_UNLIMITED);
-	sys->recovery_wqueue = create_ordered_work_queue("rw");
+	sys->recovery_wqueue = create_work_queue("rw", WQ_UNLIMITED);
 	sys->deletion_wqueue = create_ordered_work_queue("deletion");
 	sys->block_wqueue = create_ordered_work_queue("block");
 	sys->md_wqueue = create_ordered_work_queue("md");
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index f59dc06..eab784b 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -485,6 +485,7 @@ uint32_t md_get_info(struct sd_md_info *info);
 int md_plug_disks(char *disks);
 int md_unplug_disks(char *disks);
 uint64_t md_get_size(uint64_t *used);
+uint32_t md_nr_disks(void);
 
 /* http.c */
 #ifdef HAVE_HTTP
-- 
1.8.1.2




More information about the sheepdog mailing list