On 04/27/2012 11:34 PM, Christoph Hellwig wrote: > Signed-off-by: Christoph Hellwig <hch at lst.de> > > --- > sheep/Makefile.am | 5 > sheep/recovery.c | 828 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ > sheep/store.c | 811 ---------------------------------------------------- > 3 files changed, 831 insertions(+), 813 deletions(-) > > Index: sheepdog/sheep/Makefile.am > =================================================================== > --- sheepdog.orig/sheep/Makefile.am 2012-04-26 18:47:06.756621981 +0200 > +++ sheepdog/sheep/Makefile.am 2012-04-27 17:21:06.104083016 +0200 > @@ -24,8 +24,10 @@ INCLUDES = -I$(top_builddir)/include -I > > sbin_PROGRAMS = sheep > > -sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \ > - cluster/local.c strbuf.c simple_store.c object_cache.c > +sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c \ > + journal.c ops.c recovery.c cluster/local.c strbuf.c \ > + simple_store.c object_cache.c > + > if BUILD_COROSYNC > sheep_SOURCES += cluster/corosync.c > endif > Index: sheepdog/sheep/recovery.c > =================================================================== > --- /dev/null 1970-01-01 00:00:00.000000000 +0000 > +++ sheepdog/sheep/recovery.c 2012-04-27 17:24:34.040088332 +0200 > @@ -0,0 +1,828 @@ > +/* > + * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation. > + * > + * This program is free software; you can redistribute it and/or > + * modify it under the terms of the GNU General Public License version > + * 2 as published by the Free Software Foundation. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program. If not, see <http://www.gnu.org/licenses/>. > + */ > +#include <errno.h> > +#include <stdio.h> > +#include <stdlib.h> > +#include <unistd.h> > +#include <sys/types.h> > + > +#include "sheep_priv.h" > +#include "strbuf.h" > + > + > +enum rw_state { > + RW_INIT, > + RW_RUN, > +}; > + > +struct recovery_work { > + enum rw_state state; > + > + uint32_t epoch; > + uint32_t done; > + > + struct timer timer; > + int retry; > + struct work work; > + > + int nr_blocking; > + int count; > + uint64_t *oids; > + > + int old_nr_nodes; > + struct sd_node old_nodes[SD_MAX_NODES]; > + int cur_nr_nodes; > + struct sd_node cur_nodes[SD_MAX_NODES]; > + int old_nr_vnodes; > + struct sd_vnode old_vnodes[SD_MAX_VNODES]; > + int cur_nr_vnodes; > + struct sd_vnode cur_vnodes[SD_MAX_VNODES]; > +}; > + > +static struct recovery_work *next_rw; > +static struct recovery_work *recovering_work; > + > +static int obj_cmp(const void *oid1, const void *oid2) > +{ > + const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT); > + const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t), FNV1A_64_INIT); > + > + if (hval1 < hval2) > + return -1; > + if (hval1 > hval2) > + return 1; > + return 0; > +} > + > +/* > + * contains_node - checks that the node id is included in the target nodes > + * > + * The target nodes to store replicated objects are the first N nodes > + * from the base_idx'th on the consistent hash ring, where N is the > + * number of copies of objects. > + */ > +static int contains_node(struct sd_vnode *key, > + struct sd_vnode *entry, > + int nr, int base_idx, int copies) > +{ > + int i; > + > + for (i = 0; i < copies; i++) { > + int idx = get_nth_node(entry, nr, base_idx, i); > + if (memcmp(key->addr, entry[idx].addr, sizeof(key->addr)) == 0 > + && key->port == entry[idx].port) > + return idx; > + } > + return -1; > +} > + > +/* > + * find_tgt_node - find the node from which we should recover objects > + * > + * This function compares two node lists, the current target nodes and > + * the previous target nodes, and finds the node from the previous > + * target nodes which corresponds to the copy_idx'th node of the > + * current target nodes. The correspondence is injective and > + * maximizes the number of nodes which can recover objects locally. > + * > + * For example, consider the number of redundancy is 5, the consistent > + * hash ring is {A, B, C, D, E, F}, and the node G is newly added. > + * The parameters of this function are > + * old_entry = {A, B, C, D, E, F}, old_nr = 6, old_idx = 3 > + * cur_entry = {A, B, C, D, E, F, G}, cur_nr = 7, cur_idx = 3 > + * > + * In this case: > + * the previous target nodes: {D, E, F, A, B} > + * (the first 5 nodes from the 3rd node on the previous hash ring) > + * the current target nodes : {D, E, F, G, A} > + * (the first 5 nodes from the 3rd node on the current hash ring) > + * > + * The correspondence between copy_idx and return value are as follows: > + * ---------------------------- > + * copy_idx 0 1 2 3 4 > + * src_node D E F G A > + * tgt_node D E F B A > + * return value 0 1 2 4 3 > + * ---------------------------- > + * > + * The node D, E, F, and A can recover objects from local, and the > + * node G recovers from the node B. > + */ > +static int find_tgt_node(struct sd_vnode *old_entry, > + int old_nr, int old_idx, int old_copies, > + struct sd_vnode *cur_entry, > + int cur_nr, int cur_idx, int cur_copies, > + int copy_idx) > +{ > + int i, j, idx; > + > + dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", > + old_idx, old_nr, old_copies, cur_idx, cur_nr, cur_copies, copy_idx); > + > + /* If the same node is in the previous target nodes, return its index */ > + idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, copy_idx), > + old_entry, old_nr, old_idx, old_copies); > + if (idx >= 0) { > + dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", idx, copy_idx, cur_idx, cur_nr); > + return idx; > + } > + > + for (i = 0, j = 0; ; i++, j++) { > + if (i < copy_idx) { > + /* Skip if the node can recover from its local */ > + idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, i), > + old_entry, old_nr, old_idx, old_copies); > + if (idx >= 0) > + continue; > + > + /* Find the next target which needs to recover from remote */ > + while (j < old_copies && > + contains_node(old_entry + get_nth_node(old_entry, old_nr, old_idx, j), > + cur_entry, cur_nr, cur_idx, cur_copies) >= 0) > + j++; > + } > + if (j == old_copies) { > + /* > + * Cannot find the target because the number of zones > + * is smaller than the number of copies. We can select > + * any node in this case, so select the first one. > + */ > + return old_idx; > + } > + > + if (i == copy_idx) { > + /* Found the target node correspoinding to copy_idx */ > + dprintf("%"PRIu32", %"PRIu32", %"PRIu32"\n", > + get_nth_node(old_entry, old_nr, old_idx, j), > + copy_idx, (cur_idx + i) % cur_nr); > + return get_nth_node(old_entry, old_nr, old_idx, j); > + } > + > + } > + > + return -1; > +} > + > +static void *get_vnodes_from_epoch(int epoch, int *nr, int *copies) > +{ > + int nodes_nr, len = sizeof(struct sd_vnode) * SD_MAX_VNODES; > + struct sd_node nodes[SD_MAX_NODES]; > + void *buf = xmalloc(len); > + > + nodes_nr = epoch_log_read_nr(epoch, (void *)nodes, sizeof(nodes)); > + if (nodes_nr < 0) { > + nodes_nr = epoch_log_read_remote(epoch, (void *)nodes, sizeof(nodes)); > + if (nodes_nr == 0) { > + free(buf); > + return NULL; > + } > + nodes_nr /= sizeof(nodes[0]); > + } > + *nr = nodes_to_vnodes(nodes, nodes_nr, buf); > + *copies = get_max_nr_copies_from(nodes, nodes_nr); > + > + return buf; > +} > + > +static int recover_object_from_replica(uint64_t oid, > + struct sd_vnode *entry, > + int epoch, int tgt_epoch) > +{ > + struct sd_obj_req hdr; > + struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; > + char name[128]; > + unsigned wlen = 0, rlen; > + int fd, ret = -1; > + void *buf; > + struct siocb iocb = { 0 }; > + > + if (is_vdi_obj(oid)) > + rlen = SD_INODE_SIZE; > + else if (is_vdi_attr_obj(oid)) > + rlen = SD_ATTR_OBJ_SIZE; > + else > + rlen = SD_DATA_OBJ_SIZE; > + > + buf = valloc(rlen); > + if (!buf) { > + eprintf("%m\n"); > + goto out; > + } > + > + if (vnode_is_local(entry)) { > + iocb.epoch = epoch; > + iocb.length = rlen; > + ret = sd_store->link(oid, &iocb, tgt_epoch); > + if (ret == SD_RES_SUCCESS) { > + ret = 0; > + goto done; > + } else { > + ret = -1; > + goto out; > + } > + } > + > + addr_to_str(name, sizeof(name), entry->addr, 0); > + fd = connect_to(name, entry->port); > + dprintf("%s, %d\n", name, entry->port); > + if (fd < 0) { > + eprintf("failed to connect to %s:%"PRIu32"\n", name, entry->port); > + ret = -1; > + goto out; > + } > + > + memset(&hdr, 0, sizeof(hdr)); > + hdr.opcode = SD_OP_READ_OBJ; > + hdr.oid = oid; > + hdr.epoch = epoch; > + hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_IO_LOCAL; > + hdr.tgt_epoch = tgt_epoch; > + hdr.data_length = rlen; > + > + ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); > + > + close(fd); > + > + if (ret != 0) { > + eprintf("res: %"PRIx32"\n", rsp->result); > + ret = -1; > + goto out; > + } > + > + rsp = (struct sd_obj_rsp *)&hdr; > + > + if (rsp->result == SD_RES_SUCCESS) { > + iocb.epoch = epoch; > + iocb.length = rlen; > + iocb.buf = buf; > + ret = sd_store->atomic_put(oid, &iocb); > + if (ret != SD_RES_SUCCESS) { > + ret = -1; > + goto out; > + } > + } else if (rsp->result == SD_RES_NEW_NODE_VER || > + rsp->result == SD_RES_OLD_NODE_VER || > + rsp->result == SD_RES_NETWORK_ERROR) { > + dprintf("retrying: %"PRIx32", %"PRIx64"\n", rsp->result, oid); > + ret = 1; > + goto out; > + } else { > + eprintf("failed, res: %"PRIx32"\n", rsp->result); > + ret = -1; > + goto out; > + } > +done: > + dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch); > +out: > + free(buf); > + return ret; > +} > + > +static void rollback_old_cur(struct sd_vnode *old, int *old_nr, int *old_copies, > + struct sd_vnode *cur, int *cur_nr, int *cur_copies, > + struct sd_vnode *new_old, int new_old_nr, int new_old_copies) > +{ > + int nr_old = *old_nr; > + int copies_old = *old_copies; > + > + memcpy(cur, old, sizeof(*old) * nr_old); > + *cur_nr = nr_old; > + *cur_copies = copies_old; > + memcpy(old, new_old, sizeof(*new_old) * new_old_nr); > + *old_nr = new_old_nr; > + *old_copies = new_old_copies; > +} > + > +/* > + * Recover the object from its track in epoch history. That is, > + * the routine will try to recovery it from the nodes it has stayed, > + * at least, *theoretically* on consistent hash ring. > + */ > +static int do_recover_object(struct recovery_work *rw, int copy_idx) > +{ > + struct sd_vnode *old, *cur; > + uint64_t oid = rw->oids[rw->done]; > + int old_nr = rw->old_nr_vnodes, cur_nr = rw->cur_nr_vnodes; > + int epoch = rw->epoch, tgt_epoch = rw->epoch - 1; > + struct sd_vnode *tgt_entry; > + int old_idx, cur_idx, tgt_idx, old_copies, cur_copies, ret; > + > + old = xmalloc(sizeof(*old) * SD_MAX_VNODES); > + cur = xmalloc(sizeof(*cur) * SD_MAX_VNODES); > + memcpy(old, rw->old_vnodes, sizeof(*old) * old_nr); > + memcpy(cur, rw->cur_vnodes, sizeof(*cur) * cur_nr); > + old_copies = get_max_nr_copies_from(rw->old_nodes, rw->old_nr_nodes); > + cur_copies = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes); > + > +again: > + old_idx = obj_to_sheep(old, old_nr, oid, 0); > + cur_idx = obj_to_sheep(cur, cur_nr, oid, 0); > + > + dprintf("try recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch); > + > + if (cur_copies <= copy_idx) { > + eprintf("epoch (%"PRIu32") has less copies (%d) than requested copy_idx: %d\n", > + tgt_epoch, cur_copies, copy_idx); > + ret = -1; > + goto err; > + } > + > + tgt_idx = find_tgt_node(old, old_nr, old_idx, old_copies, > + cur, cur_nr, cur_idx, cur_copies, copy_idx); > + if (tgt_idx < 0) { > + eprintf("cannot find target node %"PRIx64"\n", oid); > + ret = -1; > + goto err; > + } > + tgt_entry = old + tgt_idx; > + > + ret = recover_object_from_replica(oid, tgt_entry, epoch, tgt_epoch); > + if (ret < 0) { > + struct sd_vnode *new_old; > + int new_old_nr, new_old_copies; > + > + tgt_epoch--; > + if (tgt_epoch < 1) { > + eprintf("can not recover oid %"PRIx64"\n", oid); > + ret = -1; > + goto err; > + } > + > + new_old = get_vnodes_from_epoch(tgt_epoch, &new_old_nr, &new_old_copies); > + if (!new_old) { > + ret = -1; > + goto err; > + } > + rollback_old_cur(old, &old_nr, &old_copies, cur, &cur_nr, &cur_copies, > + new_old, new_old_nr, new_old_copies); > + free(new_old); > + goto again; > + } else if (ret > 0) { > + ret = 0; > + rw->retry = 1; > + } > +err: > + free(old); > + free(cur); > + return ret; > +} > + > +static int get_replica_idx(struct recovery_work *rw, uint64_t oid, int *copy_nr) > +{ > + int i, ret = -1; > + *copy_nr = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes); > + for (i = 0; i < *copy_nr; i++) { > + int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i); > + if (vnode_is_local(&rw->cur_vnodes[n])) { > + ret = i; > + break; > + } > + } > + return ret; > +} > + > +static void recover_object(struct work *work) > +{ > + struct recovery_work *rw = container_of(work, struct recovery_work, work); > + uint64_t oid = rw->oids[rw->done]; > + uint32_t epoch = rw->epoch; > + int i, copy_idx, copy_nr, ret; > + struct siocb iocb = { 0 }; > + > + if (!sys->nr_copies) > + return; > + > + eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid); > + > + iocb.epoch = epoch; > + ret = sd_store->open(oid, &iocb, 0); > + if (ret == SD_RES_SUCCESS) { > + sd_store->close(oid, &iocb); > + dprintf("the object is already recovered\n"); > + return; > + } > + > + copy_idx = get_replica_idx(rw, oid, ©_nr); > + if (copy_idx < 0) { > + ret = -1; > + goto err; > + } > + ret = do_recover_object(rw, copy_idx); > + if (ret < 0) { > + for (i = 0; i < copy_nr; i++) { > + if (i == copy_idx) > + continue; > + ret = do_recover_object(rw, i); > + if (ret == 0) > + break; > + } > + } > +err: > + if (ret < 0) > + eprintf("failed to recover object %"PRIx64"\n", oid); > +} > + > +static struct recovery_work *suspended_recovery_work; > + > +static void recover_timer(void *data) > +{ > + struct recovery_work *rw = (struct recovery_work *)data; > + uint64_t oid = rw->oids[rw->done]; > + > + if (is_access_to_busy_objects(oid)) { > + suspended_recovery_work = rw; > + return; > + } > + > + queue_work(sys->recovery_wqueue, &rw->work); > +} > + > +void resume_recovery_work(void) > +{ > + struct recovery_work *rw; > + uint64_t oid; > + > + if (!suspended_recovery_work) > + return; > + > + rw = suspended_recovery_work; > + > + oid = rw->oids[rw->done]; > + if (is_access_to_busy_objects(oid)) > + return; > + > + suspended_recovery_work = NULL; > + queue_work(sys->recovery_wqueue, &rw->work); > +} > + > +int node_in_recovery(void) > +{ > + return !!recovering_work; > +} > + > +int is_recoverying_oid(uint64_t oid) > +{ > + uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT); > + uint64_t min_hval; > + struct recovery_work *rw = recovering_work; > + int ret, i; > + struct siocb iocb; > + > + if (oid == 0) > + return 0; > + > + if (!rw) > + return 0; /* there is no thread working for object recovery */ > + > + min_hval = fnv_64a_buf(&rw->oids[rw->done + rw->nr_blocking], sizeof(uint64_t), FNV1A_64_INIT); > + > + if (before(rw->epoch, sys->epoch)) > + return 1; > + > + if (rw->state == RW_INIT) > + return 1; > + > + memset(&iocb, 0, sizeof(iocb)); > + iocb.epoch = sys->epoch; > + ret = sd_store->open(oid, &iocb, 0); > + if (ret == SD_RES_SUCCESS) { > + dprintf("the object %" PRIx64 " is already recoverd\n", oid); > + sd_store->close(oid, &iocb); > + return 0; > + } > + > + /* the first 'rw->nr_blocking' objects were already scheduled to be done earlier */ > + for (i = 0; i < rw->nr_blocking; i++) > + if (rw->oids[rw->done + i] == oid) > + return 1; > + > + if (min_hval <= hval) { > + uint64_t *p; > + p = bsearch(&oid, rw->oids + rw->done + rw->nr_blocking, > + rw->count - rw->done - rw->nr_blocking, sizeof(oid), obj_cmp); > + if (p) { > + dprintf("recover the object %" PRIx64 " first\n", oid); > + if (rw->nr_blocking == 0) > + rw->nr_blocking = 1; /* the first oid may be processed now */ > + if (p > rw->oids + rw->done + rw->nr_blocking) { > + /* this object should be recovered earlier */ > + memmove(rw->oids + rw->done + rw->nr_blocking + 1, > + rw->oids + rw->done + rw->nr_blocking, > + sizeof(uint64_t) * (p - (rw->oids + rw->done + rw->nr_blocking))); > + rw->oids[rw->done + rw->nr_blocking] = oid; > + rw->nr_blocking++; > + } > + return 1; > + } > + } > + > + dprintf("the object %" PRIx64 " is not found\n", oid); > + return 0; > +} > + > +static void do_recover_main(struct work *work) > +{ > + struct recovery_work *rw = container_of(work, struct recovery_work, work); > + uint64_t oid; > + > + if (rw->state == RW_INIT) > + rw->state = RW_RUN; > + else if (!rw->retry) { > + rw->done++; > + if (rw->nr_blocking > 0) > + rw->nr_blocking--; > + } > + > + oid = rw->oids[rw->done]; > + > + if (rw->retry && !next_rw) { > + rw->retry = 0; > + > + rw->timer.callback = recover_timer; > + rw->timer.data = rw; > + add_timer(&rw->timer, 2); > + return; > + } > + > + if (rw->done < rw->count && !next_rw) { > + rw->work.fn = recover_object; > + > + if (is_access_to_busy_objects(oid)) { > + suspended_recovery_work = rw; > + return; > + } > + resume_pending_requests(); > + queue_work(sys->recovery_wqueue, &rw->work); > + return; > + } > + > + dprintf("recovery complete: new epoch %"PRIu32"\n", rw->epoch); > + recovering_work = NULL; > + > + sys->recovered_epoch = rw->epoch; > + > + free(rw->oids); > + free(rw); > + > + if (next_rw) { > + rw = next_rw; > + next_rw = NULL; > + > + recovering_work = rw; > + queue_work(sys->recovery_wqueue, &rw->work); > + } else { > + if (sd_store->end_recover) { > + struct siocb iocb = { 0 }; > + iocb.epoch = sys->epoch; > + sd_store->end_recover(&iocb); > + } > + } > + > + resume_pending_requests(); > +} > + > +static int request_obj_list(struct sd_node *e, uint32_t epoch, > + uint8_t *buf, size_t buf_size) > +{ > + int fd, ret; > + unsigned wlen, rlen; > + char name[128]; > + struct sd_list_req hdr; > + struct sd_list_rsp *rsp; > + > + addr_to_str(name, sizeof(name), e->addr, 0); > + > + dprintf("%s %"PRIu32"\n", name, e->port); > + > + fd = connect_to(name, e->port); > + if (fd < 0) { > + eprintf("%s %"PRIu32"\n", name, e->port); > + return -1; > + } > + > + wlen = 0; > + rlen = buf_size; > + > + memset(&hdr, 0, sizeof(hdr)); > + hdr.opcode = SD_OP_GET_OBJ_LIST; > + hdr.tgt_epoch = epoch - 1; > + hdr.flags = 0; > + hdr.data_length = rlen; > + > + ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); > + > + close(fd); > + > + rsp = (struct sd_list_rsp *)&hdr; > + > + if (ret || rsp->result != SD_RES_SUCCESS) { > + eprintf("retrying: %"PRIu32", %"PRIu32"\n", ret, rsp->result); > + return -1; > + } > + > + dprintf("%"PRIu64"\n", rsp->data_length / sizeof(uint64_t)); > + > + return rsp->data_length / sizeof(uint64_t); > +} > + > +int merge_objlist(uint64_t *list1, int nr_list1, uint64_t *list2, int nr_list2) > +{ > + int i; > + int old_nr_list1 = nr_list1; > + > + for (i = 0; i < nr_list2; i++) { > + if (bsearch(list2 + i, list1, old_nr_list1, sizeof(*list1), obj_cmp)) > + continue; > + > + list1[nr_list1++] = list2[i]; > + } > + > + qsort(list1, nr_list1, sizeof(*list1), obj_cmp); > + > + return nr_list1; > +} > + > +static int screen_obj_list(struct recovery_work *rw, uint64_t *list, int list_nr) > +{ > + int ret, i, cp, idx; > + struct strbuf buf = STRBUF_INIT; > + struct sd_vnode *nodes = rw->cur_vnodes; > + int nodes_nr = rw->cur_nr_vnodes; > + int nr_objs = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes); > + > + for (i = 0; i < list_nr; i++) { > + for (cp = 0; cp < nr_objs; cp++) { > + idx = obj_to_sheep(nodes, nodes_nr, list[i], cp); > + if (vnode_is_local(&nodes[idx])) > + break; > + } > + if (cp == nr_objs) > + continue; > + strbuf_add(&buf, &list[i], sizeof(uint64_t)); > + } > + memcpy(list, buf.buf, buf.len); > + > + ret = buf.len / sizeof(uint64_t); > + dprintf("%d\n", ret); > + strbuf_release(&buf); > + > + return ret; > +} > + > +#define MAX_RETRY_CNT 6 > + > +static int newly_joined(struct sd_node *node, struct recovery_work *rw) > +{ > + struct sd_node *old = rw->old_nodes; > + int old_nr = rw->old_nr_nodes; > + int i; > + for (i = 0; i < old_nr; i++) > + if (node_cmp(node, old + i) == 0) > + break; > + > + if (i == old_nr) > + return 1; > + return 0; > +} > + > +static int fill_obj_list(struct recovery_work *rw) > +{ > + int i; > + uint8_t *buf = NULL; > + size_t buf_size = SD_DATA_OBJ_SIZE; /* FIXME */ > + int retry_cnt; > + struct sd_node *cur = rw->cur_nodes; > + int cur_nr = rw->cur_nr_nodes; > + > + buf = malloc(buf_size); > + if (!buf) { > + eprintf("out of memory\n"); > + rw->retry = 1; > + return -1; > + } > + for (i = 0; i < cur_nr; i++) { > + int buf_nr; > + struct sd_node *node = cur + i; > + > + if (newly_joined(node, rw)) > + /* new node doesn't have a list file */ > + continue; > + > + retry_cnt = 0; > + retry: > + buf_nr = request_obj_list(node, rw->epoch, buf, buf_size); > + if (buf_nr < 0) { > + retry_cnt++; > + if (retry_cnt > MAX_RETRY_CNT) { > + eprintf("failed to get object list\n"); > + eprintf("some objects may be lost\n"); > + continue; > + } else { > + if (next_rw) { > + dprintf("go to the next recovery\n"); > + break; > + } > + dprintf("trying to get object list again\n"); > + sleep(1); > + goto retry; > + } > + } > + buf_nr = screen_obj_list(rw, (uint64_t *)buf, buf_nr); > + if (buf_nr) > + rw->count = merge_objlist(rw->oids, rw->count, (uint64_t *)buf, buf_nr); > + } > + > + dprintf("%d\n", rw->count); > + free(buf); > + return 0; > +} > + > +/* setup node list and virtual node list */ > +static int init_rw(struct recovery_work *rw) > +{ > + int epoch = rw->epoch; > + > + rw->cur_nr_nodes = epoch_log_read_nr(epoch, (char *)rw->cur_nodes, > + sizeof(rw->cur_nodes)); > + if (rw->cur_nr_nodes <= 0) { > + eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch); > + return -1; > + } > + > + rw->old_nr_nodes = epoch_log_read_nr(epoch - 1, (char *)rw->old_nodes, > + sizeof(rw->old_nodes)); > + if (rw->old_nr_nodes <= 0) { > + eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch - 1); > + return -1; > + } > + rw->old_nr_vnodes = nodes_to_vnodes(rw->old_nodes, rw->old_nr_nodes, > + rw->old_vnodes); > + rw->cur_nr_vnodes = nodes_to_vnodes(rw->cur_nodes, rw->cur_nr_nodes, > + rw->cur_vnodes); > + > + return 0; > +} > + > +static void do_recovery_work(struct work *work) > +{ > + struct recovery_work *rw = container_of(work, struct recovery_work, work); > + > + dprintf("%u\n", rw->epoch); > + > + if (!sys->nr_copies) > + return; > + > + if (rw->cur_nr_nodes == 0) > + init_rw(rw); > + > + if (fill_obj_list(rw) < 0) { > + eprintf("fatal recovery error\n"); > + rw->count = 0; > + return; > + } > +} > + > +int start_recovery(uint32_t epoch) > +{ > + struct recovery_work *rw; > + > + rw = zalloc(sizeof(struct recovery_work)); > + if (!rw) > + return -1; > + > + rw->state = RW_INIT; > + rw->oids = malloc(1 << 20); /* FIXME */ > + rw->epoch = epoch; > + rw->count = 0; > + > + rw->work.fn = do_recovery_work; > + rw->work.done = do_recover_main; > + > + if (sd_store->begin_recover) { > + struct siocb iocb = { 0 }; > + iocb.epoch = epoch; > + sd_store->begin_recover(&iocb); > + } > + > + if (recovering_work != NULL) { > + if (next_rw) { > + /* skip the previous epoch recovery */ > + free(next_rw->oids); > + free(next_rw); > + } > + next_rw = rw; > + } else { > + recovering_work = rw; > + queue_work(sys->recovery_wqueue, &rw->work); > + } > + > + return 0; > +} > Index: sheepdog/sheep/store.c > =================================================================== > --- sheepdog.orig/sheep/store.c 2012-04-27 17:19:46.000000000 +0200 > +++ sheepdog/sheep/store.c 2012-04-27 17:24:03.260087553 +0200 > @@ -16,11 +16,9 @@ > #include <stdlib.h> > #include <unistd.h> > #include <poll.h> > -#include <sys/xattr.h> > #include <sys/statvfs.h> > #include <sys/types.h> > #include <sys/stat.h> > -#include <fcntl.h> > #include <time.h> > #include <pthread.h> > > @@ -133,18 +131,6 @@ static int check_and_insert_objlist_cach > return 0; > } > > -static int obj_cmp(const void *oid1, const void *oid2) > -{ > - const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT); > - const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t), FNV1A_64_INIT); > - > - if (hval1 < hval2) > - return -1; > - if (hval1 > hval2) > - return 1; > - return 0; > -} > - > static void get_store_dir(struct strbuf *buf, int epoch) > { > if (!strcmp(sd_store->name, "simple")) > @@ -1129,803 +1115,6 @@ uint64_t get_cluster_ctime(void) > return ct; > } > > -/* > - * contains_node - checks that the node id is included in the target nodes > - * > - * The target nodes to store replicated objects are the first N nodes > - * from the base_idx'th on the consistent hash ring, where N is the > - * number of copies of objects. > - */ > -static int contains_node(struct sd_vnode *key, > - struct sd_vnode *entry, > - int nr, int base_idx, int copies) > -{ > - int i; > - > - for (i = 0; i < copies; i++) { > - int idx = get_nth_node(entry, nr, base_idx, i); > - if (memcmp(key->addr, entry[idx].addr, sizeof(key->addr)) == 0 > - && key->port == entry[idx].port) > - return idx; > - } > - return -1; > -} > - > -enum rw_state { > - RW_INIT, > - RW_RUN, > -}; > - > -struct recovery_work { > - enum rw_state state; > - > - uint32_t epoch; > - uint32_t done; > - > - struct timer timer; > - int retry; > - struct work work; > - > - int nr_blocking; > - int count; > - uint64_t *oids; > - > - int old_nr_nodes; > - struct sd_node old_nodes[SD_MAX_NODES]; > - int cur_nr_nodes; > - struct sd_node cur_nodes[SD_MAX_NODES]; > - int old_nr_vnodes; > - struct sd_vnode old_vnodes[SD_MAX_VNODES]; > - int cur_nr_vnodes; > - struct sd_vnode cur_vnodes[SD_MAX_VNODES]; > -}; > - > -static struct recovery_work *next_rw; > -static struct recovery_work *recovering_work; > - > -/* > - * find_tgt_node - find the node from which we should recover objects > - * > - * This function compares two node lists, the current target nodes and > - * the previous target nodes, and finds the node from the previous > - * target nodes which corresponds to the copy_idx'th node of the > - * current target nodes. The correspondence is injective and > - * maximizes the number of nodes which can recover objects locally. > - * > - * For example, consider the number of redundancy is 5, the consistent > - * hash ring is {A, B, C, D, E, F}, and the node G is newly added. > - * The parameters of this function are > - * old_entry = {A, B, C, D, E, F}, old_nr = 6, old_idx = 3 > - * cur_entry = {A, B, C, D, E, F, G}, cur_nr = 7, cur_idx = 3 > - * > - * In this case: > - * the previous target nodes: {D, E, F, A, B} > - * (the first 5 nodes from the 3rd node on the previous hash ring) > - * the current target nodes : {D, E, F, G, A} > - * (the first 5 nodes from the 3rd node on the current hash ring) > - * > - * The correspondence between copy_idx and return value are as follows: > - * ---------------------------- > - * copy_idx 0 1 2 3 4 > - * src_node D E F G A > - * tgt_node D E F B A > - * return value 0 1 2 4 3 > - * ---------------------------- > - * > - * The node D, E, F, and A can recover objects from local, and the > - * node G recovers from the node B. > - */ > -static int find_tgt_node(struct sd_vnode *old_entry, > - int old_nr, int old_idx, int old_copies, > - struct sd_vnode *cur_entry, > - int cur_nr, int cur_idx, int cur_copies, > - int copy_idx) > -{ > - int i, j, idx; > - > - dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", > - old_idx, old_nr, old_copies, cur_idx, cur_nr, cur_copies, copy_idx); > - > - /* If the same node is in the previous target nodes, return its index */ > - idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, copy_idx), > - old_entry, old_nr, old_idx, old_copies); > - if (idx >= 0) { > - dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", idx, copy_idx, cur_idx, cur_nr); > - return idx; > - } > - > - for (i = 0, j = 0; ; i++, j++) { > - if (i < copy_idx) { > - /* Skip if the node can recover from its local */ > - idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, i), > - old_entry, old_nr, old_idx, old_copies); > - if (idx >= 0) > - continue; > - > - /* Find the next target which needs to recover from remote */ > - while (j < old_copies && > - contains_node(old_entry + get_nth_node(old_entry, old_nr, old_idx, j), > - cur_entry, cur_nr, cur_idx, cur_copies) >= 0) > - j++; > - } > - if (j == old_copies) { > - /* > - * Cannot find the target because the number of zones > - * is smaller than the number of copies. We can select > - * any node in this case, so select the first one. > - */ > - return old_idx; > - } > - > - if (i == copy_idx) { > - /* Found the target node correspoinding to copy_idx */ > - dprintf("%"PRIu32", %"PRIu32", %"PRIu32"\n", > - get_nth_node(old_entry, old_nr, old_idx, j), > - copy_idx, (cur_idx + i) % cur_nr); > - return get_nth_node(old_entry, old_nr, old_idx, j); > - } > - > - } > - > - return -1; > -} > - > -static void *get_vnodes_from_epoch(int epoch, int *nr, int *copies) > -{ > - int nodes_nr, len = sizeof(struct sd_vnode) * SD_MAX_VNODES; > - struct sd_node nodes[SD_MAX_NODES]; > - void *buf = xmalloc(len); > - > - nodes_nr = epoch_log_read_nr(epoch, (void *)nodes, sizeof(nodes)); > - if (nodes_nr < 0) { > - nodes_nr = epoch_log_read_remote(epoch, (void *)nodes, sizeof(nodes)); > - if (nodes_nr == 0) { > - free(buf); > - return NULL; > - } > - nodes_nr /= sizeof(nodes[0]); > - } > - *nr = nodes_to_vnodes(nodes, nodes_nr, buf); > - *copies = get_max_nr_copies_from(nodes, nodes_nr); > - > - return buf; > -} > - > -static int recover_object_from_replica(uint64_t oid, > - struct sd_vnode *entry, > - int epoch, int tgt_epoch) > -{ > - struct sd_obj_req hdr; > - struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; > - char name[128]; > - unsigned wlen = 0, rlen; > - int fd, ret = -1; > - void *buf; > - struct siocb iocb = { 0 }; > - > - if (is_vdi_obj(oid)) > - rlen = SD_INODE_SIZE; > - else if (is_vdi_attr_obj(oid)) > - rlen = SD_ATTR_OBJ_SIZE; > - else > - rlen = SD_DATA_OBJ_SIZE; > - > - buf = valloc(rlen); > - if (!buf) { > - eprintf("%m\n"); > - goto out; > - } > - > - if (vnode_is_local(entry)) { > - iocb.epoch = epoch; > - iocb.length = rlen; > - ret = sd_store->link(oid, &iocb, tgt_epoch); > - if (ret == SD_RES_SUCCESS) { > - ret = 0; > - goto done; > - } else { > - ret = -1; > - goto out; > - } > - } > - > - addr_to_str(name, sizeof(name), entry->addr, 0); > - fd = connect_to(name, entry->port); > - dprintf("%s, %d\n", name, entry->port); > - if (fd < 0) { > - eprintf("failed to connect to %s:%"PRIu32"\n", name, entry->port); > - ret = -1; > - goto out; > - } > - > - memset(&hdr, 0, sizeof(hdr)); > - hdr.opcode = SD_OP_READ_OBJ; > - hdr.oid = oid; > - hdr.epoch = epoch; > - hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_IO_LOCAL; > - hdr.tgt_epoch = tgt_epoch; > - hdr.data_length = rlen; > - > - ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); > - > - close(fd); > - > - if (ret != 0) { > - eprintf("res: %"PRIx32"\n", rsp->result); > - ret = -1; > - goto out; > - } > - > - rsp = (struct sd_obj_rsp *)&hdr; > - > - if (rsp->result == SD_RES_SUCCESS) { > - iocb.epoch = epoch; > - iocb.length = rlen; > - iocb.buf = buf; > - ret = sd_store->atomic_put(oid, &iocb); > - if (ret != SD_RES_SUCCESS) { > - ret = -1; > - goto out; > - } > - } else if (rsp->result == SD_RES_NEW_NODE_VER || > - rsp->result == SD_RES_OLD_NODE_VER || > - rsp->result == SD_RES_NETWORK_ERROR) { > - dprintf("retrying: %"PRIx32", %"PRIx64"\n", rsp->result, oid); > - ret = 1; > - goto out; > - } else { > - eprintf("failed, res: %"PRIx32"\n", rsp->result); > - ret = -1; > - goto out; > - } > -done: > - dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch); > -out: > - free(buf); > - return ret; > -} > - > -static void rollback_old_cur(struct sd_vnode *old, int *old_nr, int *old_copies, > - struct sd_vnode *cur, int *cur_nr, int *cur_copies, > - struct sd_vnode *new_old, int new_old_nr, int new_old_copies) > -{ > - int nr_old = *old_nr; > - int copies_old = *old_copies; > - > - memcpy(cur, old, sizeof(*old) * nr_old); > - *cur_nr = nr_old; > - *cur_copies = copies_old; > - memcpy(old, new_old, sizeof(*new_old) * new_old_nr); > - *old_nr = new_old_nr; > - *old_copies = new_old_copies; > -} > - > -/* > - * Recover the object from its track in epoch history. That is, > - * the routine will try to recovery it from the nodes it has stayed, > - * at least, *theoretically* on consistent hash ring. > - */ > -static int do_recover_object(struct recovery_work *rw, int copy_idx) > -{ > - struct sd_vnode *old, *cur; > - uint64_t oid = rw->oids[rw->done]; > - int old_nr = rw->old_nr_vnodes, cur_nr = rw->cur_nr_vnodes; > - int epoch = rw->epoch, tgt_epoch = rw->epoch - 1; > - struct sd_vnode *tgt_entry; > - int old_idx, cur_idx, tgt_idx, old_copies, cur_copies, ret; > - > - old = xmalloc(sizeof(*old) * SD_MAX_VNODES); > - cur = xmalloc(sizeof(*cur) * SD_MAX_VNODES); > - memcpy(old, rw->old_vnodes, sizeof(*old) * old_nr); > - memcpy(cur, rw->cur_vnodes, sizeof(*cur) * cur_nr); > - old_copies = get_max_nr_copies_from(rw->old_nodes, rw->old_nr_nodes); > - cur_copies = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes); > - > -again: > - old_idx = obj_to_sheep(old, old_nr, oid, 0); > - cur_idx = obj_to_sheep(cur, cur_nr, oid, 0); > - > - dprintf("try recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch); > - > - if (cur_copies <= copy_idx) { > - eprintf("epoch (%"PRIu32") has less copies (%d) than requested copy_idx: %d\n", > - tgt_epoch, cur_copies, copy_idx); > - ret = -1; > - goto err; > - } > - > - tgt_idx = find_tgt_node(old, old_nr, old_idx, old_copies, > - cur, cur_nr, cur_idx, cur_copies, copy_idx); > - if (tgt_idx < 0) { > - eprintf("cannot find target node %"PRIx64"\n", oid); > - ret = -1; > - goto err; > - } > - tgt_entry = old + tgt_idx; > - > - ret = recover_object_from_replica(oid, tgt_entry, epoch, tgt_epoch); > - if (ret < 0) { > - struct sd_vnode *new_old; > - int new_old_nr, new_old_copies; > - > - tgt_epoch--; > - if (tgt_epoch < 1) { > - eprintf("can not recover oid %"PRIx64"\n", oid); > - ret = -1; > - goto err; > - } > - > - new_old = get_vnodes_from_epoch(tgt_epoch, &new_old_nr, &new_old_copies); > - if (!new_old) { > - ret = -1; > - goto err; > - } > - rollback_old_cur(old, &old_nr, &old_copies, cur, &cur_nr, &cur_copies, > - new_old, new_old_nr, new_old_copies); > - free(new_old); > - goto again; > - } else if (ret > 0) { > - ret = 0; > - rw->retry = 1; > - } > -err: > - free(old); > - free(cur); > - return ret; > -} > - > -static int get_replica_idx(struct recovery_work *rw, uint64_t oid, int *copy_nr) > -{ > - int i, ret = -1; > - *copy_nr = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes); > - for (i = 0; i < *copy_nr; i++) { > - int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i); > - if (vnode_is_local(&rw->cur_vnodes[n])) { > - ret = i; > - break; > - } > - } > - return ret; > -} > - > -static void recover_object(struct work *work) > -{ > - struct recovery_work *rw = container_of(work, struct recovery_work, work); > - uint64_t oid = rw->oids[rw->done]; > - uint32_t epoch = rw->epoch; > - int i, copy_idx, copy_nr, ret; > - struct siocb iocb = { 0 }; > - > - if (!sys->nr_copies) > - return; > - > - eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid); > - > - iocb.epoch = epoch; > - ret = sd_store->open(oid, &iocb, 0); > - if (ret == SD_RES_SUCCESS) { > - sd_store->close(oid, &iocb); > - dprintf("the object is already recovered\n"); > - return; > - } > - > - copy_idx = get_replica_idx(rw, oid, ©_nr); > - if (copy_idx < 0) { > - ret = -1; > - goto err; > - } > - ret = do_recover_object(rw, copy_idx); > - if (ret < 0) { > - for (i = 0; i < copy_nr; i++) { > - if (i == copy_idx) > - continue; > - ret = do_recover_object(rw, i); > - if (ret == 0) > - break; > - } > - } > -err: > - if (ret < 0) > - eprintf("failed to recover object %"PRIx64"\n", oid); > -} > - > -static struct recovery_work *suspended_recovery_work; > - > -static void recover_timer(void *data) > -{ > - struct recovery_work *rw = (struct recovery_work *)data; > - uint64_t oid = rw->oids[rw->done]; > - > - if (is_access_to_busy_objects(oid)) { > - suspended_recovery_work = rw; > - return; > - } > - > - queue_work(sys->recovery_wqueue, &rw->work); > -} > - > -void resume_recovery_work(void) > -{ > - struct recovery_work *rw; > - uint64_t oid; > - > - if (!suspended_recovery_work) > - return; > - > - rw = suspended_recovery_work; > - > - oid = rw->oids[rw->done]; > - if (is_access_to_busy_objects(oid)) > - return; > - > - suspended_recovery_work = NULL; > - queue_work(sys->recovery_wqueue, &rw->work); > -} > - > -int node_in_recovery(void) > -{ > - return !!recovering_work; > -} > - > -int is_recoverying_oid(uint64_t oid) > -{ > - uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT); > - uint64_t min_hval; > - struct recovery_work *rw = recovering_work; > - int ret, i; > - struct siocb iocb; > - > - if (oid == 0) > - return 0; > - > - if (!rw) > - return 0; /* there is no thread working for object recovery */ > - > - min_hval = fnv_64a_buf(&rw->oids[rw->done + rw->nr_blocking], sizeof(uint64_t), FNV1A_64_INIT); > - > - if (before(rw->epoch, sys->epoch)) > - return 1; > - > - if (rw->state == RW_INIT) > - return 1; > - > - memset(&iocb, 0, sizeof(iocb)); > - iocb.epoch = sys->epoch; > - ret = sd_store->open(oid, &iocb, 0); > - if (ret == SD_RES_SUCCESS) { > - dprintf("the object %" PRIx64 " is already recoverd\n", oid); > - sd_store->close(oid, &iocb); > - return 0; > - } > - > - /* the first 'rw->nr_blocking' objects were already scheduled to be done earlier */ > - for (i = 0; i < rw->nr_blocking; i++) > - if (rw->oids[rw->done + i] == oid) > - return 1; > - > - if (min_hval <= hval) { > - uint64_t *p; > - p = bsearch(&oid, rw->oids + rw->done + rw->nr_blocking, > - rw->count - rw->done - rw->nr_blocking, sizeof(oid), obj_cmp); > - if (p) { > - dprintf("recover the object %" PRIx64 " first\n", oid); > - if (rw->nr_blocking == 0) > - rw->nr_blocking = 1; /* the first oid may be processed now */ > - if (p > rw->oids + rw->done + rw->nr_blocking) { > - /* this object should be recovered earlier */ > - memmove(rw->oids + rw->done + rw->nr_blocking + 1, > - rw->oids + rw->done + rw->nr_blocking, > - sizeof(uint64_t) * (p - (rw->oids + rw->done + rw->nr_blocking))); > - rw->oids[rw->done + rw->nr_blocking] = oid; > - rw->nr_blocking++; > - } > - return 1; > - } > - } > - > - dprintf("the object %" PRIx64 " is not found\n", oid); > - return 0; > -} > - > -static void do_recover_main(struct work *work) > -{ > - struct recovery_work *rw = container_of(work, struct recovery_work, work); > - uint64_t oid; > - > - if (rw->state == RW_INIT) > - rw->state = RW_RUN; > - else if (!rw->retry) { > - rw->done++; > - if (rw->nr_blocking > 0) > - rw->nr_blocking--; > - } > - > - oid = rw->oids[rw->done]; > - > - if (rw->retry && !next_rw) { > - rw->retry = 0; > - > - rw->timer.callback = recover_timer; > - rw->timer.data = rw; > - add_timer(&rw->timer, 2); > - return; > - } > - > - if (rw->done < rw->count && !next_rw) { > - rw->work.fn = recover_object; > - > - if (is_access_to_busy_objects(oid)) { > - suspended_recovery_work = rw; > - return; > - } > - resume_pending_requests(); > - queue_work(sys->recovery_wqueue, &rw->work); > - return; > - } > - > - dprintf("recovery complete: new epoch %"PRIu32"\n", rw->epoch); > - recovering_work = NULL; > - > - sys->recovered_epoch = rw->epoch; > - > - free(rw->oids); > - free(rw); > - > - if (next_rw) { > - rw = next_rw; > - next_rw = NULL; > - > - recovering_work = rw; > - queue_work(sys->recovery_wqueue, &rw->work); > - } else { > - if (sd_store->end_recover) { > - struct siocb iocb = { 0 }; > - iocb.epoch = sys->epoch; > - sd_store->end_recover(&iocb); > - } > - } > - > - resume_pending_requests(); > -} > - > -static int request_obj_list(struct sd_node *e, uint32_t epoch, > - uint8_t *buf, size_t buf_size) > -{ > - int fd, ret; > - unsigned wlen, rlen; > - char name[128]; > - struct sd_list_req hdr; > - struct sd_list_rsp *rsp; > - > - addr_to_str(name, sizeof(name), e->addr, 0); > - > - dprintf("%s %"PRIu32"\n", name, e->port); > - > - fd = connect_to(name, e->port); > - if (fd < 0) { > - eprintf("%s %"PRIu32"\n", name, e->port); > - return -1; > - } > - > - wlen = 0; > - rlen = buf_size; > - > - memset(&hdr, 0, sizeof(hdr)); > - hdr.opcode = SD_OP_GET_OBJ_LIST; > - hdr.tgt_epoch = epoch - 1; > - hdr.flags = 0; > - hdr.data_length = rlen; > - > - ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); > - > - close(fd); > - > - rsp = (struct sd_list_rsp *)&hdr; > - > - if (ret || rsp->result != SD_RES_SUCCESS) { > - eprintf("retrying: %"PRIu32", %"PRIu32"\n", ret, rsp->result); > - return -1; > - } > - > - dprintf("%"PRIu64"\n", rsp->data_length / sizeof(uint64_t)); > - > - return rsp->data_length / sizeof(uint64_t); > -} > - > -int merge_objlist(uint64_t *list1, int nr_list1, uint64_t *list2, int nr_list2) > -{ > - int i; > - int old_nr_list1 = nr_list1; > - > - for (i = 0; i < nr_list2; i++) { > - if (bsearch(list2 + i, list1, old_nr_list1, sizeof(*list1), obj_cmp)) > - continue; > - > - list1[nr_list1++] = list2[i]; > - } > - > - qsort(list1, nr_list1, sizeof(*list1), obj_cmp); > - > - return nr_list1; > -} > - > -static int screen_obj_list(struct recovery_work *rw, uint64_t *list, int list_nr) > -{ > - int ret, i, cp, idx; > - struct strbuf buf = STRBUF_INIT; > - struct sd_vnode *nodes = rw->cur_vnodes; > - int nodes_nr = rw->cur_nr_vnodes; > - int nr_objs = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes); > - > - for (i = 0; i < list_nr; i++) { > - for (cp = 0; cp < nr_objs; cp++) { > - idx = obj_to_sheep(nodes, nodes_nr, list[i], cp); > - if (vnode_is_local(&nodes[idx])) > - break; > - } > - if (cp == nr_objs) > - continue; > - strbuf_add(&buf, &list[i], sizeof(uint64_t)); > - } > - memcpy(list, buf.buf, buf.len); > - > - ret = buf.len / sizeof(uint64_t); > - dprintf("%d\n", ret); > - strbuf_release(&buf); > - > - return ret; > -} > - > -#define MAX_RETRY_CNT 6 > - > -static int newly_joined(struct sd_node *node, struct recovery_work *rw) > -{ > - struct sd_node *old = rw->old_nodes; > - int old_nr = rw->old_nr_nodes; > - int i; > - for (i = 0; i < old_nr; i++) > - if (node_cmp(node, old + i) == 0) > - break; > - > - if (i == old_nr) > - return 1; > - return 0; > -} > - > -static int fill_obj_list(struct recovery_work *rw) > -{ > - int i; > - uint8_t *buf = NULL; > - size_t buf_size = SD_DATA_OBJ_SIZE; /* FIXME */ > - int retry_cnt; > - struct sd_node *cur = rw->cur_nodes; > - int cur_nr = rw->cur_nr_nodes; > - > - buf = malloc(buf_size); > - if (!buf) { > - eprintf("out of memory\n"); > - rw->retry = 1; > - return -1; > - } > - for (i = 0; i < cur_nr; i++) { > - int buf_nr; > - struct sd_node *node = cur + i; > - > - if (newly_joined(node, rw)) > - /* new node doesn't have a list file */ > - continue; > - > - retry_cnt = 0; > - retry: > - buf_nr = request_obj_list(node, rw->epoch, buf, buf_size); > - if (buf_nr < 0) { > - retry_cnt++; > - if (retry_cnt > MAX_RETRY_CNT) { > - eprintf("failed to get object list\n"); > - eprintf("some objects may be lost\n"); > - continue; > - } else { > - if (next_rw) { > - dprintf("go to the next recovery\n"); > - break; > - } > - dprintf("trying to get object list again\n"); > - sleep(1); > - goto retry; > - } > - } > - buf_nr = screen_obj_list(rw, (uint64_t *)buf, buf_nr); > - if (buf_nr) > - rw->count = merge_objlist(rw->oids, rw->count, (uint64_t *)buf, buf_nr); > - } > - > - dprintf("%d\n", rw->count); > - free(buf); > - return 0; > -} > - > -/* setup node list and virtual node list */ > -static int init_rw(struct recovery_work *rw) > -{ > - int epoch = rw->epoch; > - > - rw->cur_nr_nodes = epoch_log_read_nr(epoch, (char *)rw->cur_nodes, > - sizeof(rw->cur_nodes)); > - if (rw->cur_nr_nodes <= 0) { > - eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch); > - return -1; > - } > - > - rw->old_nr_nodes = epoch_log_read_nr(epoch - 1, (char *)rw->old_nodes, > - sizeof(rw->old_nodes)); > - if (rw->old_nr_nodes <= 0) { > - eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch - 1); > - return -1; > - } > - rw->old_nr_vnodes = nodes_to_vnodes(rw->old_nodes, rw->old_nr_nodes, > - rw->old_vnodes); > - rw->cur_nr_vnodes = nodes_to_vnodes(rw->cur_nodes, rw->cur_nr_nodes, > - rw->cur_vnodes); > - > - return 0; > -} > - > -static void do_recovery_work(struct work *work) > -{ > - struct recovery_work *rw = container_of(work, struct recovery_work, work); > - > - dprintf("%u\n", rw->epoch); > - > - if (!sys->nr_copies) > - return; > - > - if (rw->cur_nr_nodes == 0) > - init_rw(rw); > - > - if (fill_obj_list(rw) < 0) { > - eprintf("fatal recovery error\n"); > - rw->count = 0; > - return; > - } > -} > - > -int start_recovery(uint32_t epoch) > -{ > - struct recovery_work *rw; > - > - rw = zalloc(sizeof(struct recovery_work)); > - if (!rw) > - return -1; > - > - rw->state = RW_INIT; > - rw->oids = malloc(1 << 20); /* FIXME */ > - rw->epoch = epoch; > - rw->count = 0; > - > - rw->work.fn = do_recovery_work; > - rw->work.done = do_recover_main; > - > - if (sd_store->begin_recover) { > - struct siocb iocb = { 0 }; > - iocb.epoch = epoch; > - sd_store->begin_recover(&iocb); > - } > - > - if (recovering_work != NULL) { > - if (next_rw) { > - /* skip the previous epoch recovery */ > - free(next_rw->oids); > - free(next_rw); > - } > - next_rw = rw; > - } else { > - recovering_work = rw; > - queue_work(sys->recovery_wqueue, &rw->work); > - } > - > - return 0; > -} > - > static int init_path(const char *d, int *new) > { > int ret, retry = 0; Applied, thanks, Yuan |