[Sheepdog] [PATCH] collie: handle double node failure
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Mon Apr 5 20:32:26 CEST 2010
Currently, sheepdog cannot handle node failure when recovery is invoked,
so double node failure at once leads to system down.
This patch supports it.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
collie/collie.h | 2 +-
collie/group.c | 10 +-
collie/store.c | 684 +++++++++++++++++++++++++++++++---------------
include/sheepdog_proto.h | 44 +++-
4 files changed, 502 insertions(+), 238 deletions(-)
diff --git a/collie/collie.h b/collie/collie.h
index 3b17ee7..fac6809 100644
--- a/collie/collie.h
+++ b/collie/collie.h
@@ -122,7 +122,7 @@ int remove_epoch(int epoch);
int set_cluster_ctime(uint64_t ctime);
uint64_t get_cluster_ctime(void);
-int start_recovery(uint32_t epoch, int add);
+int start_recovery(uint32_t epoch);
static inline int is_myself(struct sheepdog_node_list_entry *e)
{
diff --git a/collie/group.c b/collie/group.c
index a2d7425..d4b5449 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -768,17 +768,15 @@ static void __sd_deliver(struct work *work, int idx)
static void __sd_deliver_done(struct work *work, int idx)
{
struct work_deliver *w = container_of(work, struct work_deliver, work);
-/* struct message_header *m = w->msg; */
-/* struct cluster_info *ci = w->ci; */
+ struct message_header *m = w->msg;
/*
* FIXME: we want to recover only after all nodes are fully
* synchronized
*/
- /* disabled for now */
-/* if (m->done && m->op == SD_MSG_JOIN) */
-/* start_recovery(ci, ci->epoch, 1); */
+ if (m->done && m->op == SD_MSG_JOIN && sys->epoch >= 2)
+ start_recovery(sys->epoch);
free(w->msg);
free(w);
@@ -904,7 +902,7 @@ static void __sd_confch_done(struct work *work, int idx)
if (w->left_list_entries) {
if (w->left_list_entries > 1)
eprintf("we can't handle %Zd\n", w->left_list_entries);
- start_recovery(sys->epoch, 0);
+ start_recovery(sys->epoch);
}
free(w->member_list);
diff --git a/collie/store.c b/collie/store.c
index 9b5584f..b35bbf5 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -76,23 +76,66 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch
return SD_RES_SUCCESS;
}
-static int get_obj_list(struct request *req)
+static int is_obj_in_range(uint64_t oid, uint64_t start, uint64_t end)
+{
+ uint64_t hval = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+
+ if (start < end)
+ return (start < hval && hval <= end);
+ else
+ return (start < hval || hval <= end);
+}
+
+static int get_obj_list(struct request *req, char *buf, int buf_len)
{
DIR *dir;
struct dirent *d;
- struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
- struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+ struct sd_list_req *hdr = (struct sd_list_req *)&req->rq;
+ struct sd_list_rsp *rsp = (struct sd_list_rsp *)&req->rp;
uint64_t oid;
- uint64_t oid_hash;
- uint64_t start_hash = hdr->oid;
- uint64_t end_hash = hdr->cow_oid;
+ uint64_t start_hash = hdr->start;
+ uint64_t end_hash = hdr->end;
+ uint32_t epoch = hdr->tgt_epoch;
char path[1024];
uint64_t *p = (uint64_t *)req->data;
int nr = 0;
+ uint64_t *objlist = NULL;
+ int obj_nr = 0, fd, i;
+ struct sheepdog_node_list_entry *e;
+ int e_nr;
+ int idx;
+ int res = SD_RES_SUCCESS;
+
+ if (epoch == 1)
+ goto local;
+
+ snprintf(path, sizeof(path), "%s%08u/list", obj_path, epoch);
+
+ fd = open(path, O_RDONLY);
+ if (fd < 0) {
+ eprintf("failed to open %s, %s\n", path, strerror(errno));
+ close(fd);
+ return SD_RES_EIO;
+ }
+ obj_nr = read(fd, buf, buf_len);
+ obj_nr /= sizeof(uint64_t);
+ objlist = (uint64_t *)buf;
+ for (i = 0; i < obj_nr; i++) {
+ if (is_obj_in_range(objlist[i], start_hash, end_hash)) {
+ dprintf("%u, %016lx, %016lx %016lx\n", epoch,
+ objlist[i], start_hash, end_hash);
+ p[nr++] = objlist[i];
+ }
- snprintf(path, sizeof(path), "%s%08u/", obj_path, hdr->obj_ver);
+ if (nr * sizeof(uint64_t) >= hdr->data_length)
+ break;
+ }
+ close(fd);
- dprintf("%d\n", sys->this_node.port);
+local:
+ snprintf(path, sizeof(path), "%s%08u/", obj_path, hdr->tgt_epoch);
+
+ dprintf("%d, %s\n", sys->this_node.port, path);
dir = opendir(path);
if (!dir) {
@@ -101,38 +144,66 @@ static int get_obj_list(struct request *req)
}
while ((d = readdir(dir))) {
- int got = 0;
-
if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
continue;
oid = strtoull(d->d_name, NULL, 16);
- oid_hash = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+ if (oid == 0)
+ continue;
+
+ for (i = 0; i < obj_nr; i++)
+ if (objlist[i] == oid)
+ break;
+ if (i < obj_nr)
+ continue;
- if ((nr + 1) * sizeof(uint64_t) > hdr->data_length)
+ if (is_obj_in_range(oid, start_hash, end_hash)) {
+ dprintf("%u, %016lx, %016lx %016lx\n", epoch,
+ oid, start_hash, end_hash);
+ p[nr++] = oid;
+ }
+
+ if (nr * sizeof(uint64_t) >= hdr->data_length)
break;
+ }
- if (start_hash < end_hash) {
- if (oid_hash >= start_hash && oid_hash < end_hash)
- got = 1;
- } else
- if (end_hash <= oid_hash || oid_hash < start_hash)
- got = 1;
+ eprintf("nr = %d\n", nr);
+ rsp->data_length = nr * sizeof(uint64_t);
- dprintf("%d, %u, %016lx, %016lx, %016lx %016lx\n", got, hdr->obj_ver,
- oid, oid_hash, start_hash, end_hash);
+ e_nr = epoch_log_read(epoch, buf, buf_len);
+ e_nr /= sizeof(*e);
+ e = (struct sheepdog_node_list_entry *)buf;
- if (got) {
- *(p + nr) = oid;
- nr++;
- }
+ if (e_nr <= sys->nr_sobjs) {
+ rsp->next = end_hash;
+ goto out;
}
- rsp->data_length = nr * 8;
+ for (idx = 0; idx < e_nr; idx++) {
+ if (e[idx].id == sys->this_node.id)
+ break;
+ }
+ if (idx != e_nr) {
+ uint64_t hval = e[idx % e_nr].id;
+ rsp->next = end_hash;
+
+ if (start_hash < end_hash) {
+ if (start_hash < hval && hval <= end_hash)
+ rsp->next = hval;
+ } else
+ if (start_hash < hval || hval <= end_hash)
+ rsp->next = hval;
+
+ dprintf("%u, %016lx, %016lx %016lx\n", epoch, hval,
+ start_hash, end_hash);
+ } else
+ res = SD_RES_SYSTEM_ERROR;
+
+out:
closedir(dir);
- return SD_RES_SUCCESS;
+ return res;
}
static int read_from_one(uint64_t oid,
@@ -225,6 +296,7 @@ static int forward_obj_req(struct request *req, char *buf)
struct sd_rsp *rsp = (struct sd_rsp *)&hdr2;
uint64_t oid = hdr->oid;
int copies;
+ uint32_t epoch;
e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
again:
@@ -243,7 +315,11 @@ again:
/* TODO: we can do better; we need to chech this first */
if (is_myself(&e[n])) {
- ret = store_queue_request_local(req, buf, sys->epoch);
+ if (hdr->flags & SD_FLAG_CMD_RECOVERY)
+ epoch = hdr->tgt_epoch;
+ else
+ epoch = sys->epoch;
+ ret = store_queue_request_local(req, buf, epoch);
memcpy(rsp, &req->rp, sizeof(*rsp));
rsp->result = ret;
goto done;
@@ -334,63 +410,13 @@ static int ob_open(uint32_t epoch, uint64_t oid, int aflags, int *ret)
return fd;
}
-static int is_my_obj(uint64_t oid, int copies)
-{
- int i, n, nr;
- struct sheepdog_node_list_entry e[SD_MAX_NODES];
-
- nr = build_node_list(&sys->sd_node_list, e);
-
- for (i = 0; i < copies; i++) {
- n = obj_to_sheep(e, nr, oid, i);
- if (is_myself(&e[n]))
- return 1;
- }
-
- return 0;
-}
-
int update_epoch_store(uint32_t epoch)
{
- int ret;
- char new[1024], old[1024];
- struct stat s;
- DIR *dir;
- struct dirent *d;
- uint64_t oid;
+ char new[1024];
snprintf(new, sizeof(new), "%s%08u/", obj_path, epoch);
mkdir(new, def_dmode);
- snprintf(old, sizeof(old), "%s%08u/", obj_path, epoch - 1);
-
- ret = stat(old, &s);
- if (ret)
- return 0;
-
- dir = opendir(old);
- if (!dir) {
- eprintf("%s, %s, %m\n", old, new);
- return 1;
- }
-
- while ((d = readdir(dir))) {
- if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
- continue;
-
- oid = strtoull(d->d_name, NULL, 16);
- /* TODO: use proper object coipes */
- if (is_my_obj(oid, sys->nr_sobjs)) {
- snprintf(new, sizeof(new), "%s%08u/%s", obj_path, epoch,
- d->d_name);
- snprintf(old, sizeof(old), "%s%08u/%s", obj_path, epoch - 1,
- d->d_name);
- link(old, new);
- }
- }
-
- closedir(dir);
-
return 0;
}
@@ -517,6 +543,13 @@ out:
if (fd != -1)
close(fd);
+ if (ret == SD_RES_NO_OBJ && hdr->flags & SD_FLAG_CMD_RECOVERY) {
+ int len = epoch_log_read(epoch - 1, req->data, hdr->data_length);
+ if (len < 0)
+ len = 0;
+ rsp->data_length = len;
+ }
+
return ret;
}
@@ -547,13 +580,16 @@ void store_queue_request(struct work *work, int idx)
goto out;
}
+ if (hdr->flags & SD_FLAG_CMD_RECOVERY)
+ epoch = hdr->tgt_epoch;
+
if (opcode == SD_OP_STAT_SHEEP) {
ret = stat_sheep(&nrsp->store_size, &nrsp->store_free, epoch);
goto out;
}
if (opcode == SD_OP_GET_OBJ_LIST) {
- ret = get_obj_list(req);
+ ret = get_obj_list(req, buf, SD_DATA_OBJ_SIZE);
goto out;
}
@@ -731,19 +767,26 @@ static int node_distance(int my, int her, int nr)
return (my + nr - her) % nr;
}
-static int node_from_distance(int my, int dist, int nr)
+static int contains_node(uint64_t id, struct sheepdog_node_list_entry *entry,
+ int nr, int base_idx)
{
- return (my + nr - dist) % nr;
+ int i;
+
+ for (i = 0; i < sys->nr_sobjs; i++) {
+ if (entry[(base_idx + i) % nr].id == id)
+ return (base_idx + i) % nr;
+ }
+ return -1;
}
struct recovery_work {
uint32_t epoch;
uint32_t done;
- uint32_t iteration;
-
struct sheepdog_node_list_entry e;
+ struct timer timer;
+ int retry;
struct work work;
struct list_head rw_siblings;
@@ -754,84 +797,272 @@ struct recovery_work {
static LIST_HEAD(recovery_work_list);
static int recovering;
-static void recover_one(struct work *work, int idx)
+static int find_tgt_node(struct sheepdog_node_list_entry *old_entry, int old_nr, int old_idx,
+ struct sheepdog_node_list_entry *cur_entry, int cur_nr, int cur_idx,
+ int copy_idx)
{
- struct recovery_work *rw = container_of(work, struct recovery_work, work);
- struct sheepdog_node_list_entry *e = &rw->e;
+ int i, idx;
+
+ dprintf("%d, %d, %d, %d, %d\n", old_idx, old_nr, cur_idx, cur_nr, copy_idx);
+
+ if (copy_idx < cur_nr) {
+ idx = contains_node(cur_entry[(cur_idx + copy_idx) % cur_nr].id,
+ old_entry, old_nr, old_idx);
+ if (idx >= 0) {
+ dprintf("%d, %d, %d, %d\n", idx, copy_idx, cur_idx, cur_nr);
+ return idx;
+ }
+ }
+
+ for (i = 0; ; i++) {
+ if (i < cur_nr) {
+ idx = contains_node(cur_entry[(cur_idx + i) % cur_nr].id,
+ old_entry, old_nr, old_idx);
+ if (idx >= 0)
+ continue;
+
+ while (contains_node(old_entry[old_idx].id, cur_entry, cur_nr, cur_idx) >= 0)
+ old_idx = (old_idx + 1) % old_nr;
+
+ }
+ if (i == copy_idx) {
+ dprintf("%d, %d, %d, %d\n", old_idx, copy_idx, cur_idx, cur_nr);
+ return old_idx;
+ }
+
+ old_idx = (old_idx + 1) % old_nr;
+ }
+ return -1;
+}
+
+static int __recover_one(struct recovery_work *rw,
+ struct sheepdog_node_list_entry *_old_entry, int old_nr,
+ struct sheepdog_node_list_entry *_cur_entry, int cur_nr, int cur_idx,
+ int copy_idx, uint32_t epoch, uint32_t tgt_epoch,
+ uint64_t oid, char *buf, int buf_len)
+{
+ struct sheepdog_node_list_entry *e;
struct sd_obj_req hdr;
- struct sd_obj_rsp *rsp;
+ struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
char name[128];
- char *buf = zero_block + idx * SD_DATA_OBJ_SIZE;
unsigned wlen = 0, rlen = SD_DATA_OBJ_SIZE;
int fd, ret;
- uint64_t oid = *(((uint64_t *)rw->buf) + rw->done);
+ struct sheepdog_node_list_entry old_entry[SD_MAX_NODES],
+ cur_entry[SD_MAX_NODES], *next_entry;
+ int next_nr;
+ int tgt_idx = -1;
+ int old_idx;
+
+ memcpy(old_entry, _old_entry, sizeof(*old_entry) * old_nr);
+ memcpy(cur_entry, _cur_entry, sizeof(*cur_entry) * cur_nr);
+next:
+ dprintf("recover obj %lx from epoch %d\n", oid, tgt_epoch);
+ old_idx = obj_to_sheep(old_entry, old_nr, oid, 0);
+
+ tgt_idx = find_tgt_node(old_entry, old_nr, old_idx, cur_entry, cur_nr, cur_idx, copy_idx);
+ if (tgt_idx < 0) {
+ eprintf("cannot find target node, %lx\n", oid);
+ return -1;
+ }
+ e = old_entry + tgt_idx;
- eprintf("%d %d, %16lx\n", rw->done, rw->count, oid);
+ if (e->id == sys->this_node.id) {
+ char old[PATH_MAX], new[PATH_MAX];
+
+ snprintf(old, sizeof(old), "%s%08u/%016" PRIx64, obj_path,
+ tgt_epoch, oid);
+ snprintf(new, sizeof(new), "%s%08u/%016" PRIx64, obj_path,
+ epoch, oid);
+ dprintf("link from %s to %s\n", old, new);
+ if (link(old, new) == 0)
+ return 0;
+
+ if (errno == ENOENT) {
+ next_nr = epoch_log_read(tgt_epoch, buf, buf_len);
+ if (next_nr <= 0) {
+ eprintf("no previous epoch, %d\n", tgt_epoch);
+ return -1;
+ }
+ next_entry = (struct sheepdog_node_list_entry *)buf;
+ next_nr /= sizeof(*next_entry);
+ goto not_found;
+ }
+
+ eprintf("cannot recover from local, %s, %s\n", old, new);
+ return -1;
+ }
addr_to_str(name, sizeof(name), e->addr, 0);
fd = connect_to(name, e->port);
if (fd < 0) {
- eprintf("%s %d\n", name, e->port);
- return;
+ eprintf("failed to connect to %s:%d\n", name, e->port);
+ return -1;
}
memset(&hdr, 0, sizeof(hdr));
hdr.opcode = SD_OP_READ_OBJ;
hdr.oid = oid;
hdr.epoch = sys->epoch;
- hdr.flags = 0;
+ hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_FORWARD;
+ hdr.tgt_epoch = tgt_epoch;
hdr.data_length = rlen;
ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
close(fd);
+ if (ret < 0) {
+ eprintf("%d\n", rsp->result);
+ return -1;
+ }
+
rsp = (struct sd_obj_rsp *)&hdr;
- if (rsp->result != SD_RES_SUCCESS) {
+ if (rsp->result == SD_RES_SUCCESS) {
+ fd = ob_open(epoch, oid, O_CREAT, &ret);
+ ret = write(fd, buf, SD_DATA_OBJ_SIZE);
+ if (ret != SD_DATA_OBJ_SIZE) {
+ eprintf("failed to write object\n");
+ return -1;
+ }
+
+ ret = fsetxattr(fd, ANAME_COPIES, &rsp->copies,
+ sizeof(rsp->copies), 0);
+ if (ret) {
+ eprintf("couldn't set xattr\n");
+ return -1;
+ }
+
+ close(fd);
+ dprintf("recovered oid %lx to epoch %d\n", oid, epoch);
+ return 0;
+ }
+
+ if (rsp->result == SD_RES_NEW_NODE_VER || rsp->result == SD_RES_OLD_NODE_VER) {
+ eprintf("try again, %d, %lx\n", rsp->result, oid);
+ rw->retry = 1;
+ return 0;
+ }
+
+ if (rsp->result != SD_RES_NO_OBJ || rsp->data_length == 0) {
eprintf("%d\n", rsp->result);
+ return -1;
+ }
+ next_entry = (struct sheepdog_node_list_entry *)buf;
+ next_nr = rsp->data_length / sizeof(*old_entry);
+
+not_found:
+ copy_idx = node_distance(tgt_idx, old_idx, old_nr);
+ dprintf("%d, %d, %d, %d, %d, %d\n", rsp->result, rsp->data_length, tgt_idx,
+ old_idx, old_nr, copy_idx);
+
+ memcpy(cur_entry, old_entry, sizeof(*old_entry) * old_nr);
+ cur_nr = old_nr;
+ cur_idx = old_idx;
+
+ memcpy(old_entry, next_entry, next_nr * sizeof(*next_entry));
+ old_nr = next_nr;
+
+ tgt_epoch--;
+ goto next;
+}
+
+static void recover_one(struct work *work, int idx)
+{
+ struct recovery_work *rw = container_of(work, struct recovery_work, work);
+ char *buf = zero_block + idx * SD_DATA_OBJ_SIZE;
+ int ret;
+ uint64_t oid = *(((uint64_t *)rw->buf) + rw->done);
+ struct sheepdog_node_list_entry old_entry[SD_MAX_NODES],
+ cur_entry[SD_MAX_NODES];
+ int old_nr, cur_nr;
+ uint32_t epoch = rw->epoch;
+ int i, my_idx = -1, copy_idx, cur_idx = -1;
+
+ eprintf("%d %d, %16lx\n", rw->done, rw->count, oid);
+
+ cur_nr = epoch_log_read(epoch, (char *)cur_entry, sizeof(cur_entry));
+ if (cur_nr <= 0) {
+ eprintf("failed to read current epoch, %d\n", epoch);
return;
}
+ cur_nr /= sizeof(struct sheepdog_node_list_entry);
+
+ old_nr = epoch_log_read(epoch - 1, (char *)old_entry, sizeof(old_entry));
+ if (old_nr <= 0) {
+ eprintf("failed to read previous epoch, %d\n", epoch - 1);
+ goto fail;
+ }
+ old_nr /= sizeof(struct sheepdog_node_list_entry);
- fd = ob_open(rw->epoch, oid, O_CREAT, &ret);
- write(fd, buf, SD_DATA_OBJ_SIZE);
+ if (!sys->nr_sobjs)
+ goto fail;
+
+ cur_idx = obj_to_sheep(cur_entry, cur_nr, oid, 0);
+
+ for (i = 0; i < cur_nr; i++) {
+ if (cur_entry[i].id == sys->this_node.id) {
+ my_idx = i;
+ break;
+ }
+ }
+ copy_idx = node_distance(my_idx, cur_idx, cur_nr);
+ dprintf("%d, %d, %d, %d\n", my_idx, cur_idx, cur_nr, copy_idx);
+
+ ret = __recover_one(rw, old_entry, old_nr, cur_entry, cur_nr, cur_idx,
+ copy_idx, epoch, epoch - 1, oid, buf, SD_DATA_OBJ_SIZE);
+ if (ret == 0)
+ return;
+
+ for (i = 0; i < sys->nr_sobjs; i++) {
+ if (i == copy_idx)
+ continue;
+ ret = __recover_one(rw, old_entry, old_nr,
+ cur_entry, cur_nr, cur_idx, i,
+ epoch, epoch - 1, oid, buf, SD_DATA_OBJ_SIZE);
+ if (ret == 0)
+ return;
+ }
+fail:
+ eprintf("failed to recover object %lx\n", oid);
}
static void __start_recovery(struct work *work, int idx);
static void __start_recovery_done(struct work *work, int idx);
+static void recover_one_timer(void *data)
+{
+ struct recovery_work *rw = (struct recovery_work *)data;
+ queue_work(dobj_queue, &rw->work);
+}
+
static void recover_one_done(struct work *work, int idx)
{
struct recovery_work *rw = container_of(work, struct recovery_work, work);
- if (rw->done < rw->count) {
- rw->done++;
- queue_work(dobj_queue, &rw->work);
+ if (rw->retry) {
+ rw->retry = 0;
+
+ rw->timer.callback = recover_one_timer;
+ rw->timer.data = rw;
+ add_timer(&rw->timer, 2);
return;
}
- if (rw->iteration) {
- if (++rw->iteration <= sys->nr_sobjs) {
- free(rw->buf);
+ rw->done++;
- rw->done = 0;
-
- rw->work.fn = __start_recovery;
- rw->work.done = __start_recovery_done;
-
- queue_work(dobj_queue, &rw->work);
-
- return;
- }
+ if (rw->done < rw->count && rw->rw_siblings.next == &recovery_work_list) {
+ queue_work(dobj_queue, &rw->work);
+ return;
}
+ dprintf("recovery done, %d\n", rw->epoch);
recovering--;
list_del(&rw->rw_siblings);
- if (rw->buf)
- free(rw->buf);
+ free(rw->buf);
free(rw);
if (!list_empty(&recovery_work_list)) {
@@ -843,16 +1074,16 @@ static void recover_one_done(struct work *work, int idx)
}
}
-static int fill_obj_list(struct recovery_work *rw,
- struct sheepdog_node_list_entry *e,
- uint64_t start_hash, uint64_t end_hash)
+static int __fill_obj_list(struct recovery_work *rw,
+ struct sheepdog_node_list_entry *e,
+ uint64_t start_hash, uint64_t end_hash, uint64_t *done_hash)
{
int fd, ret;
uint32_t epoch = rw->epoch;
unsigned wlen, rlen;
char name[128];
- struct sd_obj_req hdr;
- struct sd_obj_rsp *rsp;
+ struct sd_list_req hdr;
+ struct sd_list_rsp *rsp;
addr_to_str(name, sizeof(name), e->addr, 0);
@@ -870,34 +1101,66 @@ static int fill_obj_list(struct recovery_work *rw,
memset(&hdr, 0, sizeof(hdr));
hdr.opcode = SD_OP_GET_OBJ_LIST;
hdr.epoch = sys->epoch;
- hdr.oid = start_hash;
- hdr.cow_oid = end_hash;
- hdr.obj_ver = epoch - 1;
+ hdr.start = start_hash;
+ hdr.end = end_hash;
+ hdr.tgt_epoch = epoch - 1;
hdr.flags = 0;
hdr.data_length = rlen;
- dprintf("%016lx, %016lx\n", hdr.oid, hdr.cow_oid);
+ dprintf("%016lx, %016lx\n", hdr.start, hdr.end);
- rw->buf = malloc(rlen);
memcpy(&rw->e, e, sizeof(rw->e));
- ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf, &wlen, &rlen);
+ ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf + rw->count * sizeof(uint64_t), &wlen, &rlen);
close(fd);
- rsp = (struct sd_obj_rsp *)&hdr;
+ rsp = (struct sd_list_rsp *)&hdr;
if (rsp->result != SD_RES_SUCCESS) {
- eprintf("%d\n", rsp->result);
- return -1;
+ rw->retry = 1;
+ *done_hash = end_hash;
+ eprintf("try again, %d\n", rsp->result);
+ return 0;
}
dprintf("%d\n", rsp->data_length);
if (rsp->data_length)
- rw->count = rsp->data_length / sizeof(uint64_t);
- else
- rw->count = 0;
+ rw->count += rsp->data_length / sizeof(uint64_t);
+
+ *done_hash = rsp->next;
+
+ return 0;
+}
+
+static int fill_obj_list(struct recovery_work *rw,
+ struct sheepdog_node_list_entry *old_entry, int old_nr,
+ struct sheepdog_node_list_entry *cur_entry, int cur_nr,
+ uint64_t start_hval, uint64_t end_hval)
+{
+ int i, idx, old_idx, cur_idx;
+ uint64_t hval, done_hval = end_hval;
+
+ hval = start_hval;
+again:
+ old_idx = hval_to_sheep(old_entry, old_nr, hval + 1, 0);
+ cur_idx = hval_to_sheep(cur_entry, cur_nr, hval + 1, 0);
+
+ for (i = 0; i < sys->nr_sobjs; i++) {
+ idx = find_tgt_node(old_entry, old_nr, old_idx, cur_entry, cur_nr, cur_idx, i);
+ dprintf("%d, %d\n", idx, i);
+ if (__fill_obj_list(rw, old_entry + idx, hval, end_hval, &done_hval) == 0)
+ break;
+ }
+ if (i == sys->nr_sobjs)
+ return -1;
+
+ if (done_hval != end_hval) {
+ dprintf("%lx, %lx\n", done_hval, end_hval);
+ hval = done_hval;
+ goto again;
+ }
return 0;
}
@@ -909,153 +1172,122 @@ static void __start_recovery(struct work *work, int idx)
struct sheepdog_node_list_entry old_entry[SD_MAX_NODES],
cur_entry[SD_MAX_NODES];
int old_nr, cur_nr;
- int my_idx = -1, ch_idx = -1;
- int i, j, n;
+ int my_idx = -1;
+ int i, fd;
uint64_t start_hash, end_hash;
+ char path[PATH_MAX];
dprintf("%u\n", epoch);
cur_nr = epoch_log_read(epoch, (char *)cur_entry, sizeof(cur_entry));
- if (cur_nr <= 0)
+ if (cur_nr <= 0) {
+ eprintf("failed to read epoch log, %d\n", epoch);
goto fail;
+ }
cur_nr /= sizeof(struct sheepdog_node_list_entry);
old_nr = epoch_log_read(epoch - 1, (char *)old_entry, sizeof(old_entry));
- if (old_nr <= 0)
+ if (old_nr <= 0) {
+ eprintf("failed to read epoch log, %d\n", epoch - 1);
goto fail;
+ }
old_nr /= sizeof(struct sheepdog_node_list_entry);
- if (!sys->nr_sobjs || cur_nr < sys->nr_sobjs || old_nr < sys->nr_sobjs)
+ if (!sys->nr_sobjs)
goto fail;
- if (cur_nr < old_nr) {
- for (i = 0; i < old_nr; i++) {
- if (is_myself(&old_entry[i])) {
- my_idx = i;
- break;
- }
- }
-
- dprintf("%u %u %u, %d\n", cur_nr, old_nr, epoch, my_idx);
-
- for (i = 0; i < old_nr; i++) {
- for (j = 0; j < cur_nr; j++) {
- if (old_entry[i].id == cur_entry[j].id)
- break;
- }
-
- if (j == cur_nr)
- ch_idx = i;
- }
-
- dprintf("%u %u %u\n", my_idx, ch_idx,
- node_distance(my_idx, ch_idx, old_nr));
-
- if (node_distance(my_idx, ch_idx, old_nr) > sys->nr_sobjs)
- return;
-
- n = node_from_distance(my_idx, sys->nr_sobjs, old_nr);
-
- dprintf("%d %d\n", n, sys->nr_sobjs);
-
- start_hash = old_entry[(n - 1 + old_nr) % old_nr].id;
- end_hash = old_entry[n].id;
-
- /* FIXME */
- if (node_distance(my_idx, ch_idx, old_nr) == sys->nr_sobjs) {
- n++;
- n %= old_nr;
- }
-
- fill_obj_list(rw, old_entry + n, start_hash, end_hash);
- } else {
- for (i = 0; i < cur_nr; i++) {
- if (is_myself(&cur_entry[i])) {
- my_idx = i;
- break;
- }
+ for (i = 0; i < cur_nr; i++) {
+ if (cur_entry[i].id == sys->this_node.id) {
+ my_idx = i;
+ break;
}
+ }
+ start_hash = cur_entry[(my_idx - sys->nr_sobjs + cur_nr) % cur_nr].id;
+ end_hash = cur_entry[my_idx].id;
- dprintf("%u %u %u, %d\n", cur_nr, old_nr, epoch, my_idx);
-
- if (my_idx == -1)
- return;
-
- n = node_from_distance(my_idx, rw->iteration, cur_nr);
- start_hash = cur_entry[n].id;
- end_hash = cur_entry[(n + 1 + cur_nr) % cur_nr].id;
-
- if (rw->iteration == 1)
- n = (my_idx + 1 + cur_nr) % cur_nr;
- else
- n = (n + 1 + cur_nr) % cur_nr;
-
- dprintf("%u %u %u\n", my_idx, n, rw->iteration);
+ dprintf("fill obj list (from 0x%lx to 0x%lx)\n", start_hash, end_hash);
+ if (fill_obj_list(rw, old_entry, old_nr, cur_entry, cur_nr,
+ start_hash, end_hash) != 0) {
+ eprintf("fatal recovery error\n");
+ goto fail;
+ }
- start_hash = cur_entry[n].id;
- end_hash = cur_entry[(n - 1 + cur_nr) % cur_nr].id;
+ snprintf(path, sizeof(path), "%s%08u/list", obj_path, epoch);
+ dprintf("write object list file to %s\n", path);
- fill_obj_list(rw, cur_entry + n, start_hash, end_hash);
+ fd = open(path, O_RDWR | O_CREAT, def_fmode);
+ if (fd < 0) {
+ eprintf("failed to open %s, %s\n", path, strerror(errno));
+ goto fail;
}
+ write(fd, rw->buf, sizeof(uint64_t) * rw->count);
+ close(fd);
return;
-
fail:
rw->count = 0;
- rw->iteration = 0;
return;
}
+static void start_recovery_timer(void *data)
+{
+ struct recovery_work *rw = (struct recovery_work *)data;
+ queue_work(dobj_queue, &rw->work);
+}
+
static void __start_recovery_done(struct work *work, int idx)
{
struct recovery_work *rw = container_of(work, struct recovery_work, work);
- if (!rw->count) {
- if (rw->iteration) {
- if (++rw->iteration <= sys->nr_sobjs) {
- free(rw->buf);
+ if (rw->retry) {
+ rw->retry = 0;
- rw->work.fn = __start_recovery;
- rw->work.done = __start_recovery_done;
+ rw->timer.callback = start_recovery_timer;
+ rw->timer.data = rw;
+ add_timer(&rw->timer, 1);
+ return;
+ }
- queue_work(dobj_queue, &rw->work);
+ if (rw->count && rw->rw_siblings.next == &recovery_work_list) {
+ rw->work.fn = recover_one;
+ rw->work.done = recover_one_done;
- return;
- }
- }
+ /* TODO: we should avoid races with qemu I/Os */
+ /* rw->work.attr = WORK_ORDERED; */
- free(rw->buf);
- free(rw);
+ queue_work(dobj_queue, &rw->work);
return;
}
- rw->work.fn = recover_one;
- rw->work.done = recover_one_done;
+ dprintf("recovery done, %d\n", rw->epoch);
+ recovering--;
- /* TODO: we should avoid races with qemu I/Os */
-/* rw->work.attr = WORK_ORDERED; */
+ list_del(&rw->rw_siblings);
- queue_work(dobj_queue, &rw->work);
+ free(rw->buf);
+ free(rw);
+
+ if (!list_empty(&recovery_work_list)) {
+ rw = list_first_entry(&recovery_work_list,
+ struct recovery_work, rw_siblings);
+
+ recovering++;
+ queue_work(dobj_queue, &rw->work);
+ }
}
-int start_recovery(uint32_t epoch, int add)
+int start_recovery(uint32_t epoch)
{
struct recovery_work *rw;
- /* disable for now */
- if (add)
- return 0;
-
rw = zalloc(sizeof(struct recovery_work));
if (!rw)
return -1;
+ rw->buf = malloc(1 << 20); /* FIXME */
rw->epoch = epoch;
rw->count = 0;
- if (add)
- rw->iteration = 1;
-
rw->work.fn = __start_recovery;
rw->work.done = __start_recovery_done;
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 24f06ae..a41eb50 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -52,6 +52,7 @@
#define SD_FLAG_CMD_WRITE 0x01
#define SD_FLAG_CMD_COW 0x02
#define SD_FLAG_CMD_FORWARD 0x04
+#define SD_FLAG_CMD_RECOVERY 0x08
#define SD_STATUS_OK 0x00
#define SD_STATUS_STARTUP 0x01
@@ -169,7 +170,7 @@ struct sd_obj_req {
uint64_t oid;
uint64_t cow_oid;
uint32_t copies;
- uint32_t obj_ver;
+ uint32_t tgt_epoch;
uint64_t offset;
};
@@ -186,6 +187,31 @@ struct sd_obj_rsp {
uint32_t pad[5];
};
+struct sd_list_req {
+ uint8_t proto_ver;
+ uint8_t opcode;
+ uint16_t flags;
+ uint32_t epoch;
+ uint32_t id;
+ uint32_t data_length;
+ uint64_t start;
+ uint64_t end;
+ uint32_t tgt_epoch;
+ uint32_t pad[3];
+};
+
+struct sd_list_rsp {
+ uint8_t proto_ver;
+ uint8_t opcode;
+ uint16_t flags;
+ uint32_t epoch;
+ uint32_t id;
+ uint32_t data_length;
+ uint32_t result;
+ uint64_t next;
+ uint32_t pad[5];
+};
+
struct sd_vdi_req {
uint8_t proto_ver;
uint8_t opcode;
@@ -281,17 +307,17 @@ static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
return hval;
}
-static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries,
- int nr_entries, uint64_t oid, int idx)
+static inline int hval_to_sheep(struct sheepdog_node_list_entry *entries,
+ int nr_entries, uint64_t id, int idx)
{
- uint64_t id;
int i;
struct sheepdog_node_list_entry *e = entries, *n;
- id = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+ printf("%lx\n", id);
for (i = 0; i < nr_entries - 1; i++, e++) {
n = e + 1;
+ printf("%d, %lx, %lx, %lx\n", i, e->id, n->id, id);
if (id > e->id && id <= n->id)
break;
}
@@ -299,6 +325,14 @@ static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries,
return (i + 1 + idx) % nr_entries;
}
+static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries,
+ int nr_entries, uint64_t oid, int idx)
+{
+ uint64_t id = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+
+ return hval_to_sheep(entries, nr_entries, id, idx);
+}
+
static inline void print_node_list_entry(struct sheepdog_node_list_entry *e,
char *str, size_t size)
{
--
1.5.6.5
More information about the sheepdog
mailing list