[sheepdog] [PATCH v3 03/11] sheep/recovery: guard next_rw with RCU
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Fri Apr 19 10:55:22 CEST 2013
We have to manipulate next_rw atomically because next_rw can be
accessed by both main and worker threads.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
include/util.h | 13 +++++++++++++
sheep/recovery.c | 37 +++++++++++++++++++------------------
2 files changed, 32 insertions(+), 18 deletions(-)
diff --git a/include/util.h b/include/util.h
index dbac48d..1bdaf36 100644
--- a/include/util.h
+++ b/include/util.h
@@ -134,6 +134,19 @@ static inline void uatomic_set_false(uatomic_bool *val)
uatomic_set(&val->val, 0);
}
+/*
+ * uatomic_xchg_ptr - uatomic_xchg for pointers
+ *
+ * Swaps the old value stored at location p with new value given by
+ * val. Returns old value.
+ */
+#define uatomic_xchg_ptr(p, val) \
+({ \
+ uintptr_t ret; \
+ ret = uatomic_xchg((uintptr_t *)(p), (val)); \
+ (typeof(*(p)))ret; \
+})
+
/* colors */
#define TEXT_NORMAL "\033[0m"
#define TEXT_BOLD "\033[1m"
diff --git a/sheep/recovery.c b/sheep/recovery.c
index e3a60f4..0be846c 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -301,15 +301,20 @@ static void free_recovery_work(struct recovery_work *rw)
free(rw);
}
-static inline void run_next_rw(struct recovery_work *rw)
+/* Return true if next recovery work is queued. */
+static inline bool run_next_rw(struct recovery_work *rw)
{
+ struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, NULL);
+
+ if (nrw == NULL)
+ return false;
+
free_recovery_work(rw);
- rw = next_rw;
- next_rw = NULL;
- recovering_work = rw;
+ recovering_work = nrw;
wakeup_all_requests();
- queue_work(sys->recovery_wqueue, &rw->work);
+ queue_work(sys->recovery_wqueue, &nrw->work);
sd_dprintf("recovery work is superseded");
+ return true;
}
static void notify_recovery_completion_work(struct work *work)
@@ -421,10 +426,8 @@ static bool has_scheduled_objects(struct recovery_work *rw)
static void recover_next_object(struct recovery_work *rw)
{
- if (next_rw) {
- run_next_rw(rw);
+ if (run_next_rw(rw))
return;
- }
if (rw->nr_prio_oids)
finish_schedule_oids(rw);
@@ -452,10 +455,8 @@ static void recover_object_main(struct work *work)
{
struct recovery_work *rw = container_of(work, struct recovery_work,
work);
- if (next_rw) {
- run_next_rw(rw);
+ if (run_next_rw(rw))
return;
- }
if (rw->stop) {
/*
@@ -482,10 +483,10 @@ static void finish_object_list(struct work *work)
struct recovery_work *rw = container_of(work, struct recovery_work,
work);
rw->state = RW_RUN;
- if (next_rw) {
- run_next_rw(rw);
+
+ if (run_next_rw(rw))
return;
- }
+
if (!rw->count) {
finish_recovery(rw);
return;
@@ -598,7 +599,7 @@ again:
size_t nr_oids;
struct sd_node *node = cur + i;
- if (next_rw) {
+ if (uatomic_read(&next_rw)) {
sd_dprintf("go to the next recovery");
return;
}
@@ -648,10 +649,10 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
if (recovering_work != NULL) {
/* skip the previous epoch recovery */
- if (next_rw)
- free_recovery_work(next_rw);
+ struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw);
+ if (nrw)
+ free_recovery_work(nrw);
sd_dprintf("recovery skipped");
- next_rw = rw;
/*
* This is necesary to invoke run_next_rw when
--
1.8.1.3.566.gaa39828
More information about the sheepdog
mailing list