[sheepdog] [PATCH v3] sheep/recovery: multi-threading recovery process
Liu Yuan
namei.unix at gmail.com
Tue Feb 4 08:14:44 CET 2014
Rationale for multi-threaded recovery:
1. If one node is added, we find that all the VMs on other nodes will get
noticeably affected until 50% data is transferred to the new node.
2. For node failure, we might not have problems of running VM but the
recovery process boost will benefit IO operation of VM with less
chances to be blocked for write and also improve reliability.
3. For disk failure in node, this is similar to adding a node. All
the data on the broken disk will be recovered on other disks in
this node. Speedy recoery not only improve data reliability but
also cause less writing blocking on the lost data.
Our oid scheduling algorithm is intact and simply add multi-threading onto top
of current recovery algorithm with minimal changes.
- we still have ->oids array to denote oids to be recovered
- we start up 2 * nr_disks threads for recovery
- the tricky part is that we need to wait all the running threads to
completion before start next recovery events for multiple nodes/disks events
This patch passes "./check -g md -md" on my local box
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
sheep/md.c | 9 ++++--
sheep/recovery.c | 82 ++++++++++++++++++++++++++++++++++++++++++------------
sheep/sheep.c | 2 +-
sheep/sheep_priv.h | 1 +
4 files changed, 73 insertions(+), 21 deletions(-)
diff --git a/sheep/md.c b/sheep/md.c
index 0a2903f..e7e8ec2 100644
--- a/sheep/md.c
+++ b/sheep/md.c
@@ -43,9 +43,9 @@ static struct md md = {
.lock = SD_RW_LOCK_INITIALIZER,
};
-static inline int nr_online_disks(void)
+static inline uint32_t nr_online_disks(void)
{
- int nr;
+ uint32_t nr;
sd_read_lock(&md.lock);
nr = md.nr_disks;
@@ -683,3 +683,8 @@ uint64_t md_get_size(uint64_t *used)
return fsize + *used;
}
+
+uint32_t md_nr_disks(void)
+{
+ return nr_online_disks();
+}
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 50b6c58..525d93b 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -54,6 +54,8 @@ struct recovery_info {
uint32_t epoch;
uint32_t tgt_epoch;
uint64_t done;
+ uint64_t next;
+ uint32_t running_threads_nr;
/*
* true when automatic recovery is disabled
@@ -507,6 +509,8 @@ static int do_recover_object(struct recovery_obj_work *row)
{
uint64_t oid = row->oid;
+ sd_debug("try recover object %"PRIx64, oid);
+
if (is_erasure_oid(oid))
return recover_erasure_object(row);
else
@@ -597,7 +601,8 @@ main_fn bool oid_in_recovery(uint64_t oid)
return false;
}
- if (rinfo->oids[rinfo->done] == oid) {
+ if (xlfind(&oid, rinfo->oids + rinfo->done,
+ rinfo->next - rinfo->done, oid_cmp)) {
if (rinfo->suspended)
break;
/*
@@ -666,12 +671,19 @@ static void free_recovery_info(struct recovery_info *rinfo)
/* Return true if next recovery work is queued. */
static inline bool run_next_rw(void)
{
- struct recovery_info *nrinfo = uatomic_xchg_ptr(&next_rinfo, NULL);
+ struct recovery_info *nrinfo = uatomic_read(&next_rinfo);
struct recovery_info *cur = main_thread_get(current_rinfo);
if (nrinfo == NULL)
return false;
+ sd_debug("running threads nr %"PRIu32, cur->running_threads_nr);
+ if (cur->running_threads_nr > 1) {
+ cur->running_threads_nr--;
+ return true;
+ }
+
+ nrinfo = uatomic_xchg_ptr(&next_rinfo, NULL);
/*
* When md recovery supersed the reweight or node recovery, we need to
* notify completion.
@@ -743,14 +755,14 @@ static inline bool oid_in_prio_oids(struct recovery_info *rinfo, uint64_t oid)
/*
* Schedule prio_oids to be recovered first in FIFO order
*
- * rw->done is index of the original next object to be recovered and also the
- * number of objects already recovered.
+ * rw->next is index of the original next object to be recovered and also the
+ * number of objects already recovered and being recovered.
* we just move rw->prio_oids in between:
- * new_oids = [0..rw->done - 1] + [rw->prio_oids] + [rw->done]
+ * new_oids = [0..rw->next - 1] + [rw->prio_oids] + [rw->next]
*/
static inline void finish_schedule_oids(struct recovery_info *rinfo)
{
- uint64_t i, nr_recovered = rinfo->done, new_idx;
+ uint64_t i, nr_recovered = rinfo->next, new_idx;
uint64_t *new_oids;
/* If I am the last oid, done */
@@ -763,7 +775,7 @@ static inline void finish_schedule_oids(struct recovery_info *rinfo)
rinfo->nr_prio_oids * sizeof(uint64_t));
new_idx = nr_recovered + rinfo->nr_prio_oids;
- for (i = rinfo->done; i < rinfo->count; i++) {
+ for (i = rinfo->next; i < rinfo->count; i++) {
if (oid_in_prio_oids(rinfo, rinfo->oids[i]))
continue;
new_oids[new_idx++] = rinfo->oids[i];
@@ -810,8 +822,14 @@ static void recover_next_object(struct recovery_info *rinfo)
return;
}
+ /* no more objects to be recovered */
+ if (rinfo->next >= rinfo->count)
+ return;
+
/* Try recover next object */
queue_recovery_work(rinfo);
+ rinfo->next++;
+ rinfo->running_threads_nr++;
}
void resume_suspended_recovery(void)
@@ -833,8 +851,10 @@ static void recover_object_main(struct work *work)
base);
struct recovery_info *rinfo = main_thread_get(current_rinfo);
- if (run_next_rw())
- goto out;
+ if (run_next_rw()) {
+ free_recovery_obj_work(row);
+ return;
+ }
if (row->stop) {
/*
@@ -843,24 +863,33 @@ static void recover_object_main(struct work *work)
* requests
*/
rinfo->notify_complete = false;
- finish_recovery(rinfo);
sd_debug("recovery is stopped");
- goto out;
+ goto finish_recovery;
}
wakeup_requests_on_oid(row->oid);
+ /* ->oids[done, next] is out of order since finish order is random */
+ if (rinfo->oids[rinfo->done] != row->oid) {
+ uint64_t *p = xlfind(&row->oid, rinfo->oids + rinfo->done,
+ rinfo->next - rinfo->done, oid_cmp);
+
+ *p = rinfo->oids[rinfo->done];
+ rinfo->oids[rinfo->done] = row->oid;
+ }
rinfo->done++;
sd_info("object %"PRIx64" is recovered (%"PRIu64"/%"PRIu64")", row->oid,
rinfo->done, rinfo->count);
- if (rinfo->done < rinfo->count) {
- recover_next_object(rinfo);
- goto out;
- }
+ if (rinfo->done >= rinfo->count)
+ goto finish_recovery;
+ recover_next_object(rinfo);
+ rinfo->running_threads_nr--;
+ free_recovery_obj_work(row);
+ return;
+finish_recovery:
finish_recovery(rinfo);
-out:
free_recovery_obj_work(row);
}
@@ -872,6 +901,22 @@ static void finish_object_list(struct work *work)
struct recovery_list_work,
base);
struct recovery_info *rinfo = main_thread_get(current_rinfo);
+ /*
+ * Rationale for multi-threaded recovery:
+ * 1. If one node is added, we find that all the VMs on other nodes will
+ * get noticeably affected until 50% data is transferred to the new
+ * node.
+ * 2. For node failure, we might not have problems of running VM but the
+ * recovery process boost will benefit IO operation of VM with less
+ * chances to be blocked for write and also improve reliability.
+ * 3. For disk failure in node, this is similar to adding a node. All
+ * the data on the broken disk will be recovered on other disks in
+ * this node. Speedy recoery not only improve data reliability but
+ * also cause less writing blocking on the lost data.
+ *
+ * We choose md_nr_disks() * 2 threads for recovery, no rationale.
+ */
+ uint32_t nr_threads = md_nr_disks() * 2;
rinfo->state = RW_RECOVER_OBJ;
rinfo->count = rlw->count;
@@ -887,7 +932,8 @@ static void finish_object_list(struct work *work)
return;
}
- recover_next_object(rinfo);
+ for (uint32_t i = 0; i < nr_threads; i++)
+ recover_next_object(rinfo);
return;
}
@@ -1076,7 +1122,7 @@ static void queue_recovery_work(struct recovery_info *rinfo)
break;
case RW_RECOVER_OBJ:
row = xzalloc(sizeof(*row));
- row->oid = rinfo->oids[rinfo->done];
+ row->oid = rinfo->oids[rinfo->next];
rw = &row->base;
rw->work.fn = recover_object_work;
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 3cf1378..e18dabf 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -444,7 +444,7 @@ static int create_work_queues(void)
sys->net_wqueue = create_work_queue("net", WQ_UNLIMITED);
sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
sys->io_wqueue = create_work_queue("io", WQ_UNLIMITED);
- sys->recovery_wqueue = create_ordered_work_queue("rw");
+ sys->recovery_wqueue = create_work_queue("rw", WQ_UNLIMITED);
sys->deletion_wqueue = create_ordered_work_queue("deletion");
sys->block_wqueue = create_ordered_work_queue("block");
sys->md_wqueue = create_ordered_work_queue("md");
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 64bd457..2fc32b3 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -485,6 +485,7 @@ uint32_t md_get_info(struct sd_md_info *info);
int md_plug_disks(char *disks);
int md_unplug_disks(char *disks);
uint64_t md_get_size(uint64_t *used);
+uint32_t md_nr_disks(void);
/* http.c */
#ifdef HAVE_HTTP
--
1.8.1.2
More information about the sheepdog
mailing list