[Sheepdog] [PATCH v2] sheep: split recovery from store.c

Liu Yuan namei.unix at gmail.com
Sat Apr 28 06:03:22 CEST 2012


On 04/27/2012 11:34 PM, Christoph Hellwig wrote:

> Signed-off-by: Christoph Hellwig <hch at lst.de>
> 
> ---
>  sheep/Makefile.am |    5 
>  sheep/recovery.c  |  828 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  sheep/store.c     |  811 ----------------------------------------------------
>  3 files changed, 831 insertions(+), 813 deletions(-)
> 
> Index: sheepdog/sheep/Makefile.am
> ===================================================================
> --- sheepdog.orig/sheep/Makefile.am	2012-04-26 18:47:06.756621981 +0200
> +++ sheepdog/sheep/Makefile.am	2012-04-27 17:21:06.104083016 +0200
> @@ -24,8 +24,10 @@ INCLUDES		= -I$(top_builddir)/include -I
>  
>  sbin_PROGRAMS		= sheep
>  
> -sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
> -			  cluster/local.c strbuf.c simple_store.c object_cache.c
> +sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c \
> +			  journal.c ops.c recovery.c cluster/local.c strbuf.c \
> +			  simple_store.c object_cache.c
> +
>  if BUILD_COROSYNC
>  sheep_SOURCES		+= cluster/corosync.c
>  endif
> Index: sheepdog/sheep/recovery.c
> ===================================================================
> --- /dev/null	1970-01-01 00:00:00.000000000 +0000
> +++ sheepdog/sheep/recovery.c	2012-04-27 17:24:34.040088332 +0200
> @@ -0,0 +1,828 @@
> +/*
> + * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +#include <errno.h>
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <unistd.h>
> +#include <sys/types.h>
> +
> +#include "sheep_priv.h"
> +#include "strbuf.h"
> +
> +
> +enum rw_state {
> +	RW_INIT,
> +	RW_RUN,
> +};
> +
> +struct recovery_work {
> +	enum rw_state state;
> +
> +	uint32_t epoch;
> +	uint32_t done;
> +
> +	struct timer timer;
> +	int retry;
> +	struct work work;
> +
> +	int nr_blocking;
> +	int count;
> +	uint64_t *oids;
> +
> +	int old_nr_nodes;
> +	struct sd_node old_nodes[SD_MAX_NODES];
> +	int cur_nr_nodes;
> +	struct sd_node cur_nodes[SD_MAX_NODES];
> +	int old_nr_vnodes;
> +	struct sd_vnode old_vnodes[SD_MAX_VNODES];
> +	int cur_nr_vnodes;
> +	struct sd_vnode cur_vnodes[SD_MAX_VNODES];
> +};
> +
> +static struct recovery_work *next_rw;
> +static struct recovery_work *recovering_work;
> +
> +static int obj_cmp(const void *oid1, const void *oid2)
> +{
> +	const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);
> +	const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t), FNV1A_64_INIT);
> +
> +	if (hval1 < hval2)
> +		return -1;
> +	if (hval1 > hval2)
> +		return 1;
> +	return 0;
> +}
> +
> +/*
> + * contains_node - checks that the node id is included in the target nodes
> + *
> + * The target nodes to store replicated objects are the first N nodes
> + * from the base_idx'th on the consistent hash ring, where N is the
> + * number of copies of objects.
> + */
> +static int contains_node(struct sd_vnode *key,
> +			 struct sd_vnode *entry,
> +			 int nr, int base_idx, int copies)
> +{
> +	int i;
> +
> +	for (i = 0; i < copies; i++) {
> +		int idx = get_nth_node(entry, nr, base_idx, i);
> +		if (memcmp(key->addr, entry[idx].addr, sizeof(key->addr)) == 0
> +		    && key->port == entry[idx].port)
> +			return idx;
> +	}
> +	return -1;
> +}
> +
> +/*
> + * find_tgt_node - find the node from which we should recover objects
> + *
> + * This function compares two node lists, the current target nodes and
> + * the previous target nodes, and finds the node from the previous
> + * target nodes which corresponds to the copy_idx'th node of the
> + * current target nodes.  The correspondence is injective and
> + * maximizes the number of nodes which can recover objects locally.
> + *
> + * For example, consider the number of redundancy is 5, the consistent
> + * hash ring is {A, B, C, D, E, F}, and the node G is newly added.
> + * The parameters of this function are
> + *   old_entry = {A, B, C, D, E, F},    old_nr = 6, old_idx = 3
> + *   cur_entry = {A, B, C, D, E, F, G}, cur_nr = 7, cur_idx = 3
> + *
> + * In this case:
> + *   the previous target nodes: {D, E, F, A, B}
> + *     (the first 5 nodes from the 3rd node on the previous hash ring)
> + *   the current target nodes : {D, E, F, G, A}
> + *     (the first 5 nodes from the 3rd node on the current hash ring)
> + *
> + * The correspondence between copy_idx and return value are as follows:
> + * ----------------------------
> + * copy_idx       0  1  2  3  4
> + * src_node       D  E  F  G  A
> + * tgt_node       D  E  F  B  A
> + * return value   0  1  2  4  3
> + * ----------------------------
> + *
> + * The node D, E, F, and A can recover objects from local, and the
> + * node G recovers from the node B.
> + */
> +static int find_tgt_node(struct sd_vnode *old_entry,
> +			 int old_nr, int old_idx, int old_copies,
> +			 struct sd_vnode *cur_entry,
> +			 int cur_nr, int cur_idx, int cur_copies,
> +			 int copy_idx)
> +{
> +	int i, j, idx;
> +
> +	dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n",
> +		old_idx, old_nr, old_copies, cur_idx, cur_nr, cur_copies, copy_idx);
> +
> +	/* If the same node is in the previous target nodes, return its index */
> +	idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, copy_idx),
> +			    old_entry, old_nr, old_idx, old_copies);
> +	if (idx >= 0) {
> +		dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", idx, copy_idx, cur_idx, cur_nr);
> +		return idx;
> +	}
> +
> +	for (i = 0, j = 0; ; i++, j++) {
> +		if (i < copy_idx) {
> +			/* Skip if the node can recover from its local */
> +			idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, i),
> +					    old_entry, old_nr, old_idx, old_copies);
> +			if (idx >= 0)
> +				continue;
> +
> +			/* Find the next target which needs to recover from remote */
> +			while (j < old_copies &&
> +			       contains_node(old_entry + get_nth_node(old_entry, old_nr, old_idx, j),
> +					     cur_entry, cur_nr, cur_idx, cur_copies) >= 0)
> +				j++;
> +		}
> +		if (j == old_copies) {
> +			/*
> +			 * Cannot find the target because the number of zones
> +			 * is smaller than the number of copies.  We can select
> +			 * any node in this case, so select the first one.
> +			 */
> +			return old_idx;
> +		}
> +
> +		if (i == copy_idx) {
> +			/* Found the target node correspoinding to copy_idx */
> +			dprintf("%"PRIu32", %"PRIu32", %"PRIu32"\n",
> +				get_nth_node(old_entry, old_nr, old_idx, j),
> +				copy_idx, (cur_idx + i) % cur_nr);
> +			return get_nth_node(old_entry, old_nr, old_idx, j);
> +		}
> +
> +	}
> +
> +	return -1;
> +}
> +
> +static void *get_vnodes_from_epoch(int epoch, int *nr, int *copies)
> +{
> +	int nodes_nr, len = sizeof(struct sd_vnode) * SD_MAX_VNODES;
> +	struct sd_node nodes[SD_MAX_NODES];
> +	void *buf = xmalloc(len);
> +
> +	nodes_nr = epoch_log_read_nr(epoch, (void *)nodes, sizeof(nodes));
> +	if (nodes_nr < 0) {
> +		nodes_nr = epoch_log_read_remote(epoch, (void *)nodes, sizeof(nodes));
> +		if (nodes_nr == 0) {
> +			free(buf);
> +			return NULL;
> +		}
> +		nodes_nr /= sizeof(nodes[0]);
> +	}
> +	*nr = nodes_to_vnodes(nodes, nodes_nr, buf);
> +	*copies = get_max_nr_copies_from(nodes, nodes_nr);
> +
> +	return buf;
> +}
> +
> +static int recover_object_from_replica(uint64_t oid,
> +				       struct sd_vnode *entry,
> +				       int epoch, int tgt_epoch)
> +{
> +	struct sd_obj_req hdr;
> +	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
> +	char name[128];
> +	unsigned wlen = 0, rlen;
> +	int fd, ret = -1;
> +	void *buf;
> +	struct siocb iocb = { 0 };
> +
> +	if (is_vdi_obj(oid))
> +		rlen = SD_INODE_SIZE;
> +	else if (is_vdi_attr_obj(oid))
> +		rlen = SD_ATTR_OBJ_SIZE;
> +	else
> +		rlen = SD_DATA_OBJ_SIZE;
> +
> +	buf = valloc(rlen);
> +	if (!buf) {
> +		eprintf("%m\n");
> +		goto out;
> +	}
> +
> +	if (vnode_is_local(entry)) {
> +		iocb.epoch = epoch;
> +		iocb.length = rlen;
> +		ret = sd_store->link(oid, &iocb, tgt_epoch);
> +		if (ret == SD_RES_SUCCESS) {
> +			ret = 0;
> +			goto done;
> +		} else {
> +			ret = -1;
> +			goto out;
> +		}
> +	}
> +
> +	addr_to_str(name, sizeof(name), entry->addr, 0);
> +	fd = connect_to(name, entry->port);
> +	dprintf("%s, %d\n", name, entry->port);
> +	if (fd < 0) {
> +		eprintf("failed to connect to %s:%"PRIu32"\n", name, entry->port);
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +	hdr.opcode = SD_OP_READ_OBJ;
> +	hdr.oid = oid;
> +	hdr.epoch = epoch;
> +	hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_IO_LOCAL;
> +	hdr.tgt_epoch = tgt_epoch;
> +	hdr.data_length = rlen;
> +
> +	ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
> +
> +	close(fd);
> +
> +	if (ret != 0) {
> +		eprintf("res: %"PRIx32"\n", rsp->result);
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	rsp = (struct sd_obj_rsp *)&hdr;
> +
> +	if (rsp->result == SD_RES_SUCCESS) {
> +		iocb.epoch = epoch;
> +		iocb.length = rlen;
> +		iocb.buf = buf;
> +		ret = sd_store->atomic_put(oid, &iocb);
> +		if (ret != SD_RES_SUCCESS) {
> +			ret = -1;
> +			goto out;
> +		}
> +	} else if (rsp->result == SD_RES_NEW_NODE_VER ||
> +			rsp->result == SD_RES_OLD_NODE_VER ||
> +			rsp->result == SD_RES_NETWORK_ERROR) {
> +		dprintf("retrying: %"PRIx32", %"PRIx64"\n", rsp->result, oid);
> +		ret = 1;
> +		goto out;
> +	} else {
> +		eprintf("failed, res: %"PRIx32"\n", rsp->result);
> +		ret = -1;
> +		goto out;
> +	}
> +done:
> +	dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch);
> +out:
> +	free(buf);
> +	return ret;
> +}
> +
> +static void rollback_old_cur(struct sd_vnode *old, int *old_nr, int *old_copies,
> +			     struct sd_vnode *cur, int *cur_nr, int *cur_copies,
> +			     struct sd_vnode *new_old, int new_old_nr, int new_old_copies)
> +{
> +	int nr_old = *old_nr;
> +	int copies_old = *old_copies;
> +
> +	memcpy(cur, old, sizeof(*old) * nr_old);
> +	*cur_nr = nr_old;
> +	*cur_copies = copies_old;
> +	memcpy(old, new_old, sizeof(*new_old) * new_old_nr);
> +	*old_nr = new_old_nr;
> +	*old_copies = new_old_copies;
> +}
> +
> +/*
> + * Recover the object from its track in epoch history. That is,
> + * the routine will try to recovery it from the nodes it has stayed,
> + * at least, *theoretically* on consistent hash ring.
> + */
> +static int do_recover_object(struct recovery_work *rw, int copy_idx)
> +{
> +	struct sd_vnode *old, *cur;
> +	uint64_t oid = rw->oids[rw->done];
> +	int old_nr = rw->old_nr_vnodes, cur_nr = rw->cur_nr_vnodes;
> +	int epoch = rw->epoch, tgt_epoch = rw->epoch - 1;
> +	struct sd_vnode *tgt_entry;
> +	int old_idx, cur_idx, tgt_idx, old_copies, cur_copies, ret;
> +
> +	old = xmalloc(sizeof(*old) * SD_MAX_VNODES);
> +	cur = xmalloc(sizeof(*cur) * SD_MAX_VNODES);
> +	memcpy(old, rw->old_vnodes, sizeof(*old) * old_nr);
> +	memcpy(cur, rw->cur_vnodes, sizeof(*cur) * cur_nr);
> +	old_copies = get_max_nr_copies_from(rw->old_nodes, rw->old_nr_nodes);
> +	cur_copies = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes);
> +
> +again:
> +	old_idx = obj_to_sheep(old, old_nr, oid, 0);
> +	cur_idx = obj_to_sheep(cur, cur_nr, oid, 0);
> +
> +	dprintf("try recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch);
> +
> +	if (cur_copies <= copy_idx) {
> +		eprintf("epoch (%"PRIu32") has less copies (%d) than requested copy_idx: %d\n",
> +		tgt_epoch, cur_copies, copy_idx);
> +		ret = -1;
> +		goto err;
> +	}
> +
> +	tgt_idx = find_tgt_node(old, old_nr, old_idx, old_copies,
> +			cur, cur_nr, cur_idx, cur_copies, copy_idx);
> +	if (tgt_idx < 0) {
> +		eprintf("cannot find target node %"PRIx64"\n", oid);
> +		ret = -1;
> +		goto err;
> +	}
> +	tgt_entry = old + tgt_idx;
> +
> +	ret = recover_object_from_replica(oid, tgt_entry, epoch, tgt_epoch);
> +	if (ret < 0) {
> +		struct sd_vnode *new_old;
> +		int new_old_nr, new_old_copies;
> +
> +		tgt_epoch--;
> +		if (tgt_epoch < 1) {
> +			eprintf("can not recover oid %"PRIx64"\n", oid);
> +			ret = -1;
> +			goto err;
> +		}
> +
> +		new_old = get_vnodes_from_epoch(tgt_epoch, &new_old_nr, &new_old_copies);
> +		if (!new_old) {
> +			ret = -1;
> +			goto err;
> +		}
> +		rollback_old_cur(old, &old_nr, &old_copies, cur, &cur_nr, &cur_copies,
> +				new_old, new_old_nr, new_old_copies);
> +		free(new_old);
> +		goto again;
> +	} else if (ret > 0) {
> +		ret = 0;
> +		rw->retry = 1;
> +	}
> +err:
> +	free(old);
> +	free(cur);
> +	return ret;
> +}
> +
> +static int get_replica_idx(struct recovery_work *rw, uint64_t oid, int *copy_nr)
> +{
> +	int i, ret = -1;
> +	*copy_nr = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes);
> +	for (i = 0; i < *copy_nr; i++) {
> +		int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i);
> +		if (vnode_is_local(&rw->cur_vnodes[n])) {
> +			ret = i;
> +			break;
> +		}
> +	}
> +	return ret;
> +}
> +
> +static void recover_object(struct work *work)
> +{
> +	struct recovery_work *rw = container_of(work, struct recovery_work, work);
> +	uint64_t oid = rw->oids[rw->done];
> +	uint32_t epoch = rw->epoch;
> +	int i, copy_idx, copy_nr, ret;
> +	struct siocb iocb = { 0 };
> +
> +	if (!sys->nr_copies)
> +		return;
> +
> +	eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid);
> +
> +	iocb.epoch = epoch;
> +	ret = sd_store->open(oid, &iocb, 0);
> +	if (ret == SD_RES_SUCCESS) {
> +		sd_store->close(oid, &iocb);
> +		dprintf("the object is already recovered\n");
> +		return;
> +	}
> +
> +	copy_idx = get_replica_idx(rw, oid, &copy_nr);
> +	if (copy_idx < 0) {
> +		ret = -1;
> +		goto err;
> +	}
> +	ret = do_recover_object(rw, copy_idx);
> +	if (ret < 0) {
> +		for (i = 0; i < copy_nr; i++) {
> +			if (i == copy_idx)
> +				continue;
> +			ret = do_recover_object(rw, i);
> +			if (ret == 0)
> +				break;
> +		}
> +	}
> +err:
> +	if (ret < 0)
> +		eprintf("failed to recover object %"PRIx64"\n", oid);
> +}
> +
> +static struct recovery_work *suspended_recovery_work;
> +
> +static void recover_timer(void *data)
> +{
> +	struct recovery_work *rw = (struct recovery_work *)data;
> +	uint64_t oid = rw->oids[rw->done];
> +
> +	if (is_access_to_busy_objects(oid)) {
> +		suspended_recovery_work = rw;
> +		return;
> +	}
> +
> +	queue_work(sys->recovery_wqueue, &rw->work);
> +}
> +
> +void resume_recovery_work(void)
> +{
> +	struct recovery_work *rw;
> +	uint64_t oid;
> +
> +	if (!suspended_recovery_work)
> +		return;
> +
> +	rw = suspended_recovery_work;
> +
> +	oid =  rw->oids[rw->done];
> +	if (is_access_to_busy_objects(oid))
> +		return;
> +
> +	suspended_recovery_work = NULL;
> +	queue_work(sys->recovery_wqueue, &rw->work);
> +}
> +
> +int node_in_recovery(void)
> +{
> +	return !!recovering_work;
> +}
> +
> +int is_recoverying_oid(uint64_t oid)
> +{
> +	uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT);
> +	uint64_t min_hval;
> +	struct recovery_work *rw = recovering_work;
> +	int ret, i;
> +	struct siocb iocb;
> +
> +	if (oid == 0)
> +		return 0;
> +
> +	if (!rw)
> +		return 0; /* there is no thread working for object recovery */
> +
> +	min_hval = fnv_64a_buf(&rw->oids[rw->done + rw->nr_blocking], sizeof(uint64_t), FNV1A_64_INIT);
> +
> +	if (before(rw->epoch, sys->epoch))
> +		return 1;
> +
> +	if (rw->state == RW_INIT)
> +		return 1;
> +
> +	memset(&iocb, 0, sizeof(iocb));
> +	iocb.epoch = sys->epoch;
> +	ret = sd_store->open(oid, &iocb, 0);
> +	if (ret == SD_RES_SUCCESS) {
> +		dprintf("the object %" PRIx64 " is already recoverd\n", oid);
> +		sd_store->close(oid, &iocb);
> +		return 0;
> +	}
> +
> +	/* the first 'rw->nr_blocking' objects were already scheduled to be done earlier */
> +	for (i = 0; i < rw->nr_blocking; i++)
> +		if (rw->oids[rw->done + i] == oid)
> +			return 1;
> +
> +	if (min_hval <= hval) {
> +		uint64_t *p;
> +		p = bsearch(&oid, rw->oids + rw->done + rw->nr_blocking,
> +			    rw->count - rw->done - rw->nr_blocking, sizeof(oid), obj_cmp);
> +		if (p) {
> +			dprintf("recover the object %" PRIx64 " first\n", oid);
> +			if (rw->nr_blocking == 0)
> +				rw->nr_blocking = 1; /* the first oid may be processed now */
> +			if (p > rw->oids + rw->done + rw->nr_blocking) {
> +				/* this object should be recovered earlier */
> +				memmove(rw->oids + rw->done + rw->nr_blocking + 1,
> +					rw->oids + rw->done + rw->nr_blocking,
> +					sizeof(uint64_t) * (p - (rw->oids + rw->done + rw->nr_blocking)));
> +				rw->oids[rw->done + rw->nr_blocking] = oid;
> +				rw->nr_blocking++;
> +			}
> +			return 1;
> +		}
> +	}
> +
> +	dprintf("the object %" PRIx64 " is not found\n", oid);
> +	return 0;
> +}
> +
> +static void do_recover_main(struct work *work)
> +{
> +	struct recovery_work *rw = container_of(work, struct recovery_work, work);
> +	uint64_t oid;
> +
> +	if (rw->state == RW_INIT)
> +		rw->state = RW_RUN;
> +	else if (!rw->retry) {
> +		rw->done++;
> +		if (rw->nr_blocking > 0)
> +			rw->nr_blocking--;
> +	}
> +
> +	oid = rw->oids[rw->done];
> +
> +	if (rw->retry && !next_rw) {
> +		rw->retry = 0;
> +
> +		rw->timer.callback = recover_timer;
> +		rw->timer.data = rw;
> +		add_timer(&rw->timer, 2);
> +		return;
> +	}
> +
> +	if (rw->done < rw->count && !next_rw) {
> +		rw->work.fn = recover_object;
> +
> +		if (is_access_to_busy_objects(oid)) {
> +			suspended_recovery_work = rw;
> +			return;
> +		}
> +		resume_pending_requests();
> +		queue_work(sys->recovery_wqueue, &rw->work);
> +		return;
> +	}
> +
> +	dprintf("recovery complete: new epoch %"PRIu32"\n", rw->epoch);
> +	recovering_work = NULL;
> +
> +	sys->recovered_epoch = rw->epoch;
> +
> +	free(rw->oids);
> +	free(rw);
> +
> +	if (next_rw) {
> +		rw = next_rw;
> +		next_rw = NULL;
> +
> +		recovering_work = rw;
> +		queue_work(sys->recovery_wqueue, &rw->work);
> +	} else {
> +		if (sd_store->end_recover) {
> +			struct siocb iocb = { 0 };
> +			iocb.epoch = sys->epoch;
> +			sd_store->end_recover(&iocb);
> +		}
> +	}
> +
> +	resume_pending_requests();
> +}
> +
> +static int request_obj_list(struct sd_node *e, uint32_t epoch,
> +			   uint8_t *buf, size_t buf_size)
> +{
> +	int fd, ret;
> +	unsigned wlen, rlen;
> +	char name[128];
> +	struct sd_list_req hdr;
> +	struct sd_list_rsp *rsp;
> +
> +	addr_to_str(name, sizeof(name), e->addr, 0);
> +
> +	dprintf("%s %"PRIu32"\n", name, e->port);
> +
> +	fd = connect_to(name, e->port);
> +	if (fd < 0) {
> +		eprintf("%s %"PRIu32"\n", name, e->port);
> +		return -1;
> +	}
> +
> +	wlen = 0;
> +	rlen = buf_size;
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +	hdr.opcode = SD_OP_GET_OBJ_LIST;
> +	hdr.tgt_epoch = epoch - 1;
> +	hdr.flags = 0;
> +	hdr.data_length = rlen;
> +
> +	ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
> +
> +	close(fd);
> +
> +	rsp = (struct sd_list_rsp *)&hdr;
> +
> +	if (ret || rsp->result != SD_RES_SUCCESS) {
> +		eprintf("retrying: %"PRIu32", %"PRIu32"\n", ret, rsp->result);
> +		return -1;
> +	}
> +
> +	dprintf("%"PRIu64"\n", rsp->data_length / sizeof(uint64_t));
> +
> +	return rsp->data_length / sizeof(uint64_t);
> +}
> +
> +int merge_objlist(uint64_t *list1, int nr_list1, uint64_t *list2, int nr_list2)
> +{
> +	int i;
> +	int old_nr_list1 = nr_list1;
> +
> +	for (i = 0; i < nr_list2; i++) {
> +		if (bsearch(list2 + i, list1, old_nr_list1, sizeof(*list1), obj_cmp))
> +			continue;
> +
> +		list1[nr_list1++] = list2[i];
> +	}
> +
> +	qsort(list1, nr_list1, sizeof(*list1), obj_cmp);
> +
> +	return nr_list1;
> +}
> +
> +static int screen_obj_list(struct recovery_work *rw,  uint64_t *list, int list_nr)
> +{
> +	int ret, i, cp, idx;
> +	struct strbuf buf = STRBUF_INIT;
> +	struct sd_vnode *nodes = rw->cur_vnodes;
> +	int nodes_nr = rw->cur_nr_vnodes;
> +	int nr_objs = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes);
> +
> +	for (i = 0; i < list_nr; i++) {
> +		for (cp = 0; cp < nr_objs; cp++) {
> +			idx = obj_to_sheep(nodes, nodes_nr, list[i], cp);
> +			if (vnode_is_local(&nodes[idx]))
> +				break;
> +		}
> +		if (cp == nr_objs)
> +			continue;
> +		strbuf_add(&buf, &list[i], sizeof(uint64_t));
> +	}
> +	memcpy(list, buf.buf, buf.len);
> +
> +	ret = buf.len / sizeof(uint64_t);
> +	dprintf("%d\n", ret);
> +	strbuf_release(&buf);
> +
> +	return ret;
> +}
> +
> +#define MAX_RETRY_CNT  6
> +
> +static int newly_joined(struct sd_node *node, struct recovery_work *rw)
> +{
> +	struct sd_node *old = rw->old_nodes;
> +	int old_nr = rw->old_nr_nodes;
> +	int i;
> +	for (i = 0; i < old_nr; i++)
> +		if (node_cmp(node, old + i) == 0)
> +			break;
> +
> +	if (i == old_nr)
> +		return 1;
> +	return 0;
> +}
> +
> +static int fill_obj_list(struct recovery_work *rw)
> +{
> +	int i;
> +	uint8_t *buf = NULL;
> +	size_t buf_size = SD_DATA_OBJ_SIZE; /* FIXME */
> +	int retry_cnt;
> +	struct sd_node *cur = rw->cur_nodes;
> +	int cur_nr = rw->cur_nr_nodes;
> +
> +	buf = malloc(buf_size);
> +	if (!buf) {
> +		eprintf("out of memory\n");
> +		rw->retry = 1;
> +		return -1;
> +	}
> +	for (i = 0; i < cur_nr; i++) {
> +		int buf_nr;
> +		struct sd_node *node = cur + i;
> +
> +		if (newly_joined(node, rw))
> +			/* new node doesn't have a list file */
> +			continue;
> +
> +		retry_cnt = 0;
> +	retry:
> +		buf_nr = request_obj_list(node, rw->epoch, buf, buf_size);
> +		if (buf_nr < 0) {
> +			retry_cnt++;
> +			if (retry_cnt > MAX_RETRY_CNT) {
> +				eprintf("failed to get object list\n");
> +				eprintf("some objects may be lost\n");
> +				continue;
> +			} else {
> +				if (next_rw) {
> +					dprintf("go to the next recovery\n");
> +					break;
> +				}
> +				dprintf("trying to get object list again\n");
> +				sleep(1);
> +				goto retry;
> +			}
> +		}
> +		buf_nr = screen_obj_list(rw, (uint64_t *)buf, buf_nr);
> +		if (buf_nr)
> +			rw->count = merge_objlist(rw->oids, rw->count, (uint64_t *)buf, buf_nr);
> +	}
> +
> +	dprintf("%d\n", rw->count);
> +	free(buf);
> +	return 0;
> +}
> +
> +/* setup node list and virtual node list */
> +static int init_rw(struct recovery_work *rw)
> +{
> +	int epoch = rw->epoch;
> +
> +	rw->cur_nr_nodes = epoch_log_read_nr(epoch, (char *)rw->cur_nodes,
> +					     sizeof(rw->cur_nodes));
> +	if (rw->cur_nr_nodes <= 0) {
> +		eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch);
> +		return -1;
> +	}
> +
> +	rw->old_nr_nodes = epoch_log_read_nr(epoch - 1, (char *)rw->old_nodes,
> +					     sizeof(rw->old_nodes));
> +	if (rw->old_nr_nodes <= 0) {
> +		eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch - 1);
> +		return -1;
> +	}
> +	rw->old_nr_vnodes = nodes_to_vnodes(rw->old_nodes, rw->old_nr_nodes,
> +					    rw->old_vnodes);
> +	rw->cur_nr_vnodes = nodes_to_vnodes(rw->cur_nodes, rw->cur_nr_nodes,
> +					    rw->cur_vnodes);
> +
> +	return 0;
> +}
> +
> +static void do_recovery_work(struct work *work)
> +{
> +	struct recovery_work *rw = container_of(work, struct recovery_work, work);
> +
> +	dprintf("%u\n", rw->epoch);
> +
> +	if (!sys->nr_copies)
> +		return;
> +
> +	if (rw->cur_nr_nodes == 0)
> +		init_rw(rw);
> +
> +	if (fill_obj_list(rw) < 0) {
> +		eprintf("fatal recovery error\n");
> +		rw->count = 0;
> +		return;
> +	}
> +}
> +
> +int start_recovery(uint32_t epoch)
> +{
> +	struct recovery_work *rw;
> +
> +	rw = zalloc(sizeof(struct recovery_work));
> +	if (!rw)
> +		return -1;
> +
> +	rw->state = RW_INIT;
> +	rw->oids = malloc(1 << 20); /* FIXME */
> +	rw->epoch = epoch;
> +	rw->count = 0;
> +
> +	rw->work.fn = do_recovery_work;
> +	rw->work.done = do_recover_main;
> +
> +	if (sd_store->begin_recover) {
> +		struct siocb iocb = { 0 };
> +		iocb.epoch = epoch;
> +		sd_store->begin_recover(&iocb);
> +	}
> +
> +	if (recovering_work != NULL) {
> +		if (next_rw) {
> +			/* skip the previous epoch recovery */
> +			free(next_rw->oids);
> +			free(next_rw);
> +		}
> +		next_rw = rw;
> +	} else {
> +		recovering_work = rw;
> +		queue_work(sys->recovery_wqueue, &rw->work);
> +	}
> +
> +	return 0;
> +}
> Index: sheepdog/sheep/store.c
> ===================================================================
> --- sheepdog.orig/sheep/store.c	2012-04-27 17:19:46.000000000 +0200
> +++ sheepdog/sheep/store.c	2012-04-27 17:24:03.260087553 +0200
> @@ -16,11 +16,9 @@
>  #include <stdlib.h>
>  #include <unistd.h>
>  #include <poll.h>
> -#include <sys/xattr.h>
>  #include <sys/statvfs.h>
>  #include <sys/types.h>
>  #include <sys/stat.h>
> -#include <fcntl.h>
>  #include <time.h>
>  #include <pthread.h>
>  
> @@ -133,18 +131,6 @@ static int check_and_insert_objlist_cach
>  	return 0;
>  }
>  
> -static int obj_cmp(const void *oid1, const void *oid2)
> -{
> -	const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);
> -	const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t), FNV1A_64_INIT);
> -
> -	if (hval1 < hval2)
> -		return -1;
> -	if (hval1 > hval2)
> -		return 1;
> -	return 0;
> -}
> -
>  static void get_store_dir(struct strbuf *buf, int epoch)
>  {
>  	if (!strcmp(sd_store->name, "simple"))
> @@ -1129,803 +1115,6 @@ uint64_t get_cluster_ctime(void)
>  	return ct;
>  }
>  
> -/*
> - * contains_node - checks that the node id is included in the target nodes
> - *
> - * The target nodes to store replicated objects are the first N nodes
> - * from the base_idx'th on the consistent hash ring, where N is the
> - * number of copies of objects.
> - */
> -static int contains_node(struct sd_vnode *key,
> -			 struct sd_vnode *entry,
> -			 int nr, int base_idx, int copies)
> -{
> -	int i;
> -
> -	for (i = 0; i < copies; i++) {
> -		int idx = get_nth_node(entry, nr, base_idx, i);
> -		if (memcmp(key->addr, entry[idx].addr, sizeof(key->addr)) == 0
> -		    && key->port == entry[idx].port)
> -			return idx;
> -	}
> -	return -1;
> -}
> -
> -enum rw_state {
> -	RW_INIT,
> -	RW_RUN,
> -};
> -
> -struct recovery_work {
> -	enum rw_state state;
> -
> -	uint32_t epoch;
> -	uint32_t done;
> -
> -	struct timer timer;
> -	int retry;
> -	struct work work;
> -
> -	int nr_blocking;
> -	int count;
> -	uint64_t *oids;
> -
> -	int old_nr_nodes;
> -	struct sd_node old_nodes[SD_MAX_NODES];
> -	int cur_nr_nodes;
> -	struct sd_node cur_nodes[SD_MAX_NODES];
> -	int old_nr_vnodes;
> -	struct sd_vnode old_vnodes[SD_MAX_VNODES];
> -	int cur_nr_vnodes;
> -	struct sd_vnode cur_vnodes[SD_MAX_VNODES];
> -};
> -
> -static struct recovery_work *next_rw;
> -static struct recovery_work *recovering_work;
> -
> -/*
> - * find_tgt_node - find the node from which we should recover objects
> - *
> - * This function compares two node lists, the current target nodes and
> - * the previous target nodes, and finds the node from the previous
> - * target nodes which corresponds to the copy_idx'th node of the
> - * current target nodes.  The correspondence is injective and
> - * maximizes the number of nodes which can recover objects locally.
> - *
> - * For example, consider the number of redundancy is 5, the consistent
> - * hash ring is {A, B, C, D, E, F}, and the node G is newly added.
> - * The parameters of this function are
> - *   old_entry = {A, B, C, D, E, F},    old_nr = 6, old_idx = 3
> - *   cur_entry = {A, B, C, D, E, F, G}, cur_nr = 7, cur_idx = 3
> - *
> - * In this case:
> - *   the previous target nodes: {D, E, F, A, B}
> - *     (the first 5 nodes from the 3rd node on the previous hash ring)
> - *   the current target nodes : {D, E, F, G, A}
> - *     (the first 5 nodes from the 3rd node on the current hash ring)
> - *
> - * The correspondence between copy_idx and return value are as follows:
> - * ----------------------------
> - * copy_idx       0  1  2  3  4
> - * src_node       D  E  F  G  A
> - * tgt_node       D  E  F  B  A
> - * return value   0  1  2  4  3
> - * ----------------------------
> - *
> - * The node D, E, F, and A can recover objects from local, and the
> - * node G recovers from the node B.
> - */
> -static int find_tgt_node(struct sd_vnode *old_entry,
> -			 int old_nr, int old_idx, int old_copies,
> -			 struct sd_vnode *cur_entry,
> -			 int cur_nr, int cur_idx, int cur_copies,
> -			 int copy_idx)
> -{
> -	int i, j, idx;
> -
> -	dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n",
> -		old_idx, old_nr, old_copies, cur_idx, cur_nr, cur_copies, copy_idx);
> -
> -	/* If the same node is in the previous target nodes, return its index */
> -	idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, copy_idx),
> -			    old_entry, old_nr, old_idx, old_copies);
> -	if (idx >= 0) {
> -		dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", idx, copy_idx, cur_idx, cur_nr);
> -		return idx;
> -	}
> -
> -	for (i = 0, j = 0; ; i++, j++) {
> -		if (i < copy_idx) {
> -			/* Skip if the node can recover from its local */
> -			idx = contains_node(cur_entry + get_nth_node(cur_entry, cur_nr, cur_idx, i),
> -					    old_entry, old_nr, old_idx, old_copies);
> -			if (idx >= 0)
> -				continue;
> -
> -			/* Find the next target which needs to recover from remote */
> -			while (j < old_copies &&
> -			       contains_node(old_entry + get_nth_node(old_entry, old_nr, old_idx, j),
> -					     cur_entry, cur_nr, cur_idx, cur_copies) >= 0)
> -				j++;
> -		}
> -		if (j == old_copies) {
> -			/*
> -			 * Cannot find the target because the number of zones
> -			 * is smaller than the number of copies.  We can select
> -			 * any node in this case, so select the first one.
> -			 */
> -			return old_idx;
> -		}
> -
> -		if (i == copy_idx) {
> -			/* Found the target node correspoinding to copy_idx */
> -			dprintf("%"PRIu32", %"PRIu32", %"PRIu32"\n",
> -				get_nth_node(old_entry, old_nr, old_idx, j),
> -				copy_idx, (cur_idx + i) % cur_nr);
> -			return get_nth_node(old_entry, old_nr, old_idx, j);
> -		}
> -
> -	}
> -
> -	return -1;
> -}
> -
> -static void *get_vnodes_from_epoch(int epoch, int *nr, int *copies)
> -{
> -	int nodes_nr, len = sizeof(struct sd_vnode) * SD_MAX_VNODES;
> -	struct sd_node nodes[SD_MAX_NODES];
> -	void *buf = xmalloc(len);
> -
> -	nodes_nr = epoch_log_read_nr(epoch, (void *)nodes, sizeof(nodes));
> -	if (nodes_nr < 0) {
> -		nodes_nr = epoch_log_read_remote(epoch, (void *)nodes, sizeof(nodes));
> -		if (nodes_nr == 0) {
> -			free(buf);
> -			return NULL;
> -		}
> -		nodes_nr /= sizeof(nodes[0]);
> -	}
> -	*nr = nodes_to_vnodes(nodes, nodes_nr, buf);
> -	*copies = get_max_nr_copies_from(nodes, nodes_nr);
> -
> -	return buf;
> -}
> -
> -static int recover_object_from_replica(uint64_t oid,
> -				       struct sd_vnode *entry,
> -				       int epoch, int tgt_epoch)
> -{
> -	struct sd_obj_req hdr;
> -	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
> -	char name[128];
> -	unsigned wlen = 0, rlen;
> -	int fd, ret = -1;
> -	void *buf;
> -	struct siocb iocb = { 0 };
> -
> -	if (is_vdi_obj(oid))
> -		rlen = SD_INODE_SIZE;
> -	else if (is_vdi_attr_obj(oid))
> -		rlen = SD_ATTR_OBJ_SIZE;
> -	else
> -		rlen = SD_DATA_OBJ_SIZE;
> -
> -	buf = valloc(rlen);
> -	if (!buf) {
> -		eprintf("%m\n");
> -		goto out;
> -	}
> -
> -	if (vnode_is_local(entry)) {
> -		iocb.epoch = epoch;
> -		iocb.length = rlen;
> -		ret = sd_store->link(oid, &iocb, tgt_epoch);
> -		if (ret == SD_RES_SUCCESS) {
> -			ret = 0;
> -			goto done;
> -		} else {
> -			ret = -1;
> -			goto out;
> -		}
> -	}
> -
> -	addr_to_str(name, sizeof(name), entry->addr, 0);
> -	fd = connect_to(name, entry->port);
> -	dprintf("%s, %d\n", name, entry->port);
> -	if (fd < 0) {
> -		eprintf("failed to connect to %s:%"PRIu32"\n", name, entry->port);
> -		ret = -1;
> -		goto out;
> -	}
> -
> -	memset(&hdr, 0, sizeof(hdr));
> -	hdr.opcode = SD_OP_READ_OBJ;
> -	hdr.oid = oid;
> -	hdr.epoch = epoch;
> -	hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_IO_LOCAL;
> -	hdr.tgt_epoch = tgt_epoch;
> -	hdr.data_length = rlen;
> -
> -	ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
> -
> -	close(fd);
> -
> -	if (ret != 0) {
> -		eprintf("res: %"PRIx32"\n", rsp->result);
> -		ret = -1;
> -		goto out;
> -	}
> -
> -	rsp = (struct sd_obj_rsp *)&hdr;
> -
> -	if (rsp->result == SD_RES_SUCCESS) {
> -		iocb.epoch = epoch;
> -		iocb.length = rlen;
> -		iocb.buf = buf;
> -		ret = sd_store->atomic_put(oid, &iocb);
> -		if (ret != SD_RES_SUCCESS) {
> -			ret = -1;
> -			goto out;
> -		}
> -	} else if (rsp->result == SD_RES_NEW_NODE_VER ||
> -			rsp->result == SD_RES_OLD_NODE_VER ||
> -			rsp->result == SD_RES_NETWORK_ERROR) {
> -		dprintf("retrying: %"PRIx32", %"PRIx64"\n", rsp->result, oid);
> -		ret = 1;
> -		goto out;
> -	} else {
> -		eprintf("failed, res: %"PRIx32"\n", rsp->result);
> -		ret = -1;
> -		goto out;
> -	}
> -done:
> -	dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch);
> -out:
> -	free(buf);
> -	return ret;
> -}
> -
> -static void rollback_old_cur(struct sd_vnode *old, int *old_nr, int *old_copies,
> -			     struct sd_vnode *cur, int *cur_nr, int *cur_copies,
> -			     struct sd_vnode *new_old, int new_old_nr, int new_old_copies)
> -{
> -	int nr_old = *old_nr;
> -	int copies_old = *old_copies;
> -
> -	memcpy(cur, old, sizeof(*old) * nr_old);
> -	*cur_nr = nr_old;
> -	*cur_copies = copies_old;
> -	memcpy(old, new_old, sizeof(*new_old) * new_old_nr);
> -	*old_nr = new_old_nr;
> -	*old_copies = new_old_copies;
> -}
> -
> -/*
> - * Recover the object from its track in epoch history. That is,
> - * the routine will try to recovery it from the nodes it has stayed,
> - * at least, *theoretically* on consistent hash ring.
> - */
> -static int do_recover_object(struct recovery_work *rw, int copy_idx)
> -{
> -	struct sd_vnode *old, *cur;
> -	uint64_t oid = rw->oids[rw->done];
> -	int old_nr = rw->old_nr_vnodes, cur_nr = rw->cur_nr_vnodes;
> -	int epoch = rw->epoch, tgt_epoch = rw->epoch - 1;
> -	struct sd_vnode *tgt_entry;
> -	int old_idx, cur_idx, tgt_idx, old_copies, cur_copies, ret;
> -
> -	old = xmalloc(sizeof(*old) * SD_MAX_VNODES);
> -	cur = xmalloc(sizeof(*cur) * SD_MAX_VNODES);
> -	memcpy(old, rw->old_vnodes, sizeof(*old) * old_nr);
> -	memcpy(cur, rw->cur_vnodes, sizeof(*cur) * cur_nr);
> -	old_copies = get_max_nr_copies_from(rw->old_nodes, rw->old_nr_nodes);
> -	cur_copies = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes);
> -
> -again:
> -	old_idx = obj_to_sheep(old, old_nr, oid, 0);
> -	cur_idx = obj_to_sheep(cur, cur_nr, oid, 0);
> -
> -	dprintf("try recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch);
> -
> -	if (cur_copies <= copy_idx) {
> -		eprintf("epoch (%"PRIu32") has less copies (%d) than requested copy_idx: %d\n",
> -		tgt_epoch, cur_copies, copy_idx);
> -		ret = -1;
> -		goto err;
> -	}
> -
> -	tgt_idx = find_tgt_node(old, old_nr, old_idx, old_copies,
> -			cur, cur_nr, cur_idx, cur_copies, copy_idx);
> -	if (tgt_idx < 0) {
> -		eprintf("cannot find target node %"PRIx64"\n", oid);
> -		ret = -1;
> -		goto err;
> -	}
> -	tgt_entry = old + tgt_idx;
> -
> -	ret = recover_object_from_replica(oid, tgt_entry, epoch, tgt_epoch);
> -	if (ret < 0) {
> -		struct sd_vnode *new_old;
> -		int new_old_nr, new_old_copies;
> -
> -		tgt_epoch--;
> -		if (tgt_epoch < 1) {
> -			eprintf("can not recover oid %"PRIx64"\n", oid);
> -			ret = -1;
> -			goto err;
> -		}
> -
> -		new_old = get_vnodes_from_epoch(tgt_epoch, &new_old_nr, &new_old_copies);
> -		if (!new_old) {
> -			ret = -1;
> -			goto err;
> -		}
> -		rollback_old_cur(old, &old_nr, &old_copies, cur, &cur_nr, &cur_copies,
> -				new_old, new_old_nr, new_old_copies);
> -		free(new_old);
> -		goto again;
> -	} else if (ret > 0) {
> -		ret = 0;
> -		rw->retry = 1;
> -	}
> -err:
> -	free(old);
> -	free(cur);
> -	return ret;
> -}
> -
> -static int get_replica_idx(struct recovery_work *rw, uint64_t oid, int *copy_nr)
> -{
> -	int i, ret = -1;
> -	*copy_nr = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes);
> -	for (i = 0; i < *copy_nr; i++) {
> -		int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i);
> -		if (vnode_is_local(&rw->cur_vnodes[n])) {
> -			ret = i;
> -			break;
> -		}
> -	}
> -	return ret;
> -}
> -
> -static void recover_object(struct work *work)
> -{
> -	struct recovery_work *rw = container_of(work, struct recovery_work, work);
> -	uint64_t oid = rw->oids[rw->done];
> -	uint32_t epoch = rw->epoch;
> -	int i, copy_idx, copy_nr, ret;
> -	struct siocb iocb = { 0 };
> -
> -	if (!sys->nr_copies)
> -		return;
> -
> -	eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid);
> -
> -	iocb.epoch = epoch;
> -	ret = sd_store->open(oid, &iocb, 0);
> -	if (ret == SD_RES_SUCCESS) {
> -		sd_store->close(oid, &iocb);
> -		dprintf("the object is already recovered\n");
> -		return;
> -	}
> -
> -	copy_idx = get_replica_idx(rw, oid, &copy_nr);
> -	if (copy_idx < 0) {
> -		ret = -1;
> -		goto err;
> -	}
> -	ret = do_recover_object(rw, copy_idx);
> -	if (ret < 0) {
> -		for (i = 0; i < copy_nr; i++) {
> -			if (i == copy_idx)
> -				continue;
> -			ret = do_recover_object(rw, i);
> -			if (ret == 0)
> -				break;
> -		}
> -	}
> -err:
> -	if (ret < 0)
> -		eprintf("failed to recover object %"PRIx64"\n", oid);
> -}
> -
> -static struct recovery_work *suspended_recovery_work;
> -
> -static void recover_timer(void *data)
> -{
> -	struct recovery_work *rw = (struct recovery_work *)data;
> -	uint64_t oid = rw->oids[rw->done];
> -
> -	if (is_access_to_busy_objects(oid)) {
> -		suspended_recovery_work = rw;
> -		return;
> -	}
> -
> -	queue_work(sys->recovery_wqueue, &rw->work);
> -}
> -
> -void resume_recovery_work(void)
> -{
> -	struct recovery_work *rw;
> -	uint64_t oid;
> -
> -	if (!suspended_recovery_work)
> -		return;
> -
> -	rw = suspended_recovery_work;
> -
> -	oid =  rw->oids[rw->done];
> -	if (is_access_to_busy_objects(oid))
> -		return;
> -
> -	suspended_recovery_work = NULL;
> -	queue_work(sys->recovery_wqueue, &rw->work);
> -}
> -
> -int node_in_recovery(void)
> -{
> -	return !!recovering_work;
> -}
> -
> -int is_recoverying_oid(uint64_t oid)
> -{
> -	uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT);
> -	uint64_t min_hval;
> -	struct recovery_work *rw = recovering_work;
> -	int ret, i;
> -	struct siocb iocb;
> -
> -	if (oid == 0)
> -		return 0;
> -
> -	if (!rw)
> -		return 0; /* there is no thread working for object recovery */
> -
> -	min_hval = fnv_64a_buf(&rw->oids[rw->done + rw->nr_blocking], sizeof(uint64_t), FNV1A_64_INIT);
> -
> -	if (before(rw->epoch, sys->epoch))
> -		return 1;
> -
> -	if (rw->state == RW_INIT)
> -		return 1;
> -
> -	memset(&iocb, 0, sizeof(iocb));
> -	iocb.epoch = sys->epoch;
> -	ret = sd_store->open(oid, &iocb, 0);
> -	if (ret == SD_RES_SUCCESS) {
> -		dprintf("the object %" PRIx64 " is already recoverd\n", oid);
> -		sd_store->close(oid, &iocb);
> -		return 0;
> -	}
> -
> -	/* the first 'rw->nr_blocking' objects were already scheduled to be done earlier */
> -	for (i = 0; i < rw->nr_blocking; i++)
> -		if (rw->oids[rw->done + i] == oid)
> -			return 1;
> -
> -	if (min_hval <= hval) {
> -		uint64_t *p;
> -		p = bsearch(&oid, rw->oids + rw->done + rw->nr_blocking,
> -			    rw->count - rw->done - rw->nr_blocking, sizeof(oid), obj_cmp);
> -		if (p) {
> -			dprintf("recover the object %" PRIx64 " first\n", oid);
> -			if (rw->nr_blocking == 0)
> -				rw->nr_blocking = 1; /* the first oid may be processed now */
> -			if (p > rw->oids + rw->done + rw->nr_blocking) {
> -				/* this object should be recovered earlier */
> -				memmove(rw->oids + rw->done + rw->nr_blocking + 1,
> -					rw->oids + rw->done + rw->nr_blocking,
> -					sizeof(uint64_t) * (p - (rw->oids + rw->done + rw->nr_blocking)));
> -				rw->oids[rw->done + rw->nr_blocking] = oid;
> -				rw->nr_blocking++;
> -			}
> -			return 1;
> -		}
> -	}
> -
> -	dprintf("the object %" PRIx64 " is not found\n", oid);
> -	return 0;
> -}
> -
> -static void do_recover_main(struct work *work)
> -{
> -	struct recovery_work *rw = container_of(work, struct recovery_work, work);
> -	uint64_t oid;
> -
> -	if (rw->state == RW_INIT)
> -		rw->state = RW_RUN;
> -	else if (!rw->retry) {
> -		rw->done++;
> -		if (rw->nr_blocking > 0)
> -			rw->nr_blocking--;
> -	}
> -
> -	oid = rw->oids[rw->done];
> -
> -	if (rw->retry && !next_rw) {
> -		rw->retry = 0;
> -
> -		rw->timer.callback = recover_timer;
> -		rw->timer.data = rw;
> -		add_timer(&rw->timer, 2);
> -		return;
> -	}
> -
> -	if (rw->done < rw->count && !next_rw) {
> -		rw->work.fn = recover_object;
> -
> -		if (is_access_to_busy_objects(oid)) {
> -			suspended_recovery_work = rw;
> -			return;
> -		}
> -		resume_pending_requests();
> -		queue_work(sys->recovery_wqueue, &rw->work);
> -		return;
> -	}
> -
> -	dprintf("recovery complete: new epoch %"PRIu32"\n", rw->epoch);
> -	recovering_work = NULL;
> -
> -	sys->recovered_epoch = rw->epoch;
> -
> -	free(rw->oids);
> -	free(rw);
> -
> -	if (next_rw) {
> -		rw = next_rw;
> -		next_rw = NULL;
> -
> -		recovering_work = rw;
> -		queue_work(sys->recovery_wqueue, &rw->work);
> -	} else {
> -		if (sd_store->end_recover) {
> -			struct siocb iocb = { 0 };
> -			iocb.epoch = sys->epoch;
> -			sd_store->end_recover(&iocb);
> -		}
> -	}
> -
> -	resume_pending_requests();
> -}
> -
> -static int request_obj_list(struct sd_node *e, uint32_t epoch,
> -			   uint8_t *buf, size_t buf_size)
> -{
> -	int fd, ret;
> -	unsigned wlen, rlen;
> -	char name[128];
> -	struct sd_list_req hdr;
> -	struct sd_list_rsp *rsp;
> -
> -	addr_to_str(name, sizeof(name), e->addr, 0);
> -
> -	dprintf("%s %"PRIu32"\n", name, e->port);
> -
> -	fd = connect_to(name, e->port);
> -	if (fd < 0) {
> -		eprintf("%s %"PRIu32"\n", name, e->port);
> -		return -1;
> -	}
> -
> -	wlen = 0;
> -	rlen = buf_size;
> -
> -	memset(&hdr, 0, sizeof(hdr));
> -	hdr.opcode = SD_OP_GET_OBJ_LIST;
> -	hdr.tgt_epoch = epoch - 1;
> -	hdr.flags = 0;
> -	hdr.data_length = rlen;
> -
> -	ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
> -
> -	close(fd);
> -
> -	rsp = (struct sd_list_rsp *)&hdr;
> -
> -	if (ret || rsp->result != SD_RES_SUCCESS) {
> -		eprintf("retrying: %"PRIu32", %"PRIu32"\n", ret, rsp->result);
> -		return -1;
> -	}
> -
> -	dprintf("%"PRIu64"\n", rsp->data_length / sizeof(uint64_t));
> -
> -	return rsp->data_length / sizeof(uint64_t);
> -}
> -
> -int merge_objlist(uint64_t *list1, int nr_list1, uint64_t *list2, int nr_list2)
> -{
> -	int i;
> -	int old_nr_list1 = nr_list1;
> -
> -	for (i = 0; i < nr_list2; i++) {
> -		if (bsearch(list2 + i, list1, old_nr_list1, sizeof(*list1), obj_cmp))
> -			continue;
> -
> -		list1[nr_list1++] = list2[i];
> -	}
> -
> -	qsort(list1, nr_list1, sizeof(*list1), obj_cmp);
> -
> -	return nr_list1;
> -}
> -
> -static int screen_obj_list(struct recovery_work *rw,  uint64_t *list, int list_nr)
> -{
> -	int ret, i, cp, idx;
> -	struct strbuf buf = STRBUF_INIT;
> -	struct sd_vnode *nodes = rw->cur_vnodes;
> -	int nodes_nr = rw->cur_nr_vnodes;
> -	int nr_objs = get_max_nr_copies_from(rw->cur_nodes, rw->cur_nr_nodes);
> -
> -	for (i = 0; i < list_nr; i++) {
> -		for (cp = 0; cp < nr_objs; cp++) {
> -			idx = obj_to_sheep(nodes, nodes_nr, list[i], cp);
> -			if (vnode_is_local(&nodes[idx]))
> -				break;
> -		}
> -		if (cp == nr_objs)
> -			continue;
> -		strbuf_add(&buf, &list[i], sizeof(uint64_t));
> -	}
> -	memcpy(list, buf.buf, buf.len);
> -
> -	ret = buf.len / sizeof(uint64_t);
> -	dprintf("%d\n", ret);
> -	strbuf_release(&buf);
> -
> -	return ret;
> -}
> -
> -#define MAX_RETRY_CNT  6
> -
> -static int newly_joined(struct sd_node *node, struct recovery_work *rw)
> -{
> -	struct sd_node *old = rw->old_nodes;
> -	int old_nr = rw->old_nr_nodes;
> -	int i;
> -	for (i = 0; i < old_nr; i++)
> -		if (node_cmp(node, old + i) == 0)
> -			break;
> -
> -	if (i == old_nr)
> -		return 1;
> -	return 0;
> -}
> -
> -static int fill_obj_list(struct recovery_work *rw)
> -{
> -	int i;
> -	uint8_t *buf = NULL;
> -	size_t buf_size = SD_DATA_OBJ_SIZE; /* FIXME */
> -	int retry_cnt;
> -	struct sd_node *cur = rw->cur_nodes;
> -	int cur_nr = rw->cur_nr_nodes;
> -
> -	buf = malloc(buf_size);
> -	if (!buf) {
> -		eprintf("out of memory\n");
> -		rw->retry = 1;
> -		return -1;
> -	}
> -	for (i = 0; i < cur_nr; i++) {
> -		int buf_nr;
> -		struct sd_node *node = cur + i;
> -
> -		if (newly_joined(node, rw))
> -			/* new node doesn't have a list file */
> -			continue;
> -
> -		retry_cnt = 0;
> -	retry:
> -		buf_nr = request_obj_list(node, rw->epoch, buf, buf_size);
> -		if (buf_nr < 0) {
> -			retry_cnt++;
> -			if (retry_cnt > MAX_RETRY_CNT) {
> -				eprintf("failed to get object list\n");
> -				eprintf("some objects may be lost\n");
> -				continue;
> -			} else {
> -				if (next_rw) {
> -					dprintf("go to the next recovery\n");
> -					break;
> -				}
> -				dprintf("trying to get object list again\n");
> -				sleep(1);
> -				goto retry;
> -			}
> -		}
> -		buf_nr = screen_obj_list(rw, (uint64_t *)buf, buf_nr);
> -		if (buf_nr)
> -			rw->count = merge_objlist(rw->oids, rw->count, (uint64_t *)buf, buf_nr);
> -	}
> -
> -	dprintf("%d\n", rw->count);
> -	free(buf);
> -	return 0;
> -}
> -
> -/* setup node list and virtual node list */
> -static int init_rw(struct recovery_work *rw)
> -{
> -	int epoch = rw->epoch;
> -
> -	rw->cur_nr_nodes = epoch_log_read_nr(epoch, (char *)rw->cur_nodes,
> -					     sizeof(rw->cur_nodes));
> -	if (rw->cur_nr_nodes <= 0) {
> -		eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch);
> -		return -1;
> -	}
> -
> -	rw->old_nr_nodes = epoch_log_read_nr(epoch - 1, (char *)rw->old_nodes,
> -					     sizeof(rw->old_nodes));
> -	if (rw->old_nr_nodes <= 0) {
> -		eprintf("failed to read epoch log for epoch %"PRIu32"\n", epoch - 1);
> -		return -1;
> -	}
> -	rw->old_nr_vnodes = nodes_to_vnodes(rw->old_nodes, rw->old_nr_nodes,
> -					    rw->old_vnodes);
> -	rw->cur_nr_vnodes = nodes_to_vnodes(rw->cur_nodes, rw->cur_nr_nodes,
> -					    rw->cur_vnodes);
> -
> -	return 0;
> -}
> -
> -static void do_recovery_work(struct work *work)
> -{
> -	struct recovery_work *rw = container_of(work, struct recovery_work, work);
> -
> -	dprintf("%u\n", rw->epoch);
> -
> -	if (!sys->nr_copies)
> -		return;
> -
> -	if (rw->cur_nr_nodes == 0)
> -		init_rw(rw);
> -
> -	if (fill_obj_list(rw) < 0) {
> -		eprintf("fatal recovery error\n");
> -		rw->count = 0;
> -		return;
> -	}
> -}
> -
> -int start_recovery(uint32_t epoch)
> -{
> -	struct recovery_work *rw;
> -
> -	rw = zalloc(sizeof(struct recovery_work));
> -	if (!rw)
> -		return -1;
> -
> -	rw->state = RW_INIT;
> -	rw->oids = malloc(1 << 20); /* FIXME */
> -	rw->epoch = epoch;
> -	rw->count = 0;
> -
> -	rw->work.fn = do_recovery_work;
> -	rw->work.done = do_recover_main;
> -
> -	if (sd_store->begin_recover) {
> -		struct siocb iocb = { 0 };
> -		iocb.epoch = epoch;
> -		sd_store->begin_recover(&iocb);
> -	}
> -
> -	if (recovering_work != NULL) {
> -		if (next_rw) {
> -			/* skip the previous epoch recovery */
> -			free(next_rw->oids);
> -			free(next_rw);
> -		}
> -		next_rw = rw;
> -	} else {
> -		recovering_work = rw;
> -		queue_work(sys->recovery_wqueue, &rw->work);
> -	}
> -
> -	return 0;
> -}
> -
>  static int init_path(const char *d, int *new)
>  {
>  	int ret, retry = 0;

Applied, thanks,

Yuan



More information about the sheepdog mailing list