[sheepdog] [PATCH v3 09/10] sheep: split struct recovery_work

MORITA Kazutaka morita.kazutaka at gmail.com
Thu May 9 12:15:43 CEST 2013


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

Currently, sheepdog recovery code has a race problem because the worker thread
can access recovery_work members which are not expected to be read outside of
the main thread.  For example, the worker thread can read rw->oids in
do_recover_object() while the main thread reallocates it in
finish_schedule_oids().  The root cause is that the main thread passes
recovering_work to the worker thread even though the variable is not thread safe.

This patch splits recovery_work into four types to remove the race condition:

 - recovery_info      : This was recover_work and can be accessed only
                        in the main thread.
 - recovery_list_work : This is used for preparing object list in the worker
                        thread.
 - recovery_obj_work  : This is used for recovering objects
                        in the worker thread.
 - recovery_work      : This is a super class of recovery_list_work and
                        recovery_obj_work, and is also used for notifying
                        recovery completion in the worker thread.

I think this also improves code readability and makes it easier to extend the
current recovery code to multi-threaded in future.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/recovery.c |  371 +++++++++++++++++++++++++++++++++++-------------------
 1 file changed, 245 insertions(+), 126 deletions(-)

diff --git a/sheep/recovery.c b/sheep/recovery.c
index c044120..fb13e5a 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -18,21 +18,51 @@
 
 
 enum rw_state {
-	RW_INIT,
-	RW_RUN,
+	RW_PREPARE_LIST, /* the recovery thread is preparing object list */
+	RW_RECOVER_OBJ, /* the thread is recoering objects */
+	RW_NOTIFY_COMPLETION, /* the thread is notifying recovery completion */
 };
 
+/* base structure for the recovery thread */
 struct recovery_work {
+	uint32_t epoch;
+
+	struct vnode_info *old_vinfo;
+	struct vnode_info *cur_vinfo;
+
+	struct work work;
+};
+
+/* for preparing lists */
+struct recovery_list_work {
+	struct recovery_work base;
+
+	int count;
+	uint64_t *oids;
+};
+
+/* for recoverying objects */
+struct recovery_obj_work {
+	struct recovery_work base;
+
+	uint64_t oid; /* the object to be recovered */
+	bool stop;
+};
+
+/*
+ * recovery information
+ *
+ * We cannot access the members of this structure outside of the main thread.
+ */
+struct recovery_info {
 	enum rw_state state;
 
 	uint32_t epoch;
 	uint32_t done;
 
-	bool stop;
-	struct work work;
 	/*
 	 * true when automatic recovery is disabled
-	 * and recovery process is suspended
+	 * and no recovery work is running
 	 */
 	bool suspended;
 
@@ -46,8 +76,10 @@ struct recovery_work {
 	struct vnode_info *cur_vinfo;
 };
 
-struct recovery_work *next_rw;
-static main_thread(struct recovery_work *) recovering_work;
+struct recovery_info *next_rinfo;
+static main_thread(struct recovery_info *) current_rinfo;
+
+static void queue_recovery_work(struct recovery_info *rinfo);
 
 /* Dynamically grown list buffer default as 4M (2T storage) */
 #define DEFAULT_LIST_BUFFER_SIZE (UINT64_C(1) << 22)
@@ -164,10 +196,11 @@ out:
  * the routine will try to recovery it from the nodes it has stayed,
  * at least, *theoretically* on consistent hash ring.
  */
-static int do_recover_object(struct recovery_work *rw)
+static int do_recover_object(struct recovery_obj_work *row)
 {
+	struct recovery_work *rw = &row->base;
 	struct vnode_info *old, *cur;
-	uint64_t oid = rw->oids[rw->done];
+	uint64_t oid = row->oid;
 	uint32_t epoch = rw->epoch, tgt_epoch = rw->epoch;
 	int ret;
 	struct vnode_info *new_old;
@@ -185,7 +218,7 @@ again:
 		/* Succeed */
 		break;
 	case SD_RES_OLD_NODE_VER:
-		rw->stop = true;
+		row->stop = true;
 		break;
 	default:
 		/* No luck, roll back to an older configuration and try again */
@@ -217,61 +250,61 @@ static void recover_object_work(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work,
 						work);
-	uint64_t oid = rw->oids[rw->done];
+	struct recovery_obj_work *row = container_of(rw,
+						     struct recovery_obj_work,
+						     base);
+	uint64_t oid = row->oid;
 	int ret;
 
-	sd_eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64, rw->done,
-		   rw->count, oid);
-
 	if (sd_store->exist(oid)) {
 		sd_dprintf("the object is already recovered");
 		return;
 	}
 
-	ret = do_recover_object(rw);
+	ret = do_recover_object(row);
 	if (ret < 0)
 		sd_eprintf("failed to recover object %"PRIx64, oid);
 }
 
 bool node_in_recovery(void)
 {
-	return main_thread_get(recovering_work) != NULL;
+	return main_thread_get(current_rinfo) != NULL;
 }
 
 static inline void prepare_schedule_oid(uint64_t oid)
 {
-	struct recovery_work *rw = main_thread_get(recovering_work);
+	struct recovery_info *rinfo = main_thread_get(current_rinfo);
 	int i;
 
-	for (i = 0; i < rw->nr_prio_oids; i++)
-		if (rw->prio_oids[i] == oid)
+	for (i = 0; i < rinfo->nr_prio_oids; i++)
+		if (rinfo->prio_oids[i] == oid)
 			return;
 	/*
 	 * We need this check because oid might not be recovered.
 	 * Very much unlikely though, but it might happen indeed.
 	 */
-	for (i = 0; i < rw->done; i++)
-		if (rw->oids[i] == oid) {
+	for (i = 0; i < rinfo->done; i++)
+		if (rinfo->oids[i] == oid) {
 			sd_dprintf("%"PRIx64" not recovered, don't schedule it",
 				   oid);
 			return;
 		}
 	/* When recovery is not suspended, oid is currently being recovered */
-	if (!rw->suspended && rw->oids[rw->done] == oid)
+	if (!rinfo->suspended && rinfo->oids[rinfo->done] == oid)
 		return;
 
-	rw->nr_prio_oids++;
-	rw->prio_oids = xrealloc(rw->prio_oids,
-				 rw->nr_prio_oids * sizeof(uint64_t));
-	rw->prio_oids[rw->nr_prio_oids - 1] = oid;
-	sd_dprintf("%"PRIx64" nr_prio_oids %d", oid, rw->nr_prio_oids);
+	rinfo->nr_prio_oids++;
+	rinfo->prio_oids = xrealloc(rinfo->prio_oids,
+				    rinfo->nr_prio_oids * sizeof(uint64_t));
+	rinfo->prio_oids[rinfo->nr_prio_oids - 1] = oid;
+	sd_dprintf("%"PRIx64" nr_prio_oids %d", oid, rinfo->nr_prio_oids);
 
 	resume_suspended_recovery();
 }
 
 bool oid_in_recovery(uint64_t oid)
 {
-	struct recovery_work *rw = main_thread_get(recovering_work);
+	struct recovery_info *rinfo = main_thread_get(current_rinfo);
 	int i;
 
 	if (!node_in_recovery())
@@ -282,11 +315,11 @@ bool oid_in_recovery(uint64_t oid)
 		return false;
 	}
 
-	if (before(rw->epoch, sys->epoch))
+	if (before(rinfo->epoch, sys->epoch))
 		return true;
 
 	/* If we are in preparation of object list, oid is not recovered yet */
-	if (rw->state == RW_INIT)
+	if (rinfo->state == RW_PREPARE_LIST)
 		return true;
 
 	/*
@@ -294,15 +327,15 @@ bool oid_in_recovery(uint64_t oid)
 	 *
 	 * FIXME: do we need more efficient yet complex data structure?
 	 */
-	for (i = rw->done; i < rw->count; i++)
-		if (rw->oids[i] == oid)
+	for (i = rinfo->done; i < rinfo->count; i++)
+		if (rinfo->oids[i] == oid)
 			break;
 
 	/*
 	 * Newly created object after prepare_object_list() might not be
 	 * in the list
 	 */
-	if (i == rw->count) {
+	if (i == rinfo->count) {
 		sd_eprintf("%"PRIx64" is not in the recovery list", oid);
 		return false;
 	}
@@ -315,26 +348,49 @@ static void free_recovery_work(struct recovery_work *rw)
 {
 	put_vnode_info(rw->cur_vinfo);
 	put_vnode_info(rw->old_vinfo);
-	free(rw->oids);
 	free(rw);
 }
 
+static void free_recovery_list_work(struct recovery_list_work *rlw)
+{
+	put_vnode_info(rlw->base.cur_vinfo);
+	put_vnode_info(rlw->base.old_vinfo);
+	free(rlw->oids);
+	free(rlw);
+}
+
+static void free_recovery_obj_work(struct recovery_obj_work *row)
+{
+	put_vnode_info(row->base.cur_vinfo);
+	put_vnode_info(row->base.old_vinfo);
+	free(row);
+}
+
+static void free_recovery_info(struct recovery_info *rinfo)
+{
+	put_vnode_info(rinfo->cur_vinfo);
+	put_vnode_info(rinfo->old_vinfo);
+	free(rinfo->oids);
+	free(rinfo->prio_oids);
+	free(rinfo);
+}
+
 /* Return true if next recovery work is queued. */
-static inline bool run_next_rw(struct recovery_work *rw)
+static inline bool run_next_rw(void)
 {
-	struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, NULL);
+	struct recovery_info *nrinfo = uatomic_xchg_ptr(&next_rinfo, NULL);
 
-	if (nrw == NULL)
+	if (nrinfo == NULL)
 		return false;
 
-	free_recovery_work(rw);
+	free_recovery_info(main_thread_get(current_rinfo));
 
 	if (sd_store->update_epoch)
-		sd_store->update_epoch(nrw->epoch);
+		sd_store->update_epoch(nrinfo->epoch);
 
-	main_thread_set(recovering_work, nrw);
+	main_thread_set(current_rinfo, nrinfo);
 	wakeup_all_requests();
-	queue_work(sys->recovery_wqueue, &nrw->work);
+	queue_recovery_work(nrinfo);
 	sd_dprintf("recovery work is superseded");
 	return true;
 }
@@ -364,27 +420,28 @@ static void notify_recovery_completion_main(struct work *work)
 	free_recovery_work(rw);
 }
 
-static inline void finish_recovery(struct recovery_work *rw)
+static inline void finish_recovery(struct recovery_info *rinfo)
 {
-	uint32_t recovered_epoch = rw->epoch;
-	main_thread_set(recovering_work, NULL);
+	uint32_t recovered_epoch = rinfo->epoch;
+	main_thread_set(current_rinfo, NULL);
 
 	wakeup_all_requests();
 
+	rinfo->state = RW_NOTIFY_COMPLETION;
+
 	/* notify recovery completion to other nodes */
-	rw->work.fn = notify_recovery_completion_work;
-	rw->work.done = notify_recovery_completion_main;
-	queue_work(sys->recovery_wqueue, &rw->work);
+	queue_recovery_work(rinfo);
+	free_recovery_info(rinfo);
 
 	sd_dprintf("recovery complete: new epoch %"PRIu32, recovered_epoch);
 }
 
-static inline bool oid_in_prio_oids(struct recovery_work *rw, uint64_t oid)
+static inline bool oid_in_prio_oids(struct recovery_info *rinfo, uint64_t oid)
 {
 	int i;
 
-	for (i = 0; i < rw->nr_prio_oids; i++)
-		if (rw->prio_oids[i] == oid)
+	for (i = 0; i < rinfo->nr_prio_oids; i++)
+		if (rinfo->prio_oids[i] == oid)
 			return true;
 	return false;
 }
@@ -397,38 +454,38 @@ static inline bool oid_in_prio_oids(struct recovery_work *rw, uint64_t oid)
  * we just move rw->prio_oids in between:
  *   new_oids = [0..rw->done - 1] + [rw->prio_oids] + [rw->done]
  */
-static inline void finish_schedule_oids(struct recovery_work *rw)
+static inline void finish_schedule_oids(struct recovery_info *rinfo)
 {
-	int i, nr_recovered = rw->done, new_idx;
+	int i, nr_recovered = rinfo->done, new_idx;
 	uint64_t *new_oids;
 
 	/* If I am the last oid, done */
-	if (nr_recovered == rw->count - 1)
+	if (nr_recovered == rinfo->count - 1)
 		goto done;
 
 	new_oids = xmalloc(list_buffer_size);
-	memcpy(new_oids, rw->oids, nr_recovered * sizeof(uint64_t));
-	memcpy(new_oids + nr_recovered, rw->prio_oids,
-	       rw->nr_prio_oids * sizeof(uint64_t));
-	new_idx = nr_recovered + rw->nr_prio_oids;
+	memcpy(new_oids, rinfo->oids, nr_recovered * sizeof(uint64_t));
+	memcpy(new_oids + nr_recovered, rinfo->prio_oids,
+	       rinfo->nr_prio_oids * sizeof(uint64_t));
+	new_idx = nr_recovered + rinfo->nr_prio_oids;
 
-	for (i = rw->done; i < rw->count; i++) {
-		if (oid_in_prio_oids(rw, rw->oids[i]))
+	for (i = rinfo->done; i < rinfo->count; i++) {
+		if (oid_in_prio_oids(rinfo, rinfo->oids[i]))
 			continue;
-		new_oids[new_idx++] = rw->oids[i];
+		new_oids[new_idx++] = rinfo->oids[i];
 	}
 	/* rw->count should eq new_idx, otherwise something is wrong */
 	sd_dprintf("%snr_recovered %d, nr_prio_oids %d, count %d = new %d",
-		   rw->count == new_idx ? "" : "WARN: ", nr_recovered,
-		   rw->nr_prio_oids, rw->count, new_idx);
+		   rinfo->count == new_idx ? "" : "WARN: ", nr_recovered,
+		   rinfo->nr_prio_oids, rinfo->count, new_idx);
 
-	free(rw->oids);
-	rw->oids = new_oids;
+	free(rinfo->oids);
+	rinfo->oids = new_oids;
 done:
-	free(rw->prio_oids);
-	rw->prio_oids = NULL;
-	rw->nr_scheduled_prio_oids += rw->nr_prio_oids;
-	rw->nr_prio_oids = 0;
+	free(rinfo->prio_oids);
+	rinfo->prio_oids = NULL;
+	rinfo->nr_scheduled_prio_oids += rinfo->nr_prio_oids;
+	rinfo->nr_prio_oids = 0;
 }
 
 /*
@@ -438,37 +495,37 @@ done:
  * clients.  Sheep recovers such objects for availability even when
  * automatic object recovery is not enabled.
  */
-static bool has_scheduled_objects(struct recovery_work *rw)
+static bool has_scheduled_objects(struct recovery_info *rinfo)
 {
-	return rw->done < rw->nr_scheduled_prio_oids;
+	return rinfo->done < rinfo->nr_scheduled_prio_oids;
 }
 
-static void recover_next_object(struct recovery_work *rw)
+static void recover_next_object(struct recovery_info *rinfo)
 {
-	if (run_next_rw(rw))
+	if (run_next_rw())
 		return;
 
-	if (rw->nr_prio_oids)
-		finish_schedule_oids(rw);
+	if (rinfo->nr_prio_oids)
+		finish_schedule_oids(rinfo);
 
-	if (sys->disable_recovery && !has_scheduled_objects(rw)) {
+	if (sys->disable_recovery && !has_scheduled_objects(rinfo)) {
 		sd_dprintf("suspended");
-		rw->suspended = true;
+		rinfo->suspended = true;
 		/* suspend until resume_suspended_recovery() is called */
 		return;
 	}
 
 	/* Try recover next object */
-	queue_work(sys->recovery_wqueue, &rw->work);
+	queue_recovery_work(rinfo);
 }
 
 void resume_suspended_recovery(void)
 {
-	struct recovery_work *rw = main_thread_get(recovering_work);
+	struct recovery_info *rinfo = main_thread_get(current_rinfo);
 
-	if (rw && rw->suspended) {
-		rw->suspended = false;
-		recover_next_object(rw);
+	if (rinfo && rinfo->suspended) {
+		rinfo->suspended = false;
+		recover_next_object(rinfo);
 	}
 }
 
@@ -476,10 +533,15 @@ static void recover_object_main(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work,
 						work);
-	if (run_next_rw(rw))
-		return;
+	struct recovery_obj_work *row = container_of(rw,
+						     struct recovery_obj_work,
+						     base);
+	struct recovery_info *rinfo = main_thread_get(current_rinfo);
 
-	if (rw->stop) {
+	if (run_next_rw())
+		goto out;
+
+	if (row->stop) {
 		/*
 		 * Stop this recovery process and wait for epoch to be
 		 * lifted and flush wait queue to requeue those
@@ -487,34 +549,49 @@ static void recover_object_main(struct work *work)
 		 */
 		wakeup_all_requests();
 		sd_dprintf("recovery is stopped");
-		return;
+		goto out;
 	}
 
-	wakeup_requests_on_oid(rw->oids[rw->done++]);
-	if (rw->done < rw->count) {
-		recover_next_object(rw);
-		return;
+	wakeup_requests_on_oid(row->oid);
+	rinfo->done++;
+
+	sd_eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64, rinfo->done,
+		   rinfo->count, row->oid);
+
+	if (rinfo->done < rinfo->count) {
+		recover_next_object(rinfo);
+		goto out;
 	}
 
-	finish_recovery(rw);
+	finish_recovery(rinfo);
+out:
+	free_recovery_obj_work(row);
 }
 
 static void finish_object_list(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work,
 						work);
-	rw->state = RW_RUN;
-
-	if (run_next_rw(rw))
+	struct recovery_list_work *rlw = container_of(rw,
+						      struct recovery_list_work,
+						      base);
+	struct recovery_info *rinfo = main_thread_get(current_rinfo);
+
+	rinfo->state = RW_RECOVER_OBJ;
+	rinfo->count = rlw->count;
+	rinfo->oids = rlw->oids;
+	rlw->oids = NULL;
+	free_recovery_list_work(rlw);
+
+	if (run_next_rw())
 		return;
 
-	if (!rw->count) {
-		finish_recovery(rw);
+	if (!rinfo->count) {
+		finish_recovery(rinfo);
 		return;
 	}
-	rw->work.fn = recover_object_work;
-	rw->work.done = recover_object_main;
-	recover_next_object(rw);
+
+	recover_next_object(rinfo);
 	return;
 }
 
@@ -556,11 +633,12 @@ retry:
 }
 
 /* Screen out objects that don't belong to this node */
-static void screen_object_list(struct recovery_work *rw,
+static void screen_object_list(struct recovery_list_work *rlw,
 			       uint64_t *oids, size_t nr_oids)
 {
+	struct recovery_work *rw = &rlw->base;
 	const struct sd_vnode *vnodes[SD_MAX_COPIES];
-	int old_count = rw->count;
+	int old_count = rlw->count;
 	int nr_objs;
 	int i, j;
 
@@ -576,21 +654,22 @@ static void screen_object_list(struct recovery_work *rw,
 		for (j = 0; j < nr_objs; j++) {
 			if (!vnode_is_local(vnodes[j]))
 				continue;
-			if (bsearch(&oids[i], rw->oids, old_count,
+			if (bsearch(&oids[i], rlw->oids, old_count,
 				    sizeof(uint64_t), obj_cmp))
 				continue;
 
-			rw->oids[rw->count++] = oids[i];
+			rlw->oids[rlw->count++] = oids[i];
 			/* enlarge the list buffer if full */
-			if (rw->count == list_buffer_size / sizeof(uint64_t)) {
+			if (rlw->count == list_buffer_size / sizeof(uint64_t)) {
 				list_buffer_size *= 2;
-				rw->oids = xrealloc(rw->oids, list_buffer_size);
+				rlw->oids = xrealloc(rlw->oids,
+						     list_buffer_size);
 			}
 			break;
 		}
 	}
 
-	qsort(rw->oids, rw->count, sizeof(uint64_t), obj_cmp);
+	qsort(rlw->oids, rlw->count, sizeof(uint64_t), obj_cmp);
 }
 
 static bool newly_joined(struct sd_node *node, struct recovery_work *rw)
@@ -606,6 +685,9 @@ static void prepare_object_list(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work,
 						work);
+	struct recovery_list_work *rlw = container_of(rw,
+						      struct recovery_list_work,
+						      base);
 	struct sd_node *cur = rw->cur_vinfo->nodes;
 	int cur_nr = rw->cur_vinfo->nr_nodes;
 	int start = random() % cur_nr, i, end = cur_nr;
@@ -619,7 +701,7 @@ again:
 		size_t nr_oids;
 		struct sd_node *node = cur + i;
 
-		if (uatomic_read(&next_rw)) {
+		if (uatomic_read(&next_rinfo)) {
 			sd_dprintf("go to the next recovery");
 			return;
 		}
@@ -630,7 +712,7 @@ again:
 		oids = fetch_object_list(node, rw->epoch, &nr_oids);
 		if (!oids)
 			continue;
-		screen_object_list(rw, oids, nr_oids);
+		screen_object_list(rlw, oids, nr_oids);
 		free(oids);
 	}
 
@@ -640,7 +722,7 @@ again:
 		goto again;
 	}
 
-	sd_dprintf("%d", rw->count);
+	sd_dprintf("%d", rlw->count);
 }
 
 static inline bool node_is_gateway_only(void)
@@ -650,31 +732,28 @@ static inline bool node_is_gateway_only(void)
 
 int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
 {
-	struct recovery_work *rw;
+	struct recovery_info *rinfo;
 
 	if (node_is_gateway_only())
 		goto out;
 
-	rw = xzalloc(sizeof(struct recovery_work));
-	rw->state = RW_INIT;
-	rw->oids = xmalloc(list_buffer_size);
-	rw->epoch = sys->epoch;
-	rw->count = 0;
-
-	rw->cur_vinfo = grab_vnode_info(cur_vinfo);
-	rw->old_vinfo = grab_vnode_info(old_vinfo);
+	rinfo = xzalloc(sizeof(struct recovery_info));
+	rinfo->state = RW_PREPARE_LIST;
+	rinfo->epoch = sys->epoch;
+	rinfo->count = 0;
 
-	rw->work.fn = prepare_object_list;
-	rw->work.done = finish_object_list;
+	rinfo->cur_vinfo = grab_vnode_info(cur_vinfo);
+	rinfo->old_vinfo = grab_vnode_info(old_vinfo);
 
 	if (sd_store->update_epoch)
-		sd_store->update_epoch(rw->epoch);
+		sd_store->update_epoch(rinfo->epoch);
 
-	if (main_thread_get(recovering_work) != NULL) {
+	if (main_thread_get(current_rinfo) != NULL) {
 		/* skip the previous epoch recovery */
-		struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw);
-		if (nrw)
-			free_recovery_work(nrw);
+		struct recovery_info *nrinfo;
+		nrinfo = uatomic_xchg_ptr(&next_rinfo, rinfo);
+		if (nrinfo)
+			free_recovery_info(nrinfo);
 		sd_dprintf("recovery skipped");
 
 		/*
@@ -683,10 +762,50 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
 		 */
 		resume_suspended_recovery();
 	} else {
-		main_thread_set(recovering_work, rw);
-		queue_work(sys->recovery_wqueue, &rw->work);
+		main_thread_set(current_rinfo, rinfo);
+		queue_recovery_work(rinfo);
 	}
 out:
 	wakeup_requests_on_epoch();
 	return 0;
 }
+
+static void queue_recovery_work(struct recovery_info *rinfo)
+{
+	struct recovery_work *rw;
+	struct recovery_list_work *rlw;
+	struct recovery_obj_work *row;
+
+	switch (rinfo->state) {
+	case RW_PREPARE_LIST:
+		rlw = xzalloc(sizeof(*rlw));
+		rlw->oids = xmalloc(list_buffer_size);
+
+		rw = &rlw->base;
+		rw->work.fn = prepare_object_list;
+		rw->work.done = finish_object_list;
+		break;
+	case RW_RECOVER_OBJ:
+		row = xzalloc(sizeof(*row));
+		row->oid = rinfo->oids[rinfo->done];
+
+		rw = &row->base;
+		rw->work.fn = recover_object_work;
+		rw->work.done = recover_object_main;
+		break;
+	case RW_NOTIFY_COMPLETION:
+		rw = xzalloc(sizeof(*rw));
+		rw->work.fn = notify_recovery_completion_work;
+		rw->work.done = notify_recovery_completion_main;
+		break;
+	default:
+		panic("unknow recovery state %d", rinfo->state);
+		break;
+	}
+
+	rw->epoch = rinfo->epoch;
+	rw->cur_vinfo = grab_vnode_info(rinfo->cur_vinfo);
+	rw->old_vinfo = grab_vnode_info(rinfo->old_vinfo);
+
+	queue_work(sys->recovery_wqueue, &rw->work);
+}
-- 
1.7.9.5




More information about the sheepdog mailing list