[Sheepdog] [PATCH UPDATE] sheep: refactor recovery logic

Liu Yuan namei.unix at gmail.com
Wed Dec 21 04:10:31 CET 2011


From: Liu Yuan <tailai.ly at taobao.com>

Current recovery logic is elusive and it is not easy to be understood.
I hope this work would ease the headache.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
update:
 - remove redundant memset().
 - correct old|cur_copies rollback.

 sheep/store.c |  360 +++++++++++++++++++++++++++++++--------------------------
 1 files changed, 194 insertions(+), 166 deletions(-)

diff --git a/sheep/store.c b/sheep/store.c
index fed74f8..22e5a5a 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -1139,81 +1139,81 @@ static int find_tgt_node(struct sheepdog_vnode_list_entry *old_entry,
 	return -1;
 }
 
-static int __recover_one(struct recovery_work *rw,
-			 struct sheepdog_vnode_list_entry *_old_entry,
-			 int old_nr, int old_copies,
-			 struct sheepdog_vnode_list_entry *_cur_entry,
-			 int cur_nr, int cur_copies, int cur_idx,
-			 int copy_idx, uint32_t epoch, uint32_t tgt_epoch,
-			 uint64_t oid, char *buf, int buf_len)
+static void *alloc_buffer_for(uint64_t oid)
+{
+	void *buf = NULL;
+
+	if (is_vdi_obj(oid))
+		buf = xmalloc(sizeof(struct sheepdog_inode));
+	else if (is_vdi_attr_obj(oid))
+		buf = xmalloc(SD_MAX_VDI_ATTR_VALUE_LEN);
+	else if (is_data_obj(oid))
+		buf = valloc(SD_DATA_OBJ_SIZE);
+	else
+		buf = xmalloc(SD_DATA_OBJ_SIZE);
+
+	return buf;
+}
+
+static void *get_vnodes_from_epoch(int epoch, int *nr, int *copies)
+{
+	int nodes_nr, len = sizeof(struct sheepdog_vnode_list_entry) * SD_MAX_VNODES;
+	struct sheepdog_node_list_entry nodes[SD_MAX_NODES];
+	void *buf = xmalloc(len);
+
+	nodes_nr = epoch_log_read_nr(epoch, (void *)nodes, ARRAY_SIZE(nodes));
+	if (nodes_nr < 0) {
+		nodes_nr = epoch_log_read_remote(epoch, (void *)nodes, ARRAY_SIZE(nodes));
+		if (nodes_nr == 0) {
+			free(buf);
+			return NULL;
+		}
+		nodes_nr /= sizeof(nodes[0]);
+	}
+	*nr = nodes_to_vnodes(nodes, nodes_nr, buf);
+	*copies = get_max_copies(nodes, nodes_nr);
+
+	return buf;
+}
+
+static int recover_object_from_replica(uint64_t oid,
+				       struct sheepdog_vnode_list_entry *entry,
+				       int epoch, int tgt_epoch)
 {
-	struct sheepdog_vnode_list_entry *e;
 	struct sd_obj_req hdr;
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
 	char name[128];
 	unsigned wlen = 0, rlen;
 	int fd, ret;
-	struct sheepdog_vnode_list_entry *old_entry, *cur_entry, *next_entry;
-	int next_nr, next_copies;
-	int tgt_idx = -1;
-	int old_idx;
-
-	old_entry = malloc(sizeof(*old_entry) * SD_MAX_VNODES);
-	cur_entry = malloc(sizeof(*cur_entry) * SD_MAX_VNODES);
-	next_entry = malloc(sizeof(*next_entry) * SD_MAX_VNODES);
-	if (!old_entry || !cur_entry || !next_entry) {
-		eprintf("failed to allocate memory\n");
-		goto err;
-	}
-
-	memcpy(old_entry, _old_entry, sizeof(*old_entry) * old_nr);
-	memcpy(cur_entry, _cur_entry, sizeof(*cur_entry) * cur_nr);
-next:
-	dprintf("recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch);
-	old_idx = obj_to_sheep(old_entry, old_nr, oid, 0);
+	void *buf;
 
-	tgt_idx = find_tgt_node(old_entry, old_nr, old_idx, old_copies,
-				cur_entry, cur_nr, cur_idx, cur_copies, copy_idx);
-	if (tgt_idx < 0) {
-		eprintf("cannot find target node %"PRIx64"\n", oid);
-		goto err;
+	buf = alloc_buffer_for(oid);
+	if (!buf) {
+		eprintf("out of memory\n");
+		return -1;
 	}
-	e = old_entry + tgt_idx;
 
-	if (is_myself(e->addr, e->port)) {
+	if (is_myself(entry->addr, entry->port)) {
 		struct siocb iocb = { 0 };
 
 		iocb.epoch = epoch;
 		ret = store.link(oid, &iocb, tgt_epoch);
-		if (ret == SD_RES_SUCCESS)
+		if (ret == SD_RES_SUCCESS) {
+			ret = 0;
+			goto done;
+		} else {
+			ret = -1;
 			goto out;
-
-		if (ret == SD_RES_NO_OBJ) {
-			next_nr = epoch_log_read(tgt_epoch - 1, buf, buf_len);
-			if (next_nr <= 0) {
-				eprintf("no previous epoch: %"PRIu32"\n", tgt_epoch - 1);
-				goto err;
-			}
-			next_nr /= sizeof(struct sheepdog_node_list_entry);
-			next_copies = get_max_copies((struct sheepdog_node_list_entry *)buf,
-						     next_nr);
-			next_nr = nodes_to_vnodes((struct sheepdog_node_list_entry *)buf,
-						  next_nr, next_entry);
-			goto not_found;
 		}
-
-		eprintf("Cannot recover from local store for %"PRIx64"\n", oid);
-		goto err;
 	}
 
-	addr_to_str(name, sizeof(name), e->addr, 0);
-
-	fd = connect_to(name, e->port);
+	addr_to_str(name, sizeof(name), entry->addr, 0);
+	fd = connect_to(name, entry->port);
 	if (fd < 0) {
-		eprintf("failed to connect to %s:%"PRIu32"\n", name, e->port);
-		goto err;
+		eprintf("failed to connect to %s:%"PRIu32"\n", name, entry->port);
+		ret = -1;
+		goto out;
 	}
-
 	if (is_vdi_obj(oid))
 		rlen = sizeof(struct sheepdog_inode);
 	else if (is_vdi_attr_obj(oid))
@@ -1234,8 +1234,9 @@ next:
 	close(fd);
 
 	if (ret != 0) {
-		eprintf("%"PRIu32"\n", rsp->result);
-		goto err;
+		eprintf("res: %"PRIx32"\n", rsp->result);
+		ret = -1;
+		goto out;
 	}
 
 	rsp = (struct sd_obj_rsp *)&hdr;
@@ -1245,20 +1246,22 @@ next:
 		int flags = O_DSYNC | O_RDWR | O_CREAT;
 
 		snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path,
-			 epoch, oid);
+				epoch, oid);
 		snprintf(tmp_path, sizeof(tmp_path), "%s%08u/%016" PRIx64 ".tmp",
-			 obj_path, epoch, oid);
+				obj_path, epoch, oid);
 
 		fd = open(tmp_path, flags, def_fmode);
 		if (fd < 0) {
 			eprintf("failed to open %s: %m\n", tmp_path);
-			goto err;
+			ret = -1;
+			goto out;
 		}
 
 		ret = write(fd, buf, rlen);
 		if (ret != rlen) {
 			eprintf("failed to write object\n");
-			goto err;
+			ret = -1;
+			goto out;
 		}
 
 		close(fd);
@@ -1267,137 +1270,162 @@ next:
 		ret = rename(tmp_path, path);
 		if (ret < 0) {
 			eprintf("failed to rename %s to %s: %m\n", tmp_path, path);
-			goto err;
+			ret = -1;
+			goto out;
 		}
-		dprintf("recovered oid %"PRIx64" to epoch %"PRIu32"\n", oid, epoch);
+	} else if (rsp->result == SD_RES_NEW_NODE_VER ||
+			rsp->result == SD_RES_OLD_NODE_VER ||
+			rsp->result == SD_RES_NETWORK_ERROR) {
+		dprintf("retrying: %"PRIx32", %"PRIx64"\n", rsp->result, oid);
+		ret = 1;
 		goto out;
-	}
-
-	if (rsp->result == SD_RES_NEW_NODE_VER || rsp->result == SD_RES_OLD_NODE_VER
-	    || rsp->result == SD_RES_NETWORK_ERROR) {
-		eprintf("retrying: %"PRIu32", %"PRIx64"\n", rsp->result, oid);
-		rw->retry = 1;
+	} else {
+		eprintf("failed, res: %"PRIx32"\n", rsp->result);
+		ret = -1;
 		goto out;
 	}
+done:
+	dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch);
+out:
+	free(buf);
+	return ret;
+}
 
-	if (rsp->result != SD_RES_NO_OBJ || rsp->data_length == 0) {
-		eprintf("%"PRIu32"\n", rsp->result);
-		goto err;
-	}
-	next_nr = rsp->data_length / sizeof(struct sheepdog_node_list_entry);
-	next_copies = get_max_copies((struct sheepdog_node_list_entry *)buf, next_nr);
-	next_nr = nodes_to_vnodes((struct sheepdog_node_list_entry *)buf,
-				  next_nr, next_entry);
+static void rollback_old_cur(struct sheepdog_vnode_list_entry *old, int *old_nr, int *old_copies,
+			     struct sheepdog_vnode_list_entry *cur, int *cur_nr, int *cur_copies,
+			     struct sheepdog_vnode_list_entry *new_old, int new_old_nr, int new_old_copies)
+{
+	int nr_old = *old_nr;
+	int copies_old = *old_copies;
+
+	memcpy(cur, old, sizeof(*old) * nr_old);
+	*cur_nr = nr_old;
+	*cur_copies = copies_old;
+	memcpy(old, new_old, sizeof(*new_old) * new_old_nr);
+	*old_nr = new_old_nr;
+	*old_copies = new_old_copies;
+}
 
-not_found:
-	for (copy_idx = 0; copy_idx < old_copies; copy_idx++)
-		if (get_nth_node(old_entry, old_nr, old_idx, copy_idx) == tgt_idx)
-			break;
-	if (copy_idx == old_copies) {
-		eprintf("bug: cannot find the proper copy_idx\n");
+/*
+ * Recover the object from its track in epoch history. That is,
+ * the routine will try to recovery it from the nodes it has stayed,
+ * at least, *theoretically* on consistent hash ring.
+ */
+static int do_recover_object(struct recovery_work *rw, int copy_idx)
+{
+	struct sheepdog_vnode_list_entry *old, *cur;
+	uint64_t oid = rw->oids[rw->done];
+	int old_nr = rw->old_nr_vnodes, cur_nr = rw->cur_nr_vnodes;
+	int epoch = rw->epoch, tgt_epoch = rw->epoch - 1;
+	struct sheepdog_vnode_list_entry *tgt_entry;
+	int old_idx, cur_idx, tgt_idx, old_copies, cur_copies, ret;
+
+	old = xmalloc(sizeof(*old) * SD_MAX_VNODES);
+	cur = xmalloc(sizeof(*cur) * SD_MAX_VNODES);
+	memcpy(old, rw->old_vnodes, sizeof(*old) * old_nr);
+	memcpy(cur, rw->cur_vnodes, sizeof(*cur) * cur_nr);
+	old_copies = get_max_copies(rw->old_nodes, rw->old_nr_nodes);
+	cur_copies = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes);
+
+again:
+	old_idx = obj_to_sheep(old, old_nr, oid, 0);
+	cur_idx = obj_to_sheep(cur, cur_nr, oid, 0);
+
+	dprintf("try recover object %"PRIx64" from epoch %"PRIu32"\n", oid, tgt_epoch);
+
+	tgt_idx = find_tgt_node(old, old_nr, old_idx, old_copies,
+			cur, cur_nr, cur_idx, cur_copies, copy_idx);
+	if (tgt_idx < 0) {
+		eprintf("cannot find target node %"PRIx64"\n", oid);
+		ret = -1;
 		goto err;
 	}
+	tgt_entry = old + tgt_idx;
 
-	dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32"\n", rsp->result, rsp->data_length, tgt_idx,
-		old_idx, old_nr, copy_idx);
-	memcpy(cur_entry, old_entry, sizeof(*old_entry) * old_nr);
-	cur_copies = old_copies;
-	cur_nr = old_nr;
-	cur_idx = old_idx;
+	ret = recover_object_from_replica(oid, tgt_entry, epoch, tgt_epoch);
+	if (ret < 0) {
+		struct sheepdog_vnode_list_entry *new_old;
+		int new_old_nr, new_old_copies;
 
-	memcpy(old_entry, next_entry, next_nr * sizeof(*next_entry));
-	old_copies = next_copies;
-	old_nr = next_nr;
+		tgt_epoch--;
+		if (tgt_epoch < 1) {
+			eprintf("can not recover oid %"PRIx64"\n", oid);
+			ret = -1;
+			goto err;
+		}
 
-	tgt_epoch--;
-	goto next;
-out:
-	free(old_entry);
-	free(cur_entry);
-	free(next_entry);
-	return 0;
+		new_old = get_vnodes_from_epoch(tgt_epoch, &new_old_nr, &new_old_copies);
+		if (!new_old) {
+			ret = -1;
+			goto err;
+		}
+		rollback_old_cur(old, &old_nr, &old_copies, cur, &cur_nr, &cur_copies,
+				new_old, new_old_nr, new_old_copies);
+		free(new_old);
+		goto again;
+	} else if (ret > 0) {
+		ret = 0;
+		rw->retry = 1;
+	}
 err:
-	free(old_entry);
-	free(cur_entry);
-	free(next_entry);
-	return -1;
+	free(old);
+	free(cur);
+	return ret;
+}
+
+static int get_replica_idx(struct recovery_work *rw, uint64_t oid, int *copy_nr)
+{
+	int i, ret = -1;
+	*copy_nr = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes);
+	for (i = 0; i < *copy_nr; i++) {
+		int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i);
+		if (is_myself(rw->cur_vnodes[n].addr, rw->cur_vnodes[n].port)) {
+			ret = i;
+			break;
+		}
+	}
+	return ret;
 }
 
-static void recover_one(struct work *work, int idx)
+static void recover_object(struct work *work, int idx)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work, work);
-	char *buf = NULL;
-	int ret;
 	uint64_t oid = rw->oids[rw->done];
-	int old_copies, cur_copies;
 	uint32_t epoch = rw->epoch;
-	int i, copy_idx = 0, cur_idx = -1;
-	struct siocb iocb;
+	int i, copy_idx, copy_nr, ret;
+	struct siocb iocb = { 0 };
+
+	if (!sys->nr_sobjs)
+		return;
 
-	eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid);
+	eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid);
 
-	memset(&iocb, 0, sizeof(iocb));
 	iocb.epoch = epoch;
 	ret = store.open(oid, &iocb, 0);
 	if (ret == SD_RES_SUCCESS) {
-		/* the object is already recovered */
 		store.close(oid, &iocb);
-		goto out;
+		dprintf("the object is already recovered\n");
+		return;
 	}
 
-	if (is_vdi_obj(oid))
-		buf = malloc(sizeof(struct sheepdog_inode));
-	else if (is_vdi_attr_obj(oid))
-		buf = malloc(SD_MAX_VDI_ATTR_VALUE_LEN);
-	else if (is_data_obj(oid))
-		buf = valloc(SD_DATA_OBJ_SIZE);
-	else
-		buf = malloc(SD_DATA_OBJ_SIZE);
-
-	if (!sys->nr_sobjs)
-		goto fail;
-
-	cur_idx = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, 0);
-
-	old_copies = get_max_copies(rw->old_nodes, rw->old_nr_nodes);
-	cur_copies = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes);
-
-	copy_idx = -1;
-	for (i = 0; i < cur_copies; i++) {
-		int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i);
-		if (is_myself(rw->cur_vnodes[n].addr, rw->cur_vnodes[n].port)) {
-			copy_idx = i;
-			break;
-		}
-	}
-	if (copy_idx < 0) {
-		eprintf("bug: copy_idx < 0\n");
-		goto out;
+	copy_idx = get_replica_idx(rw, oid, &copy_nr);
+	if (idx < 0) {
+		ret = -1;
+		goto err;
 	}
-
-	dprintf("%"PRIu32", %"PRIu32", %"PRIu32"\n", cur_idx, rw->cur_nr_nodes,
-		copy_idx);
-
-	ret = __recover_one(rw, rw->old_vnodes, rw->old_nr_vnodes, old_copies,
-			    rw->cur_vnodes, rw->cur_nr_vnodes, cur_copies,
-			    cur_idx, copy_idx, epoch, epoch - 1, oid,
-			    buf, SD_DATA_OBJ_SIZE);
-	if (ret == 0)
-		goto out;
-
-	for (i = 0; i < cur_copies; i++) {
-		if (i == copy_idx)
-			continue;
-		ret = __recover_one(rw, rw->old_vnodes, rw->old_nr_vnodes, old_copies,
-				    rw->cur_vnodes, rw->cur_nr_vnodes, cur_copies, cur_idx, i,
-				    epoch, epoch - 1, oid, buf, SD_DATA_OBJ_SIZE);
-		if (ret == 0)
-			goto out;
+	ret = do_recover_object(rw, copy_idx);
+	if (ret < 0) {
+		for (i = 0; i < copy_nr; i++) {
+			if (i == copy_idx)
+				continue;
+			ret = do_recover_object(rw, i);
+			if (ret == 0)
+				break;
+		}
 	}
-fail:
-	eprintf("failed to recover object %"PRIx64"\n", oid);
-out:
-	free(buf);
+err:
+	if (ret < 0)
+		eprintf("failed to recover object %"PRIx64"\n", oid);
 }
 
 static struct recovery_work *suspended_recovery_work;
@@ -1520,7 +1548,7 @@ static void recover_done(struct work *work, int idx)
 	}
 
 	if (rw->done < rw->count && !next_rw) {
-		rw->work.fn = recover_one;
+		rw->work.fn = recover_object;
 
 		if (is_access_to_busy_objects(oid)) {
 			suspended_recovery_work = rw;
-- 
1.7.8.rc3




More information about the sheepdog mailing list