Currently, sheepdog cannot handle node failure when recovery is invoked, so double node failure at once leads to system down. This patch supports it. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- collie/collie.h | 2 +- collie/group.c | 10 +- collie/store.c | 684 +++++++++++++++++++++++++++++++--------------- include/sheepdog_proto.h | 44 +++- 4 files changed, 502 insertions(+), 238 deletions(-) diff --git a/collie/collie.h b/collie/collie.h index 3b17ee7..fac6809 100644 --- a/collie/collie.h +++ b/collie/collie.h @@ -122,7 +122,7 @@ int remove_epoch(int epoch); int set_cluster_ctime(uint64_t ctime); uint64_t get_cluster_ctime(void); -int start_recovery(uint32_t epoch, int add); +int start_recovery(uint32_t epoch); static inline int is_myself(struct sheepdog_node_list_entry *e) { diff --git a/collie/group.c b/collie/group.c index a2d7425..d4b5449 100644 --- a/collie/group.c +++ b/collie/group.c @@ -768,17 +768,15 @@ static void __sd_deliver(struct work *work, int idx) static void __sd_deliver_done(struct work *work, int idx) { struct work_deliver *w = container_of(work, struct work_deliver, work); -/* struct message_header *m = w->msg; */ -/* struct cluster_info *ci = w->ci; */ + struct message_header *m = w->msg; /* * FIXME: we want to recover only after all nodes are fully * synchronized */ - /* disabled for now */ -/* if (m->done && m->op == SD_MSG_JOIN) */ -/* start_recovery(ci, ci->epoch, 1); */ + if (m->done && m->op == SD_MSG_JOIN && sys->epoch >= 2) + start_recovery(sys->epoch); free(w->msg); free(w); @@ -904,7 +902,7 @@ static void __sd_confch_done(struct work *work, int idx) if (w->left_list_entries) { if (w->left_list_entries > 1) eprintf("we can't handle %Zd\n", w->left_list_entries); - start_recovery(sys->epoch, 0); + start_recovery(sys->epoch); } free(w->member_list); diff --git a/collie/store.c b/collie/store.c index 9b5584f..b35bbf5 100644 --- a/collie/store.c +++ b/collie/store.c @@ -76,23 +76,66 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch return SD_RES_SUCCESS; } -static int get_obj_list(struct request *req) +static int is_obj_in_range(uint64_t oid, uint64_t start, uint64_t end) +{ + uint64_t hval = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT); + + if (start < end) + return (start < hval && hval <= end); + else + return (start < hval || hval <= end); +} + +static int get_obj_list(struct request *req, char *buf, int buf_len) { DIR *dir; struct dirent *d; - struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp; + struct sd_list_req *hdr = (struct sd_list_req *)&req->rq; + struct sd_list_rsp *rsp = (struct sd_list_rsp *)&req->rp; uint64_t oid; - uint64_t oid_hash; - uint64_t start_hash = hdr->oid; - uint64_t end_hash = hdr->cow_oid; + uint64_t start_hash = hdr->start; + uint64_t end_hash = hdr->end; + uint32_t epoch = hdr->tgt_epoch; char path[1024]; uint64_t *p = (uint64_t *)req->data; int nr = 0; + uint64_t *objlist = NULL; + int obj_nr = 0, fd, i; + struct sheepdog_node_list_entry *e; + int e_nr; + int idx; + int res = SD_RES_SUCCESS; + + if (epoch == 1) + goto local; + + snprintf(path, sizeof(path), "%s%08u/list", obj_path, epoch); + + fd = open(path, O_RDONLY); + if (fd < 0) { + eprintf("failed to open %s, %s\n", path, strerror(errno)); + close(fd); + return SD_RES_EIO; + } + obj_nr = read(fd, buf, buf_len); + obj_nr /= sizeof(uint64_t); + objlist = (uint64_t *)buf; + for (i = 0; i < obj_nr; i++) { + if (is_obj_in_range(objlist[i], start_hash, end_hash)) { + dprintf("%u, %016lx, %016lx %016lx\n", epoch, + objlist[i], start_hash, end_hash); + p[nr++] = objlist[i]; + } - snprintf(path, sizeof(path), "%s%08u/", obj_path, hdr->obj_ver); + if (nr * sizeof(uint64_t) >= hdr->data_length) + break; + } + close(fd); - dprintf("%d\n", sys->this_node.port); +local: + snprintf(path, sizeof(path), "%s%08u/", obj_path, hdr->tgt_epoch); + + dprintf("%d, %s\n", sys->this_node.port, path); dir = opendir(path); if (!dir) { @@ -101,38 +144,66 @@ static int get_obj_list(struct request *req) } while ((d = readdir(dir))) { - int got = 0; - if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, "..")) continue; oid = strtoull(d->d_name, NULL, 16); - oid_hash = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT); + if (oid == 0) + continue; + + for (i = 0; i < obj_nr; i++) + if (objlist[i] == oid) + break; + if (i < obj_nr) + continue; - if ((nr + 1) * sizeof(uint64_t) > hdr->data_length) + if (is_obj_in_range(oid, start_hash, end_hash)) { + dprintf("%u, %016lx, %016lx %016lx\n", epoch, + oid, start_hash, end_hash); + p[nr++] = oid; + } + + if (nr * sizeof(uint64_t) >= hdr->data_length) break; + } - if (start_hash < end_hash) { - if (oid_hash >= start_hash && oid_hash < end_hash) - got = 1; - } else - if (end_hash <= oid_hash || oid_hash < start_hash) - got = 1; + eprintf("nr = %d\n", nr); + rsp->data_length = nr * sizeof(uint64_t); - dprintf("%d, %u, %016lx, %016lx, %016lx %016lx\n", got, hdr->obj_ver, - oid, oid_hash, start_hash, end_hash); + e_nr = epoch_log_read(epoch, buf, buf_len); + e_nr /= sizeof(*e); + e = (struct sheepdog_node_list_entry *)buf; - if (got) { - *(p + nr) = oid; - nr++; - } + if (e_nr <= sys->nr_sobjs) { + rsp->next = end_hash; + goto out; } - rsp->data_length = nr * 8; + for (idx = 0; idx < e_nr; idx++) { + if (e[idx].id == sys->this_node.id) + break; + } + if (idx != e_nr) { + uint64_t hval = e[idx % e_nr].id; + rsp->next = end_hash; + + if (start_hash < end_hash) { + if (start_hash < hval && hval <= end_hash) + rsp->next = hval; + } else + if (start_hash < hval || hval <= end_hash) + rsp->next = hval; + + dprintf("%u, %016lx, %016lx %016lx\n", epoch, hval, + start_hash, end_hash); + } else + res = SD_RES_SYSTEM_ERROR; + +out: closedir(dir); - return SD_RES_SUCCESS; + return res; } static int read_from_one(uint64_t oid, @@ -225,6 +296,7 @@ static int forward_obj_req(struct request *req, char *buf) struct sd_rsp *rsp = (struct sd_rsp *)&hdr2; uint64_t oid = hdr->oid; int copies; + uint32_t epoch; e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry)); again: @@ -243,7 +315,11 @@ again: /* TODO: we can do better; we need to chech this first */ if (is_myself(&e[n])) { - ret = store_queue_request_local(req, buf, sys->epoch); + if (hdr->flags & SD_FLAG_CMD_RECOVERY) + epoch = hdr->tgt_epoch; + else + epoch = sys->epoch; + ret = store_queue_request_local(req, buf, epoch); memcpy(rsp, &req->rp, sizeof(*rsp)); rsp->result = ret; goto done; @@ -334,63 +410,13 @@ static int ob_open(uint32_t epoch, uint64_t oid, int aflags, int *ret) return fd; } -static int is_my_obj(uint64_t oid, int copies) -{ - int i, n, nr; - struct sheepdog_node_list_entry e[SD_MAX_NODES]; - - nr = build_node_list(&sys->sd_node_list, e); - - for (i = 0; i < copies; i++) { - n = obj_to_sheep(e, nr, oid, i); - if (is_myself(&e[n])) - return 1; - } - - return 0; -} - int update_epoch_store(uint32_t epoch) { - int ret; - char new[1024], old[1024]; - struct stat s; - DIR *dir; - struct dirent *d; - uint64_t oid; + char new[1024]; snprintf(new, sizeof(new), "%s%08u/", obj_path, epoch); mkdir(new, def_dmode); - snprintf(old, sizeof(old), "%s%08u/", obj_path, epoch - 1); - - ret = stat(old, &s); - if (ret) - return 0; - - dir = opendir(old); - if (!dir) { - eprintf("%s, %s, %m\n", old, new); - return 1; - } - - while ((d = readdir(dir))) { - if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, "..")) - continue; - - oid = strtoull(d->d_name, NULL, 16); - /* TODO: use proper object coipes */ - if (is_my_obj(oid, sys->nr_sobjs)) { - snprintf(new, sizeof(new), "%s%08u/%s", obj_path, epoch, - d->d_name); - snprintf(old, sizeof(old), "%s%08u/%s", obj_path, epoch - 1, - d->d_name); - link(old, new); - } - } - - closedir(dir); - return 0; } @@ -517,6 +543,13 @@ out: if (fd != -1) close(fd); + if (ret == SD_RES_NO_OBJ && hdr->flags & SD_FLAG_CMD_RECOVERY) { + int len = epoch_log_read(epoch - 1, req->data, hdr->data_length); + if (len < 0) + len = 0; + rsp->data_length = len; + } + return ret; } @@ -547,13 +580,16 @@ void store_queue_request(struct work *work, int idx) goto out; } + if (hdr->flags & SD_FLAG_CMD_RECOVERY) + epoch = hdr->tgt_epoch; + if (opcode == SD_OP_STAT_SHEEP) { ret = stat_sheep(&nrsp->store_size, &nrsp->store_free, epoch); goto out; } if (opcode == SD_OP_GET_OBJ_LIST) { - ret = get_obj_list(req); + ret = get_obj_list(req, buf, SD_DATA_OBJ_SIZE); goto out; } @@ -731,19 +767,26 @@ static int node_distance(int my, int her, int nr) return (my + nr - her) % nr; } -static int node_from_distance(int my, int dist, int nr) +static int contains_node(uint64_t id, struct sheepdog_node_list_entry *entry, + int nr, int base_idx) { - return (my + nr - dist) % nr; + int i; + + for (i = 0; i < sys->nr_sobjs; i++) { + if (entry[(base_idx + i) % nr].id == id) + return (base_idx + i) % nr; + } + return -1; } struct recovery_work { uint32_t epoch; uint32_t done; - uint32_t iteration; - struct sheepdog_node_list_entry e; + struct timer timer; + int retry; struct work work; struct list_head rw_siblings; @@ -754,84 +797,272 @@ struct recovery_work { static LIST_HEAD(recovery_work_list); static int recovering; -static void recover_one(struct work *work, int idx) +static int find_tgt_node(struct sheepdog_node_list_entry *old_entry, int old_nr, int old_idx, + struct sheepdog_node_list_entry *cur_entry, int cur_nr, int cur_idx, + int copy_idx) { - struct recovery_work *rw = container_of(work, struct recovery_work, work); - struct sheepdog_node_list_entry *e = &rw->e; + int i, idx; + + dprintf("%d, %d, %d, %d, %d\n", old_idx, old_nr, cur_idx, cur_nr, copy_idx); + + if (copy_idx < cur_nr) { + idx = contains_node(cur_entry[(cur_idx + copy_idx) % cur_nr].id, + old_entry, old_nr, old_idx); + if (idx >= 0) { + dprintf("%d, %d, %d, %d\n", idx, copy_idx, cur_idx, cur_nr); + return idx; + } + } + + for (i = 0; ; i++) { + if (i < cur_nr) { + idx = contains_node(cur_entry[(cur_idx + i) % cur_nr].id, + old_entry, old_nr, old_idx); + if (idx >= 0) + continue; + + while (contains_node(old_entry[old_idx].id, cur_entry, cur_nr, cur_idx) >= 0) + old_idx = (old_idx + 1) % old_nr; + + } + if (i == copy_idx) { + dprintf("%d, %d, %d, %d\n", old_idx, copy_idx, cur_idx, cur_nr); + return old_idx; + } + + old_idx = (old_idx + 1) % old_nr; + } + return -1; +} + +static int __recover_one(struct recovery_work *rw, + struct sheepdog_node_list_entry *_old_entry, int old_nr, + struct sheepdog_node_list_entry *_cur_entry, int cur_nr, int cur_idx, + int copy_idx, uint32_t epoch, uint32_t tgt_epoch, + uint64_t oid, char *buf, int buf_len) +{ + struct sheepdog_node_list_entry *e; struct sd_obj_req hdr; - struct sd_obj_rsp *rsp; + struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; char name[128]; - char *buf = zero_block + idx * SD_DATA_OBJ_SIZE; unsigned wlen = 0, rlen = SD_DATA_OBJ_SIZE; int fd, ret; - uint64_t oid = *(((uint64_t *)rw->buf) + rw->done); + struct sheepdog_node_list_entry old_entry[SD_MAX_NODES], + cur_entry[SD_MAX_NODES], *next_entry; + int next_nr; + int tgt_idx = -1; + int old_idx; + + memcpy(old_entry, _old_entry, sizeof(*old_entry) * old_nr); + memcpy(cur_entry, _cur_entry, sizeof(*cur_entry) * cur_nr); +next: + dprintf("recover obj %lx from epoch %d\n", oid, tgt_epoch); + old_idx = obj_to_sheep(old_entry, old_nr, oid, 0); + + tgt_idx = find_tgt_node(old_entry, old_nr, old_idx, cur_entry, cur_nr, cur_idx, copy_idx); + if (tgt_idx < 0) { + eprintf("cannot find target node, %lx\n", oid); + return -1; + } + e = old_entry + tgt_idx; - eprintf("%d %d, %16lx\n", rw->done, rw->count, oid); + if (e->id == sys->this_node.id) { + char old[PATH_MAX], new[PATH_MAX]; + + snprintf(old, sizeof(old), "%s%08u/%016" PRIx64, obj_path, + tgt_epoch, oid); + snprintf(new, sizeof(new), "%s%08u/%016" PRIx64, obj_path, + epoch, oid); + dprintf("link from %s to %s\n", old, new); + if (link(old, new) == 0) + return 0; + + if (errno == ENOENT) { + next_nr = epoch_log_read(tgt_epoch, buf, buf_len); + if (next_nr <= 0) { + eprintf("no previous epoch, %d\n", tgt_epoch); + return -1; + } + next_entry = (struct sheepdog_node_list_entry *)buf; + next_nr /= sizeof(*next_entry); + goto not_found; + } + + eprintf("cannot recover from local, %s, %s\n", old, new); + return -1; + } addr_to_str(name, sizeof(name), e->addr, 0); fd = connect_to(name, e->port); if (fd < 0) { - eprintf("%s %d\n", name, e->port); - return; + eprintf("failed to connect to %s:%d\n", name, e->port); + return -1; } memset(&hdr, 0, sizeof(hdr)); hdr.opcode = SD_OP_READ_OBJ; hdr.oid = oid; hdr.epoch = sys->epoch; - hdr.flags = 0; + hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_FORWARD; + hdr.tgt_epoch = tgt_epoch; hdr.data_length = rlen; ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); close(fd); + if (ret < 0) { + eprintf("%d\n", rsp->result); + return -1; + } + rsp = (struct sd_obj_rsp *)&hdr; - if (rsp->result != SD_RES_SUCCESS) { + if (rsp->result == SD_RES_SUCCESS) { + fd = ob_open(epoch, oid, O_CREAT, &ret); + ret = write(fd, buf, SD_DATA_OBJ_SIZE); + if (ret != SD_DATA_OBJ_SIZE) { + eprintf("failed to write object\n"); + return -1; + } + + ret = fsetxattr(fd, ANAME_COPIES, &rsp->copies, + sizeof(rsp->copies), 0); + if (ret) { + eprintf("couldn't set xattr\n"); + return -1; + } + + close(fd); + dprintf("recovered oid %lx to epoch %d\n", oid, epoch); + return 0; + } + + if (rsp->result == SD_RES_NEW_NODE_VER || rsp->result == SD_RES_OLD_NODE_VER) { + eprintf("try again, %d, %lx\n", rsp->result, oid); + rw->retry = 1; + return 0; + } + + if (rsp->result != SD_RES_NO_OBJ || rsp->data_length == 0) { eprintf("%d\n", rsp->result); + return -1; + } + next_entry = (struct sheepdog_node_list_entry *)buf; + next_nr = rsp->data_length / sizeof(*old_entry); + +not_found: + copy_idx = node_distance(tgt_idx, old_idx, old_nr); + dprintf("%d, %d, %d, %d, %d, %d\n", rsp->result, rsp->data_length, tgt_idx, + old_idx, old_nr, copy_idx); + + memcpy(cur_entry, old_entry, sizeof(*old_entry) * old_nr); + cur_nr = old_nr; + cur_idx = old_idx; + + memcpy(old_entry, next_entry, next_nr * sizeof(*next_entry)); + old_nr = next_nr; + + tgt_epoch--; + goto next; +} + +static void recover_one(struct work *work, int idx) +{ + struct recovery_work *rw = container_of(work, struct recovery_work, work); + char *buf = zero_block + idx * SD_DATA_OBJ_SIZE; + int ret; + uint64_t oid = *(((uint64_t *)rw->buf) + rw->done); + struct sheepdog_node_list_entry old_entry[SD_MAX_NODES], + cur_entry[SD_MAX_NODES]; + int old_nr, cur_nr; + uint32_t epoch = rw->epoch; + int i, my_idx = -1, copy_idx, cur_idx = -1; + + eprintf("%d %d, %16lx\n", rw->done, rw->count, oid); + + cur_nr = epoch_log_read(epoch, (char *)cur_entry, sizeof(cur_entry)); + if (cur_nr <= 0) { + eprintf("failed to read current epoch, %d\n", epoch); return; } + cur_nr /= sizeof(struct sheepdog_node_list_entry); + + old_nr = epoch_log_read(epoch - 1, (char *)old_entry, sizeof(old_entry)); + if (old_nr <= 0) { + eprintf("failed to read previous epoch, %d\n", epoch - 1); + goto fail; + } + old_nr /= sizeof(struct sheepdog_node_list_entry); - fd = ob_open(rw->epoch, oid, O_CREAT, &ret); - write(fd, buf, SD_DATA_OBJ_SIZE); + if (!sys->nr_sobjs) + goto fail; + + cur_idx = obj_to_sheep(cur_entry, cur_nr, oid, 0); + + for (i = 0; i < cur_nr; i++) { + if (cur_entry[i].id == sys->this_node.id) { + my_idx = i; + break; + } + } + copy_idx = node_distance(my_idx, cur_idx, cur_nr); + dprintf("%d, %d, %d, %d\n", my_idx, cur_idx, cur_nr, copy_idx); + + ret = __recover_one(rw, old_entry, old_nr, cur_entry, cur_nr, cur_idx, + copy_idx, epoch, epoch - 1, oid, buf, SD_DATA_OBJ_SIZE); + if (ret == 0) + return; + + for (i = 0; i < sys->nr_sobjs; i++) { + if (i == copy_idx) + continue; + ret = __recover_one(rw, old_entry, old_nr, + cur_entry, cur_nr, cur_idx, i, + epoch, epoch - 1, oid, buf, SD_DATA_OBJ_SIZE); + if (ret == 0) + return; + } +fail: + eprintf("failed to recover object %lx\n", oid); } static void __start_recovery(struct work *work, int idx); static void __start_recovery_done(struct work *work, int idx); +static void recover_one_timer(void *data) +{ + struct recovery_work *rw = (struct recovery_work *)data; + queue_work(dobj_queue, &rw->work); +} + static void recover_one_done(struct work *work, int idx) { struct recovery_work *rw = container_of(work, struct recovery_work, work); - if (rw->done < rw->count) { - rw->done++; - queue_work(dobj_queue, &rw->work); + if (rw->retry) { + rw->retry = 0; + + rw->timer.callback = recover_one_timer; + rw->timer.data = rw; + add_timer(&rw->timer, 2); return; } - if (rw->iteration) { - if (++rw->iteration <= sys->nr_sobjs) { - free(rw->buf); + rw->done++; - rw->done = 0; - - rw->work.fn = __start_recovery; - rw->work.done = __start_recovery_done; - - queue_work(dobj_queue, &rw->work); - - return; - } + if (rw->done < rw->count && rw->rw_siblings.next == &recovery_work_list) { + queue_work(dobj_queue, &rw->work); + return; } + dprintf("recovery done, %d\n", rw->epoch); recovering--; list_del(&rw->rw_siblings); - if (rw->buf) - free(rw->buf); + free(rw->buf); free(rw); if (!list_empty(&recovery_work_list)) { @@ -843,16 +1074,16 @@ static void recover_one_done(struct work *work, int idx) } } -static int fill_obj_list(struct recovery_work *rw, - struct sheepdog_node_list_entry *e, - uint64_t start_hash, uint64_t end_hash) +static int __fill_obj_list(struct recovery_work *rw, + struct sheepdog_node_list_entry *e, + uint64_t start_hash, uint64_t end_hash, uint64_t *done_hash) { int fd, ret; uint32_t epoch = rw->epoch; unsigned wlen, rlen; char name[128]; - struct sd_obj_req hdr; - struct sd_obj_rsp *rsp; + struct sd_list_req hdr; + struct sd_list_rsp *rsp; addr_to_str(name, sizeof(name), e->addr, 0); @@ -870,34 +1101,66 @@ static int fill_obj_list(struct recovery_work *rw, memset(&hdr, 0, sizeof(hdr)); hdr.opcode = SD_OP_GET_OBJ_LIST; hdr.epoch = sys->epoch; - hdr.oid = start_hash; - hdr.cow_oid = end_hash; - hdr.obj_ver = epoch - 1; + hdr.start = start_hash; + hdr.end = end_hash; + hdr.tgt_epoch = epoch - 1; hdr.flags = 0; hdr.data_length = rlen; - dprintf("%016lx, %016lx\n", hdr.oid, hdr.cow_oid); + dprintf("%016lx, %016lx\n", hdr.start, hdr.end); - rw->buf = malloc(rlen); memcpy(&rw->e, e, sizeof(rw->e)); - ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf, &wlen, &rlen); + ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf + rw->count * sizeof(uint64_t), &wlen, &rlen); close(fd); - rsp = (struct sd_obj_rsp *)&hdr; + rsp = (struct sd_list_rsp *)&hdr; if (rsp->result != SD_RES_SUCCESS) { - eprintf("%d\n", rsp->result); - return -1; + rw->retry = 1; + *done_hash = end_hash; + eprintf("try again, %d\n", rsp->result); + return 0; } dprintf("%d\n", rsp->data_length); if (rsp->data_length) - rw->count = rsp->data_length / sizeof(uint64_t); - else - rw->count = 0; + rw->count += rsp->data_length / sizeof(uint64_t); + + *done_hash = rsp->next; + + return 0; +} + +static int fill_obj_list(struct recovery_work *rw, + struct sheepdog_node_list_entry *old_entry, int old_nr, + struct sheepdog_node_list_entry *cur_entry, int cur_nr, + uint64_t start_hval, uint64_t end_hval) +{ + int i, idx, old_idx, cur_idx; + uint64_t hval, done_hval = end_hval; + + hval = start_hval; +again: + old_idx = hval_to_sheep(old_entry, old_nr, hval + 1, 0); + cur_idx = hval_to_sheep(cur_entry, cur_nr, hval + 1, 0); + + for (i = 0; i < sys->nr_sobjs; i++) { + idx = find_tgt_node(old_entry, old_nr, old_idx, cur_entry, cur_nr, cur_idx, i); + dprintf("%d, %d\n", idx, i); + if (__fill_obj_list(rw, old_entry + idx, hval, end_hval, &done_hval) == 0) + break; + } + if (i == sys->nr_sobjs) + return -1; + + if (done_hval != end_hval) { + dprintf("%lx, %lx\n", done_hval, end_hval); + hval = done_hval; + goto again; + } return 0; } @@ -909,153 +1172,122 @@ static void __start_recovery(struct work *work, int idx) struct sheepdog_node_list_entry old_entry[SD_MAX_NODES], cur_entry[SD_MAX_NODES]; int old_nr, cur_nr; - int my_idx = -1, ch_idx = -1; - int i, j, n; + int my_idx = -1; + int i, fd; uint64_t start_hash, end_hash; + char path[PATH_MAX]; dprintf("%u\n", epoch); cur_nr = epoch_log_read(epoch, (char *)cur_entry, sizeof(cur_entry)); - if (cur_nr <= 0) + if (cur_nr <= 0) { + eprintf("failed to read epoch log, %d\n", epoch); goto fail; + } cur_nr /= sizeof(struct sheepdog_node_list_entry); old_nr = epoch_log_read(epoch - 1, (char *)old_entry, sizeof(old_entry)); - if (old_nr <= 0) + if (old_nr <= 0) { + eprintf("failed to read epoch log, %d\n", epoch - 1); goto fail; + } old_nr /= sizeof(struct sheepdog_node_list_entry); - if (!sys->nr_sobjs || cur_nr < sys->nr_sobjs || old_nr < sys->nr_sobjs) + if (!sys->nr_sobjs) goto fail; - if (cur_nr < old_nr) { - for (i = 0; i < old_nr; i++) { - if (is_myself(&old_entry[i])) { - my_idx = i; - break; - } - } - - dprintf("%u %u %u, %d\n", cur_nr, old_nr, epoch, my_idx); - - for (i = 0; i < old_nr; i++) { - for (j = 0; j < cur_nr; j++) { - if (old_entry[i].id == cur_entry[j].id) - break; - } - - if (j == cur_nr) - ch_idx = i; - } - - dprintf("%u %u %u\n", my_idx, ch_idx, - node_distance(my_idx, ch_idx, old_nr)); - - if (node_distance(my_idx, ch_idx, old_nr) > sys->nr_sobjs) - return; - - n = node_from_distance(my_idx, sys->nr_sobjs, old_nr); - - dprintf("%d %d\n", n, sys->nr_sobjs); - - start_hash = old_entry[(n - 1 + old_nr) % old_nr].id; - end_hash = old_entry[n].id; - - /* FIXME */ - if (node_distance(my_idx, ch_idx, old_nr) == sys->nr_sobjs) { - n++; - n %= old_nr; - } - - fill_obj_list(rw, old_entry + n, start_hash, end_hash); - } else { - for (i = 0; i < cur_nr; i++) { - if (is_myself(&cur_entry[i])) { - my_idx = i; - break; - } + for (i = 0; i < cur_nr; i++) { + if (cur_entry[i].id == sys->this_node.id) { + my_idx = i; + break; } + } + start_hash = cur_entry[(my_idx - sys->nr_sobjs + cur_nr) % cur_nr].id; + end_hash = cur_entry[my_idx].id; - dprintf("%u %u %u, %d\n", cur_nr, old_nr, epoch, my_idx); - - if (my_idx == -1) - return; - - n = node_from_distance(my_idx, rw->iteration, cur_nr); - start_hash = cur_entry[n].id; - end_hash = cur_entry[(n + 1 + cur_nr) % cur_nr].id; - - if (rw->iteration == 1) - n = (my_idx + 1 + cur_nr) % cur_nr; - else - n = (n + 1 + cur_nr) % cur_nr; - - dprintf("%u %u %u\n", my_idx, n, rw->iteration); + dprintf("fill obj list (from 0x%lx to 0x%lx)\n", start_hash, end_hash); + if (fill_obj_list(rw, old_entry, old_nr, cur_entry, cur_nr, + start_hash, end_hash) != 0) { + eprintf("fatal recovery error\n"); + goto fail; + } - start_hash = cur_entry[n].id; - end_hash = cur_entry[(n - 1 + cur_nr) % cur_nr].id; + snprintf(path, sizeof(path), "%s%08u/list", obj_path, epoch); + dprintf("write object list file to %s\n", path); - fill_obj_list(rw, cur_entry + n, start_hash, end_hash); + fd = open(path, O_RDWR | O_CREAT, def_fmode); + if (fd < 0) { + eprintf("failed to open %s, %s\n", path, strerror(errno)); + goto fail; } + write(fd, rw->buf, sizeof(uint64_t) * rw->count); + close(fd); return; - fail: rw->count = 0; - rw->iteration = 0; return; } +static void start_recovery_timer(void *data) +{ + struct recovery_work *rw = (struct recovery_work *)data; + queue_work(dobj_queue, &rw->work); +} + static void __start_recovery_done(struct work *work, int idx) { struct recovery_work *rw = container_of(work, struct recovery_work, work); - if (!rw->count) { - if (rw->iteration) { - if (++rw->iteration <= sys->nr_sobjs) { - free(rw->buf); + if (rw->retry) { + rw->retry = 0; - rw->work.fn = __start_recovery; - rw->work.done = __start_recovery_done; + rw->timer.callback = start_recovery_timer; + rw->timer.data = rw; + add_timer(&rw->timer, 1); + return; + } - queue_work(dobj_queue, &rw->work); + if (rw->count && rw->rw_siblings.next == &recovery_work_list) { + rw->work.fn = recover_one; + rw->work.done = recover_one_done; - return; - } - } + /* TODO: we should avoid races with qemu I/Os */ + /* rw->work.attr = WORK_ORDERED; */ - free(rw->buf); - free(rw); + queue_work(dobj_queue, &rw->work); return; } - rw->work.fn = recover_one; - rw->work.done = recover_one_done; + dprintf("recovery done, %d\n", rw->epoch); + recovering--; - /* TODO: we should avoid races with qemu I/Os */ -/* rw->work.attr = WORK_ORDERED; */ + list_del(&rw->rw_siblings); - queue_work(dobj_queue, &rw->work); + free(rw->buf); + free(rw); + + if (!list_empty(&recovery_work_list)) { + rw = list_first_entry(&recovery_work_list, + struct recovery_work, rw_siblings); + + recovering++; + queue_work(dobj_queue, &rw->work); + } } -int start_recovery(uint32_t epoch, int add) +int start_recovery(uint32_t epoch) { struct recovery_work *rw; - /* disable for now */ - if (add) - return 0; - rw = zalloc(sizeof(struct recovery_work)); if (!rw) return -1; + rw->buf = malloc(1 << 20); /* FIXME */ rw->epoch = epoch; rw->count = 0; - if (add) - rw->iteration = 1; - rw->work.fn = __start_recovery; rw->work.done = __start_recovery_done; diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h index 24f06ae..a41eb50 100644 --- a/include/sheepdog_proto.h +++ b/include/sheepdog_proto.h @@ -52,6 +52,7 @@ #define SD_FLAG_CMD_WRITE 0x01 #define SD_FLAG_CMD_COW 0x02 #define SD_FLAG_CMD_FORWARD 0x04 +#define SD_FLAG_CMD_RECOVERY 0x08 #define SD_STATUS_OK 0x00 #define SD_STATUS_STARTUP 0x01 @@ -169,7 +170,7 @@ struct sd_obj_req { uint64_t oid; uint64_t cow_oid; uint32_t copies; - uint32_t obj_ver; + uint32_t tgt_epoch; uint64_t offset; }; @@ -186,6 +187,31 @@ struct sd_obj_rsp { uint32_t pad[5]; }; +struct sd_list_req { + uint8_t proto_ver; + uint8_t opcode; + uint16_t flags; + uint32_t epoch; + uint32_t id; + uint32_t data_length; + uint64_t start; + uint64_t end; + uint32_t tgt_epoch; + uint32_t pad[3]; +}; + +struct sd_list_rsp { + uint8_t proto_ver; + uint8_t opcode; + uint16_t flags; + uint32_t epoch; + uint32_t id; + uint32_t data_length; + uint32_t result; + uint64_t next; + uint32_t pad[5]; +}; + struct sd_vdi_req { uint8_t proto_ver; uint8_t opcode; @@ -281,17 +307,17 @@ static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval) return hval; } -static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries, - int nr_entries, uint64_t oid, int idx) +static inline int hval_to_sheep(struct sheepdog_node_list_entry *entries, + int nr_entries, uint64_t id, int idx) { - uint64_t id; int i; struct sheepdog_node_list_entry *e = entries, *n; - id = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT); + printf("%lx\n", id); for (i = 0; i < nr_entries - 1; i++, e++) { n = e + 1; + printf("%d, %lx, %lx, %lx\n", i, e->id, n->id, id); if (id > e->id && id <= n->id) break; } @@ -299,6 +325,14 @@ static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries, return (i + 1 + idx) % nr_entries; } +static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries, + int nr_entries, uint64_t oid, int idx) +{ + uint64_t id = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT); + + return hval_to_sheep(entries, nr_entries, id, idx); +} + static inline void print_node_list_entry(struct sheepdog_node_list_entry *e, char *str, size_t size) { -- 1.5.6.5 |