From: Liu Yuan <tailai.ly at taobao.com> It is not thread safe to manipulate rw->oids[] both in main and worker threads. Add a rw->prio_oids[] and let rw->oids[] handling be safe in recover_object_main(), which means no recover_object_work is being executed meanwhile. This also fix a nasty buffer overflow in rw->oids[], which did an insane memmove and clean up code a bit. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/recovery.c | 111 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 71 insertions(+), 40 deletions(-) diff --git a/sheep/recovery.c b/sheep/recovery.c index 591c5d1..b4e2134 100644 --- a/sheep/recovery.c +++ b/sheep/recovery.c @@ -31,9 +31,10 @@ struct recovery_work { int stop; struct work work; - int nr_blocking; int count; uint64_t *oids; + uint64_t *prio_oids; + int nr_prio_oids; struct vnode_info *old_vnodes; struct vnode_info *cur_vnodes; @@ -271,45 +272,22 @@ int is_recovery_init(void) return rw->state == RW_INIT; } -static inline bool schedule_oid(uint64_t oid) +static inline void prepare_schedule_oid(uint64_t oid) { - uint64_t hval, min_hval; - int i; struct recovery_work *rw = recovering_work; + int i; - /* Check if the oid is already scheduled in front */ - for (i = 0; i < rw->nr_blocking; i++) - if (rw->oids[rw->done + i] == oid) - return true; + for (i = 0; i < rw->nr_prio_oids; i++) + if (rw->prio_oids[i] == oid ) + return; - min_hval = fnv_64a_buf(&rw->oids[rw->done + rw->nr_blocking], - sizeof(uint64_t), FNV1A_64_INIT); - hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT); - - if (min_hval <= hval) { - uint64_t *p; - p = bsearch(&oid, rw->oids + rw->done + rw->nr_blocking, - rw->count - rw->done - rw->nr_blocking, sizeof(oid), - obj_cmp); - if (p) { - dprintf("recover the object %" PRIx64 " first\n", oid); - /* The first oid may be processed now */ - if (rw->nr_blocking == 0) - rw->nr_blocking = 1; - /* This oid should be recovered first */ - if (p > rw->oids + rw->done + rw->nr_blocking) { - memmove(rw->oids + rw->done + rw->nr_blocking + 1, - rw->oids + rw->done + rw->nr_blocking, - sizeof(uint64_t) * (p - (rw->oids + rw->done + rw->nr_blocking))); - rw->oids[rw->done + rw->nr_blocking] = oid; - rw->nr_blocking++; - } - return true; - } - } + /* The oid is currently being recovered */ + if (rw->oids[rw->done] == oid) + return; - dprintf("the object %" PRIx64 " is not found\n", oid); - return false; + rw->prio_oids = xrealloc(rw->prio_oids, ++rw->nr_prio_oids); + rw->prio_oids[rw->nr_prio_oids - 1] = oid; + dprintf("%"PRIx64" nr_prio_oids %d\n", oid, rw->nr_prio_oids); } bool oid_in_recovery(uint64_t oid) @@ -331,7 +309,14 @@ bool oid_in_recovery(uint64_t oid) if (rw->state == RW_INIT) return true; - return schedule_oid(oid); + if (!bsearch(&oid, rw->oids + rw->done, rw->count - rw->done, + sizeof(oid), obj_cmp)) { + eprintf("%"PRIx64" is not in the recovery list\n", oid); + return false; + } + + prepare_schedule_oid(oid); + return true; } static void free_recovery_work(struct recovery_work *rw) @@ -368,6 +353,49 @@ static inline void finish_recovery(struct recovery_work *rw) sys->recovered_epoch); } +static inline bool oid_in_prio_oids(struct recovery_work *rw, uint64_t oid) +{ + int i; + + for (i = 0; i < rw->nr_prio_oids; i++) + if (rw->prio_oids[i] == oid) + return true; + return false; +} + +/* + * Schedule prio_oids to be recovered first in FIFO order + * + * rw->done is index of the original next object to be recovered and also the + * number of objects already recovered. + * we just move rw->prio_oids in between: + * new_oids = [0..rw->done - 1] + [rw->prio_oids] + [rw->done] + */ +static inline void finish_schedule_oids(struct recovery_work *rw) +{ + int i, nr_recovered = rw->done, new_idx; + uint64_t *new_oids = xmalloc(1 << 20); /* FIXME */ + + memmove(new_oids, rw->oids, nr_recovered * sizeof(uint64_t)); + memmove(new_oids + nr_recovered, rw->prio_oids, + rw->nr_prio_oids * sizeof(uint64_t)); + new_idx = nr_recovered + rw->nr_prio_oids; + + for (i = rw->done; i < rw->count; i++) { + if (oid_in_prio_oids(rw, rw->oids[i])) + continue; + new_oids[new_idx++] = rw->oids[i]; + } + dprintf("nr_recovered %d, nr_prio_oids %d, count %d, new %d\n", + nr_recovered, rw->nr_prio_oids, rw->count, new_idx); + + free(rw->prio_oids); + free(rw->oids); + rw->oids = new_oids; + rw->prio_oids = NULL; + rw->nr_prio_oids = 0; +} + static void recover_object_main(struct work *work) { struct recovery_work *rw = container_of(work, struct recovery_work, @@ -388,11 +416,12 @@ static void recover_object_main(struct work *work) return; } - if (rw->nr_blocking > 0) - rw->nr_blocking--; resume_wait_obj_requests(rw->oids[rw->done++]); if (rw->done < rw->count) { + if (rw->nr_prio_oids) + finish_schedule_oids(rw); + /* Try recover next object */ queue_work(sys->recovery_wqueue, &rw->work); return; @@ -557,11 +586,13 @@ int start_recovery(struct vnode_info *cur_vnodes, struct vnode_info *old_vnodes) struct recovery_work *rw; rw = zalloc(sizeof(struct recovery_work)); - if (!rw) + if (!rw) { + eprintf("%m\n"); return -1; + } rw->state = RW_INIT; - rw->oids = malloc(1 << 20); /* FIXME */ + rw->oids = xmalloc(1 << 20); /* FIXME */ rw->epoch = sys->epoch; rw->count = 0; -- 1.7.10.2 |