From: Liu Yuan <tailai.ly at taobao.com> Current recovery logic is elusive and it is not easy to be understood. I hope this work would ease the headache. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/store.c | 356 ++++++++++++++++++++++++++++++-------------------------- 1 files changed, 191 insertions(+), 165 deletions(-) diff --git a/sheep/store.c b/sheep/store.c index fed74f8..409840c 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -1139,81 +1139,80 @@ static int find_tgt_node(struct sheepdog_vnode_list_entry *old_entry, return -1; } -static int __recover_one(struct recovery_work *rw, - struct sheepdog_vnode_list_entry *_old_entry, - int old_nr, int old_copies, - struct sheepdog_vnode_list_entry *_cur_entry, - int cur_nr, int cur_copies, int cur_idx, - int copy_idx, uint32_t epoch, uint32_t tgt_epoch, - uint64_t oid, char *buf, int buf_len) +static void *alloc_buffer_for(uint64_t oid) +{ + void *buf = NULL; + + if (is_vdi_obj(oid)) + buf = xmalloc(sizeof(struct sheepdog_inode)); + else if (is_vdi_attr_obj(oid)) + buf = xmalloc(SD_MAX_VDI_ATTR_VALUE_LEN); + else if (is_data_obj(oid)) + buf = valloc(SD_DATA_OBJ_SIZE); + else + buf = xmalloc(SD_DATA_OBJ_SIZE); + + return buf; +} + +static void *get_vnodes_from_epoch(int epoch, int *nr) +{ + int nodes_nr, len = sizeof(struct sheepdog_vnode_list_entry) * SD_MAX_VNODES; + struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; + void *buf = xmalloc(len); + + nodes_nr = epoch_log_read_nr(epoch, (void *)nodes, ARRAY_SIZE(nodes)); + if (nodes_nr < 0) { + nodes_nr = epoch_log_read_remote(epoch, (void *)nodes, ARRAY_SIZE(nodes)); + if (nodes_nr == 0) { + free(buf); + return NULL; + } + nodes_nr /= sizeof(nodes[0]); + } + *nr = nodes_to_vnodes(nodes, nodes_nr, buf); + + return buf; +} + +static int recover_object_from_replica(uint64_t oid, + struct sheepdog_vnode_list_entry *entry, + int epoch, int tgt_epoch) { - struct sheepdog_vnode_list_entry *e; struct sd_obj_req hdr; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; char name[128]; unsigned wlen = 0, rlen; int fd, ret; - struct sheepdog_vnode_list_entry *old_entry, *cur_entry, *next_entry; - int next_nr, next_copies; - int tgt_idx = -1; - int old_idx; - - old_entry = malloc(sizeof(*old_entry) * SD_MAX_VNODES); - cur_entry = malloc(sizeof(*cur_entry) * SD_MAX_VNODES); - next_entry = malloc(sizeof(*next_entry) * SD_MAX_VNODES); - if (!old_entry || !cur_entry || !next_entry) { - eprintf("failed to allocate memory\n"); - goto err; - } - - memcpy(old_entry, _old_entry, sizeof(*old_entry) * old_nr); - memcpy(cur_entry, _cur_entry, sizeof(*cur_entry) * cur_nr); -next: - dprintf("recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch); - old_idx = obj_to_sheep(old_entry, old_nr, oid, 0); + void *buf; - tgt_idx = find_tgt_node(old_entry, old_nr, old_idx, old_copies, - cur_entry, cur_nr, cur_idx, cur_copies, copy_idx); - if (tgt_idx < 0) { - eprintf("cannot find target node %"PRIx64"\n", oid); - goto err; + buf = alloc_buffer_for(oid); + if (!buf) { + eprintf("out of memory\n"); + return -1; } - e = old_entry + tgt_idx; - if (is_myself(e->addr, e->port)) { + if (is_myself(entry->addr, entry->port)) { struct siocb iocb = { 0 }; iocb.epoch = epoch; ret = store.link(oid, &iocb, tgt_epoch); - if (ret == SD_RES_SUCCESS) + if (ret == SD_RES_SUCCESS) { + ret = 0; + goto done; + } else { + ret = -1; goto out; - - if (ret == SD_RES_NO_OBJ) { - next_nr = epoch_log_read(tgt_epoch - 1, buf, buf_len); - if (next_nr <= 0) { - eprintf("no previous epoch: %"PRIu32"\n", tgt_epoch - 1); - goto err; - } - next_nr /= sizeof(struct sheepdog_node_list_entry); - next_copies = get_max_copies((struct sheepdog_node_list_entry *)buf, - next_nr); - next_nr = nodes_to_vnodes((struct sheepdog_node_list_entry *)buf, - next_nr, next_entry); - goto not_found; } - - eprintf("Cannot recover from local store for %"PRIx64"\n", oid); - goto err; } - addr_to_str(name, sizeof(name), e->addr, 0); - - fd = connect_to(name, e->port); + addr_to_str(name, sizeof(name), entry->addr, 0); + fd = connect_to(name, entry->port); if (fd < 0) { - eprintf("failed to connect to %s:%"PRIu32"\n", name, e->port); - goto err; + eprintf("failed to connect to %s:%"PRIu32"\n", name, entry->port); + ret = -1; + goto out; } - if (is_vdi_obj(oid)) rlen = sizeof(struct sheepdog_inode); else if (is_vdi_attr_obj(oid)) @@ -1234,8 +1233,9 @@ next: close(fd); if (ret != 0) { - eprintf("%"PRIu32"\n", rsp->result); - goto err; + eprintf("res: %"PRIx32"\n", rsp->result); + ret = -1; + goto out; } rsp = (struct sd_obj_rsp *)&hdr; @@ -1245,20 +1245,22 @@ next: int flags = O_DSYNC | O_RDWR | O_CREAT; snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, - epoch, oid); + epoch, oid); snprintf(tmp_path, sizeof(tmp_path), "%s%08u/%016" PRIx64 ".tmp", - obj_path, epoch, oid); + obj_path, epoch, oid); fd = open(tmp_path, flags, def_fmode); if (fd < 0) { eprintf("failed to open %s: %m\n", tmp_path); - goto err; + ret = -1; + goto out; } ret = write(fd, buf, rlen); if (ret != rlen) { eprintf("failed to write object\n"); - goto err; + ret = -1; + goto out; } close(fd); @@ -1267,137 +1269,161 @@ next: ret = rename(tmp_path, path); if (ret < 0) { eprintf("failed to rename %s to %s: %m\n", tmp_path, path); - goto err; + ret = -1; + goto out; } - dprintf("recovered oid %"PRIx64" to epoch %"PRIu32"\n", oid, epoch); + } else if (rsp->result == SD_RES_NEW_NODE_VER || + rsp->result == SD_RES_OLD_NODE_VER || + rsp->result == SD_RES_NETWORK_ERROR) { + dprintf("retrying: %"PRIx32", %"PRIx64"\n", rsp->result, oid); + ret = 1; goto out; - } - - if (rsp->result == SD_RES_NEW_NODE_VER || rsp->result == SD_RES_OLD_NODE_VER - || rsp->result == SD_RES_NETWORK_ERROR) { - eprintf("retrying: %"PRIu32", %"PRIx64"\n", rsp->result, oid); - rw->retry = 1; + } else { + eprintf("failed, res: %"PRIx32"\n", rsp->result); + ret = -1; goto out; } +done: + dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch); +out: + free(buf); + return ret; +} - if (rsp->result != SD_RES_NO_OBJ || rsp->data_length == 0) { - eprintf("%"PRIu32"\n", rsp->result); - goto err; - } - next_nr = rsp->data_length / sizeof(struct sheepdog_node_list_entry); - next_copies = get_max_copies((struct sheepdog_node_list_entry *)buf, next_nr); - next_nr = nodes_to_vnodes((struct sheepdog_node_list_entry *)buf, - next_nr, next_entry); +static void rollback_old_cur(struct sheepdog_vnode_list_entry *old, int *old_nr, + struct sheepdog_vnode_list_entry *cur, int *cur_nr, + struct sheepdog_vnode_list_entry *new_old, int new_old_nr) +{ + int nr_old = *old_nr; -not_found: - for (copy_idx = 0; copy_idx < old_copies; copy_idx++) - if (get_nth_node(old_entry, old_nr, old_idx, copy_idx) == tgt_idx) - break; - if (copy_idx == old_copies) { - eprintf("bug: cannot find the proper copy_idx\n"); + memcpy(cur, old, sizeof(*old) * nr_old); + *cur_nr = nr_old; + memcpy(old, new_old, sizeof(*new_old) * new_old_nr); + *old_nr = new_old_nr; +} + +/* + * Recover the object from its track in epoch history. That is, + * the routine will try to recovery it from the nodes it has stayed, + * at least, *theoretically* on consistent hash ring. + */ +static int do_recover_object(struct recovery_work *rw, int copy_idx) +{ + struct sheepdog_vnode_list_entry *old, *cur; + uint64_t oid = rw->oids[rw->done]; + int old_nr = rw->old_nr_vnodes, cur_nr = rw->cur_nr_vnodes; + int epoch = rw->epoch, tgt_epoch = rw->epoch - 1; + struct sheepdog_vnode_list_entry *tgt_entry; + int old_idx, cur_idx, tgt_idx, old_copies, cur_copies, ret; + + old = xmalloc(sizeof(*old) * SD_MAX_VNODES); + cur = xmalloc(sizeof(*cur) * SD_MAX_VNODES); + + memcpy(old, rw->old_vnodes, sizeof(*old) * old_nr); + memcpy(cur, rw->cur_vnodes, sizeof(*cur) * cur_nr); + +again: + old_idx = obj_to_sheep(old, old_nr, oid, 0); + cur_idx = obj_to_sheep(cur, cur_nr, oid, 0); + old_copies = get_max_copies(rw->old_nodes, rw->old_nr_nodes); + cur_copies = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); + + dprintf("try recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch); + + tgt_idx = find_tgt_node(old, old_nr, old_idx, old_copies, + cur, cur_nr, cur_idx, cur_copies, copy_idx); + if (tgt_idx < 0) { + eprintf("cannot find target node %"PRIx64"\n", oid); + ret = -1; goto err; } + tgt_entry = old + tgt_idx; - dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", rsp->result, rsp->data_length, tgt_idx, - old_idx, old_nr, copy_idx); - memcpy(cur_entry, old_entry, sizeof(*old_entry) * old_nr); - cur_copies = old_copies; - cur_nr = old_nr; - cur_idx = old_idx; + ret = recover_object_from_replica(oid, tgt_entry, epoch, tgt_epoch); + if (ret < 0) { + struct sheepdog_vnode_list_entry *new_old; + int new_old_nr; - memcpy(old_entry, next_entry, next_nr * sizeof(*next_entry)); - old_copies = next_copies; - old_nr = next_nr; + tgt_epoch--; + if (tgt_epoch < 1) { + eprintf("can not recover oid %"PRIx64"\n", oid); + ret = -1; + goto err; + } - tgt_epoch--; - goto next; -out: - free(old_entry); - free(cur_entry); - free(next_entry); - return 0; + new_old = get_vnodes_from_epoch(tgt_epoch, &new_old_nr); + if (!new_old) { + ret = -1; + goto err; + } + rollback_old_cur(old, &old_nr, cur, &cur_nr, + new_old, new_old_nr); + free(new_old); + goto again; + } else if (ret > 0) { + ret = 0; + rw->retry = 1; + } err: - free(old_entry); - free(cur_entry); - free(next_entry); - return -1; + free(old); + free(cur); + return ret; } -static void recover_one(struct work *work, int idx) +static int get_replica_idx(struct recovery_work *rw, uint64_t oid, int *copy_nr) +{ + int i, ret = -1; + *copy_nr = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); + for (i = 0; i < *copy_nr; i++) { + int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i); + if (is_myself(rw->cur_vnodes[n].addr, rw->cur_vnodes[n].port)) { + ret = i; + break; + } + } + return ret; +} + +static void recover_object(struct work *work, int idx) { struct recovery_work *rw = container_of(work, struct recovery_work, work); - char *buf = NULL; - int ret; uint64_t oid = rw->oids[rw->done]; - int old_copies, cur_copies; uint32_t epoch = rw->epoch; - int i, copy_idx = 0, cur_idx = -1; - struct siocb iocb; + int i, copy_idx, copy_nr, ret; + struct siocb iocb = { 0 }; - eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid); + if (!sys->nr_sobjs) + return; + + eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid); memset(&iocb, 0, sizeof(iocb)); iocb.epoch = epoch; ret = store.open(oid, &iocb, 0); if (ret == SD_RES_SUCCESS) { - /* the object is already recovered */ store.close(oid, &iocb); - goto out; + dprintf("the object is already recovered\n"); + return; } - if (is_vdi_obj(oid)) - buf = malloc(sizeof(struct sheepdog_inode)); - else if (is_vdi_attr_obj(oid)) - buf = malloc(SD_MAX_VDI_ATTR_VALUE_LEN); - else if (is_data_obj(oid)) - buf = valloc(SD_DATA_OBJ_SIZE); - else - buf = malloc(SD_DATA_OBJ_SIZE); - - if (!sys->nr_sobjs) - goto fail; - - cur_idx = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, 0); - - old_copies = get_max_copies(rw->old_nodes, rw->old_nr_nodes); - cur_copies = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); - - copy_idx = -1; - for (i = 0; i < cur_copies; i++) { - int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i); - if (is_myself(rw->cur_vnodes[n].addr, rw->cur_vnodes[n].port)) { - copy_idx = i; - break; - } - } - if (copy_idx < 0) { - eprintf("bug: copy_idx < 0\n"); - goto out; + copy_idx = get_replica_idx(rw, oid, ©_nr); + if (idx < 0) { + ret = -1; + goto err; } - - dprintf("%"PRIu32", %"PRIu32", %"PRIu32"\n", cur_idx, rw->cur_nr_nodes, - copy_idx); - - ret = __recover_one(rw, rw->old_vnodes, rw->old_nr_vnodes, old_copies, - rw->cur_vnodes, rw->cur_nr_vnodes, cur_copies, - cur_idx, copy_idx, epoch, epoch - 1, oid, - buf, SD_DATA_OBJ_SIZE); - if (ret == 0) - goto out; - - for (i = 0; i < cur_copies; i++) { - if (i == copy_idx) - continue; - ret = __recover_one(rw, rw->old_vnodes, rw->old_nr_vnodes, old_copies, - rw->cur_vnodes, rw->cur_nr_vnodes, cur_copies, cur_idx, i, - epoch, epoch - 1, oid, buf, SD_DATA_OBJ_SIZE); - if (ret == 0) - goto out; + ret = do_recover_object(rw, copy_idx); + if (ret < 0) { + for (i = 0; i < copy_nr; i++) { + if (i == copy_idx) + continue; + ret = do_recover_object(rw, i); + if (ret == 0) + break; + } } -fail: - eprintf("failed to recover object %"PRIx64"\n", oid); -out: - free(buf); +err: + if (ret < 0) + eprintf("failed to recover object %"PRIx64"\n", oid); } static struct recovery_work *suspended_recovery_work; @@ -1520,7 +1546,7 @@ static void recover_done(struct work *work, int idx) } if (rw->done < rw->count && !next_rw) { - rw->work.fn = recover_one; + rw->work.fn = recover_object; if (is_access_to_busy_objects(oid)) { suspended_recovery_work = rw; -- 1.7.8.rc3 |