[sheepdog] [PATCH 8/9] sheep: split struct recovery_work
MORITA Kazutaka
morita.kazutaka at gmail.com
Mon May 6 19:45:55 CEST 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
Currently, sheepdog recovery code has a race problem because the worker thread
can access recovery_work members which are not expected to be read outside of
the main thread. For example, the worker thread can read rw->oids in
do_recover_object() while the main thread reallocates it in
finish_schedule_oids(). The root cause is that the main thread passes
recovering_work to the worker thread even though the variable is not thread safe.
This patch splits recovery_work into four types to remove the race condition:
- recovery_info : This was recover_work and can be accessed only
in the main thread.
- recovery_list_work : This is used for preparing object list in the worker
thread.
- recovery_obj_work : This is used for recovering objects
in the worker thread.
- recovery_work : This is a super class of recovery_list_work and
recovery_obj_work, and is also used for notifying
recovery completion in the worker thread.
I think this also improves code readability and makes it easier to extend the
current recovery code to multi-threaded in future.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/recovery.c | 371 +++++++++++++++++++++++++++++++++++-------------------
1 file changed, 245 insertions(+), 126 deletions(-)
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 919f597..8962742 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -18,21 +18,51 @@
enum rw_state {
- RW_INIT,
- RW_RUN,
+ RW_PREPARE_LIST, /* the recovery thread is preparing object list */
+ RW_RECOVER_OBJ, /* the thread is recoering objects */
+ RW_NOTIFY_COMPLETION, /* the thread is notifying recovery completion */
};
+/* base structure for the recovery thread */
struct recovery_work {
+ uint32_t epoch;
+
+ struct vnode_info *old_vinfo;
+ struct vnode_info *cur_vinfo;
+
+ struct work work;
+};
+
+/* for preparing lists */
+struct recovery_list_work {
+ struct recovery_work base;
+
+ int count;
+ uint64_t *oids;
+};
+
+/* for recoverying objects */
+struct recovery_obj_work {
+ struct recovery_work base;
+
+ uint64_t oid; /* the object to be recovered */
+ bool stop;
+};
+
+/*
+ * recovery information
+ *
+ * We cannot access the members of this structure outside of the main thread.
+ */
+struct recovery_info {
enum rw_state state;
uint32_t epoch;
uint32_t done;
- bool stop;
- struct work work;
/*
* true when automatic recovery is disabled
- * and recovery process is suspended
+ * and no recovery work is running
*/
bool suspended;
@@ -46,8 +76,10 @@ struct recovery_work {
struct vnode_info *cur_vinfo;
};
-struct recovery_work *next_rw;
-static thread_unsafe(struct recovery_work *) recovering_work;
+struct recovery_info *next_rinfo;
+static thread_unsafe(struct recovery_info *) current_rinfo;
+
+static void queue_recovery_work(struct recovery_info *rinfo);
/* Dynamically grown list buffer default as 4M (2T storage) */
#define DEFAULT_LIST_BUFFER_SIZE (UINT64_C(1) << 22)
@@ -136,10 +168,11 @@ static bool is_invalid_vnode(const struct sd_vnode *entry,
* the routine will try to recovery it from the nodes it has stayed,
* at least, *theoretically* on consistent hash ring.
*/
-static int do_recover_object(struct recovery_work *rw)
+static int do_recover_object(struct recovery_obj_work *row)
{
+ struct recovery_work *rw = &row->base;
struct vnode_info *old;
- uint64_t oid = rw->oids[rw->done];
+ uint64_t oid = row->oid;
uint32_t epoch = rw->epoch, tgt_epoch = rw->epoch;
int nr_copies, ret, i, start = 0;
@@ -178,7 +211,7 @@ again:
/* Succeed */
break;
} else if (SD_RES_OLD_NODE_VER == ret) {
- rw->stop = true;
+ row->stop = true;
goto err;
} else
ret = -1;
@@ -214,61 +247,61 @@ static void recover_object_work(struct work *work)
{
struct recovery_work *rw = container_of(work, struct recovery_work,
work);
- uint64_t oid = rw->oids[rw->done];
+ struct recovery_obj_work *row = container_of(rw,
+ struct recovery_obj_work,
+ base);
+ uint64_t oid = row->oid;
int ret;
- sd_eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64, rw->done,
- rw->count, oid);
-
if (sd_store->exist(oid)) {
sd_dprintf("the object is already recovered");
return;
}
- ret = do_recover_object(rw);
+ ret = do_recover_object(row);
if (ret < 0)
sd_eprintf("failed to recover object %"PRIx64, oid);
}
bool node_in_recovery(void)
{
- return thread_unsafe_get(recovering_work) != NULL;
+ return thread_unsafe_get(current_rinfo) != NULL;
}
static inline void prepare_schedule_oid(uint64_t oid)
{
- struct recovery_work *rw = thread_unsafe_get(recovering_work);
+ struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
int i;
- for (i = 0; i < rw->nr_prio_oids; i++)
- if (rw->prio_oids[i] == oid)
+ for (i = 0; i < rinfo->nr_prio_oids; i++)
+ if (rinfo->prio_oids[i] == oid)
return;
/*
* We need this check because oid might not be recovered.
* Very much unlikely though, but it might happen indeed.
*/
- for (i = 0; i < rw->done; i++)
- if (rw->oids[i] == oid) {
+ for (i = 0; i < rinfo->done; i++)
+ if (rinfo->oids[i] == oid) {
sd_dprintf("%"PRIx64" not recovered, don't schedule it",
oid);
return;
}
/* When recovery is not suspended, oid is currently being recovered */
- if (!rw->suspended && rw->oids[rw->done] == oid)
+ if (!rinfo->suspended && rinfo->oids[rinfo->done] == oid)
return;
- rw->nr_prio_oids++;
- rw->prio_oids = xrealloc(rw->prio_oids,
- rw->nr_prio_oids * sizeof(uint64_t));
- rw->prio_oids[rw->nr_prio_oids - 1] = oid;
- sd_dprintf("%"PRIx64" nr_prio_oids %d", oid, rw->nr_prio_oids);
+ rinfo->nr_prio_oids++;
+ rinfo->prio_oids = xrealloc(rinfo->prio_oids,
+ rinfo->nr_prio_oids * sizeof(uint64_t));
+ rinfo->prio_oids[rinfo->nr_prio_oids - 1] = oid;
+ sd_dprintf("%"PRIx64" nr_prio_oids %d", oid, rinfo->nr_prio_oids);
resume_suspended_recovery();
}
bool oid_in_recovery(uint64_t oid)
{
- struct recovery_work *rw = thread_unsafe_get(recovering_work);
+ struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
int i;
if (!node_in_recovery())
@@ -279,11 +312,11 @@ bool oid_in_recovery(uint64_t oid)
return false;
}
- if (before(rw->epoch, sys->epoch))
+ if (before(rinfo->epoch, sys->epoch))
return true;
/* If we are in preparation of object list, oid is not recovered yet */
- if (rw->state == RW_INIT)
+ if (rinfo->state == RW_PREPARE_LIST)
return true;
/*
@@ -291,15 +324,15 @@ bool oid_in_recovery(uint64_t oid)
*
* FIXME: do we need more efficient yet complex data structure?
*/
- for (i = rw->done; i < rw->count; i++)
- if (rw->oids[i] == oid)
+ for (i = rinfo->done; i < rinfo->count; i++)
+ if (rinfo->oids[i] == oid)
break;
/*
* Newly created object after prepare_object_list() might not be
* in the list
*/
- if (i == rw->count) {
+ if (i == rinfo->count) {
sd_eprintf("%"PRIx64" is not in the recovery list", oid);
return false;
}
@@ -312,26 +345,49 @@ static void free_recovery_work(struct recovery_work *rw)
{
put_vnode_info(rw->cur_vinfo);
put_vnode_info(rw->old_vinfo);
- free(rw->oids);
free(rw);
}
+static void free_recovery_list_work(struct recovery_list_work *rlw)
+{
+ put_vnode_info(rlw->base.cur_vinfo);
+ put_vnode_info(rlw->base.old_vinfo);
+ free(rlw->oids);
+ free(rlw);
+}
+
+static void free_recovery_obj_work(struct recovery_obj_work *row)
+{
+ put_vnode_info(row->base.cur_vinfo);
+ put_vnode_info(row->base.old_vinfo);
+ free(row);
+}
+
+static void free_recovery_info(struct recovery_info *rinfo)
+{
+ put_vnode_info(rinfo->cur_vinfo);
+ put_vnode_info(rinfo->old_vinfo);
+ free(rinfo->oids);
+ free(rinfo->prio_oids);
+ free(rinfo);
+}
+
/* Return true if next recovery work is queued. */
-static inline bool run_next_rw(struct recovery_work *rw)
+static inline bool run_next_rw(void)
{
- struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, NULL);
+ struct recovery_info *nrinfo = uatomic_xchg_ptr(&next_rinfo, NULL);
- if (nrw == NULL)
+ if (nrinfo == NULL)
return false;
- free_recovery_work(rw);
+ free_recovery_info(thread_unsafe_get(current_rinfo));
if (sd_store->update_epoch)
- sd_store->update_epoch(nrw->epoch);
+ sd_store->update_epoch(nrinfo->epoch);
- thread_unsafe_set(recovering_work, nrw);
+ thread_unsafe_set(current_rinfo, nrinfo);
wakeup_all_requests();
- queue_work(sys->recovery_wqueue, &nrw->work);
+ queue_recovery_work(nrinfo);
sd_dprintf("recovery work is superseded");
return true;
}
@@ -361,27 +417,28 @@ static void notify_recovery_completion_main(struct work *work)
free_recovery_work(rw);
}
-static inline void finish_recovery(struct recovery_work *rw)
+static inline void finish_recovery(struct recovery_info *rinfo)
{
- uint32_t recovered_epoch = rw->epoch;
- thread_unsafe_set(recovering_work, NULL);
+ uint32_t recovered_epoch = rinfo->epoch;
+ thread_unsafe_set(current_rinfo, NULL);
wakeup_all_requests();
+ rinfo->state = RW_NOTIFY_COMPLETION;
+
/* notify recovery completion to other nodes */
- rw->work.fn = notify_recovery_completion_work;
- rw->work.done = notify_recovery_completion_main;
- queue_work(sys->recovery_wqueue, &rw->work);
+ queue_recovery_work(rinfo);
+ free_recovery_info(rinfo);
sd_dprintf("recovery complete: new epoch %"PRIu32, recovered_epoch);
}
-static inline bool oid_in_prio_oids(struct recovery_work *rw, uint64_t oid)
+static inline bool oid_in_prio_oids(struct recovery_info *rinfo, uint64_t oid)
{
int i;
- for (i = 0; i < rw->nr_prio_oids; i++)
- if (rw->prio_oids[i] == oid)
+ for (i = 0; i < rinfo->nr_prio_oids; i++)
+ if (rinfo->prio_oids[i] == oid)
return true;
return false;
}
@@ -394,38 +451,38 @@ static inline bool oid_in_prio_oids(struct recovery_work *rw, uint64_t oid)
* we just move rw->prio_oids in between:
* new_oids = [0..rw->done - 1] + [rw->prio_oids] + [rw->done]
*/
-static inline void finish_schedule_oids(struct recovery_work *rw)
+static inline void finish_schedule_oids(struct recovery_info *rinfo)
{
- int i, nr_recovered = rw->done, new_idx;
+ int i, nr_recovered = rinfo->done, new_idx;
uint64_t *new_oids;
/* If I am the last oid, done */
- if (nr_recovered == rw->count - 1)
+ if (nr_recovered == rinfo->count - 1)
goto done;
new_oids = xmalloc(list_buffer_size);
- memcpy(new_oids, rw->oids, nr_recovered * sizeof(uint64_t));
- memcpy(new_oids + nr_recovered, rw->prio_oids,
- rw->nr_prio_oids * sizeof(uint64_t));
- new_idx = nr_recovered + rw->nr_prio_oids;
+ memcpy(new_oids, rinfo->oids, nr_recovered * sizeof(uint64_t));
+ memcpy(new_oids + nr_recovered, rinfo->prio_oids,
+ rinfo->nr_prio_oids * sizeof(uint64_t));
+ new_idx = nr_recovered + rinfo->nr_prio_oids;
- for (i = rw->done; i < rw->count; i++) {
- if (oid_in_prio_oids(rw, rw->oids[i]))
+ for (i = rinfo->done; i < rinfo->count; i++) {
+ if (oid_in_prio_oids(rinfo, rinfo->oids[i]))
continue;
- new_oids[new_idx++] = rw->oids[i];
+ new_oids[new_idx++] = rinfo->oids[i];
}
/* rw->count should eq new_idx, otherwise something is wrong */
sd_dprintf("%snr_recovered %d, nr_prio_oids %d, count %d = new %d",
- rw->count == new_idx ? "" : "WARN: ", nr_recovered,
- rw->nr_prio_oids, rw->count, new_idx);
+ rinfo->count == new_idx ? "" : "WARN: ", nr_recovered,
+ rinfo->nr_prio_oids, rinfo->count, new_idx);
- free(rw->oids);
- rw->oids = new_oids;
+ free(rinfo->oids);
+ rinfo->oids = new_oids;
done:
- free(rw->prio_oids);
- rw->prio_oids = NULL;
- rw->nr_scheduled_prio_oids += rw->nr_prio_oids;
- rw->nr_prio_oids = 0;
+ free(rinfo->prio_oids);
+ rinfo->prio_oids = NULL;
+ rinfo->nr_scheduled_prio_oids += rinfo->nr_prio_oids;
+ rinfo->nr_prio_oids = 0;
}
/*
@@ -435,37 +492,37 @@ done:
* clients. Sheep recovers such objects for availability even when
* automatic object recovery is not enabled.
*/
-static bool has_scheduled_objects(struct recovery_work *rw)
+static bool has_scheduled_objects(struct recovery_info *rinfo)
{
- return rw->done < rw->nr_scheduled_prio_oids;
+ return rinfo->done < rinfo->nr_scheduled_prio_oids;
}
-static void recover_next_object(struct recovery_work *rw)
+static void recover_next_object(struct recovery_info *rinfo)
{
- if (run_next_rw(rw))
+ if (run_next_rw())
return;
- if (rw->nr_prio_oids)
- finish_schedule_oids(rw);
+ if (rinfo->nr_prio_oids)
+ finish_schedule_oids(rinfo);
- if (sys->disable_recovery && !has_scheduled_objects(rw)) {
+ if (sys->disable_recovery && !has_scheduled_objects(rinfo)) {
sd_dprintf("suspended");
- rw->suspended = true;
+ rinfo->suspended = true;
/* suspend until resume_suspended_recovery() is called */
return;
}
/* Try recover next object */
- queue_work(sys->recovery_wqueue, &rw->work);
+ queue_recovery_work(rinfo);
}
void resume_suspended_recovery(void)
{
- struct recovery_work *rw = thread_unsafe_get(recovering_work);
+ struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
- if (rw && rw->suspended) {
- rw->suspended = false;
- recover_next_object(rw);
+ if (rinfo && rinfo->suspended) {
+ rinfo->suspended = false;
+ recover_next_object(rinfo);
}
}
@@ -473,10 +530,15 @@ static void recover_object_main(struct work *work)
{
struct recovery_work *rw = container_of(work, struct recovery_work,
work);
- if (run_next_rw(rw))
- return;
+ struct recovery_obj_work *row = container_of(rw,
+ struct recovery_obj_work,
+ base);
+ struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
- if (rw->stop) {
+ if (run_next_rw())
+ goto out;
+
+ if (row->stop) {
/*
* Stop this recovery process and wait for epoch to be
* lifted and flush wait queue to requeue those
@@ -484,34 +546,49 @@ static void recover_object_main(struct work *work)
*/
wakeup_all_requests();
sd_dprintf("recovery is stopped");
- return;
+ goto out;
}
- wakeup_requests_on_oid(rw->oids[rw->done++]);
- if (rw->done < rw->count) {
- recover_next_object(rw);
- return;
+ wakeup_requests_on_oid(row->oid);
+ rinfo->done++;
+
+ sd_eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64, rinfo->done,
+ rinfo->count, row->oid);
+
+ if (rinfo->done < rinfo->count) {
+ recover_next_object(rinfo);
+ goto out;
}
- finish_recovery(rw);
+ finish_recovery(rinfo);
+out:
+ free_recovery_obj_work(row);
}
static void finish_object_list(struct work *work)
{
struct recovery_work *rw = container_of(work, struct recovery_work,
work);
- rw->state = RW_RUN;
-
- if (run_next_rw(rw))
+ struct recovery_list_work *rlw = container_of(rw,
+ struct recovery_list_work,
+ base);
+ struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
+
+ rinfo->state = RW_RECOVER_OBJ;
+ rinfo->count = rlw->count;
+ rinfo->oids = rlw->oids;
+ rlw->oids = NULL;
+ free_recovery_list_work(rlw);
+
+ if (run_next_rw())
return;
- if (!rw->count) {
- finish_recovery(rw);
+ if (!rinfo->count) {
+ finish_recovery(rinfo);
return;
}
- rw->work.fn = recover_object_work;
- rw->work.done = recover_object_main;
- recover_next_object(rw);
+
+ recover_next_object(rinfo);
return;
}
@@ -553,11 +630,12 @@ retry:
}
/* Screen out objects that don't belong to this node */
-static void screen_object_list(struct recovery_work *rw,
+static void screen_object_list(struct recovery_list_work *rlw,
uint64_t *oids, size_t nr_oids)
{
+ struct recovery_work *rw = &rlw->base;
const struct sd_vnode *vnodes[SD_MAX_COPIES];
- int old_count = rw->count;
+ int old_count = rlw->count;
int nr_objs;
int i, j;
@@ -573,21 +651,22 @@ static void screen_object_list(struct recovery_work *rw,
for (j = 0; j < nr_objs; j++) {
if (!vnode_is_local(vnodes[j]))
continue;
- if (bsearch(&oids[i], rw->oids, old_count,
+ if (bsearch(&oids[i], rlw->oids, old_count,
sizeof(uint64_t), obj_cmp))
continue;
- rw->oids[rw->count++] = oids[i];
+ rlw->oids[rlw->count++] = oids[i];
/* enlarge the list buffer if full */
- if (rw->count == list_buffer_size / sizeof(uint64_t)) {
+ if (rlw->count == list_buffer_size / sizeof(uint64_t)) {
list_buffer_size *= 2;
- rw->oids = xrealloc(rw->oids, list_buffer_size);
+ rlw->oids = xrealloc(rlw->oids,
+ list_buffer_size);
}
break;
}
}
- qsort(rw->oids, rw->count, sizeof(uint64_t), obj_cmp);
+ qsort(rlw->oids, rlw->count, sizeof(uint64_t), obj_cmp);
}
static bool newly_joined(struct sd_node *node, struct recovery_work *rw)
@@ -603,6 +682,9 @@ static void prepare_object_list(struct work *work)
{
struct recovery_work *rw = container_of(work, struct recovery_work,
work);
+ struct recovery_list_work *rlw = container_of(rw,
+ struct recovery_list_work,
+ base);
struct sd_node *cur = rw->cur_vinfo->nodes;
int cur_nr = rw->cur_vinfo->nr_nodes;
int start = random() % cur_nr, i, end = cur_nr;
@@ -616,7 +698,7 @@ again:
size_t nr_oids;
struct sd_node *node = cur + i;
- if (uatomic_read(&next_rw)) {
+ if (uatomic_read(&next_rinfo)) {
sd_dprintf("go to the next recovery");
return;
}
@@ -627,7 +709,7 @@ again:
oids = fetch_object_list(node, rw->epoch, &nr_oids);
if (!oids)
continue;
- screen_object_list(rw, oids, nr_oids);
+ screen_object_list(rlw, oids, nr_oids);
free(oids);
}
@@ -637,7 +719,7 @@ again:
goto again;
}
- sd_dprintf("%d", rw->count);
+ sd_dprintf("%d", rlw->count);
}
static inline bool node_is_gateway_only(void)
@@ -647,31 +729,28 @@ static inline bool node_is_gateway_only(void)
int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
{
- struct recovery_work *rw;
+ struct recovery_info *rinfo;
if (node_is_gateway_only())
goto out;
- rw = xzalloc(sizeof(struct recovery_work));
- rw->state = RW_INIT;
- rw->oids = xmalloc(list_buffer_size);
- rw->epoch = sys->epoch;
- rw->count = 0;
-
- rw->cur_vinfo = grab_vnode_info(cur_vinfo);
- rw->old_vinfo = grab_vnode_info(old_vinfo);
+ rinfo = xzalloc(sizeof(struct recovery_info));
+ rinfo->state = RW_PREPARE_LIST;
+ rinfo->epoch = sys->epoch;
+ rinfo->count = 0;
- rw->work.fn = prepare_object_list;
- rw->work.done = finish_object_list;
+ rinfo->cur_vinfo = grab_vnode_info(cur_vinfo);
+ rinfo->old_vinfo = grab_vnode_info(old_vinfo);
if (sd_store->update_epoch)
- sd_store->update_epoch(rw->epoch);
+ sd_store->update_epoch(rinfo->epoch);
- if (thread_unsafe_get(recovering_work) != NULL) {
+ if (thread_unsafe_get(current_rinfo) != NULL) {
/* skip the previous epoch recovery */
- struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw);
- if (nrw)
- free_recovery_work(nrw);
+ struct recovery_info *nrinfo;
+ nrinfo = uatomic_xchg_ptr(&next_rinfo, rinfo);
+ if (nrinfo)
+ free_recovery_info(nrinfo);
sd_dprintf("recovery skipped");
/*
@@ -680,10 +759,50 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
*/
resume_suspended_recovery();
} else {
- thread_unsafe_set(recovering_work, rw);
- queue_work(sys->recovery_wqueue, &rw->work);
+ thread_unsafe_set(current_rinfo, rinfo);
+ queue_recovery_work(rinfo);
}
out:
wakeup_requests_on_epoch();
return 0;
}
+
+static void queue_recovery_work(struct recovery_info *rinfo)
+{
+ struct recovery_work *rw;
+ struct recovery_list_work *rlw;
+ struct recovery_obj_work *row;
+
+ switch (rinfo->state) {
+ case RW_PREPARE_LIST:
+ rlw = xzalloc(sizeof(*rlw));
+ rlw->oids = xmalloc(list_buffer_size);
+
+ rw = &rlw->base;
+ rw->work.fn = prepare_object_list;
+ rw->work.done = finish_object_list;
+ break;
+ case RW_RECOVER_OBJ:
+ row = xzalloc(sizeof(*row));
+ row->oid = rinfo->oids[rinfo->done];
+
+ rw = &row->base;
+ rw->work.fn = recover_object_work;
+ rw->work.done = recover_object_main;
+ break;
+ case RW_NOTIFY_COMPLETION:
+ rw = xzalloc(sizeof(*rw));
+ rw->work.fn = notify_recovery_completion_work;
+ rw->work.done = notify_recovery_completion_main;
+ break;
+ default:
+ panic("unknow recovery state %d", rinfo->state);
+ break;
+ }
+
+ rw->epoch = rinfo->epoch;
+ rw->cur_vinfo = grab_vnode_info(rinfo->cur_vinfo);
+ rw->old_vinfo = grab_vnode_info(rinfo->old_vinfo);
+
+ queue_work(sys->recovery_wqueue, &rw->work);
+}
--
1.7.9.5
More information about the sheepdog
mailing list