[sheepdog] [PATCH v2 5/5] sheep: add basic recovery support for erasure code
Liu Yuan
namei.unix at gmail.com
Sat Oct 12 14:13:35 CEST 2013
With this patch we can support single node event (join/leave).
The recovery strategy for erasure object:
- if object isn't lost, we will read it from targeted live node in its stale dir
- if object is lost, we will read enough replica and rebuild it using RS algorithm
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
sheep/ops.c | 1 +
sheep/plain_store.c | 57 ++++++++++-------
sheep/recovery.c | 170 ++++++++++++++++++++++++++++++++++++++++++++++-----
sheep/sheep_priv.h | 7 ++-
4 files changed, 197 insertions(+), 38 deletions(-)
diff --git a/sheep/ops.c b/sheep/ops.c
index 7e12c5c..d754e9b 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -918,6 +918,7 @@ int peer_read_obj(struct request *req)
iocb.buf = req->data;
iocb.length = hdr->data_length;
iocb.offset = hdr->obj.offset;
+ iocb.ec_index = hdr->obj.ec_index;
ret = sd_store->read(hdr->obj.oid, &iocb);
if (ret != SD_RES_SUCCESS)
goto out;
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index fcccaa4..1d7740f 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -15,6 +15,26 @@
#define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; })
+#define ECNAME "user.ec.index"
+#define ECSIZE sizeof(uint8_t)
+static int set_erasure_index(const char *path, uint8_t idx)
+{
+ if (setxattr(path, ECNAME, &idx, ECSIZE, 0) < 0) {
+ sd_err("failed to setxattr %s, %m", path);
+ return -1;
+ }
+ return 0;
+}
+
+static int get_erasure_index(const char *path, uint8_t *idx)
+{
+ if (getxattr(path, ECNAME, idx, ECSIZE) < 0) {
+ sd_err("failed to getxattr %s, %m", path);
+ return -1;
+ }
+ return 0;
+}
+
static inline bool iocb_is_aligned(const struct siocb *iocb)
{
return sector_algined(iocb->offset) && sector_algined(iocb->length);
@@ -245,6 +265,21 @@ static int default_read_from_path(uint64_t oid, const char *path,
if (fd < 0)
return err_to_sderr(path, oid, errno);
+ if (is_erasure_oid(oid)) {
+ uint8_t idx;
+
+ if (get_erasure_index(path, &idx) < 0) {
+ close(fd);
+ return err_to_sderr(path, oid, errno);
+ }
+ /* We pretend NO-OBJ to read old object in the stale dir */
+ if (idx != iocb->ec_index) {
+ sd_debug("ec_index %d != %d", iocb->ec_index, idx);
+ close(fd);
+ return SD_RES_NO_OBJ;
+ }
+ }
+
size = xpread(fd, iocb->buf, iocb->length, iocb->offset);
if (unlikely(size != iocb->length)) {
sd_err("failed to read object %"PRIx64", path=%s, offset=%"
@@ -293,33 +328,13 @@ int prealloc(int fd, uint32_t size)
return 0;
}
-static size_t get_store_objsize(uint64_t oid)
+size_t get_store_objsize(uint64_t oid)
{
if (is_erasure_oid(oid))
return SD_EC_OBJECT_SIZE;
return get_objsize(oid);
}
-#define ECNAME "user.ec.index"
-#define ECSIZE sizeof(uint8_t)
-static int set_erasure_index(const char *path, uint8_t idx)
-{
- if (setxattr(path, ECNAME, &idx, ECSIZE, 0) < 0) {
- sd_err("failed to setxattr %s, %m", path);
- return -1;
- }
- return 0;
-}
-
-static int get_erasure_index(const char *path, uint8_t *idx)
-{
- if (getxattr(path, ECNAME, idx, ECSIZE) < 0) {
- sd_err("failed to getxattr %s, %m", path);
- return -1;
- }
- return 0;
-}
-
int default_create_and_write(uint64_t oid, const struct siocb *iocb)
{
char path[PATH_MAX], tmp_path[PATH_MAX];
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 0df3a5a..7e16bba 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -93,9 +93,42 @@ static inline bool node_is_gateway_only(void)
return sys->this_node.nr_vnodes == 0;
}
-/* recover object from vnode */
+static void *read_object_from(const struct sd_node *node, uint64_t oid,
+ uint32_t epoch, uint32_t tgt_epoch, uint8_t idx)
+{
+ struct sd_req hdr;
+ unsigned rlen = get_store_objsize(oid);
+ void *buf = xvalloc(rlen);
+ int ret;
+
+ sd_init_req(&hdr, SD_OP_READ_PEER);
+ hdr.epoch = epoch;
+ hdr.flags = SD_FLAG_CMD_RECOVERY;
+ hdr.data_length = rlen;
+ hdr.obj.oid = oid;
+ hdr.obj.tgt_epoch = tgt_epoch;
+ hdr.obj.ec_index = idx;
+
+ sd_debug("%s, epoch %"PRIu32" tgt %"PRIu32" idx %d", node_to_str(node),
+ epoch, tgt_epoch, idx);
+ ret = sheep_exec_req(&node->nid, &hdr, buf);
+ if (ret != SD_RES_SUCCESS) {
+ free(buf);
+ return NULL;
+ }
+ return buf;
+}
+
+/*
+ * Read object from targeted node and store it in the local node.
+ *
+ * tgt_epoch: the specific epoch that the object has stayed
+ * idx: erasure index. For non-erasure object, pass 0.
+ */
static int recover_object_from(struct recovery_obj_work *row,
- const struct sd_node *node, uint32_t tgt_epoch)
+ const struct sd_node *node,
+ uint32_t tgt_epoch,
+ uint8_t idx)
{
uint64_t oid = row->oid;
uint32_t local_epoch = row->local_epoch;
@@ -116,7 +149,7 @@ static int recover_object_from(struct recovery_obj_work *row,
}
/* compare sha1 hash value first */
- if (local_epoch > 0) {
+ if (!is_erasure_oid(oid) && local_epoch > 0) {
sd_init_req(&hdr, SD_OP_GET_HASH);
hdr.obj.oid = oid;
hdr.obj.tgt_epoch = tgt_epoch;
@@ -133,7 +166,7 @@ static int recover_object_from(struct recovery_obj_work *row,
}
}
- rlen = get_objsize(oid);
+ rlen = get_store_objsize(oid);
buf = xvalloc(rlen);
/* recover from remote replica */
@@ -143,6 +176,7 @@ static int recover_object_from(struct recovery_obj_work *row,
hdr.data_length = rlen;
hdr.obj.oid = oid;
hdr.obj.tgt_epoch = tgt_epoch;
+ hdr.obj.ec_index = idx;
ret = sheep_exec_req(&node->nid, &hdr, buf);
if (ret == SD_RES_SUCCESS) {
@@ -150,6 +184,7 @@ static int recover_object_from(struct recovery_obj_work *row,
iocb.length = rsp->data_length;
iocb.offset = rsp->obj.offset;
iocb.buf = buf;
+ iocb.ec_index = idx;
ret = sd_store->create_and_write(oid, &iocb);
}
@@ -202,7 +237,7 @@ static int recover_object_from_replica(struct recovery_obj_work *row,
if (invalid_node(node, row->base.cur_vinfo))
continue;
- ret = recover_object_from(row, node, tgt_epoch);
+ ret = recover_object_from(row, node, tgt_epoch, 0);
switch (ret) {
case SD_RES_SUCCESS:
sd_debug("recovered oid %"PRIx64" from %d to epoch %d",
@@ -237,7 +272,7 @@ static int recover_object_from_replica(struct recovery_obj_work *row,
* the routine will try to recovery it from the nodes it has stayed,
* at least, *theoretically* on consistent hash ring.
*/
-static int do_recover_object(struct recovery_obj_work *row)
+static int recover_replication_object(struct recovery_obj_work *row)
{
struct recovery_work *rw = &row->base;
struct vnode_info *old;
@@ -292,6 +327,111 @@ rollback:
return ret;
}
+static int rebuild_object_from_replica(struct recovery_obj_work *row,
+ uint32_t tgt_epoch,
+ const uint8_t idx)
+{
+ struct vnode_info *old = grab_vnode_info(row->base.old_vinfo);
+ const struct sd_node *target_nodes[SD_MAX_NODES];
+ uint8_t *bufs[SD_EC_D] = { 0 };
+ uint64_t oid = row->oid;
+ uint32_t epoch = row->base.epoch;
+ int idxs[SD_EC_D], len = get_store_objsize(oid);
+ struct fec *ctx = ec_init();
+ char *lost = xvalloc(len);
+ struct siocb iocb = { 0 };
+ int i, j, ret = -1;
+
+ /* Prepare replica */
+ oid_to_nodes(oid, &old->vroot, SD_EC_DP, target_nodes);
+ for (i = 0, j = 0; i < SD_EC_DP && j < SD_EC_D; i++) {
+ if (i == idx)
+ continue;
+ bufs[j] = read_object_from(target_nodes[i], oid,
+ epoch, tgt_epoch, i);
+ if (!bufs[j])
+ continue;
+ idxs[j++] = i;
+ }
+ if (j != SD_EC_D)
+ goto out;
+
+ /* Rebuild the lost replica */
+ for (i = 0; i < SD_EC_NR_STRIPE_PER_OBJECT; i++) {
+ const uint8_t *in[SD_EC_D];
+ uint8_t out[SD_EC_STRIP_SIZE];
+
+ for (j = 0; j < SD_EC_D; j++)
+ in[j] = bufs[j] + SD_EC_STRIP_SIZE * i;
+ ec_decode(ctx, in, idxs, out, idx);
+ memcpy(lost + SD_EC_STRIP_SIZE * i, out, SD_EC_STRIP_SIZE);
+ }
+
+ iocb.epoch = epoch;
+ iocb.length = len;
+ iocb.offset = 0;
+ iocb.buf = lost;
+ iocb.ec_index = idx;
+ ret = sd_store->create_and_write(oid, &iocb);
+out:
+ ec_destroy(ctx);
+ put_vnode_info(old);
+ for (i = 0; i < SD_EC_D; i++)
+ free(bufs[i]);
+ free(lost);
+ return ret;
+}
+
+static uint8_t local_node_copy_index(struct rb_root *vroot, uint64_t oid)
+{
+ const struct sd_node *target_nodes[SD_MAX_NODES];
+ uint8_t idx;
+
+ oid_to_nodes(oid, vroot, SD_EC_DP, target_nodes);
+ for (idx = 0; idx < SD_EC_DP; idx++)
+ if (node_is_local(target_nodes[idx]))
+ return idx;
+ panic("can't get valid index for %"PRIx64, oid);
+}
+
+static int recover_erasure_object(struct recovery_obj_work *row)
+{
+ struct recovery_work *rw = &row->base;
+ struct vnode_info *old, *cur;
+ uint64_t oid = row->oid;
+ uint32_t tgt_epoch = rw->tgt_epoch;
+ uint8_t idx;
+ const struct sd_node *node;
+ int ret;
+
+ cur = grab_vnode_info(rw->cur_vinfo);
+ old = grab_vnode_info(rw->old_vinfo);
+
+ idx = local_node_copy_index(&cur->vroot, oid);
+ node = oid_to_node(oid, &old->vroot, idx);
+
+ sd_debug("%"PRIx64" idx %d, from epoch %"PRIu32, oid, idx, tgt_epoch);
+
+ if (invalid_node(node, cur))
+ ret = rebuild_object_from_replica(row, tgt_epoch, idx);
+ else
+ ret = recover_object_from(row, node, tgt_epoch, idx);
+
+ put_vnode_info(cur);
+ put_vnode_info(old);
+ return ret;
+}
+
+static int do_recover_object(struct recovery_obj_work *row)
+{
+ uint64_t oid = row->oid;
+
+ if (is_erasure_oid(oid))
+ return recover_erasure_object(row);
+ else
+ return recover_replication_object(row);
+}
+
static void recover_object_work(struct work *work)
{
struct recovery_work *rw = container_of(work, struct recovery_work,
@@ -308,17 +448,19 @@ static void recover_object_work(struct work *work)
}
/* find object in the stale directory */
- for (epoch = sys_epoch() - 1; epoch > 0; epoch--) {
- ret = sd_store->get_hash(oid, epoch, row->local_sha1);
- if (ret == SD_RES_SUCCESS) {
- sd_debug("replica found in local at epoch %d", epoch);
- row->local_epoch = epoch;
- break;
+ if (!is_erasure_oid(oid))
+ for (epoch = sys_epoch() - 1; epoch > 0; epoch--) {
+ ret = sd_store->get_hash(oid, epoch, row->local_sha1);
+ if (ret == SD_RES_SUCCESS) {
+ sd_debug("replica found in local at epoch %d",
+ epoch);
+ row->local_epoch = epoch;
+ break;
+ }
}
- }
ret = do_recover_object(row);
- if (ret < 0)
+ if (ret != 0)
sd_err("failed to recover object %"PRIx64, oid);
}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 9b83883..99bf676 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -216,6 +216,9 @@ struct store_driver {
int (*cleanup)(void);
};
+/* backend store */
+int peer_read_obj(struct request *req);
+
int default_init(void);
bool default_exist(uint64_t oid);
int default_create_and_write(uint64_t oid, const struct siocb *iocb);
@@ -234,6 +237,7 @@ int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path,
uint32_t epoch, void *arg),
void *arg);
int for_each_obj_path(int (*func)(const char *path));
+size_t get_store_objsize(uint64_t oid);
extern struct list_head store_drivers;
#define add_store_driver(driver) \
@@ -404,9 +408,6 @@ int gateway_create_and_write_obj(struct request *req);
int gateway_remove_obj(struct request *req);
bool is_erasure_oid(uint64_t oid);
-/* backend store */
-int peer_read_obj(struct request *req);
-
/* object_cache */
void object_cache_format(void);
--
1.7.9.5
More information about the sheepdog
mailing list