Signed-off-by: Christoph Hellwig <hch at lst.de> --- sheep/Makefile.am | 5 sheep/recovery.c | 828 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ sheep/store.c | 811 ---------------------------------------------------- 3 files changed, 831 insertions(+), 813 deletions(-) Index: sheepdog/sheep/Makefile.am =================================================================== --- sheepdog.orig/sheep/Makefile.am 2012-04-21 08:09:51.987584773 +0200 +++ sheepdog/sheep/Makefile.am 2012-04-26 18:26:54.208590934 +0200 @@ -24,8 +24,9 @@ INCLUDES = -I$(top_builddir)/include -I sbin_PROGRAMS = sheep -sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \ - cluster/local.c strbuf.c simple_store.c object_cache.c +sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c \ + journal.c ops.c recovery.c cluster/local.c strbuf.c \ + simple_store.c object_cache.c if BUILD_COROSYNC sheep_SOURCES += cluster/corosync.c endif Index: sheepdog/sheep/recovery.c =================================================================== --- /dev/null 1970-01-01 00:00:00.000000000 +0000 +++ sheepdog/sheep/recovery.c 2012-04-26 18:29:05.964594307 +0200 @@ -0,0 +1,828 @@ +/* + * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/types.h> + +#include "sheep_priv.h" +#include "strbuf.h" +#include "util.h" + +enum rw_state { + RW_INIT, + RW_RUN, +}; + +struct recovery_work { + enum rw_state state; + + uint32_t epoch; + uint32_t done; + + struct timer timer; + int retry; + struct work work; + + int nr_blocking; + int count; + uint64_t *oids; + + int old_nr_nodes; + struct sd_node old_nodes[SD_MAX_NODES]; + int cur_nr_nodes; + struct sd_node cur_nodes[SD_MAX_NODES]; + int old_nr_vnodes; + struct sd_vnode old_vnodes[SD_MAX_VNODES]; + int cur_nr_vnodes; + struct sd_vnode cur_vnodes[SD_MAX_VNODES]; +}; + +static struct recovery_work *next_rw; +static struct recovery_work *recovering_work; + +static int obj_cmp(const void *oid1, const void *oid2) +{ + const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT); + const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t), FNV1A_64_INIT); + + if (hval1 < hval2) + return -1; + if (hval1 > hval2) + return 1; + return 0; +} + +/* + * contains_node - checks that the node id is included in the target nodes + * + * The target nodes to store replicated objects are the first N nodes + * from the base_idx'th on the consistent hash ring, where N is the + * number of copies of objects. + */ +static int contains_node(struct sd_vnode *key, + struct sd_vnode *entry, + int nr, int base_idx, int copies) +{ + int i; + + for (i = 0; i < copies; i++) { + int idx = get_nth_node(entry, nr, base_idx, i); + if (memcmp(key->addr, entry[idx].addr, sizeof(key->addr)) == 0 + && key->port == entry[idx].port) + return idx; + } + return -1; +} + +/* + * find_tgt_node - find the node from which we should recover objects + * + * This function compares two node lists, the current target nodes and + * the previous target nodes, and finds the node from the previous + * target nodes which corresponds to the copy_idx'th node of the + * current target nodes. The correspondence is injective and + * maximizes the number of nodes which can recover objects locally. + * + * For example, consider the number of redundancy is 5, the consistent + * hash ring is {A, B, C, D, E, F}, and the node G is newly added. + * The parameters of this function are + * old_entry = {A, B, C, D, E, F}, old_nr = 6, old_idx = 3 + * cur_entry = {A, B, C, D, E, F, G}, cur_nr = 7, cur_idx = 3 + * + * In this case: + * the previous target nodes: {D, E, F, A, B} + * (the first 5 nodes from the 3rd node on the previous hash ring) + * the current target nodes : {D, E, F, G, A} + * (the first 5 nodes from the 3rd node on the current hash ring) + * + * The correspondence between copy_idx and return value are as follows: + * ---------------------------- + * copy_idx 0 1 2 3 4 + * src_node D E F G A + * tgt_node D E F B A + * return value 0 1 2 4 3 + * ---------------------------- + * + * The node D, E, F, and A can recover objects from local, and the + * node G recovers from the node B. + */ +static int find_tgt_node(struct sd_vnode *old_entry, + int old_nr, int old_idx, int old_copies, + struct sd_vnode *cur_entry, + int cur_nr, int cur_idx, int cur_copies, + int copy_idx) +{ + int i, j, idx; + + dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", + old_idx, old_nr, old_copies, cur_idx, cur_nr, cur_copies, copy_idx); + + /* If the same node is in the previous target nodes, return its index */ + idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, copy_idx), + old_entry, old_nr, old_idx, old_copies); + if (idx >= 0) { + dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", idx, copy_idx, cur_idx, cur_nr); + return idx; + } + + for (i = 0, j = 0; ; i++, j++) { + if (i < copy_idx) { + /* Skip if the node can recover from its local */ + idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, i), + old_entry, old_nr, old_idx, old_copies); + if (idx >= 0) + continue; + + /* Find the next target which needs to recover from remote */ + while (j < old_copies && + contains_node(old_entry + get_nth_node(old_entry, old_nr, old_idx, j), + cur_entry, cur_nr, cur_idx, cur_copies) >= 0) + j++; + } + if (j == old_copies) { + /* + * Cannot find the target because the number of zones + * is smaller than the number of copies. We can select + * any node in this case, so select the first one. + */ + return old_idx; + } + + if (i == copy_idx) { + /* Found the target node correspoinding to copy_idx */ + dprintf("%"PRIu32", %"PRIu32", %"PRIu32"\n", + get_nth_node(old_entry, old_nr, old_idx, j), + copy_idx, (cur_idx + i) % cur_nr); + return get_nth_node(old_entry, old_nr, old_idx, j); + } + + } + + return -1; +} + +static void *get_vnodes_from_epoch(int epoch, int *nr, int *copies) +{ + int nodes_nr, len = sizeof(struct sd_vnode) * SD_MAX_VNODES; + struct sd_node nodes[SD_MAX_NODES]; + void *buf = xmalloc(len); + + nodes_nr = epoch_log_read_nr(epoch, (void *)nodes, sizeof(nodes)); + if (nodes_nr < 0) { + nodes_nr = epoch_log_read_remote(epoch, (void *)nodes, sizeof(nodes)); + if (nodes_nr == 0) { + free(buf); + return NULL; + } + nodes_nr /= sizeof(nodes[0]); + } + *nr = nodes_to_vnodes(nodes, nodes_nr, buf); + *copies = get_max_copies(nodes, nodes_nr); + + return buf; +} + +static int recover_object_from_replica(uint64_t oid, + struct sd_vnode *entry, + int epoch, int tgt_epoch) +{ + struct sd_req hdr; + struct sd_rsp *rsp = (struct sd_rsp *)&hdr; + char name[128]; + unsigned wlen = 0, rlen; + int fd, ret = -1; + void *buf; + struct siocb iocb = { 0 }; + + if (is_vdi_obj(oid)) + rlen = SD_INODE_SIZE; + else if (is_vdi_attr_obj(oid)) + rlen = SD_ATTR_OBJ_SIZE; + else + rlen = SD_DATA_OBJ_SIZE; + + buf = valloc(rlen); + if (!buf) { + eprintf("%m\n"); + goto out; + } + + if (vnode_is_local(entry)) { + iocb.epoch = epoch; + iocb.length = rlen; + ret = sd_store->link(oid, &iocb, tgt_epoch); + if (ret == SD_RES_SUCCESS) { + ret = 0; + goto done; + } else { + ret = -1; + goto out; + } + } + + addr_to_str(name, sizeof(name), entry->addr, 0); + fd = connect_to(name, entry->port); + dprintf("%s, %d\n", name, entry->port); + if (fd < 0) { + eprintf("failed to connect to %s:%"PRIu32"\n", name, entry->port); + ret = -1; + goto out; + } + + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = SD_OP_READ_OBJ; + hdr.epoch = epoch; + hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_IO_LOCAL; + hdr.data_length = rlen; + hdr.obj.oid = oid; + hdr.obj.tgt_epoch = tgt_epoch; + + ret = exec_req(fd, &hdr, buf, &wlen, &rlen); + + close(fd); + + if (ret != 0) { + eprintf("res: %"PRIx32"\n", rsp->result); + ret = -1; + goto out; + } + + rsp = (struct sd_rsp *)&hdr; + + if (rsp->result == SD_RES_SUCCESS) { + iocb.epoch = epoch; + iocb.length = rlen; + iocb.buf = buf; + ret = sd_store->atomic_put(oid, &iocb); + if (ret != SD_RES_SUCCESS) { + ret = -1; + goto out; + } + } else if (rsp->result == SD_RES_NEW_NODE_VER || + rsp->result == SD_RES_OLD_NODE_VER || + rsp->result == SD_RES_NETWORK_ERROR) { + dprintf("retrying: %"PRIx32", %"PRIx64"\n", rsp->result, oid); + ret = 1; + goto out; + } else { + eprintf("failed, res: %"PRIx32"\n", rsp->result); + ret = -1; + goto out; + } +done: + dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch); +out: + free(buf); + return ret; +} + +static void rollback_old_cur(struct sd_vnode *old, int *old_nr, int *old_copies, + struct sd_vnode *cur, int *cur_nr, int *cur_copies, + struct sd_vnode *new_old, int new_old_nr, int new_old_copies) +{ + int nr_old = *old_nr; + int copies_old = *old_copies; + + memcpy(cur, old, sizeof(*old) * nr_old); + *cur_nr = nr_old; + *cur_copies = copies_old; + memcpy(old, new_old, sizeof(*new_old) * new_old_nr); + *old_nr = new_old_nr; + *old_copies = new_old_copies; +} + +/* + * Recover the object from its track in epoch history. That is, + * the routine will try to recovery it from the nodes it has stayed, + * at least, *theoretically* on consistent hash ring. + */ +static int do_recover_object(struct recovery_work *rw, int copy_idx) +{ + struct sd_vnode *old, *cur; + uint64_t oid = rw->oids[rw->done]; + int old_nr = rw->old_nr_vnodes, cur_nr = rw->cur_nr_vnodes; + int epoch = rw->epoch, tgt_epoch = rw->epoch - 1; + struct sd_vnode *tgt_entry; + int old_idx, cur_idx, tgt_idx, old_copies, cur_copies, ret; + + old = xmalloc(sizeof(*old) * SD_MAX_VNODES); + cur = xmalloc(sizeof(*cur) * SD_MAX_VNODES); + memcpy(old, rw->old_vnodes, sizeof(*old) * old_nr); + memcpy(cur, rw->cur_vnodes, sizeof(*cur) * cur_nr); + old_copies = get_max_copies(rw->old_nodes, rw->old_nr_nodes); + cur_copies = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); + +again: + old_idx = obj_to_sheep(old, old_nr, oid, 0); + cur_idx = obj_to_sheep(cur, cur_nr, oid, 0); + + dprintf("try recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch); + + if (cur_copies <= copy_idx) { + eprintf("epoch (%"PRIu32") has less copies (%d) than requested copy_idx: %d\n", + tgt_epoch, cur_copies, copy_idx); + ret = -1; + goto err; + } + + tgt_idx = find_tgt_node(old, old_nr, old_idx, old_copies, + cur, cur_nr, cur_idx, cur_copies, copy_idx); + if (tgt_idx < 0) { + eprintf("cannot find target node %"PRIx64"\n", oid); + ret = -1; + goto err; + } + tgt_entry = old + tgt_idx; + + ret = recover_object_from_replica(oid, tgt_entry, epoch, tgt_epoch); + if (ret < 0) { + struct sd_vnode *new_old; + int new_old_nr, new_old_copies; + + tgt_epoch--; + if (tgt_epoch < 1) { + eprintf("can not recover oid %"PRIx64"\n", oid); + ret = -1; + goto err; + } + + new_old = get_vnodes_from_epoch(tgt_epoch, &new_old_nr, &new_old_copies); + if (!new_old) { + ret = -1; + goto err; + } + rollback_old_cur(old, &old_nr, &old_copies, cur, &cur_nr, &cur_copies, + new_old, new_old_nr, new_old_copies); + free(new_old); + goto again; + } else if (ret > 0) { + ret = 0; + rw->retry = 1; + } +err: + free(old); + free(cur); + return ret; +} + +static int get_replica_idx(struct recovery_work *rw, uint64_t oid, int *copy_nr) +{ + int i, ret = -1; + *copy_nr = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); + for (i = 0; i < *copy_nr; i++) { + int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i); + if (vnode_is_local(&rw->cur_vnodes[n])) { + ret = i; + break; + } + } + return ret; +} + +static void recover_object(struct work *work) +{ + struct recovery_work *rw = container_of(work, struct recovery_work, work); + uint64_t oid = rw->oids[rw->done]; + uint32_t epoch = rw->epoch; + int i, copy_idx, copy_nr, ret; + struct siocb iocb = { 0 }; + + if (!sys->nr_sobjs) + return; + + eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid); + + iocb.epoch = epoch; + ret = sd_store->open(oid, &iocb, 0); + if (ret == SD_RES_SUCCESS) { + sd_store->close(oid, &iocb); + dprintf("the object is already recovered\n"); + return; + } + + copy_idx = get_replica_idx(rw, oid, ©_nr); + if (copy_idx < 0) { + ret = -1; + goto err; + } + ret = do_recover_object(rw, copy_idx); + if (ret < 0) { + for (i = 0; i < copy_nr; i++) { + if (i == copy_idx) + continue; + ret = do_recover_object(rw, i); + if (ret == 0) + break; + } + } +err: + if (ret < 0) + eprintf("failed to recover object %"PRIx64"\n", oid); +} + +static struct recovery_work *suspended_recovery_work; + +static void recover_timer(void *data) +{ + struct recovery_work *rw = (struct recovery_work *)data; + uint64_t oid = rw->oids[rw->done]; + + if (is_access_to_busy_objects(oid)) { + suspended_recovery_work = rw; + return; + } + + queue_work(sys->recovery_wqueue, &rw->work); +} + +void resume_recovery_work(void) +{ + struct recovery_work *rw; + uint64_t oid; + + if (!suspended_recovery_work) + return; + + rw = suspended_recovery_work; + + oid = rw->oids[rw->done]; + if (is_access_to_busy_objects(oid)) + return; + + suspended_recovery_work = NULL; + queue_work(sys->recovery_wqueue, &rw->work); +} + +int node_in_recovery(void) +{ + return !!recovering_work; +} + +int is_recoverying_oid(uint64_t oid) +{ + uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT); + uint64_t min_hval; + struct recovery_work *rw = recovering_work; + int ret, i; + struct siocb iocb; + + if (oid == 0) + return 0; + + if (!rw) + return 0; /* there is no thread working for object recovery */ + + min_hval = fnv_64a_buf(&rw->oids[rw->done + rw->nr_blocking], sizeof(uint64_t), FNV1A_64_INIT); + + if (before(rw->epoch, sys->epoch)) + return 1; + + if (rw->state == RW_INIT) + return 1; + + memset(&iocb, 0, sizeof(iocb)); + iocb.epoch = sys->epoch; + ret = sd_store->open(oid, &iocb, 0); + if (ret == SD_RES_SUCCESS) { + dprintf("the object %" PRIx64 " is already recoverd\n", oid); + sd_store->close(oid, &iocb); + return 0; + } + + /* the first 'rw->nr_blocking' objects were already scheduled to be done earlier */ + for (i = 0; i < rw->nr_blocking; i++) + if (rw->oids[rw->done + i] == oid) + return 1; + + if (min_hval <= hval) { + uint64_t *p; + p = bsearch(&oid, rw->oids + rw->done + rw->nr_blocking, + rw->count - rw->done - rw->nr_blocking, sizeof(oid), obj_cmp); + if (p) { + dprintf("recover the object %" PRIx64 " first\n", oid); + if (rw->nr_blocking == 0) + rw->nr_blocking = 1; /* the first oid may be processed now */ + if (p > rw->oids + rw->done + rw->nr_blocking) { + /* this object should be recovered earlier */ + memmove(rw->oids + rw->done + rw->nr_blocking + 1, + rw->oids + rw->done + rw->nr_blocking, + sizeof(uint64_t) * (p - (rw->oids + rw->done + rw->nr_blocking))); + rw->oids[rw->done + rw->nr_blocking] = oid; + rw->nr_blocking++; + } + return 1; + } + } + + dprintf("the object %" PRIx64 " is not found\n", oid); + return 0; +} + +static void do_recover_main(struct work *work) +{ + struct recovery_work *rw = container_of(work, struct recovery_work, work); + uint64_t oid; + + if (rw->state == RW_INIT) + rw->state = RW_RUN; + else if (!rw->retry) { + rw->done++; + if (rw->nr_blocking > 0) + rw->nr_blocking--; + } + + oid = rw->oids[rw->done]; + + if (rw->retry && !next_rw) { + rw->retry = 0; + + rw->timer.callback = recover_timer; + rw->timer.data = rw; + add_timer(&rw->timer, 2); + return; + } + + if (rw->done < rw->count && !next_rw) { + rw->work.fn = recover_object; + + if (is_access_to_busy_objects(oid)) { + suspended_recovery_work = rw; + return; + } + resume_pending_requests(); + queue_work(sys->recovery_wqueue, &rw->work); + return; + } + + dprintf("recovery complete: new epoch %"PRIu32"\n", rw->epoch); + recovering_work = NULL; + + sys->recovered_epoch = rw->epoch; + + free(rw->oids); + free(rw); + + if (next_rw) { + rw = next_rw; + next_rw = NULL; + + recovering_work = rw; + queue_work(sys->recovery_wqueue, &rw->work); + } else { + if (sd_store->end_recover) { + struct siocb iocb = { 0 }; + iocb.epoch = sys->epoch; + sd_store->end_recover(&iocb); + } + } + + resume_pending_requests(); +} + +static int request_obj_list(struct sd_node *e, uint32_t epoch, + uint8_t *buf, size_t buf_size) +{ + int fd, ret; + unsigned wlen, rlen; + char name[128]; + struct sd_list_req hdr; + struct sd_list_rsp *rsp; + + addr_to_str(name, sizeof(name), e->addr, 0); + + dprintf("%s %"PRIu32"\n", name, e->port); + + fd = connect_to(name, e->port); + if (fd < 0) { + eprintf("%s %"PRIu32"\n", name, e->port); + return -1; + } + + wlen = 0; + rlen = buf_size; + + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = SD_OP_GET_OBJ_LIST; + hdr.tgt_epoch = epoch - 1; + hdr.flags = 0; + hdr.data_length = rlen; + + ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); + + close(fd); + + rsp = (struct sd_list_rsp *)&hdr; + + if (ret || rsp->result != SD_RES_SUCCESS) { + eprintf("retrying: %"PRIu32", %"PRIu32"\n", ret, rsp->result); + return -1; + } + + dprintf("%"PRIu64"\n", rsp->data_length / sizeof(uint64_t)); + + return rsp->data_length / sizeof(uint64_t); +} + +int merge_objlist(uint64_t *list1, int nr_list1, uint64_t *list2, int nr_list2) +{ + int i; + int old_nr_list1 = nr_list1; + + for (i = 0; i < nr_list2; i++) { + if (bsearch(list2 + i, list1, old_nr_list1, sizeof(*list1), obj_cmp)) + continue; + + list1[nr_list1++] = list2[i]; + } + + qsort(list1, nr_list1, sizeof(*list1), obj_cmp); + + return nr_list1; +} + +static int screen_obj_list(struct recovery_work *rw, uint64_t *list, int list_nr) +{ + int ret, i, cp, idx; + struct strbuf buf = STRBUF_INIT; + struct sd_vnode *nodes = rw->cur_vnodes; + int nodes_nr = rw->cur_nr_vnodes; + int nr_objs = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); + + for (i = 0; i < list_nr; i++) { + for (cp = 0; cp < nr_objs; cp++) { + idx = obj_to_sheep(nodes, nodes_nr, list[i], cp); + if (vnode_is_local(&nodes[idx])) + break; + } + if (cp == nr_objs) + continue; + strbuf_add(&buf, &list[i], sizeof(uint64_t)); + } + memcpy(list, buf.buf, buf.len); + + ret = buf.len / sizeof(uint64_t); + dprintf("%d\n", ret); + strbuf_release(&buf); + + return ret; +} + +#define MAX_RETRY_CNT 6 + +static int newly_joined(struct sd_node *node, struct recovery_work *rw) +{ + struct sd_node *old = rw->old_nodes; + int old_nr = rw->old_nr_nodes; + int i; + for (i = 0; i < old_nr; i++) + if (node_cmp(node, old + i) == 0) + break; + + if (i == old_nr) + return 1; + return 0; +} + +static int fill_obj_list(struct recovery_work *rw) +{ + int i; + uint8_t *buf = NULL; + size_t buf_size = SD_DATA_OBJ_SIZE; /* FIXME */ + int retry_cnt; + struct sd_node *cur = rw->cur_nodes; + int cur_nr = rw->cur_nr_nodes; + + buf = malloc(buf_size); + if (!buf) { + eprintf("out of memory\n"); + rw->retry = 1; + return -1; + } + for (i = 0; i < cur_nr; i++) { + int buf_nr; + struct sd_node *node = cur + i; + + if (newly_joined(node, rw)) + /* new node doesn't have a list file */ + continue; + + retry_cnt = 0; + retry: + buf_nr = request_obj_list(node, rw->epoch, buf, buf_size); + if (buf_nr < 0) { + retry_cnt++; + if (retry_cnt > MAX_RETRY_CNT) { + eprintf("failed to get object list\n"); + eprintf("some objects may be lost\n"); + continue; + } else { + if (next_rw) { + dprintf("go to the next recovery\n"); + break; + } + dprintf("trying to get object list again\n"); + sleep(1); + goto retry; + } + } + buf_nr = screen_obj_list(rw, (uint64_t *)buf, buf_nr); + if (buf_nr) + rw->count = merge_objlist(rw->oids, rw->count, (uint64_t *)buf, buf_nr); + } + + dprintf("%d\n", rw->count); + free(buf); + return 0; +} + +/* setup node list and virtual node list */ +static int init_rw(struct recovery_work *rw) +{ + int epoch = rw->epoch; + + rw->cur_nr_nodes = epoch_log_read_nr(epoch, (char *)rw->cur_nodes, + sizeof(rw->cur_nodes)); + if (rw->cur_nr_nodes <= 0) { + eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch); + return -1; + } + + rw->old_nr_nodes = epoch_log_read_nr(epoch - 1, (char *)rw->old_nodes, + sizeof(rw->old_nodes)); + if (rw->old_nr_nodes <= 0) { + eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch - 1); + return -1; + } + rw->old_nr_vnodes = nodes_to_vnodes(rw->old_nodes, rw->old_nr_nodes, + rw->old_vnodes); + rw->cur_nr_vnodes = nodes_to_vnodes(rw->cur_nodes, rw->cur_nr_nodes, + rw->cur_vnodes); + + return 0; +} + +static void do_recovery_work(struct work *work) +{ + struct recovery_work *rw = container_of(work, struct recovery_work, work); + + dprintf("%u\n", rw->epoch); + + if (!sys->nr_sobjs) + return; + + if (rw->cur_nr_nodes == 0) + init_rw(rw); + + if (fill_obj_list(rw) < 0) { + eprintf("fatal recovery error\n"); + rw->count = 0; + return; + } +} + +int start_recovery(uint32_t epoch) +{ + struct recovery_work *rw; + + rw = zalloc(sizeof(struct recovery_work)); + if (!rw) + return -1; + + rw->state = RW_INIT; + rw->oids = malloc(1 << 20); /* FIXME */ + rw->epoch = epoch; + rw->count = 0; + + rw->work.fn = do_recovery_work; + rw->work.done = do_recover_main; + + if (sd_store->begin_recover) { + struct siocb iocb = { 0 }; + iocb.epoch = epoch; + sd_store->begin_recover(&iocb); + } + + if (recovering_work != NULL) { + if (next_rw) { + /* skip the previous epoch recovery */ + free(next_rw->oids); + free(next_rw); + } + next_rw = rw; + } else { + recovering_work = rw; + queue_work(sys->recovery_wqueue, &rw->work); + } + + return 0; +} Index: sheepdog/sheep/store.c =================================================================== --- sheepdog.orig/sheep/store.c 2012-04-26 18:21:22.724582445 +0200 +++ sheepdog/sheep/store.c 2012-04-26 18:29:32.544595023 +0200 @@ -16,12 +16,10 @@ #include <stdlib.h> #include <unistd.h> #include <poll.h> -#include <sys/xattr.h> #include <sys/statvfs.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> -#include <time.h> #include <pthread.h> #include "sheep_priv.h" @@ -133,18 +131,6 @@ static int check_and_insert_objlist_cach return 0; } -static int obj_cmp(const void *oid1, const void *oid2) -{ - const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT); - const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t), FNV1A_64_INIT); - - if (hval1 < hval2) - return -1; - if (hval1 > hval2) - return 1; - return 0; -} - static void get_store_dir(struct strbuf *buf, int epoch) { if (!strcmp(sd_store->name, "simple")) @@ -1162,803 +1148,6 @@ int get_max_copies(struct sd_node *entri return min(sys->nr_sobjs, nr_zones); } -/* - * contains_node - checks that the node id is included in the target nodes - * - * The target nodes to store replicated objects are the first N nodes - * from the base_idx'th on the consistent hash ring, where N is the - * number of copies of objects. - */ -static int contains_node(struct sd_vnode *key, - struct sd_vnode *entry, - int nr, int base_idx, int copies) -{ - int i; - - for (i = 0; i < copies; i++) { - int idx = get_nth_node(entry, nr, base_idx, i); - if (memcmp(key->addr, entry[idx].addr, sizeof(key->addr)) == 0 - && key->port == entry[idx].port) - return idx; - } - return -1; -} - -enum rw_state { - RW_INIT, - RW_RUN, -}; - -struct recovery_work { - enum rw_state state; - - uint32_t epoch; - uint32_t done; - - struct timer timer; - int retry; - struct work work; - - int nr_blocking; - int count; - uint64_t *oids; - - int old_nr_nodes; - struct sd_node old_nodes[SD_MAX_NODES]; - int cur_nr_nodes; - struct sd_node cur_nodes[SD_MAX_NODES]; - int old_nr_vnodes; - struct sd_vnode old_vnodes[SD_MAX_VNODES]; - int cur_nr_vnodes; - struct sd_vnode cur_vnodes[SD_MAX_VNODES]; -}; - -static struct recovery_work *next_rw; -static struct recovery_work *recovering_work; - -/* - * find_tgt_node - find the node from which we should recover objects - * - * This function compares two node lists, the current target nodes and - * the previous target nodes, and finds the node from the previous - * target nodes which corresponds to the copy_idx'th node of the - * current target nodes. The correspondence is injective and - * maximizes the number of nodes which can recover objects locally. - * - * For example, consider the number of redundancy is 5, the consistent - * hash ring is {A, B, C, D, E, F}, and the node G is newly added. - * The parameters of this function are - * old_entry = {A, B, C, D, E, F}, old_nr = 6, old_idx = 3 - * cur_entry = {A, B, C, D, E, F, G}, cur_nr = 7, cur_idx = 3 - * - * In this case: - * the previous target nodes: {D, E, F, A, B} - * (the first 5 nodes from the 3rd node on the previous hash ring) - * the current target nodes : {D, E, F, G, A} - * (the first 5 nodes from the 3rd node on the current hash ring) - * - * The correspondence between copy_idx and return value are as follows: - * ---------------------------- - * copy_idx 0 1 2 3 4 - * src_node D E F G A - * tgt_node D E F B A - * return value 0 1 2 4 3 - * ---------------------------- - * - * The node D, E, F, and A can recover objects from local, and the - * node G recovers from the node B. - */ -static int find_tgt_node(struct sd_vnode *old_entry, - int old_nr, int old_idx, int old_copies, - struct sd_vnode *cur_entry, - int cur_nr, int cur_idx, int cur_copies, - int copy_idx) -{ - int i, j, idx; - - dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", - old_idx, old_nr, old_copies, cur_idx, cur_nr, cur_copies, copy_idx); - - /* If the same node is in the previous target nodes, return its index */ - idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, copy_idx), - old_entry, old_nr, old_idx, old_copies); - if (idx >= 0) { - dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", idx, copy_idx, cur_idx, cur_nr); - return idx; - } - - for (i = 0, j = 0; ; i++, j++) { - if (i < copy_idx) { - /* Skip if the node can recover from its local */ - idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, i), - old_entry, old_nr, old_idx, old_copies); - if (idx >= 0) - continue; - - /* Find the next target which needs to recover from remote */ - while (j < old_copies && - contains_node(old_entry + get_nth_node(old_entry, old_nr, old_idx, j), - cur_entry, cur_nr, cur_idx, cur_copies) >= 0) - j++; - } - if (j == old_copies) { - /* - * Cannot find the target because the number of zones - * is smaller than the number of copies. We can select - * any node in this case, so select the first one. - */ - return old_idx; - } - - if (i == copy_idx) { - /* Found the target node correspoinding to copy_idx */ - dprintf("%"PRIu32", %"PRIu32", %"PRIu32"\n", - get_nth_node(old_entry, old_nr, old_idx, j), - copy_idx, (cur_idx + i) % cur_nr); - return get_nth_node(old_entry, old_nr, old_idx, j); - } - - } - - return -1; -} - -static void *get_vnodes_from_epoch(int epoch, int *nr, int *copies) -{ - int nodes_nr, len = sizeof(struct sd_vnode) * SD_MAX_VNODES; - struct sd_node nodes[SD_MAX_NODES]; - void *buf = xmalloc(len); - - nodes_nr = epoch_log_read_nr(epoch, (void *)nodes, sizeof(nodes)); - if (nodes_nr < 0) { - nodes_nr = epoch_log_read_remote(epoch, (void *)nodes, sizeof(nodes)); - if (nodes_nr == 0) { - free(buf); - return NULL; - } - nodes_nr /= sizeof(nodes[0]); - } - *nr = nodes_to_vnodes(nodes, nodes_nr, buf); - *copies = get_max_copies(nodes, nodes_nr); - - return buf; -} - -static int recover_object_from_replica(uint64_t oid, - struct sd_vnode *entry, - int epoch, int tgt_epoch) -{ - struct sd_req hdr; - struct sd_rsp *rsp = (struct sd_rsp *)&hdr; - char name[128]; - unsigned wlen = 0, rlen; - int fd, ret = -1; - void *buf; - struct siocb iocb = { 0 }; - - if (is_vdi_obj(oid)) - rlen = SD_INODE_SIZE; - else if (is_vdi_attr_obj(oid)) - rlen = SD_ATTR_OBJ_SIZE; - else - rlen = SD_DATA_OBJ_SIZE; - - buf = valloc(rlen); - if (!buf) { - eprintf("%m\n"); - goto out; - } - - if (vnode_is_local(entry)) { - iocb.epoch = epoch; - iocb.length = rlen; - ret = sd_store->link(oid, &iocb, tgt_epoch); - if (ret == SD_RES_SUCCESS) { - ret = 0; - goto done; - } else { - ret = -1; - goto out; - } - } - - addr_to_str(name, sizeof(name), entry->addr, 0); - fd = connect_to(name, entry->port); - dprintf("%s, %d\n", name, entry->port); - if (fd < 0) { - eprintf("failed to connect to %s:%"PRIu32"\n", name, entry->port); - ret = -1; - goto out; - } - - memset(&hdr, 0, sizeof(hdr)); - hdr.opcode = SD_OP_READ_OBJ; - hdr.epoch = epoch; - hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_IO_LOCAL; - hdr.data_length = rlen; - hdr.obj.oid = oid; - hdr.obj.tgt_epoch = tgt_epoch; - - ret = exec_req(fd, &hdr, buf, &wlen, &rlen); - - close(fd); - - if (ret != 0) { - eprintf("res: %"PRIx32"\n", rsp->result); - ret = -1; - goto out; - } - - rsp = (struct sd_rsp *)&hdr; - - if (rsp->result == SD_RES_SUCCESS) { - iocb.epoch = epoch; - iocb.length = rlen; - iocb.buf = buf; - ret = sd_store->atomic_put(oid, &iocb); - if (ret != SD_RES_SUCCESS) { - ret = -1; - goto out; - } - } else if (rsp->result == SD_RES_NEW_NODE_VER || - rsp->result == SD_RES_OLD_NODE_VER || - rsp->result == SD_RES_NETWORK_ERROR) { - dprintf("retrying: %"PRIx32", %"PRIx64"\n", rsp->result, oid); - ret = 1; - goto out; - } else { - eprintf("failed, res: %"PRIx32"\n", rsp->result); - ret = -1; - goto out; - } -done: - dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch); -out: - free(buf); - return ret; -} - -static void rollback_old_cur(struct sd_vnode *old, int *old_nr, int *old_copies, - struct sd_vnode *cur, int *cur_nr, int *cur_copies, - struct sd_vnode *new_old, int new_old_nr, int new_old_copies) -{ - int nr_old = *old_nr; - int copies_old = *old_copies; - - memcpy(cur, old, sizeof(*old) * nr_old); - *cur_nr = nr_old; - *cur_copies = copies_old; - memcpy(old, new_old, sizeof(*new_old) * new_old_nr); - *old_nr = new_old_nr; - *old_copies = new_old_copies; -} - -/* - * Recover the object from its track in epoch history. That is, - * the routine will try to recovery it from the nodes it has stayed, - * at least, *theoretically* on consistent hash ring. - */ -static int do_recover_object(struct recovery_work *rw, int copy_idx) -{ - struct sd_vnode *old, *cur; - uint64_t oid = rw->oids[rw->done]; - int old_nr = rw->old_nr_vnodes, cur_nr = rw->cur_nr_vnodes; - int epoch = rw->epoch, tgt_epoch = rw->epoch - 1; - struct sd_vnode *tgt_entry; - int old_idx, cur_idx, tgt_idx, old_copies, cur_copies, ret; - - old = xmalloc(sizeof(*old) * SD_MAX_VNODES); - cur = xmalloc(sizeof(*cur) * SD_MAX_VNODES); - memcpy(old, rw->old_vnodes, sizeof(*old) * old_nr); - memcpy(cur, rw->cur_vnodes, sizeof(*cur) * cur_nr); - old_copies = get_max_copies(rw->old_nodes, rw->old_nr_nodes); - cur_copies = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); - -again: - old_idx = obj_to_sheep(old, old_nr, oid, 0); - cur_idx = obj_to_sheep(cur, cur_nr, oid, 0); - - dprintf("try recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch); - - if (cur_copies <= copy_idx) { - eprintf("epoch (%"PRIu32") has less copies (%d) than requested copy_idx: %d\n", - tgt_epoch, cur_copies, copy_idx); - ret = -1; - goto err; - } - - tgt_idx = find_tgt_node(old, old_nr, old_idx, old_copies, - cur, cur_nr, cur_idx, cur_copies, copy_idx); - if (tgt_idx < 0) { - eprintf("cannot find target node %"PRIx64"\n", oid); - ret = -1; - goto err; - } - tgt_entry = old + tgt_idx; - - ret = recover_object_from_replica(oid, tgt_entry, epoch, tgt_epoch); - if (ret < 0) { - struct sd_vnode *new_old; - int new_old_nr, new_old_copies; - - tgt_epoch--; - if (tgt_epoch < 1) { - eprintf("can not recover oid %"PRIx64"\n", oid); - ret = -1; - goto err; - } - - new_old = get_vnodes_from_epoch(tgt_epoch, &new_old_nr, &new_old_copies); - if (!new_old) { - ret = -1; - goto err; - } - rollback_old_cur(old, &old_nr, &old_copies, cur, &cur_nr, &cur_copies, - new_old, new_old_nr, new_old_copies); - free(new_old); - goto again; - } else if (ret > 0) { - ret = 0; - rw->retry = 1; - } -err: - free(old); - free(cur); - return ret; -} - -static int get_replica_idx(struct recovery_work *rw, uint64_t oid, int *copy_nr) -{ - int i, ret = -1; - *copy_nr = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); - for (i = 0; i < *copy_nr; i++) { - int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i); - if (vnode_is_local(&rw->cur_vnodes[n])) { - ret = i; - break; - } - } - return ret; -} - -static void recover_object(struct work *work) -{ - struct recovery_work *rw = container_of(work, struct recovery_work, work); - uint64_t oid = rw->oids[rw->done]; - uint32_t epoch = rw->epoch; - int i, copy_idx, copy_nr, ret; - struct siocb iocb = { 0 }; - - if (!sys->nr_sobjs) - return; - - eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid); - - iocb.epoch = epoch; - ret = sd_store->open(oid, &iocb, 0); - if (ret == SD_RES_SUCCESS) { - sd_store->close(oid, &iocb); - dprintf("the object is already recovered\n"); - return; - } - - copy_idx = get_replica_idx(rw, oid, ©_nr); - if (copy_idx < 0) { - ret = -1; - goto err; - } - ret = do_recover_object(rw, copy_idx); - if (ret < 0) { - for (i = 0; i < copy_nr; i++) { - if (i == copy_idx) - continue; - ret = do_recover_object(rw, i); - if (ret == 0) - break; - } - } -err: - if (ret < 0) - eprintf("failed to recover object %"PRIx64"\n", oid); -} - -static struct recovery_work *suspended_recovery_work; - -static void recover_timer(void *data) -{ - struct recovery_work *rw = (struct recovery_work *)data; - uint64_t oid = rw->oids[rw->done]; - - if (is_access_to_busy_objects(oid)) { - suspended_recovery_work = rw; - return; - } - - queue_work(sys->recovery_wqueue, &rw->work); -} - -void resume_recovery_work(void) -{ - struct recovery_work *rw; - uint64_t oid; - - if (!suspended_recovery_work) - return; - - rw = suspended_recovery_work; - - oid = rw->oids[rw->done]; - if (is_access_to_busy_objects(oid)) - return; - - suspended_recovery_work = NULL; - queue_work(sys->recovery_wqueue, &rw->work); -} - -int node_in_recovery(void) -{ - return !!recovering_work; -} - -int is_recoverying_oid(uint64_t oid) -{ - uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT); - uint64_t min_hval; - struct recovery_work *rw = recovering_work; - int ret, i; - struct siocb iocb; - - if (oid == 0) - return 0; - - if (!rw) - return 0; /* there is no thread working for object recovery */ - - min_hval = fnv_64a_buf(&rw->oids[rw->done + rw->nr_blocking], sizeof(uint64_t), FNV1A_64_INIT); - - if (before(rw->epoch, sys->epoch)) - return 1; - - if (rw->state == RW_INIT) - return 1; - - memset(&iocb, 0, sizeof(iocb)); - iocb.epoch = sys->epoch; - ret = sd_store->open(oid, &iocb, 0); - if (ret == SD_RES_SUCCESS) { - dprintf("the object %" PRIx64 " is already recoverd\n", oid); - sd_store->close(oid, &iocb); - return 0; - } - - /* the first 'rw->nr_blocking' objects were already scheduled to be done earlier */ - for (i = 0; i < rw->nr_blocking; i++) - if (rw->oids[rw->done + i] == oid) - return 1; - - if (min_hval <= hval) { - uint64_t *p; - p = bsearch(&oid, rw->oids + rw->done + rw->nr_blocking, - rw->count - rw->done - rw->nr_blocking, sizeof(oid), obj_cmp); - if (p) { - dprintf("recover the object %" PRIx64 " first\n", oid); - if (rw->nr_blocking == 0) - rw->nr_blocking = 1; /* the first oid may be processed now */ - if (p > rw->oids + rw->done + rw->nr_blocking) { - /* this object should be recovered earlier */ - memmove(rw->oids + rw->done + rw->nr_blocking + 1, - rw->oids + rw->done + rw->nr_blocking, - sizeof(uint64_t) * (p - (rw->oids + rw->done + rw->nr_blocking))); - rw->oids[rw->done + rw->nr_blocking] = oid; - rw->nr_blocking++; - } - return 1; - } - } - - dprintf("the object %" PRIx64 " is not found\n", oid); - return 0; -} - -static void do_recover_main(struct work *work) -{ - struct recovery_work *rw = container_of(work, struct recovery_work, work); - uint64_t oid; - - if (rw->state == RW_INIT) - rw->state = RW_RUN; - else if (!rw->retry) { - rw->done++; - if (rw->nr_blocking > 0) - rw->nr_blocking--; - } - - oid = rw->oids[rw->done]; - - if (rw->retry && !next_rw) { - rw->retry = 0; - - rw->timer.callback = recover_timer; - rw->timer.data = rw; - add_timer(&rw->timer, 2); - return; - } - - if (rw->done < rw->count && !next_rw) { - rw->work.fn = recover_object; - - if (is_access_to_busy_objects(oid)) { - suspended_recovery_work = rw; - return; - } - resume_pending_requests(); - queue_work(sys->recovery_wqueue, &rw->work); - return; - } - - dprintf("recovery complete: new epoch %"PRIu32"\n", rw->epoch); - recovering_work = NULL; - - sys->recovered_epoch = rw->epoch; - - free(rw->oids); - free(rw); - - if (next_rw) { - rw = next_rw; - next_rw = NULL; - - recovering_work = rw; - queue_work(sys->recovery_wqueue, &rw->work); - } else { - if (sd_store->end_recover) { - struct siocb iocb = { 0 }; - iocb.epoch = sys->epoch; - sd_store->end_recover(&iocb); - } - } - - resume_pending_requests(); -} - -static int request_obj_list(struct sd_node *e, uint32_t epoch, - uint8_t *buf, size_t buf_size) -{ - int fd, ret; - unsigned wlen, rlen; - char name[128]; - struct sd_list_req hdr; - struct sd_list_rsp *rsp; - - addr_to_str(name, sizeof(name), e->addr, 0); - - dprintf("%s %"PRIu32"\n", name, e->port); - - fd = connect_to(name, e->port); - if (fd < 0) { - eprintf("%s %"PRIu32"\n", name, e->port); - return -1; - } - - wlen = 0; - rlen = buf_size; - - memset(&hdr, 0, sizeof(hdr)); - hdr.opcode = SD_OP_GET_OBJ_LIST; - hdr.tgt_epoch = epoch - 1; - hdr.flags = 0; - hdr.data_length = rlen; - - ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); - - close(fd); - - rsp = (struct sd_list_rsp *)&hdr; - - if (ret || rsp->result != SD_RES_SUCCESS) { - eprintf("retrying: %"PRIu32", %"PRIu32"\n", ret, rsp->result); - return -1; - } - - dprintf("%"PRIu64"\n", rsp->data_length / sizeof(uint64_t)); - - return rsp->data_length / sizeof(uint64_t); -} - -int merge_objlist(uint64_t *list1, int nr_list1, uint64_t *list2, int nr_list2) -{ - int i; - int old_nr_list1 = nr_list1; - - for (i = 0; i < nr_list2; i++) { - if (bsearch(list2 + i, list1, old_nr_list1, sizeof(*list1), obj_cmp)) - continue; - - list1[nr_list1++] = list2[i]; - } - - qsort(list1, nr_list1, sizeof(*list1), obj_cmp); - - return nr_list1; -} - -static int screen_obj_list(struct recovery_work *rw, uint64_t *list, int list_nr) -{ - int ret, i, cp, idx; - struct strbuf buf = STRBUF_INIT; - struct sd_vnode *nodes = rw->cur_vnodes; - int nodes_nr = rw->cur_nr_vnodes; - int nr_objs = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes); - - for (i = 0; i < list_nr; i++) { - for (cp = 0; cp < nr_objs; cp++) { - idx = obj_to_sheep(nodes, nodes_nr, list[i], cp); - if (vnode_is_local(&nodes[idx])) - break; - } - if (cp == nr_objs) - continue; - strbuf_add(&buf, &list[i], sizeof(uint64_t)); - } - memcpy(list, buf.buf, buf.len); - - ret = buf.len / sizeof(uint64_t); - dprintf("%d\n", ret); - strbuf_release(&buf); - - return ret; -} - -#define MAX_RETRY_CNT 6 - -static int newly_joined(struct sd_node *node, struct recovery_work *rw) -{ - struct sd_node *old = rw->old_nodes; - int old_nr = rw->old_nr_nodes; - int i; - for (i = 0; i < old_nr; i++) - if (node_cmp(node, old + i) == 0) - break; - - if (i == old_nr) - return 1; - return 0; -} - -static int fill_obj_list(struct recovery_work *rw) -{ - int i; - uint8_t *buf = NULL; - size_t buf_size = SD_DATA_OBJ_SIZE; /* FIXME */ - int retry_cnt; - struct sd_node *cur = rw->cur_nodes; - int cur_nr = rw->cur_nr_nodes; - - buf = malloc(buf_size); - if (!buf) { - eprintf("out of memory\n"); - rw->retry = 1; - return -1; - } - for (i = 0; i < cur_nr; i++) { - int buf_nr; - struct sd_node *node = cur + i; - - if (newly_joined(node, rw)) - /* new node doesn't have a list file */ - continue; - - retry_cnt = 0; - retry: - buf_nr = request_obj_list(node, rw->epoch, buf, buf_size); - if (buf_nr < 0) { - retry_cnt++; - if (retry_cnt > MAX_RETRY_CNT) { - eprintf("failed to get object list\n"); - eprintf("some objects may be lost\n"); - continue; - } else { - if (next_rw) { - dprintf("go to the next recovery\n"); - break; - } - dprintf("trying to get object list again\n"); - sleep(1); - goto retry; - } - } - buf_nr = screen_obj_list(rw, (uint64_t *)buf, buf_nr); - if (buf_nr) - rw->count = merge_objlist(rw->oids, rw->count, (uint64_t *)buf, buf_nr); - } - - dprintf("%d\n", rw->count); - free(buf); - return 0; -} - -/* setup node list and virtual node list */ -static int init_rw(struct recovery_work *rw) -{ - int epoch = rw->epoch; - - rw->cur_nr_nodes = epoch_log_read_nr(epoch, (char *)rw->cur_nodes, - sizeof(rw->cur_nodes)); - if (rw->cur_nr_nodes <= 0) { - eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch); - return -1; - } - - rw->old_nr_nodes = epoch_log_read_nr(epoch - 1, (char *)rw->old_nodes, - sizeof(rw->old_nodes)); - if (rw->old_nr_nodes <= 0) { - eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch - 1); - return -1; - } - rw->old_nr_vnodes = nodes_to_vnodes(rw->old_nodes, rw->old_nr_nodes, - rw->old_vnodes); - rw->cur_nr_vnodes = nodes_to_vnodes(rw->cur_nodes, rw->cur_nr_nodes, - rw->cur_vnodes); - - return 0; -} - -static void do_recovery_work(struct work *work) -{ - struct recovery_work *rw = container_of(work, struct recovery_work, work); - - dprintf("%u\n", rw->epoch); - - if (!sys->nr_sobjs) - return; - - if (rw->cur_nr_nodes == 0) - init_rw(rw); - - if (fill_obj_list(rw) < 0) { - eprintf("fatal recovery error\n"); - rw->count = 0; - return; - } -} - -int start_recovery(uint32_t epoch) -{ - struct recovery_work *rw; - - rw = zalloc(sizeof(struct recovery_work)); - if (!rw) - return -1; - - rw->state = RW_INIT; - rw->oids = malloc(1 << 20); /* FIXME */ - rw->epoch = epoch; - rw->count = 0; - - rw->work.fn = do_recovery_work; - rw->work.done = do_recover_main; - - if (sd_store->begin_recover) { - struct siocb iocb = { 0 }; - iocb.epoch = epoch; - sd_store->begin_recover(&iocb); - } - - if (recovering_work != NULL) { - if (next_rw) { - /* skip the previous epoch recovery */ - free(next_rw->oids); - free(next_rw); - } - next_rw = rw; - } else { - recovering_work = rw; - queue_work(sys->recovery_wqueue, &rw->work); - } - - return 0; -} - static int init_path(const char *d, int *new) { int ret, retry = 0; |