From: Liu Yuan <tailai.ly at taobao.com> - rename __start_recovery to explicitly say it work in worker thread - rename recover_done. It is not really done for most cases when it is called. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/store.c | 142 +++++++++++++++++++++++++++++--------------------------- 1 files changed, 74 insertions(+), 68 deletions(-) diff --git a/sheep/store.c b/sheep/store.c index 8c857f9..d4a231d 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -1406,7 +1406,7 @@ err: static struct recovery_work *suspended_recovery_work; -static void __start_recovery(struct work *work, int idx); +static void do_recovery_work(struct work *work, int idx); static void recover_timer(void *data) { @@ -1499,7 +1499,7 @@ int is_recoverying_oid(uint64_t oid) return 0; } -static void recover_done(struct work *work, int idx) +static void do_recover_main(struct work *work, int idx) { struct recovery_work *rw = container_of(work, struct recovery_work, work); uint64_t oid; @@ -1554,7 +1554,7 @@ static void recover_done(struct work *work, int idx) resume_pending_requests(); } -static int __fill_obj_list(struct sd_node *e, uint32_t epoch, +static int request_obj_list(struct sd_node *e, uint32_t epoch, uint8_t *buf, size_t buf_size) { int fd, ret; @@ -1615,11 +1615,13 @@ static int merge_objlist(uint64_t *list1, int nr_list1, uint64_t *list2, int nr_ return nr_list1; } -static int screen_obj_list(struct sd_vnode *nodes, int nodes_nr, - uint64_t *list, int list_nr, int nr_objs) +static int screen_obj_list(struct recovery_work *rw, uint64_t *list, int list_nr) { int ret, i, cp, idx; struct strbuf buf = STRBUF_INIT; + struct sd_vnode *nodes = rw->cur_vnodes; + int nodes_nr = rw->cur_nr_vnodes; + int nr_objs = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); for (i = 0; i < list_nr; i++) { for (cp = 0; cp < nr_objs; cp++) { @@ -1642,35 +1644,47 @@ static int screen_obj_list(struct sd_vnode *nodes, int nodes_nr, #define MAX_RETRY_CNT 6 -static int fill_obj_list(struct recovery_work *rw, - struct sd_node *old_entry, int old_nr, - struct sd_node *cur_entry, int cur_nr, - int nr_objs) +static int newly_joined(struct sd_node *node, struct recovery_work *rw) { - int i, j; + struct sd_node *old = rw->old_nodes; + int old_nr = rw->old_nr_nodes; + int i; + for (i = 0; i < old_nr; i++) + if (node_cmp(node, old + i) == 0) + break; + + if (i == old_nr) + return 1; + return 0; +} + +static int fill_obj_list(struct recovery_work *rw) +{ + int i; uint8_t *buf = NULL; size_t buf_size = SD_DATA_OBJ_SIZE; /* FIXME */ int retry_cnt; + struct sd_node *cur = rw->cur_nodes; + int cur_nr = rw->cur_nr_nodes; buf = malloc(buf_size); - if (!buf) - goto fail; - + if (!buf) { + eprintf("out of memory\n"); + rw->retry = 1; + return -1; + } for (i = 0; i < cur_nr; i++) { - int nr; - - for (j = 0; j < old_nr; j++) - if (node_cmp(cur_entry + i, old_entry + j) == 0) - break; + int buf_nr; + struct sd_node *node = cur + i; - if (j == old_nr) - /* cur_entry[i] doesn't have a list file */ + if (newly_joined(node, rw)) + /* new node doesn't have a list file */ continue; retry_cnt = 0; retry: - nr = __fill_obj_list(cur_entry + i, rw->epoch, buf, buf_size); - if (nr < 0) { + buf_nr = request_obj_list(node, rw->epoch, buf, buf_size); + if (buf_nr < 0) { retry_cnt++; if (retry_cnt > MAX_RETRY_CNT) { eprintf("failed to get object list\n"); @@ -1686,67 +1700,59 @@ static int fill_obj_list(struct recovery_work *rw, goto retry; } } - nr = screen_obj_list(rw->cur_vnodes, rw->cur_nr_vnodes, (uint64_t *)buf, - nr, nr_objs); - if (nr) - rw->count = merge_objlist(rw->oids, rw->count, (uint64_t *)buf, nr); + buf_nr = screen_obj_list(rw, (uint64_t *)buf, buf_nr); + if (buf_nr) + rw->count = merge_objlist(rw->oids, rw->count, (uint64_t *)buf, buf_nr); } dprintf("%d\n", rw->count); free(buf); return 0; -fail: - free(buf); - rw->retry = 1; - return -1; } -static void __start_recovery(struct work *work, int idx) +/* setup node list and virtual node list */ +static int init_rw(struct recovery_work *rw) { - struct recovery_work *rw = container_of(work, struct recovery_work, work); - uint32_t epoch = rw->epoch; - int nr_objs; + int epoch = rw->epoch; - dprintf("%u\n", epoch); + rw->cur_nr_nodes = epoch_log_read_nr(epoch, (char *)rw->cur_nodes, + sizeof(rw->cur_nodes)); + if (rw->cur_nr_nodes <= 0) { + eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch); + return -1; + } - if (rw->cur_nr_nodes == 0) { - /* setup node list and virtual node list */ - rw->cur_nr_nodes = epoch_log_read(epoch, (char *)rw->cur_nodes, - sizeof(rw->cur_nodes)); - if (rw->cur_nr_nodes <= 0) { - eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch); - goto fail; - } - rw->cur_nr_nodes /= sizeof(struct sd_node); + rw->old_nr_nodes = epoch_log_read_nr(epoch - 1, (char *)rw->old_nodes, + sizeof(rw->old_nodes)); + if (rw->old_nr_nodes <= 0) { + eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch - 1); + return -1; + } + rw->old_nr_vnodes = nodes_to_vnodes(rw->old_nodes, rw->old_nr_nodes, + rw->old_vnodes); + rw->cur_nr_vnodes = nodes_to_vnodes(rw->cur_nodes, rw->cur_nr_nodes, + rw->cur_vnodes); - rw->old_nr_nodes = epoch_log_read(epoch - 1, (char *)rw->old_nodes, - sizeof(rw->old_nodes)); - if (rw->old_nr_nodes <= 0) { - eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch - 1); - goto fail; - } - rw->old_nr_nodes /= sizeof(struct sd_node); + return 0; +} - rw->old_nr_vnodes = nodes_to_vnodes(rw->old_nodes, rw->old_nr_nodes, - rw->old_vnodes); - rw->cur_nr_vnodes = nodes_to_vnodes(rw->cur_nodes, rw->cur_nr_nodes, - rw->cur_vnodes); - } +static void do_recovery_work(struct work *work, int idx) +{ + struct recovery_work *rw = container_of(work, struct recovery_work, work); + + dprintf("%u\n", rw->epoch); if (!sys->nr_sobjs) - goto fail; - nr_objs = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); + return; - if (fill_obj_list(rw, rw->old_nodes, rw->old_nr_nodes, rw->cur_nodes, - rw->cur_nr_nodes, nr_objs) != 0) { + if (rw->cur_nr_nodes == 0) + init_rw(rw); + + if (fill_obj_list(rw) < 0) { eprintf("fatal recovery error\n"); - goto fail; + rw->count = 0; + return; } - - return; -fail: - rw->count = 0; - return; } int start_recovery(uint32_t epoch) @@ -1762,8 +1768,8 @@ int start_recovery(uint32_t epoch) rw->epoch = epoch; rw->count = 0; - rw->work.fn = __start_recovery; - rw->work.done = recover_done; + rw->work.fn = do_recovery_work; + rw->work.done = do_recover_main; if (recovering_work != NULL) { if (next_rw) { -- 1.7.8.rc3 |