[sheepdog] [PATCH v3 03/11] sheep/recovery: guard next_rw with RCU

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Fri Apr 19 10:55:22 CEST 2013


We have to manipulate next_rw atomically because next_rw can be
accessed by both main and worker threads.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 include/util.h   | 13 +++++++++++++
 sheep/recovery.c | 37 +++++++++++++++++++------------------
 2 files changed, 32 insertions(+), 18 deletions(-)

diff --git a/include/util.h b/include/util.h
index dbac48d..1bdaf36 100644
--- a/include/util.h
+++ b/include/util.h
@@ -134,6 +134,19 @@ static inline void uatomic_set_false(uatomic_bool *val)
 	uatomic_set(&val->val, 0);
 }
 
+/*
+ * uatomic_xchg_ptr - uatomic_xchg for pointers
+ *
+ * Swaps the old value stored at location p with new value given by
+ * val.  Returns old value.
+ */
+#define uatomic_xchg_ptr(p, val)			\
+({							\
+	uintptr_t ret;					\
+	ret = uatomic_xchg((uintptr_t *)(p), (val));	\
+	(typeof(*(p)))ret;				\
+})
+
 /* colors */
 #define TEXT_NORMAL         "\033[0m"
 #define TEXT_BOLD           "\033[1m"
diff --git a/sheep/recovery.c b/sheep/recovery.c
index e3a60f4..0be846c 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -301,15 +301,20 @@ static void free_recovery_work(struct recovery_work *rw)
 	free(rw);
 }
 
-static inline void run_next_rw(struct recovery_work *rw)
+/* Return true if next recovery work is queued. */
+static inline bool run_next_rw(struct recovery_work *rw)
 {
+	struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, NULL);
+
+	if (nrw == NULL)
+		return false;
+
 	free_recovery_work(rw);
-	rw = next_rw;
-	next_rw = NULL;
-	recovering_work = rw;
+	recovering_work = nrw;
 	wakeup_all_requests();
-	queue_work(sys->recovery_wqueue, &rw->work);
+	queue_work(sys->recovery_wqueue, &nrw->work);
 	sd_dprintf("recovery work is superseded");
+	return true;
 }
 
 static void notify_recovery_completion_work(struct work *work)
@@ -421,10 +426,8 @@ static bool has_scheduled_objects(struct recovery_work *rw)
 
 static void recover_next_object(struct recovery_work *rw)
 {
-	if (next_rw) {
-		run_next_rw(rw);
+	if (run_next_rw(rw))
 		return;
-	}
 
 	if (rw->nr_prio_oids)
 		finish_schedule_oids(rw);
@@ -452,10 +455,8 @@ static void recover_object_main(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work,
 						work);
-	if (next_rw) {
-		run_next_rw(rw);
+	if (run_next_rw(rw))
 		return;
-	}
 
 	if (rw->stop) {
 		/*
@@ -482,10 +483,10 @@ static void finish_object_list(struct work *work)
 	struct recovery_work *rw = container_of(work, struct recovery_work,
 						work);
 	rw->state = RW_RUN;
-	if (next_rw) {
-		run_next_rw(rw);
+
+	if (run_next_rw(rw))
 		return;
-	}
+
 	if (!rw->count) {
 		finish_recovery(rw);
 		return;
@@ -598,7 +599,7 @@ again:
 		size_t nr_oids;
 		struct sd_node *node = cur + i;
 
-		if (next_rw) {
+		if (uatomic_read(&next_rw)) {
 			sd_dprintf("go to the next recovery");
 			return;
 		}
@@ -648,10 +649,10 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
 
 	if (recovering_work != NULL) {
 		/* skip the previous epoch recovery */
-		if (next_rw)
-			free_recovery_work(next_rw);
+		struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw);
+		if (nrw)
+			free_recovery_work(nrw);
 		sd_dprintf("recovery skipped");
-		next_rw = rw;
 
 		/*
 		 * This is necesary to invoke run_next_rw when
-- 
1.8.1.3.566.gaa39828




More information about the sheepdog mailing list