[Sheepdog] [PATCH] collie: handle double node failure

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Mon Apr 5 20:32:26 CEST 2010


Currently, sheepdog cannot handle node failure when recovery is invoked,
so double node failure at once leads to system down.

This patch supports it.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/collie.h          |    2 +-
 collie/group.c           |   10 +-
 collie/store.c           |  684 +++++++++++++++++++++++++++++++---------------
 include/sheepdog_proto.h |   44 +++-
 4 files changed, 502 insertions(+), 238 deletions(-)

diff --git a/collie/collie.h b/collie/collie.h
index 3b17ee7..fac6809 100644
--- a/collie/collie.h
+++ b/collie/collie.h
@@ -122,7 +122,7 @@ int remove_epoch(int epoch);
 int set_cluster_ctime(uint64_t ctime);
 uint64_t get_cluster_ctime(void);
 
-int start_recovery(uint32_t epoch, int add);
+int start_recovery(uint32_t epoch);
 
 static inline int is_myself(struct sheepdog_node_list_entry *e)
 {
diff --git a/collie/group.c b/collie/group.c
index a2d7425..d4b5449 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -768,17 +768,15 @@ static void __sd_deliver(struct work *work, int idx)
 static void __sd_deliver_done(struct work *work, int idx)
 {
 	struct work_deliver *w = container_of(work, struct work_deliver, work);
-/* 	struct message_header *m = w->msg; */
-/* 	struct cluster_info *ci = w->ci; */
+	struct message_header *m = w->msg;
 
 	/*
 	 * FIXME: we want to recover only after all nodes are fully
 	 * synchronized
 	 */
 
-	/* disabled for now */
-/* 	if (m->done && m->op == SD_MSG_JOIN) */
-/* 		start_recovery(ci, ci->epoch, 1); */
+	if (m->done && m->op == SD_MSG_JOIN && sys->epoch >= 2)
+		start_recovery(sys->epoch);
 
 	free(w->msg);
 	free(w);
@@ -904,7 +902,7 @@ static void __sd_confch_done(struct work *work, int idx)
 	if (w->left_list_entries) {
 		if (w->left_list_entries > 1)
 			eprintf("we can't handle %Zd\n", w->left_list_entries);
-		start_recovery(sys->epoch, 0);
+		start_recovery(sys->epoch);
 	}
 
 	free(w->member_list);
diff --git a/collie/store.c b/collie/store.c
index 9b5584f..b35bbf5 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -76,23 +76,66 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch
 	return SD_RES_SUCCESS;
 }
 
-static int get_obj_list(struct request *req)
+static int is_obj_in_range(uint64_t oid, uint64_t start, uint64_t end)
+{
+	uint64_t hval = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+
+	if (start < end)
+		return (start < hval && hval <= end);
+	else
+		return (start < hval || hval <= end);
+}
+
+static int get_obj_list(struct request *req, char *buf, int buf_len)
 {
 	DIR *dir;
 	struct dirent *d;
-	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
-	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+	struct sd_list_req *hdr = (struct sd_list_req *)&req->rq;
+	struct sd_list_rsp *rsp = (struct sd_list_rsp *)&req->rp;
 	uint64_t oid;
-	uint64_t oid_hash;
-	uint64_t start_hash = hdr->oid;
-	uint64_t end_hash = hdr->cow_oid;
+	uint64_t start_hash = hdr->start;
+	uint64_t end_hash = hdr->end;
+	uint32_t epoch = hdr->tgt_epoch;
 	char path[1024];
 	uint64_t *p = (uint64_t *)req->data;
 	int nr = 0;
+	uint64_t *objlist = NULL;
+	int obj_nr = 0, fd, i;
+	struct sheepdog_node_list_entry *e;
+	int e_nr;
+	int idx;
+	int res = SD_RES_SUCCESS;
+
+	if (epoch == 1)
+		goto local;
+
+	snprintf(path, sizeof(path), "%s%08u/list", obj_path, epoch);
+
+	fd = open(path, O_RDONLY);
+	if (fd < 0) {
+		eprintf("failed to open %s, %s\n", path, strerror(errno));
+		close(fd);
+		return SD_RES_EIO;
+	}
+	obj_nr = read(fd, buf, buf_len);
+	obj_nr /= sizeof(uint64_t);
+	objlist = (uint64_t *)buf;
+	for (i = 0; i < obj_nr; i++) {
+		if (is_obj_in_range(objlist[i], start_hash, end_hash)) {
+			dprintf("%u, %016lx, %016lx %016lx\n", epoch,
+				objlist[i], start_hash, end_hash);
+			p[nr++] = objlist[i];
+		}
 
-	snprintf(path, sizeof(path), "%s%08u/", obj_path, hdr->obj_ver);
+		if (nr * sizeof(uint64_t) >= hdr->data_length)
+			break;
+	}
+	close(fd);
 
-	dprintf("%d\n", sys->this_node.port);
+local:
+	snprintf(path, sizeof(path), "%s%08u/", obj_path, hdr->tgt_epoch);
+
+	dprintf("%d, %s\n", sys->this_node.port, path);
 
 	dir = opendir(path);
 	if (!dir) {
@@ -101,38 +144,66 @@ static int get_obj_list(struct request *req)
 	}
 
 	while ((d = readdir(dir))) {
-		int got = 0;
-
 		if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
 			continue;
 
 		oid = strtoull(d->d_name, NULL, 16);
-		oid_hash = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+		if (oid == 0)
+			continue;
+
+		for (i = 0; i < obj_nr; i++)
+			if (objlist[i] == oid)
+				break;
+		if (i < obj_nr)
+			continue;
 
-		if ((nr + 1) * sizeof(uint64_t) > hdr->data_length)
+		if (is_obj_in_range(oid, start_hash, end_hash)) {
+			dprintf("%u, %016lx, %016lx %016lx\n", epoch,
+				oid, start_hash, end_hash);
+			p[nr++] = oid;
+		}
+
+		if (nr * sizeof(uint64_t) >= hdr->data_length)
 			break;
+	}
 
-		if (start_hash < end_hash) {
-			if (oid_hash >= start_hash && oid_hash < end_hash)
-				got = 1;
-		} else
-			if (end_hash <= oid_hash || oid_hash < start_hash)
-				got = 1;
+	eprintf("nr = %d\n", nr);
+	rsp->data_length = nr * sizeof(uint64_t);
 
-		dprintf("%d, %u, %016lx, %016lx, %016lx %016lx\n", got, hdr->obj_ver,
-			oid, oid_hash, start_hash, end_hash);
+	e_nr = epoch_log_read(epoch, buf, buf_len);
+	e_nr /= sizeof(*e);
+	e = (struct sheepdog_node_list_entry *)buf;
 
-		if (got) {
-			*(p + nr) = oid;
-			nr++;
-		}
+	if (e_nr <= sys->nr_sobjs) {
+		rsp->next = end_hash;
+		goto out;
 	}
 
-	rsp->data_length = nr * 8;
+	for (idx = 0; idx < e_nr; idx++) {
+		if (e[idx].id == sys->this_node.id)
+			break;
+	}
+	if (idx != e_nr) {
+		uint64_t hval = e[idx % e_nr].id;
 
+		rsp->next = end_hash;
+
+		if (start_hash < end_hash) {
+			if (start_hash < hval && hval <= end_hash)
+				rsp->next = hval;
+		} else
+			if (start_hash < hval || hval <= end_hash)
+				rsp->next = hval;
+
+		dprintf("%u, %016lx, %016lx %016lx\n", epoch, hval,
+			start_hash, end_hash);
+	} else
+		res = SD_RES_SYSTEM_ERROR;
+
+out:
 	closedir(dir);
 
-	return SD_RES_SUCCESS;
+	return res;
 }
 
 static int read_from_one(uint64_t oid,
@@ -225,6 +296,7 @@ static int forward_obj_req(struct request *req, char *buf)
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr2;
 	uint64_t oid = hdr->oid;
 	int copies;
+	uint32_t epoch;
 
 	e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
 again:
@@ -243,7 +315,11 @@ again:
 
 		/* TODO: we can do better; we need to chech this first */
 		if (is_myself(&e[n])) {
-			ret = store_queue_request_local(req, buf, sys->epoch);
+			if (hdr->flags & SD_FLAG_CMD_RECOVERY)
+				epoch = hdr->tgt_epoch;
+			else
+				epoch = sys->epoch;
+			ret = store_queue_request_local(req, buf, epoch);
 			memcpy(rsp, &req->rp, sizeof(*rsp));
 			rsp->result = ret;
 			goto done;
@@ -334,63 +410,13 @@ static int ob_open(uint32_t epoch, uint64_t oid, int aflags, int *ret)
 	return fd;
 }
 
-static int is_my_obj(uint64_t oid, int copies)
-{
-	int i, n, nr;
-	struct sheepdog_node_list_entry e[SD_MAX_NODES];
-
-	nr = build_node_list(&sys->sd_node_list, e);
-
-	for (i = 0; i < copies; i++) {
-		n = obj_to_sheep(e, nr, oid, i);
-		if (is_myself(&e[n]))
-			return 1;
-	}
-
-	return 0;
-}
-
 int update_epoch_store(uint32_t epoch)
 {
-	int ret;
-	char new[1024], old[1024];
-	struct stat s;
-	DIR *dir;
-	struct dirent *d;
-	uint64_t oid;
+	char new[1024];
 
 	snprintf(new, sizeof(new), "%s%08u/", obj_path, epoch);
 	mkdir(new, def_dmode);
 
-	snprintf(old, sizeof(old), "%s%08u/", obj_path, epoch - 1);
-
-	ret = stat(old, &s);
-	if (ret)
-		return 0;
-
-	dir = opendir(old);
-	if (!dir) {
-		eprintf("%s, %s, %m\n", old, new);
-		return 1;
-	}
-
-	while ((d = readdir(dir))) {
-		if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
-			continue;
-
-		oid = strtoull(d->d_name, NULL, 16);
-		/* TODO: use proper object coipes */
-		if (is_my_obj(oid, sys->nr_sobjs)) {
-			snprintf(new, sizeof(new), "%s%08u/%s", obj_path, epoch,
-				d->d_name);
-			snprintf(old, sizeof(old), "%s%08u/%s", obj_path, epoch - 1,
-				d->d_name);
-			link(old, new);
-		}
-	}
-
-	closedir(dir);
-
 	return 0;
 }
 
@@ -517,6 +543,13 @@ out:
 	if (fd != -1)
 		close(fd);
 
+	if (ret == SD_RES_NO_OBJ && hdr->flags & SD_FLAG_CMD_RECOVERY) {
+		int len  = epoch_log_read(epoch - 1, req->data, hdr->data_length);
+		if (len < 0)
+			len = 0;
+		rsp->data_length = len;
+	}
+
 	return ret;
 }
 
@@ -547,13 +580,16 @@ void store_queue_request(struct work *work, int idx)
 			goto out;
 	}
 
+	if (hdr->flags & SD_FLAG_CMD_RECOVERY)
+		epoch = hdr->tgt_epoch;
+
 	if (opcode == SD_OP_STAT_SHEEP) {
 		ret = stat_sheep(&nrsp->store_size, &nrsp->store_free, epoch);
 		goto out;
 	}
 
 	if (opcode == SD_OP_GET_OBJ_LIST) {
-		ret = get_obj_list(req);
+		ret = get_obj_list(req, buf, SD_DATA_OBJ_SIZE);
 		goto out;
 	}
 
@@ -731,19 +767,26 @@ static int node_distance(int my, int her, int nr)
 	return (my + nr - her) % nr;
 }
 
-static int node_from_distance(int my, int dist, int nr)
+static int contains_node(uint64_t id, struct sheepdog_node_list_entry *entry,
+			 int nr, int base_idx)
 {
-	return (my + nr - dist) % nr;
+	int i;
+
+	for (i = 0; i < sys->nr_sobjs; i++) {
+		if (entry[(base_idx + i) % nr].id == id)
+			return (base_idx + i) % nr;
+	}
+	return -1;
 }
 
 struct recovery_work {
 	uint32_t epoch;
 	uint32_t done;
 
-	uint32_t iteration;
-
 	struct sheepdog_node_list_entry e;
 
+	struct timer timer;
+	int retry;
 	struct work work;
 	struct list_head rw_siblings;
 
@@ -754,84 +797,272 @@ struct recovery_work {
 static LIST_HEAD(recovery_work_list);
 static int recovering;
 
-static void recover_one(struct work *work, int idx)
+static int find_tgt_node(struct sheepdog_node_list_entry *old_entry, int old_nr, int old_idx,
+			 struct sheepdog_node_list_entry *cur_entry, int cur_nr, int cur_idx,
+			 int copy_idx)
 {
-	struct recovery_work *rw = container_of(work, struct recovery_work, work);
-	struct sheepdog_node_list_entry *e = &rw->e;
+	int i, idx;
+
+	dprintf("%d, %d, %d, %d, %d\n", old_idx, old_nr, cur_idx, cur_nr, copy_idx);
+
+	if (copy_idx < cur_nr) {
+		idx = contains_node(cur_entry[(cur_idx + copy_idx) % cur_nr].id,
+				    old_entry, old_nr, old_idx);
+		if (idx >= 0) {
+			dprintf("%d, %d, %d, %d\n", idx, copy_idx, cur_idx, cur_nr);
+			return idx;
+		}
+	}
+
+	for (i = 0; ; i++) {
+		if (i < cur_nr) {
+			idx = contains_node(cur_entry[(cur_idx + i) % cur_nr].id,
+					    old_entry, old_nr, old_idx);
+			if (idx >= 0)
+				continue;
+
+			while (contains_node(old_entry[old_idx].id, cur_entry, cur_nr, cur_idx) >= 0)
+				old_idx = (old_idx + 1) % old_nr;
+
+		}
+		if (i == copy_idx) {
+			dprintf("%d, %d, %d, %d\n", old_idx, copy_idx, cur_idx, cur_nr);
+			return old_idx;
+		}
+
+		old_idx = (old_idx + 1) % old_nr;
+	}
+	return -1;
+}
+
+static int __recover_one(struct recovery_work *rw,
+			 struct sheepdog_node_list_entry *_old_entry, int old_nr,
+			 struct sheepdog_node_list_entry *_cur_entry, int cur_nr, int cur_idx,
+			 int copy_idx, uint32_t epoch, uint32_t tgt_epoch,
+			 uint64_t oid, char *buf, int buf_len)
+{
+	struct sheepdog_node_list_entry *e;
 	struct sd_obj_req hdr;
-	struct sd_obj_rsp *rsp;
+	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
 	char name[128];
-	char *buf = zero_block + idx * SD_DATA_OBJ_SIZE;
 	unsigned wlen = 0, rlen = SD_DATA_OBJ_SIZE;
 	int fd, ret;
-	uint64_t oid = *(((uint64_t *)rw->buf) + rw->done);
+	struct sheepdog_node_list_entry old_entry[SD_MAX_NODES],
+		cur_entry[SD_MAX_NODES], *next_entry;
+	int next_nr;
+	int tgt_idx = -1;
+	int old_idx;
+
+	memcpy(old_entry, _old_entry, sizeof(*old_entry) * old_nr);
+	memcpy(cur_entry, _cur_entry, sizeof(*cur_entry) * cur_nr);
+next:
+	dprintf("recover obj %lx from epoch %d\n", oid, tgt_epoch);
+	old_idx = obj_to_sheep(old_entry, old_nr, oid, 0);
+
+	tgt_idx = find_tgt_node(old_entry, old_nr, old_idx, cur_entry, cur_nr, cur_idx, copy_idx);
+	if (tgt_idx < 0) {
+		eprintf("cannot find target node, %lx\n", oid);
+		return -1;
+	}
+	e = old_entry + tgt_idx;
 
-	eprintf("%d %d, %16lx\n", rw->done, rw->count, oid);
+	if (e->id == sys->this_node.id) {
+		char old[PATH_MAX], new[PATH_MAX];
+
+		snprintf(old, sizeof(old), "%s%08u/%016" PRIx64, obj_path,
+			 tgt_epoch, oid);
+		snprintf(new, sizeof(new), "%s%08u/%016" PRIx64, obj_path,
+			 epoch, oid);
+		dprintf("link from %s to %s\n", old, new);
+		if (link(old, new) == 0)
+			return 0;
+
+		if (errno == ENOENT) {
+			next_nr = epoch_log_read(tgt_epoch, buf, buf_len);
+			if (next_nr <= 0) {
+				eprintf("no previous epoch, %d\n", tgt_epoch);
+				return -1;
+			}
+			next_entry = (struct sheepdog_node_list_entry *)buf;
+			next_nr /= sizeof(*next_entry);
+			goto not_found;
+		}
+
+		eprintf("cannot recover from local, %s, %s\n", old, new);
+		return -1;
+	}
 
 	addr_to_str(name, sizeof(name), e->addr, 0);
 
 	fd = connect_to(name, e->port);
 	if (fd < 0) {
-		eprintf("%s %d\n", name, e->port);
-		return;
+		eprintf("failed to connect to %s:%d\n", name, e->port);
+		return -1;
 	}
 
 	memset(&hdr, 0, sizeof(hdr));
 	hdr.opcode = SD_OP_READ_OBJ;
 	hdr.oid = oid;
 	hdr.epoch = sys->epoch;
-	hdr.flags = 0;
+	hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_FORWARD;
+	hdr.tgt_epoch = tgt_epoch;
 	hdr.data_length = rlen;
 
 	ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
 
 	close(fd);
 
+	if (ret < 0) {
+		eprintf("%d\n", rsp->result);
+		return -1;
+	}
+
 	rsp = (struct sd_obj_rsp *)&hdr;
 
-	if (rsp->result != SD_RES_SUCCESS) {
+	if (rsp->result == SD_RES_SUCCESS) {
+		fd = ob_open(epoch, oid, O_CREAT, &ret);
+		ret = write(fd, buf, SD_DATA_OBJ_SIZE);
+		if (ret != SD_DATA_OBJ_SIZE) {
+			eprintf("failed to write object\n");
+			return -1;
+		}
+
+		ret = fsetxattr(fd, ANAME_COPIES, &rsp->copies,
+				sizeof(rsp->copies), 0);
+		if (ret) {
+			eprintf("couldn't set xattr\n");
+			return -1;
+		}
+
+		close(fd);
+		dprintf("recovered oid %lx to epoch %d\n", oid, epoch);
+		return 0;
+	}
+
+	if (rsp->result == SD_RES_NEW_NODE_VER || rsp->result == SD_RES_OLD_NODE_VER) {
+		eprintf("try again, %d, %lx\n", rsp->result, oid);
+		rw->retry = 1;
+		return 0;
+	}
+
+	if (rsp->result != SD_RES_NO_OBJ || rsp->data_length == 0) {
 		eprintf("%d\n", rsp->result);
+		return -1;
+	}
+	next_entry = (struct sheepdog_node_list_entry *)buf;
+	next_nr = rsp->data_length / sizeof(*old_entry);
+
+not_found:
+	copy_idx = node_distance(tgt_idx, old_idx, old_nr);
+	dprintf("%d, %d, %d, %d, %d, %d\n", rsp->result, rsp->data_length, tgt_idx,
+		old_idx, old_nr, copy_idx);
+
+	memcpy(cur_entry, old_entry, sizeof(*old_entry) * old_nr);
+	cur_nr = old_nr;
+	cur_idx = old_idx;
+
+	memcpy(old_entry, next_entry, next_nr * sizeof(*next_entry));
+	old_nr = next_nr;
+
+	tgt_epoch--;
+	goto next;
+}
+
+static void recover_one(struct work *work, int idx)
+{
+	struct recovery_work *rw = container_of(work, struct recovery_work, work);
+	char *buf = zero_block + idx * SD_DATA_OBJ_SIZE;
+	int ret;
+	uint64_t oid = *(((uint64_t *)rw->buf) + rw->done);
+	struct sheepdog_node_list_entry old_entry[SD_MAX_NODES],
+		cur_entry[SD_MAX_NODES];
+	int old_nr, cur_nr;
+	uint32_t epoch = rw->epoch;
+	int i, my_idx = -1, copy_idx, cur_idx = -1;
+
+	eprintf("%d %d, %16lx\n", rw->done, rw->count, oid);
+
+	cur_nr = epoch_log_read(epoch, (char *)cur_entry, sizeof(cur_entry));
+	if (cur_nr <= 0) {
+		eprintf("failed to read current epoch, %d\n", epoch);
 		return;
 	}
+	cur_nr /= sizeof(struct sheepdog_node_list_entry);
+
+	old_nr = epoch_log_read(epoch - 1, (char *)old_entry, sizeof(old_entry));
+	if (old_nr <= 0) {
+		eprintf("failed to read previous epoch, %d\n", epoch - 1);
+		goto fail;
+	}
+	old_nr /= sizeof(struct sheepdog_node_list_entry);
 
-	fd = ob_open(rw->epoch, oid, O_CREAT, &ret);
-	write(fd, buf, SD_DATA_OBJ_SIZE);
+	if (!sys->nr_sobjs)
+		goto fail;
+
+	cur_idx = obj_to_sheep(cur_entry, cur_nr, oid, 0);
+
+	for (i = 0; i < cur_nr; i++) {
+		if (cur_entry[i].id == sys->this_node.id) {
+			my_idx = i;
+			break;
+		}
+	}
+	copy_idx = node_distance(my_idx, cur_idx, cur_nr);
+	dprintf("%d, %d, %d, %d\n", my_idx, cur_idx, cur_nr, copy_idx);
+
+	ret = __recover_one(rw, old_entry, old_nr, cur_entry, cur_nr, cur_idx,
+			    copy_idx, epoch, epoch - 1, oid, buf, SD_DATA_OBJ_SIZE);
+	if (ret == 0)
+		return;
+
+	for (i = 0; i < sys->nr_sobjs; i++) {
+		if (i == copy_idx)
+			continue;
+		ret = __recover_one(rw, old_entry, old_nr,
+				    cur_entry, cur_nr, cur_idx, i,
+				    epoch, epoch - 1, oid, buf, SD_DATA_OBJ_SIZE);
+		if (ret == 0)
+			return;
+	}
+fail:
+	eprintf("failed to recover object %lx\n", oid);
 }
 
 static void __start_recovery(struct work *work, int idx);
 static void __start_recovery_done(struct work *work, int idx);
 
+static void recover_one_timer(void *data)
+{
+	struct recovery_work *rw = (struct recovery_work *)data;
+	queue_work(dobj_queue, &rw->work);
+}
+
 static void recover_one_done(struct work *work, int idx)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work, work);
 
-	if (rw->done < rw->count) {
-		rw->done++;
-		queue_work(dobj_queue, &rw->work);
+	if (rw->retry) {
+		rw->retry = 0;
+
+		rw->timer.callback = recover_one_timer;
+		rw->timer.data = rw;
+		add_timer(&rw->timer, 2);
 		return;
 	}
 
-	if (rw->iteration) {
-		if (++rw->iteration <= sys->nr_sobjs) {
-			free(rw->buf);
+	rw->done++;
 
-			rw->done = 0;
-
-			rw->work.fn = __start_recovery;
-			rw->work.done = __start_recovery_done;
-
-			queue_work(dobj_queue, &rw->work);
-
-			return;
-		}
+	if (rw->done < rw->count && rw->rw_siblings.next == &recovery_work_list) {
+		queue_work(dobj_queue, &rw->work);
+		return;
 	}
 
+	dprintf("recovery done, %d\n", rw->epoch);
 	recovering--;
 
 	list_del(&rw->rw_siblings);
 
-	if (rw->buf)
-		free(rw->buf);
+	free(rw->buf);
 	free(rw);
 
 	if (!list_empty(&recovery_work_list)) {
@@ -843,16 +1074,16 @@ static void recover_one_done(struct work *work, int idx)
 	}
 }
 
-static int fill_obj_list(struct recovery_work *rw,
-			 struct sheepdog_node_list_entry *e,
-			 uint64_t start_hash, uint64_t end_hash)
+static int __fill_obj_list(struct recovery_work *rw,
+			   struct sheepdog_node_list_entry *e,
+			   uint64_t start_hash, uint64_t end_hash, uint64_t *done_hash)
 {
 	int fd, ret;
 	uint32_t epoch = rw->epoch;
 	unsigned wlen, rlen;
 	char name[128];
-	struct sd_obj_req hdr;
-	struct sd_obj_rsp *rsp;
+	struct sd_list_req hdr;
+	struct sd_list_rsp *rsp;
 
 	addr_to_str(name, sizeof(name), e->addr, 0);
 
@@ -870,34 +1101,66 @@ static int fill_obj_list(struct recovery_work *rw,
 	memset(&hdr, 0, sizeof(hdr));
 	hdr.opcode = SD_OP_GET_OBJ_LIST;
 	hdr.epoch = sys->epoch;
-	hdr.oid = start_hash;
-	hdr.cow_oid = end_hash;
-	hdr.obj_ver = epoch - 1;
+	hdr.start = start_hash;
+	hdr.end = end_hash;
+	hdr.tgt_epoch = epoch - 1;
 	hdr.flags = 0;
 	hdr.data_length = rlen;
 
-	dprintf("%016lx, %016lx\n", hdr.oid, hdr.cow_oid);
+	dprintf("%016lx, %016lx\n", hdr.start, hdr.end);
 
-	rw->buf = malloc(rlen);
 	memcpy(&rw->e, e, sizeof(rw->e));
 
-	ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf, &wlen, &rlen);
+	ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf + rw->count * sizeof(uint64_t), &wlen, &rlen);
 
 	close(fd);
 
-	rsp = (struct sd_obj_rsp *)&hdr;
+	rsp = (struct sd_list_rsp *)&hdr;
 
 	if (rsp->result != SD_RES_SUCCESS) {
-		eprintf("%d\n", rsp->result);
-		return -1;
+		rw->retry = 1;
+		*done_hash = end_hash;
+		eprintf("try again, %d\n", rsp->result);
+		return 0;
 	}
 
 	dprintf("%d\n", rsp->data_length);
 
 	if (rsp->data_length)
-		rw->count = rsp->data_length / sizeof(uint64_t);
-	else
-		rw->count = 0;
+		rw->count += rsp->data_length / sizeof(uint64_t);
+
+	*done_hash = rsp->next;
+
+	return 0;
+}
+
+static int fill_obj_list(struct recovery_work *rw,
+			 struct sheepdog_node_list_entry *old_entry, int old_nr,
+			 struct sheepdog_node_list_entry *cur_entry, int cur_nr,
+			 uint64_t start_hval, uint64_t end_hval)
+{
+	int i, idx, old_idx, cur_idx;
+	uint64_t hval, done_hval = end_hval;
+
+	hval = start_hval;
+again:
+	old_idx = hval_to_sheep(old_entry, old_nr, hval + 1, 0);
+	cur_idx = hval_to_sheep(cur_entry, cur_nr, hval + 1, 0);
+
+	for (i = 0; i < sys->nr_sobjs; i++) {
+		idx = find_tgt_node(old_entry, old_nr, old_idx, cur_entry, cur_nr, cur_idx, i);
+		dprintf("%d, %d\n", idx, i);
+		if (__fill_obj_list(rw, old_entry + idx, hval, end_hval, &done_hval) == 0)
+			break;
+	}
+	if (i == sys->nr_sobjs)
+		return -1;
+
+	if (done_hval != end_hval) {
+		dprintf("%lx, %lx\n", done_hval, end_hval);
+		hval = done_hval;
+		goto again;
+	}
 
 	return 0;
 }
@@ -909,153 +1172,122 @@ static void __start_recovery(struct work *work, int idx)
 	struct sheepdog_node_list_entry old_entry[SD_MAX_NODES],
 		cur_entry[SD_MAX_NODES];
 	int old_nr, cur_nr;
-	int my_idx = -1, ch_idx = -1;
-	int i, j, n;
+	int my_idx = -1;
+	int i, fd;
 	uint64_t start_hash, end_hash;
+	char path[PATH_MAX];
 
 	dprintf("%u\n", epoch);
 
 	cur_nr = epoch_log_read(epoch, (char *)cur_entry, sizeof(cur_entry));
-	if (cur_nr <= 0)
+	if (cur_nr <= 0) {
+		eprintf("failed to read epoch log, %d\n", epoch);
 		goto fail;
+	}
 	cur_nr /= sizeof(struct sheepdog_node_list_entry);
 
 	old_nr = epoch_log_read(epoch - 1, (char *)old_entry, sizeof(old_entry));
-	if (old_nr <= 0)
+	if (old_nr <= 0) {
+		eprintf("failed to read epoch log, %d\n", epoch - 1);
 		goto fail;
+	}
 	old_nr /= sizeof(struct sheepdog_node_list_entry);
 
-	if (!sys->nr_sobjs || cur_nr < sys->nr_sobjs || old_nr < sys->nr_sobjs)
+	if (!sys->nr_sobjs)
 		goto fail;
 
-	if (cur_nr < old_nr) {
-		for (i = 0; i < old_nr; i++) {
-			if (is_myself(&old_entry[i])) {
-				my_idx = i;
-				break;
-			}
-		}
-
-		dprintf("%u %u %u, %d\n", cur_nr, old_nr, epoch, my_idx);
-
-		for (i = 0; i < old_nr; i++) {
-			for (j = 0; j < cur_nr; j++) {
-				if (old_entry[i].id == cur_entry[j].id)
-					break;
-			}
-
-			if (j == cur_nr)
-				ch_idx = i;
-		}
-
-		dprintf("%u %u %u\n", my_idx, ch_idx,
-			node_distance(my_idx, ch_idx, old_nr));
-
-		if (node_distance(my_idx, ch_idx, old_nr) > sys->nr_sobjs)
-			return;
-
-		n = node_from_distance(my_idx, sys->nr_sobjs, old_nr);
-
-		dprintf("%d %d\n", n, sys->nr_sobjs);
-
-		start_hash = old_entry[(n - 1 + old_nr) % old_nr].id;
-		end_hash = old_entry[n].id;
-
-		/* FIXME */
-		if (node_distance(my_idx, ch_idx, old_nr) == sys->nr_sobjs) {
-			n++;
-			n %= old_nr;
-		}
-
-		fill_obj_list(rw, old_entry + n, start_hash, end_hash);
-	} else {
-		for (i = 0; i < cur_nr; i++) {
-			if (is_myself(&cur_entry[i])) {
-				my_idx = i;
-				break;
-			}
+	for (i = 0; i < cur_nr; i++) {
+		if (cur_entry[i].id == sys->this_node.id) {
+			my_idx = i;
+			break;
 		}
+	}
+	start_hash = cur_entry[(my_idx - sys->nr_sobjs + cur_nr) % cur_nr].id;
+	end_hash = cur_entry[my_idx].id;
 
-		dprintf("%u %u %u, %d\n", cur_nr, old_nr, epoch, my_idx);
-
-		if (my_idx == -1)
-			return;
-
-		n = node_from_distance(my_idx, rw->iteration, cur_nr);
-		start_hash = cur_entry[n].id;
-		end_hash = cur_entry[(n + 1 + cur_nr) % cur_nr].id;
-
-		if (rw->iteration == 1)
-			n = (my_idx + 1 + cur_nr) % cur_nr;
-		else
-			n = (n + 1 + cur_nr) % cur_nr;
-
-		dprintf("%u %u %u\n", my_idx, n, rw->iteration);
+	dprintf("fill obj list (from 0x%lx to 0x%lx)\n", start_hash, end_hash);
+	if (fill_obj_list(rw, old_entry, old_nr, cur_entry, cur_nr,
+			  start_hash, end_hash) != 0) {
+		eprintf("fatal recovery error\n");
+		goto fail;
+	}
 
-		start_hash = cur_entry[n].id;
-		end_hash = cur_entry[(n - 1 + cur_nr) % cur_nr].id;
+	snprintf(path, sizeof(path), "%s%08u/list", obj_path, epoch);
+	dprintf("write object list file to %s\n", path);
 
-		fill_obj_list(rw, cur_entry + n, start_hash, end_hash);
+	fd = open(path, O_RDWR | O_CREAT, def_fmode);
+	if (fd < 0) {
+		eprintf("failed to open %s, %s\n", path, strerror(errno));
+		goto fail;
 	}
+	write(fd, rw->buf, sizeof(uint64_t) * rw->count);
+	close(fd);
 
 	return;
-
 fail:
 	rw->count = 0;
-	rw->iteration = 0;
 	return;
 }
 
+static void start_recovery_timer(void *data)
+{
+	struct recovery_work *rw = (struct recovery_work *)data;
+	queue_work(dobj_queue, &rw->work);
+}
+
 static void __start_recovery_done(struct work *work, int idx)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work, work);
 
-	if (!rw->count) {
-		if (rw->iteration) {
-			if (++rw->iteration <= sys->nr_sobjs) {
-				free(rw->buf);
+	if (rw->retry) {
+		rw->retry = 0;
 
-				rw->work.fn = __start_recovery;
-				rw->work.done = __start_recovery_done;
+		rw->timer.callback = start_recovery_timer;
+		rw->timer.data = rw;
+		add_timer(&rw->timer, 1);
+		return;
+	}
 
-				queue_work(dobj_queue, &rw->work);
+	if (rw->count && rw->rw_siblings.next == &recovery_work_list) {
+		rw->work.fn = recover_one;
+		rw->work.done = recover_one_done;
 
-				return;
-			}
-		}
+		/* TODO: we should avoid races with qemu I/Os */
+		/* rw->work.attr = WORK_ORDERED; */
 
-		free(rw->buf);
-		free(rw);
+		queue_work(dobj_queue, &rw->work);
 		return;
 	}
 
-	rw->work.fn = recover_one;
-	rw->work.done = recover_one_done;
+	dprintf("recovery done, %d\n", rw->epoch);
+	recovering--;
 
-	/* TODO: we should avoid races with qemu I/Os */
-/* 	rw->work.attr = WORK_ORDERED; */
+	list_del(&rw->rw_siblings);
 
-	queue_work(dobj_queue, &rw->work);
+	free(rw->buf);
+	free(rw);
+
+	if (!list_empty(&recovery_work_list)) {
+		rw = list_first_entry(&recovery_work_list,
+				      struct recovery_work, rw_siblings);
+
+		recovering++;
+		queue_work(dobj_queue, &rw->work);
+	}
 }
 
-int start_recovery(uint32_t epoch, int add)
+int start_recovery(uint32_t epoch)
 {
 	struct recovery_work *rw;
 
-	/* disable for now */
-	if (add)
-		return 0;
-
 	rw = zalloc(sizeof(struct recovery_work));
 	if (!rw)
 		return -1;
 
+	rw->buf = malloc(1 << 20); /* FIXME */
 	rw->epoch = epoch;
 	rw->count = 0;
 
-	if (add)
-		rw->iteration = 1;
-
 	rw->work.fn = __start_recovery;
 	rw->work.done = __start_recovery_done;
 
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 24f06ae..a41eb50 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -52,6 +52,7 @@
 #define SD_FLAG_CMD_WRITE    0x01
 #define SD_FLAG_CMD_COW      0x02
 #define SD_FLAG_CMD_FORWARD  0x04
+#define SD_FLAG_CMD_RECOVERY 0x08
 
 #define SD_STATUS_OK            0x00
 #define SD_STATUS_STARTUP       0x01
@@ -169,7 +170,7 @@ struct sd_obj_req {
 	uint64_t        oid;
 	uint64_t        cow_oid;
 	uint32_t        copies;
-	uint32_t        obj_ver;
+	uint32_t        tgt_epoch;
 	uint64_t        offset;
 };
 
@@ -186,6 +187,31 @@ struct sd_obj_rsp {
 	uint32_t        pad[5];
 };
 
+struct sd_list_req {
+	uint8_t		proto_ver;
+	uint8_t		opcode;
+	uint16_t	flags;
+	uint32_t	epoch;
+	uint32_t        id;
+	uint32_t        data_length;
+	uint64_t        start;
+	uint64_t        end;
+	uint32_t        tgt_epoch;
+	uint32_t        pad[3];
+};
+
+struct sd_list_rsp {
+	uint8_t		proto_ver;
+	uint8_t		opcode;
+	uint16_t	flags;
+	uint32_t	epoch;
+	uint32_t        id;
+	uint32_t        data_length;
+	uint32_t        result;
+	uint64_t        next;
+	uint32_t        pad[5];
+};
+
 struct sd_vdi_req {
 	uint8_t		proto_ver;
 	uint8_t		opcode;
@@ -281,17 +307,17 @@ static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 	return hval;
 }
 
-static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries,
-			       int nr_entries, uint64_t oid, int idx)
+static inline int hval_to_sheep(struct sheepdog_node_list_entry *entries,
+				int nr_entries, uint64_t id, int idx)
 {
-	uint64_t id;
 	int i;
 	struct sheepdog_node_list_entry *e = entries, *n;
 
-	id = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+	printf("%lx\n", id);
 
 	for (i = 0; i < nr_entries - 1; i++, e++) {
 		n = e + 1;
+		printf("%d, %lx, %lx, %lx\n", i, e->id, n->id, id);
 		if (id > e->id && id <= n->id)
 			break;
 	}
@@ -299,6 +325,14 @@ static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries,
 	return (i + 1 + idx) % nr_entries;
 }
 
+static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries,
+			       int nr_entries, uint64_t oid, int idx)
+{
+	uint64_t id = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+
+	return hval_to_sheep(entries, nr_entries, id, idx);
+}
+
 static inline void print_node_list_entry(struct sheepdog_node_list_entry *e,
 					 char *str, size_t size)
 {
-- 
1.5.6.5




More information about the sheepdog mailing list