[sheepdog] [PATCH] sheep: split create_vdi_obj() into 4 functions
MORITA Kazutaka
morita.kazutaka at gmail.com
Thu Aug 22 09:53:48 CEST 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
create_vdi_obj() is very complex because it contains four kinds of
operations:
- create fresh vdi
- clone vdi
- snapshot vdi
- rebase vdi (rollback)
Splitting them apart make the code much easier to read.
This patch is just a cleanup and doesn't change its logic at all.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/ops.c | 10 +-
sheep/sheep_priv.h | 4 +-
sheep/vdi.c | 356 ++++++++++++++++++++++++++++++++++++++--------------
3 files changed, 277 insertions(+), 93 deletions(-)
diff --git a/sheep/ops.c b/sheep/ops.c
index 4c0f975..76ff611 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -67,6 +67,10 @@ static int cluster_new_vdi(struct request *req)
struct sd_rsp *rsp = &req->rp;
uint32_t vid;
int ret;
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+
struct vdi_iocb iocb = {
.name = req->data,
.data_len = hdr->data_length,
@@ -75,12 +79,16 @@ static int cluster_new_vdi(struct request *req)
.create_snapshot = !!hdr->vdi.snapid,
.nr_copies = hdr->vdi.copies ? hdr->vdi.copies :
sys->cinfo.nr_copies,
+ .time = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000,
};
if (hdr->data_length != SD_MAX_VDI_LEN)
return SD_RES_INVALID_PARMS;
- ret = vdi_create(&iocb, &vid);
+ if (iocb.create_snapshot)
+ ret = sd_snapshot_create(&iocb, &vid);
+ else
+ ret = sd_vdi_create(&iocb, &vid);
rsp->vdi.vdi_id = vid;
rsp->vdi.copies = iocb.nr_copies;
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 35c067e..b78afc8 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -171,6 +171,7 @@ struct vdi_iocb {
uint32_t snapid;
bool create_snapshot;
int nr_copies;
+ uint64_t time;
};
struct vdi_info {
@@ -276,7 +277,8 @@ int get_max_copy_number(void);
int get_req_copy_number(struct request *req);
int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot);
int vdi_exist(uint32_t vid);
-int vdi_create(struct vdi_iocb *iocb, uint32_t *new_vid);
+int sd_vdi_create(struct vdi_iocb *iocb, uint32_t *new_vid);
+int sd_snapshot_create(struct vdi_iocb *iocb, uint32_t *new_vid);
int vdi_delete(struct vdi_iocb *iocb, struct request *req);
int vdi_lookup(struct vdi_iocb *iocb, struct vdi_info *info);
void clean_vdi_state(void);
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 4a180cb..c6fc3d6 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -187,109 +187,236 @@ out:
return ret;
}
-/* TODO: should be performed atomically */
-static int create_vdi_obj(struct vdi_iocb *iocb, uint32_t new_vid,
- uint32_t cur_vid)
+static struct sd_inode *alloc_inode(struct vdi_iocb *iocb, uint32_t new_vid,
+ uint32_t *data_vdi_id)
{
- /* we are not called concurrently */
- struct sd_inode *new = NULL, *base = NULL, *cur = NULL;
- struct timeval tv;
- int ret = SD_RES_NO_MEM;
+ struct sd_inode *new = xzalloc(sizeof(*new));
unsigned long block_size = SD_DATA_OBJ_SIZE;
- const char *name = iocb->name;
- new = xzalloc(sizeof(*new));
- if (iocb->base_vid)
- base = xzalloc(sizeof(*base));
+ pstrcpy(new->name, sizeof(new->name), iocb->name);
+ new->vdi_id = new_vid;
+ new->create_time = iocb->time;
+ new->vdi_size = iocb->size;
+ new->copy_policy = 0;
+ new->nr_copies = iocb->nr_copies;
+ new->block_size_shift = find_next_bit(&block_size, BITS_PER_LONG, 0);
+ new->snap_id = iocb->snapid;
+ new->parent_vdi_id = iocb->base_vid;
+ if (data_vdi_id)
+ memcpy(new->data_vdi_id, data_vdi_id, sizeof(new->data_vdi_id));
- if (iocb->create_snapshot && cur_vid != iocb->base_vid)
- cur = xzalloc(SD_INODE_HEADER_SIZE);
+ return new;
+}
- if (iocb->base_vid) {
- ret = read_object(vid_to_vdi_oid(iocb->base_vid), (char *)base,
- sizeof(*base), 0);
- if (ret != SD_RES_SUCCESS) {
- ret = SD_RES_BASE_VDI_READ;
- goto out;
- }
+/* Find the first zeroed index to be used for a child vid. */
+static int find_free_idx(uint32_t *vdi_id, size_t max_idx)
+{
+ for (int i = 0; i < max_idx; i++) {
+ if (vdi_id[i] == 0)
+ return i;
}
- gettimeofday(&tv, NULL);
+ return -1;
+}
- if (iocb->create_snapshot) {
- if (cur_vid != iocb->base_vid) {
- sd_info("tree snapshot %s %" PRIx32 " %" PRIx32, name,
- cur_vid, iocb->base_vid);
+/* Create a fresh vdi */
+static main_fn int create_vdi(struct vdi_iocb *iocb, uint32_t new_vid)
+{
+ struct sd_inode *new = alloc_inode(iocb, new_vid, NULL);;
+ int ret;
- ret = read_object(vid_to_vdi_oid(cur_vid), (char *)cur,
- SD_INODE_HEADER_SIZE, 0);
- if (ret != SD_RES_SUCCESS) {
- sd_err("failed");
- ret = SD_RES_BASE_VDI_READ;
- goto out;
- }
+ ret = write_object(vid_to_vdi_oid(new_vid), (char *)new, sizeof(*new),
+ 0, true);
+ if (ret != SD_RES_SUCCESS)
+ ret = SD_RES_VDI_WRITE;
+
+ free(new);
+ return ret;
+}
+
+/*
+ * Create a clone vdi from the existing snapshot
+ *
+ * This creates a working vdi 'new' based on the snapshot 'base'. For example:
+ *
+ * [before]
+ * base
+ * o----o----o----x
+ *
+ * [after]
+ * base
+ * o----o----o----x
+ * \
+ * x new
+ * x: working vdi
+ * o: snapshot vdi
+ */
+static main_fn int clone_vdi(struct vdi_iocb *iocb, uint32_t new_vid)
+{
+ struct sd_inode *new = NULL, *base = xzalloc(sizeof(*base));
+ int ret, idx;
- cur->snap_ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
- } else
- base->snap_ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+ ret = read_object(vid_to_vdi_oid(iocb->base_vid), (char *)base,
+ sizeof(*base), 0);
+ if (ret != SD_RES_SUCCESS) {
+ ret = SD_RES_BASE_VDI_READ;
+ goto out;
}
- pstrcpy(new->name, sizeof(new->name), name);
- new->vdi_id = new_vid;
- new->create_time = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
- new->vdi_size = iocb->size;
- new->copy_policy = 0;
- new->nr_copies = iocb->nr_copies;
- new->block_size_shift = find_next_bit(&block_size, BITS_PER_LONG, 0);
- new->snap_id = iocb->snapid;
+ idx = find_free_idx(base->child_vdi_id, sizeof(base->child_vdi_id));
+ if (idx < 0) {
+ ret = SD_RES_NO_BASE_VDI;
+ goto out;
+ }
- if (iocb->base_vid) {
- int i;
+ /* TODO: multiple write_object should be performed atomically */
+ /* update a base vdi */
+ ret = write_object(vid_to_vdi_oid(iocb->base_vid), (char *)&new_vid,
+ sizeof(new_vid),
+ offsetof(struct sd_inode, child_vdi_id[idx]), false);
+ if (ret != SD_RES_SUCCESS) {
+ ret = SD_RES_BASE_VDI_WRITE;
+ goto out;
+ }
- new->parent_vdi_id = iocb->base_vid;
- memcpy(new->data_vdi_id, base->data_vdi_id, sizeof(new->data_vdi_id));
+ /* create a new vdi */
+ new = alloc_inode(iocb, new_vid, base->data_vdi_id);
+ ret = write_object(vid_to_vdi_oid(new_vid), (char *)new, sizeof(*new),
+ 0, true);
+ if (ret != SD_RES_SUCCESS)
+ ret = SD_RES_VDI_WRITE;
- for (i = 0; i < ARRAY_SIZE(base->child_vdi_id); i++) {
- if (!base->child_vdi_id[i]) {
- base->child_vdi_id[i] = new_vid;
- break;
- }
- }
+out:
+ free(new);
+ free(base);
+ return ret;
+}
- if (i == ARRAY_SIZE(base->child_vdi_id)) {
- ret = SD_RES_NO_BASE_VDI;
- goto out;
- }
+/*
+ * Create a snapshot vdi
+ *
+ * This makes the current working vdi 'base' a snapshot, and create a working
+ * vdi 'new'. For example:
+ *
+ * [before]
+ * o----o----o----x base
+ *
+ * [after]
+ * base
+ * o----o----o----o----x new
+ *
+ * x: working vdi
+ * o: snapshot vdi
+ */
+static main_fn int snapshot_vdi(struct vdi_iocb *iocb, uint32_t new_vid)
+{
+ struct sd_inode *new = NULL, *base = xzalloc(sizeof(*base));
+ int ret, idx;
+
+ ret = read_object(vid_to_vdi_oid(iocb->base_vid), (char *)base,
+ sizeof(*base), 0);
+ if (ret != SD_RES_SUCCESS) {
+ ret = SD_RES_BASE_VDI_READ;
+ goto out;
}
- if (iocb->create_snapshot && cur_vid != iocb->base_vid) {
- ret = write_object(vid_to_vdi_oid(cur_vid), (char *)cur,
- SD_INODE_HEADER_SIZE, 0, false);
- if (ret != 0) {
- sd_err("failed");
- ret = SD_RES_BASE_VDI_READ;
- goto out;
- }
+ idx = find_free_idx(base->child_vdi_id, sizeof(base->child_vdi_id));
+ if (idx < 0) {
+ ret = SD_RES_NO_BASE_VDI;
+ goto out;
}
- if (iocb->base_vid) {
- ret = write_object(vid_to_vdi_oid(iocb->base_vid), (char *)base,
- SD_INODE_HEADER_SIZE, 0, false);
- if (ret != 0) {
- sd_err("failed");
- ret = SD_RES_BASE_VDI_WRITE;
- goto out;
- }
+ /* TODO: multiple write_object should be performed atomically */
+ /* update a base vdi */
+ base->snap_ctime = iocb->time;
+ base->child_vdi_id[idx] = new_vid;
+ ret = write_object(vid_to_vdi_oid(iocb->base_vid), (char *)base,
+ SD_INODE_HEADER_SIZE, 0, false);
+ if (ret != SD_RES_SUCCESS) {
+ ret = SD_RES_BASE_VDI_WRITE;
+ goto out;
}
+ /* create a new vdi */
+ new = alloc_inode(iocb, new_vid, base->data_vdi_id);
ret = write_object(vid_to_vdi_oid(new_vid), (char *)new, sizeof(*new),
0, true);
- if (ret != 0)
+ if (ret != SD_RES_SUCCESS)
+ ret = SD_RES_VDI_WRITE;
+
+out:
+ free(new);
+ free(base);
+ return ret;
+}
+
+/*
+ * Rebase onto another snapshot vdi
+ *
+ * This makes the current working vdi 'base' a snapshot, and create a new
+ * working vdi 'new' based on the snapshot 'base'. We use this operation when
+ * rollbacking to the snapshot or writing data to the snapshot. Here is an
+ * example:
+ *
+ * [before]
+ * base
+ * o----o----o----x cur
+ *
+ * [after]
+ * base
+ * o----o----o----o cur
+ * \
+ * x new
+ * x: working vdi
+ * o: snapshot vdi
+ */
+static main_fn int rebase_vdi(struct vdi_iocb *iocb, uint32_t new_vid,
+ uint32_t cur_vid)
+{
+ struct sd_inode *new = NULL, *base = xzalloc(sizeof(*base));
+ int ret, idx;
+
+ ret = read_object(vid_to_vdi_oid(iocb->base_vid), (char *)base,
+ sizeof(*base), 0);
+ if (ret != SD_RES_SUCCESS) {
+ ret = SD_RES_BASE_VDI_READ;
+ goto out;
+ }
+
+ idx = find_free_idx(base->child_vdi_id, sizeof(base->child_vdi_id));
+ if (idx < 0) {
+ ret = SD_RES_NO_BASE_VDI;
+ goto out;
+ }
+
+ /* TODO: multiple write_object should be performed atomically */
+ /* update current working vdi */
+ ret = write_object(vid_to_vdi_oid(cur_vid), (char *)&iocb->time,
+ sizeof(iocb->time),
+ offsetof(struct sd_inode, snap_ctime), false);
+ if (ret != SD_RES_SUCCESS) {
+ ret = SD_RES_BASE_VDI_READ;
+ goto out;
+ }
+
+ /* update base vdi */
+ ret = write_object(vid_to_vdi_oid(iocb->base_vid), (char *)&new_vid,
+ sizeof(new_vid),
+ offsetof(struct sd_inode, child_vdi_id[idx]), false);
+ if (ret != SD_RES_SUCCESS) {
+ ret = SD_RES_BASE_VDI_WRITE;
+ goto out;
+ }
+
+ /* create a new vdi */
+ new = alloc_inode(iocb, new_vid, base->data_vdi_id);
+ ret = write_object(vid_to_vdi_oid(new_vid), (char *)new, sizeof(*new),
+ 0, true);
+ if (ret != SD_RES_SUCCESS)
ret = SD_RES_VDI_WRITE;
out:
free(new);
- free(cur);
free(base);
return ret;
}
@@ -476,15 +603,18 @@ static void vdi_flush(uint32_t vid)
}
/*
- * There are 3 create operation in SD:
- * 1. fresh create (expect NO_VDI returned from vdi_lookup)
- * 2. snapshot create (expect SUCCESS)
- * 3. rollback create (expect NO_VDI)
+ * This function creates another working vdi with a new name. The parent of the
+ * newly created vdi is iocb->base_vid.
+ *
+ * There are 2 vdi create operation in SD:
+ * 1. fresh create (base_vid == 0)
+ * 2. clone create (base_vid != 0)
*
- * Fresh create started with id = 1. Both rollback & snap create started with
- * current working VDI's snap_id + 1. Working VDI always has the highest snapid.
+ * This function expects NO_VDI returned from vdi_lookup(). Fresh create
+ * started with id = 1 when there are no snapshot with the same name. Working
+ * VDI always has the highest snapid.
*/
-int vdi_create(struct vdi_iocb *iocb, uint32_t *new_vid)
+int sd_vdi_create(struct vdi_iocb *iocb, uint32_t *new_vid)
{
const char *name = iocb->name;
struct vdi_info info = {};
@@ -493,20 +623,15 @@ int vdi_create(struct vdi_iocb *iocb, uint32_t *new_vid)
ret = vdi_lookup(iocb, &info);
switch (ret) {
case SD_RES_SUCCESS:
- if (!iocb->create_snapshot)
- return SD_RES_VDI_EXIST;
- if (sys->enable_object_cache)
- vdi_flush(iocb->base_vid);
- break;
+ return SD_RES_VDI_EXIST;
case SD_RES_NO_VDI:
- if (iocb->create_snapshot)
- return ret;
break;
default:
sd_err("%s", sd_strerror(ret));
return ret;
}
- if (!iocb->snapid)
+
+ if (iocb->snapid == 0)
iocb->snapid = 1;
*new_vid = info.free_bit;
ret = notify_vdi_add(*new_vid, iocb->nr_copies, info.vid);
@@ -519,7 +644,56 @@ int vdi_create(struct vdi_iocb *iocb, uint32_t *new_vid)
*new_vid, iocb->base_vid, info.vid, iocb->nr_copies,
iocb->snapid);
- return create_vdi_obj(iocb, *new_vid, info.vid);
+ if (iocb->base_vid == 0)
+ return create_vdi(iocb, *new_vid);
+ else
+ return clone_vdi(iocb, *new_vid);
+}
+
+/*
+ * This function makes the current working vdi a snapshot, and create a new
+ * working vdi with the same name. The parent of the newly created vdi is
+ * iocb->base_vid.
+ *
+ * There are 2 snapshot create operation in SD:
+ * 1. snapshot create (base_vid == current_vid)
+ * 2. rollback create (base_vid != current_vid)
+ *
+ * This function expects SUCCESS returned from vdi_lookup(). Both rollback and
+ * snap create started with current working VDI's snap_id + 1. Working VDI
+ * always has the highest snapid.
+ */
+int sd_snapshot_create(struct vdi_iocb *iocb, uint32_t *new_vid)
+{
+ const char *name = iocb->name;
+ struct vdi_info info = {};
+ int ret;
+
+ ret = vdi_lookup(iocb, &info);
+ if (ret == SD_RES_SUCCESS) {
+ if (sys->enable_object_cache)
+ vdi_flush(iocb->base_vid);
+ } else {
+ sd_err("%s", sd_strerror(ret));
+ return ret;
+ }
+
+ assert(iocb->snapid > 0);
+ *new_vid = info.free_bit;
+ ret = notify_vdi_add(*new_vid, iocb->nr_copies, info.vid);
+ if (ret != SD_RES_SUCCESS)
+ return ret;
+
+ sd_debug("%s %s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32
+ ", cur %" PRIx32 ", copies %d, snapid %" PRIu32,
+ iocb->create_snapshot ? "snapshot" : "vdi", name, iocb->size,
+ *new_vid, iocb->base_vid, info.vid, iocb->nr_copies,
+ iocb->snapid);
+
+ if (iocb->base_vid == info.vid)
+ return snapshot_vdi(iocb, *new_vid);
+ else
+ return rebase_vdi(iocb, *new_vid, info.vid);
}
static int start_deletion(struct request *req, uint32_t vid);
--
1.7.9.5
More information about the sheepdog
mailing list