[sheepdog] [PATCH v2 05/21] sheep: add basic recovery support for erasure code

Liu Yuan namei.unix at gmail.com
Wed Oct 16 07:50:31 CEST 2013


With this patch we can support single node event (join/leave).

The recovery strategy for erasure object:

- if object isn't lost, we will read it from targeted live node in its stale dir
- if object is lost, we will read enough replica and rebuild it using RS algorithm

Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
 sheep/ops.c         |    1 +
 sheep/plain_store.c |   57 ++++++++++-------
 sheep/recovery.c    |  170 ++++++++++++++++++++++++++++++++++++++++++++++-----
 sheep/sheep_priv.h  |    7 ++-
 4 files changed, 197 insertions(+), 38 deletions(-)

diff --git a/sheep/ops.c b/sheep/ops.c
index 7e12c5c..d754e9b 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -918,6 +918,7 @@ int peer_read_obj(struct request *req)
 	iocb.buf = req->data;
 	iocb.length = hdr->data_length;
 	iocb.offset = hdr->obj.offset;
+	iocb.ec_index = hdr->obj.ec_index;
 	ret = sd_store->read(hdr->obj.oid, &iocb);
 	if (ret != SD_RES_SUCCESS)
 		goto out;
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index 2c9b6ef..15b54fe 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -15,6 +15,26 @@
 
 #define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; })
 
+#define ECNAME "user.ec.index"
+#define ECSIZE sizeof(uint8_t)
+static int set_erasure_index(const char *path, uint8_t idx)
+{
+	if (setxattr(path, ECNAME, &idx, ECSIZE, 0) < 0) {
+		sd_err("failed to setxattr %s, %m", path);
+		return -1;
+	}
+	return 0;
+}
+
+static int get_erasure_index(const char *path, uint8_t *idx)
+{
+	if (getxattr(path, ECNAME, idx, ECSIZE) < 0) {
+		sd_err("failed to getxattr %s, %m", path);
+		return -1;
+	}
+	return 0;
+}
+
 static inline bool iocb_is_aligned(const struct siocb *iocb)
 {
 	return  sector_algined(iocb->offset) && sector_algined(iocb->length);
@@ -245,6 +265,21 @@ static int default_read_from_path(uint64_t oid, const char *path,
 	if (fd < 0)
 		return err_to_sderr(path, oid, errno);
 
+	if (is_erasure_oid(oid)) {
+		uint8_t idx;
+
+		if (get_erasure_index(path, &idx) < 0) {
+			close(fd);
+			return err_to_sderr(path, oid, errno);
+		}
+		/* We pretend NO-OBJ to read old object in the stale dir */
+		if (idx != iocb->ec_index) {
+			sd_debug("ec_index %d != %d", iocb->ec_index, idx);
+			close(fd);
+			return SD_RES_NO_OBJ;
+		}
+	}
+
 	size = xpread(fd, iocb->buf, iocb->length, iocb->offset);
 	if (unlikely(size != iocb->length)) {
 		sd_err("failed to read object %"PRIx64", path=%s, offset=%"
@@ -293,33 +328,13 @@ int prealloc(int fd, uint32_t size)
 	return 0;
 }
 
-static size_t get_store_objsize(uint64_t oid)
+size_t get_store_objsize(uint64_t oid)
 {
 	if (is_erasure_oid(oid))
 		return SD_EC_OBJECT_SIZE;
 	return get_objsize(oid);
 }
 
-#define ECNAME "user.ec.index"
-#define ECSIZE sizeof(uint8_t)
-static int set_erasure_index(const char *path, uint8_t idx)
-{
-	if (setxattr(path, ECNAME, &idx, ECSIZE, 0) < 0) {
-		sd_err("failed to setxattr %s, %m", path);
-		return -1;
-	}
-	return 0;
-}
-
-static int get_erasure_index(const char *path, uint8_t *idx)
-{
-	if (getxattr(path, ECNAME, idx, ECSIZE) < 0) {
-		sd_err("failed to getxattr %s, %m", path);
-		return -1;
-	}
-	return 0;
-}
-
 int default_create_and_write(uint64_t oid, const struct siocb *iocb)
 {
 	char path[PATH_MAX], tmp_path[PATH_MAX];
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 0df3a5a..7e16bba 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -93,9 +93,42 @@ static inline bool node_is_gateway_only(void)
 	return sys->this_node.nr_vnodes == 0;
 }
 
-/* recover object from vnode */
+static void *read_object_from(const struct sd_node *node, uint64_t oid,
+			      uint32_t epoch, uint32_t tgt_epoch, uint8_t idx)
+{
+	struct sd_req hdr;
+	unsigned rlen = get_store_objsize(oid);
+	void *buf = xvalloc(rlen);
+	int ret;
+
+	sd_init_req(&hdr, SD_OP_READ_PEER);
+	hdr.epoch = epoch;
+	hdr.flags = SD_FLAG_CMD_RECOVERY;
+	hdr.data_length = rlen;
+	hdr.obj.oid = oid;
+	hdr.obj.tgt_epoch = tgt_epoch;
+	hdr.obj.ec_index = idx;
+
+	sd_debug("%s, epoch %"PRIu32" tgt %"PRIu32" idx %d", node_to_str(node),
+		 epoch, tgt_epoch, idx);
+	ret = sheep_exec_req(&node->nid, &hdr, buf);
+	if (ret != SD_RES_SUCCESS) {
+		free(buf);
+		return NULL;
+	}
+	return buf;
+}
+
+/*
+ * Read object from targeted node and store it in the local node.
+ *
+ * tgt_epoch: the specific epoch that the object has stayed
+ * idx: erasure index. For non-erasure object, pass 0.
+ */
 static int recover_object_from(struct recovery_obj_work *row,
-			       const struct sd_node *node, uint32_t tgt_epoch)
+			       const struct sd_node *node,
+			       uint32_t tgt_epoch,
+			       uint8_t idx)
 {
 	uint64_t oid = row->oid;
 	uint32_t local_epoch = row->local_epoch;
@@ -116,7 +149,7 @@ static int recover_object_from(struct recovery_obj_work *row,
 	}
 
 	/* compare sha1 hash value first */
-	if (local_epoch > 0) {
+	if (!is_erasure_oid(oid) && local_epoch > 0) {
 		sd_init_req(&hdr, SD_OP_GET_HASH);
 		hdr.obj.oid = oid;
 		hdr.obj.tgt_epoch = tgt_epoch;
@@ -133,7 +166,7 @@ static int recover_object_from(struct recovery_obj_work *row,
 		}
 	}
 
-	rlen = get_objsize(oid);
+	rlen = get_store_objsize(oid);
 	buf = xvalloc(rlen);
 
 	/* recover from remote replica */
@@ -143,6 +176,7 @@ static int recover_object_from(struct recovery_obj_work *row,
 	hdr.data_length = rlen;
 	hdr.obj.oid = oid;
 	hdr.obj.tgt_epoch = tgt_epoch;
+	hdr.obj.ec_index = idx;
 
 	ret = sheep_exec_req(&node->nid, &hdr, buf);
 	if (ret == SD_RES_SUCCESS) {
@@ -150,6 +184,7 @@ static int recover_object_from(struct recovery_obj_work *row,
 		iocb.length = rsp->data_length;
 		iocb.offset = rsp->obj.offset;
 		iocb.buf = buf;
+		iocb.ec_index = idx;
 		ret = sd_store->create_and_write(oid, &iocb);
 	}
 
@@ -202,7 +237,7 @@ static int recover_object_from_replica(struct recovery_obj_work *row,
 		if (invalid_node(node, row->base.cur_vinfo))
 			continue;
 
-		ret = recover_object_from(row, node, tgt_epoch);
+		ret = recover_object_from(row, node, tgt_epoch, 0);
 		switch (ret) {
 		case SD_RES_SUCCESS:
 			sd_debug("recovered oid %"PRIx64" from %d to epoch %d",
@@ -237,7 +272,7 @@ static int recover_object_from_replica(struct recovery_obj_work *row,
  * the routine will try to recovery it from the nodes it has stayed,
  * at least, *theoretically* on consistent hash ring.
  */
-static int do_recover_object(struct recovery_obj_work *row)
+static int recover_replication_object(struct recovery_obj_work *row)
 {
 	struct recovery_work *rw = &row->base;
 	struct vnode_info *old;
@@ -292,6 +327,111 @@ rollback:
 	return ret;
 }
 
+static int rebuild_object_from_replica(struct recovery_obj_work *row,
+				       uint32_t tgt_epoch,
+				       const uint8_t idx)
+{
+	struct vnode_info *old = grab_vnode_info(row->base.old_vinfo);
+	const struct sd_node *target_nodes[SD_MAX_NODES];
+	uint8_t *bufs[SD_EC_D] = { 0 };
+	uint64_t oid = row->oid;
+	uint32_t epoch = row->base.epoch;
+	int idxs[SD_EC_D], len = get_store_objsize(oid);
+	struct fec *ctx = ec_init();
+	char *lost = xvalloc(len);
+	struct siocb iocb = { 0 };
+	int i, j, ret = -1;
+
+	/* Prepare replica */
+	oid_to_nodes(oid, &old->vroot, SD_EC_DP, target_nodes);
+	for (i = 0, j = 0; i < SD_EC_DP && j < SD_EC_D; i++) {
+		if (i == idx)
+			continue;
+		bufs[j] = read_object_from(target_nodes[i], oid,
+					   epoch, tgt_epoch, i);
+		if (!bufs[j])
+			continue;
+		idxs[j++] = i;
+	}
+	if (j != SD_EC_D)
+		goto out;
+
+	/* Rebuild the lost replica */
+	for (i = 0; i < SD_EC_NR_STRIPE_PER_OBJECT; i++) {
+		const uint8_t *in[SD_EC_D];
+		uint8_t out[SD_EC_STRIP_SIZE];
+
+		for (j = 0; j < SD_EC_D; j++)
+			in[j] = bufs[j] + SD_EC_STRIP_SIZE * i;
+		ec_decode(ctx, in, idxs, out, idx);
+		memcpy(lost + SD_EC_STRIP_SIZE * i, out, SD_EC_STRIP_SIZE);
+	}
+
+	iocb.epoch = epoch;
+	iocb.length = len;
+	iocb.offset = 0;
+	iocb.buf = lost;
+	iocb.ec_index = idx;
+	ret = sd_store->create_and_write(oid, &iocb);
+out:
+	ec_destroy(ctx);
+	put_vnode_info(old);
+	for (i = 0; i < SD_EC_D; i++)
+		free(bufs[i]);
+	free(lost);
+	return ret;
+}
+
+static uint8_t local_node_copy_index(struct rb_root *vroot, uint64_t oid)
+{
+	const struct sd_node *target_nodes[SD_MAX_NODES];
+	uint8_t idx;
+
+	oid_to_nodes(oid, vroot, SD_EC_DP, target_nodes);
+	for (idx = 0; idx < SD_EC_DP; idx++)
+		if (node_is_local(target_nodes[idx]))
+			return idx;
+	panic("can't get valid index for %"PRIx64, oid);
+}
+
+static int recover_erasure_object(struct recovery_obj_work *row)
+{
+	struct recovery_work *rw = &row->base;
+	struct vnode_info *old, *cur;
+	uint64_t oid = row->oid;
+	uint32_t tgt_epoch = rw->tgt_epoch;
+	uint8_t idx;
+	const struct sd_node *node;
+	int ret;
+
+	cur = grab_vnode_info(rw->cur_vinfo);
+	old = grab_vnode_info(rw->old_vinfo);
+
+	idx = local_node_copy_index(&cur->vroot, oid);
+	node = oid_to_node(oid, &old->vroot, idx);
+
+	sd_debug("%"PRIx64" idx %d, from epoch %"PRIu32, oid, idx, tgt_epoch);
+
+	if (invalid_node(node, cur))
+		ret = rebuild_object_from_replica(row, tgt_epoch, idx);
+	else
+		ret = recover_object_from(row, node, tgt_epoch, idx);
+
+	put_vnode_info(cur);
+	put_vnode_info(old);
+	return ret;
+}
+
+static int do_recover_object(struct recovery_obj_work *row)
+{
+	uint64_t oid = row->oid;
+
+	if (is_erasure_oid(oid))
+		return recover_erasure_object(row);
+	else
+		return recover_replication_object(row);
+}
+
 static void recover_object_work(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work,
@@ -308,17 +448,19 @@ static void recover_object_work(struct work *work)
 	}
 
 	/* find object in the stale directory */
-	for (epoch = sys_epoch() - 1; epoch > 0; epoch--) {
-		ret = sd_store->get_hash(oid, epoch, row->local_sha1);
-		if (ret == SD_RES_SUCCESS) {
-			sd_debug("replica found in local at epoch %d", epoch);
-			row->local_epoch = epoch;
-			break;
+	if (!is_erasure_oid(oid))
+		for (epoch = sys_epoch() - 1; epoch > 0; epoch--) {
+			ret = sd_store->get_hash(oid, epoch, row->local_sha1);
+			if (ret == SD_RES_SUCCESS) {
+				sd_debug("replica found in local at epoch %d",
+					 epoch);
+				row->local_epoch = epoch;
+				break;
+			}
 		}
-	}
 
 	ret = do_recover_object(row);
-	if (ret < 0)
+	if (ret != 0)
 		sd_err("failed to recover object %"PRIx64, oid);
 }
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 9b83883..99bf676 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -216,6 +216,9 @@ struct store_driver {
 	int (*cleanup)(void);
 };
 
+/* backend store */
+int peer_read_obj(struct request *req);
+
 int default_init(void);
 bool default_exist(uint64_t oid);
 int default_create_and_write(uint64_t oid, const struct siocb *iocb);
@@ -234,6 +237,7 @@ int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path,
 					 uint32_t epoch, void *arg),
 			     void *arg);
 int for_each_obj_path(int (*func)(const char *path));
+size_t get_store_objsize(uint64_t oid);
 
 extern struct list_head store_drivers;
 #define add_store_driver(driver)				\
@@ -404,9 +408,6 @@ int gateway_create_and_write_obj(struct request *req);
 int gateway_remove_obj(struct request *req);
 bool is_erasure_oid(uint64_t oid);
 
-/* backend store */
-int peer_read_obj(struct request *req);
-
 /* object_cache */
 
 void object_cache_format(void);
-- 
1.7.9.5




More information about the sheepdog mailing list