[sheepdog] [PATCH stable-0.7 5/9] sheep: split create_vdi_obj() into 4 functions

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Fri Jan 31 06:46:40 CET 2014


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

create_vdi_obj() is very complex because it contains four kinds of
operations:

 - create fresh vdi
 - clone vdi
 - snapshot vdi
 - rebase vdi (rollback)

Splitting them apart makes the code much easier to read.

This patch is just a cleanup and doesn't change its logic at all.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
 sheep/ops.c        |   10 +-
 sheep/sheep_priv.h |    2 +
 sheep/vdi.c        |  376 +++++++++++++++++++++++++++++++++++++++-------------
 3 files changed, 292 insertions(+), 96 deletions(-)

diff --git a/sheep/ops.c b/sheep/ops.c
index 77a5314..75b07f6 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -67,6 +67,10 @@ static int cluster_new_vdi(struct request *req)
 	struct sd_rsp *rsp = &req->rp;
 	uint32_t vid;
 	int ret;
+	struct timeval tv;
+
+	gettimeofday(&tv, NULL);
+
 	struct vdi_iocb iocb = {
 		.name = req->data,
 		.data_len = hdr->data_length,
@@ -75,12 +79,16 @@ static int cluster_new_vdi(struct request *req)
 		.create_snapshot = !!hdr->vdi.snapid,
 		.nr_copies = hdr->vdi.copies ? hdr->vdi.copies :
 				sys->cinfo.nr_copies,
+		.time = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000,
 	};
 
 	if (hdr->data_length != SD_MAX_VDI_LEN)
 		return SD_RES_INVALID_PARMS;
 
-	ret = vdi_create(&iocb, &vid);
+	if (iocb.create_snapshot)
+		ret = vdi_snapshot(&iocb, &vid);
+	else
+		ret = vdi_create(&iocb, &vid);
 
 	rsp->vdi.vdi_id = vid;
 	rsp->vdi.copies = iocb.nr_copies;
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 463baf9..c13725d 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -169,6 +169,7 @@ struct vdi_iocb {
 	uint32_t snapid;
 	bool create_snapshot;
 	int nr_copies;
+	uint64_t time;
 };
 
 /* This structure is used to get information from sheepdog. */
@@ -277,6 +278,7 @@ int get_req_copy_number(struct request *req);
 int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot);
 int vdi_exist(uint32_t vid);
 int vdi_create(const struct vdi_iocb *iocb, uint32_t *new_vid);
+int vdi_snapshot(const struct vdi_iocb *iocb, uint32_t *new_vid);
 int vdi_delete(const struct vdi_iocb *iocb, struct request *req);
 int vdi_lookup(const struct vdi_iocb *iocb, struct vdi_info *info);
 void clean_vdi_state(void);
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 3f6328b..33dcabc 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -214,109 +214,260 @@ out:
 	return ret;
 }
 
-/* TODO: should be performed atomically */
-static int create_vdi_obj(const struct vdi_iocb *iocb, uint32_t new_snapid,
-			  uint32_t new_vid, uint32_t cur_vid)
+static struct sd_inode *alloc_inode(const struct vdi_iocb *iocb,
+				    uint32_t new_snapid, uint32_t new_vid,
+				    uint32_t *data_vdi_id)
 {
-	/* we are not called concurrently */
-	struct sd_inode *new = NULL, *base = NULL, *cur = NULL;
-	struct timeval tv;
-	int ret = SD_RES_NO_MEM;
+	struct sd_inode *new = xzalloc(sizeof(*new));
 	unsigned long block_size = SD_DATA_OBJ_SIZE;
-	const char *name = iocb->name;
 
-	new = xzalloc(sizeof(*new));
-	if (iocb->base_vid)
-		base = xzalloc(sizeof(*base));
+	pstrcpy(new->name, sizeof(new->name), iocb->name);
+	new->vdi_id = new_vid;
+	new->create_time = iocb->time;
+	new->vdi_size = iocb->size;
+	new->copy_policy = 0;
+	new->nr_copies = iocb->nr_copies;
+	new->block_size_shift = find_next_bit(&block_size, BITS_PER_LONG, 0);
+	new->snap_id = new_snapid;
+	new->parent_vdi_id = iocb->base_vid;
+	if (data_vdi_id)
+		memcpy(new->data_vdi_id, data_vdi_id, sizeof(new->data_vdi_id));
 
-	if (iocb->create_snapshot && cur_vid != iocb->base_vid)
-		cur = xzalloc(SD_INODE_HEADER_SIZE);
+	return new;
+}
 
-	if (iocb->base_vid) {
-		ret = read_object(vid_to_vdi_oid(iocb->base_vid), (char *)base,
-				  sizeof(*base), 0);
-		if (ret != SD_RES_SUCCESS) {
-			ret = SD_RES_BASE_VDI_READ;
-			goto out;
-		}
+/* Find the first zeroed index to be used for a child vid. */
+static int find_free_idx(uint32_t *vdi_id, size_t max_idx)
+{
+	for (int i = 0; i < max_idx; i++) {
+		if (vdi_id[i] == 0)
+			return i;
 	}
 
-	gettimeofday(&tv, NULL);
+	return -1;
+}
 
-	if (iocb->create_snapshot) {
-		if (cur_vid != iocb->base_vid) {
-			sd_info("tree snapshot %s %" PRIx32 " %" PRIx32, name,
-				cur_vid, iocb->base_vid);
+/* Create a fresh vdi */
+static int create_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
+		      uint32_t new_vid)
+{
+	struct sd_inode *new = alloc_inode(iocb, new_snapid, new_vid, NULL);
+	int ret;
 
-			ret = read_object(vid_to_vdi_oid(cur_vid), (char *)cur,
-					  SD_INODE_HEADER_SIZE, 0);
-			if (ret != SD_RES_SUCCESS) {
-				sd_err("failed");
-				ret = SD_RES_BASE_VDI_READ;
-				goto out;
-			}
+	sd_debug("%s: size %" PRIu64 ", new_vid %" PRIx32 ", copies %d, "
+		 "snapid %" PRIu32, iocb->name, iocb->size, new_vid,
+		 iocb->nr_copies, new_snapid);
+
+	ret = write_object(vid_to_vdi_oid(new_vid), (char *)new, sizeof(*new),
+			   0, true);
+	if (ret != SD_RES_SUCCESS)
+		ret = SD_RES_VDI_WRITE;
+
+	free(new);
+	return ret;
+}
+
+/*
+ * Create a clone vdi from the existing snapshot
+ *
+ * This creates a working vdi 'new' based on the snapshot 'base'.  For example:
+ *
+ * [before]
+ *                base
+ *            o----o----o----x
+ *
+ * [after]
+ *                base
+ *            o----o----o----x
+ *                  \
+ *                   x new
+ * x: working vdi
+ * o: snapshot vdi
+ */
+static int clone_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
+		     uint32_t new_vid, uint32_t base_vid)
+{
+	struct sd_inode *new = NULL, *base = xzalloc(sizeof(*base));
+	int ret, idx;
 
-			cur->snap_ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
-		} else
-			base->snap_ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+	sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
+		 "copies %d, snapid %" PRIu32, iocb->name, iocb->size, new_vid,
+		 base_vid, iocb->nr_copies, new_snapid);
+
+	ret = read_object(vid_to_vdi_oid(base_vid), (char *)base, sizeof(*base),
+			  0);
+	if (ret != SD_RES_SUCCESS) {
+		ret = SD_RES_BASE_VDI_READ;
+		goto out;
 	}
 
-	pstrcpy(new->name, sizeof(new->name), name);
-	new->vdi_id = new_vid;
-	new->create_time = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
-	new->vdi_size = iocb->size;
-	new->copy_policy = 0;
-	new->nr_copies = iocb->nr_copies;
-	new->block_size_shift = find_next_bit(&block_size, BITS_PER_LONG, 0);
-	new->snap_id = new_snapid;
+	idx = find_free_idx(base->child_vdi_id, sizeof(base->child_vdi_id));
+	if (idx < 0) {
+		ret = SD_RES_NO_BASE_VDI;
+		goto out;
+	}
 
-	if (iocb->base_vid) {
-		int i;
+	/* TODO: multiple write_object should be performed atomically */
 
-		new->parent_vdi_id = iocb->base_vid;
-		memcpy(new->data_vdi_id, base->data_vdi_id, sizeof(new->data_vdi_id));
+	/* update a base vdi */
+	ret = write_object(vid_to_vdi_oid(base_vid), (char *)&new_vid,
+			   sizeof(new_vid),
+			   offsetof(struct sd_inode, child_vdi_id[idx]), false);
+	if (ret != SD_RES_SUCCESS) {
+		ret = SD_RES_BASE_VDI_WRITE;
+		goto out;
+	}
 
-		for (i = 0; i < ARRAY_SIZE(base->child_vdi_id); i++) {
-			if (!base->child_vdi_id[i]) {
-				base->child_vdi_id[i] = new_vid;
-				break;
-			}
-		}
+	/* create a new vdi */
+	new = alloc_inode(iocb, new_snapid, new_vid, base->data_vdi_id);
+	ret = write_object(vid_to_vdi_oid(new_vid), (char *)new, sizeof(*new),
+			   0, true);
+	if (ret != SD_RES_SUCCESS)
+		ret = SD_RES_VDI_WRITE;
 
-		if (i == ARRAY_SIZE(base->child_vdi_id)) {
-			ret = SD_RES_NO_BASE_VDI;
-			goto out;
-		}
+out:
+	free(new);
+	free(base);
+	return ret;
+}
+
+/*
+ * Create a snapshot vdi
+ *
+ * This makes the current working vdi 'base' a snapshot, and create a working
+ * vdi 'new'.  For example:
+ *
+ * [before]
+ *            o----o----o----x base
+ *
+ * [after]
+ *                          base
+ *            o----o----o----o----x new
+ *
+ * x: working vdi
+ * o: snapshot vdi
+ */
+static int snapshot_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
+			uint32_t new_vid, uint32_t base_vid)
+{
+	struct sd_inode *new = NULL, *base = xzalloc(sizeof(*base));
+	int ret, idx;
+
+	sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
+		 "copies %d, snapid %" PRIu32, iocb->name, iocb->size, new_vid,
+		 base_vid, iocb->nr_copies, new_snapid);
+
+	ret = read_object(vid_to_vdi_oid(base_vid), (char *)base, sizeof(*base),
+			  0);
+	if (ret != SD_RES_SUCCESS) {
+		ret = SD_RES_BASE_VDI_READ;
+		goto out;
 	}
 
-	if (iocb->create_snapshot && cur_vid != iocb->base_vid) {
-		ret = write_object(vid_to_vdi_oid(cur_vid), (char *)cur,
-				   SD_INODE_HEADER_SIZE, 0, false);
-		if (ret != 0) {
-			sd_err("failed");
-			ret = SD_RES_BASE_VDI_READ;
-			goto out;
-		}
+	idx = find_free_idx(base->child_vdi_id, sizeof(base->child_vdi_id));
+	if (idx < 0) {
+		ret = SD_RES_NO_BASE_VDI;
+		goto out;
 	}
 
-	if (iocb->base_vid) {
-		ret = write_object(vid_to_vdi_oid(iocb->base_vid), (char *)base,
-				   SD_INODE_HEADER_SIZE, 0, false);
-		if (ret != 0) {
-			sd_err("failed");
-			ret = SD_RES_BASE_VDI_WRITE;
-			goto out;
-		}
+	/* TODO: multiple write_object should be performed atomically */
+
+	/* update a base vdi */
+	base->snap_ctime = iocb->time;
+	base->child_vdi_id[idx] = new_vid;
+	ret = write_object(vid_to_vdi_oid(base_vid), (char *)base,
+			   SD_INODE_HEADER_SIZE, 0, false);
+	if (ret != SD_RES_SUCCESS) {
+		ret = SD_RES_BASE_VDI_WRITE;
+		goto out;
 	}
 
+	/* create a new vdi */
+	new = alloc_inode(iocb, new_snapid, new_vid, base->data_vdi_id);
 	ret = write_object(vid_to_vdi_oid(new_vid), (char *)new, sizeof(*new),
 			   0, true);
-	if (ret != 0)
+	if (ret != SD_RES_SUCCESS)
+		ret = SD_RES_VDI_WRITE;
+
+out:
+	free(new);
+	free(base);
+	return ret;
+}
+
+/*
+ * Rebase onto another snapshot vdi
+ *
+ * This makes the current working vdi 'base' a snapshot, and create a new
+ * working vdi 'new' based on the snapshot 'base'.  We use this operation when
+ * rollbacking to the snapshot or writing data to the snapshot.  Here is an
+ * example:
+ *
+ * [before]
+ *                base
+ *            o----o----o----x cur
+ *
+ * [after]
+ *                base
+ *            o----o----o----o cur
+ *                  \
+ *                   x new
+ * x: working vdi
+ * o: snapshot vdi
+ */
+static int rebase_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
+		      uint32_t new_vid, uint32_t base_vid, uint32_t cur_vid)
+{
+	struct sd_inode *new = NULL, *base = xzalloc(sizeof(*base));
+	int ret, idx;
+
+	sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
+		 "cur %" PRIx32 ", copies %d, snapid %" PRIu32, iocb->name,
+		 iocb->size, new_vid, base_vid, cur_vid, iocb->nr_copies,
+		 new_snapid);
+
+	ret = read_object(vid_to_vdi_oid(base_vid), (char *)base, sizeof(*base),
+			  0);
+	if (ret != SD_RES_SUCCESS) {
+		ret = SD_RES_BASE_VDI_READ;
+		goto out;
+	}
+
+	idx = find_free_idx(base->child_vdi_id, sizeof(base->child_vdi_id));
+	if (idx < 0) {
+		ret = SD_RES_NO_BASE_VDI;
+		goto out;
+	}
+
+	/* TODO: multiple write_object should be performed atomically */
+
+	/* update current working vdi */
+	ret = write_object(vid_to_vdi_oid(cur_vid), (char *)&iocb->time,
+			   sizeof(iocb->time),
+			   offsetof(struct sd_inode, snap_ctime), false);
+	if (ret != SD_RES_SUCCESS) {
+		ret = SD_RES_BASE_VDI_READ;
+		goto out;
+	}
+
+	/* update base vdi */
+	ret = write_object(vid_to_vdi_oid(base_vid), (char *)&new_vid,
+			   sizeof(new_vid),
+			   offsetof(struct sd_inode, child_vdi_id[idx]), false);
+	if (ret != SD_RES_SUCCESS) {
+		ret = SD_RES_BASE_VDI_WRITE;
+		goto out;
+	}
+
+	/* create a new vdi */
+	new = alloc_inode(iocb, new_snapid, new_vid, base->data_vdi_id);
+	ret = write_object(vid_to_vdi_oid(new_vid), (char *)new, sizeof(*new),
+			   0, true);
+	if (ret != SD_RES_SUCCESS)
 		ret = SD_RES_VDI_WRITE;
 
 out:
 	free(new);
-	free(cur);
 	free(base);
 	return ret;
 }
@@ -503,36 +654,33 @@ static void vdi_flush(uint32_t vid)
 }
 
 /*
- * There are 3 create operation in SD:
- * 1. fresh create (expect NO_VDI returned from vdi_lookup)
- * 2. snapshot create (expect SUCCESS)
- * 3. rollback create (expect NO_VDI)
+ * This function creates another working vdi with a new name.  The parent of the
+ * newly created vdi is iocb->base_vid.
+ *
+ * There are 2 vdi create operation in SD:
+ * 1. fresh create (base_vid == 0)
+ * 2. clone create (base_vid != 0)
  *
- * Fresh create started with id = 1. Both rollback & snap create started with
- * current working VDI's snap_id + 1. Working VDI always has the highest snapid.
+ * This function expects NO_VDI returned from vdi_lookup().  Fresh create
+ * started with id = 1 when there are no snapshot with the same name.  Working
+ * VDI always has the highest snapid.
  */
 int vdi_create(const struct vdi_iocb *iocb, uint32_t *new_vid)
 {
-	const char *name = iocb->name;
 	struct vdi_info info = {};
 	int ret;
 
 	ret = vdi_lookup(iocb, &info);
 	switch (ret) {
 	case SD_RES_SUCCESS:
-		if (!iocb->create_snapshot)
-			return SD_RES_VDI_EXIST;
-		if (sys->enable_object_cache)
-			vdi_flush(iocb->base_vid);
-		break;
+		return SD_RES_VDI_EXIST;
 	case SD_RES_NO_VDI:
-		if (iocb->create_snapshot)
-			return ret;
 		break;
 	default:
 		sd_err("%s", sd_strerror(ret));
 		return ret;
 	}
+
 	if (info.snapid == 0)
 		info.snapid = 1;
 	*new_vid = info.free_bit;
@@ -540,13 +688,51 @@ int vdi_create(const struct vdi_iocb *iocb, uint32_t *new_vid)
 	if (ret != SD_RES_SUCCESS)
 		return ret;
 
-	sd_debug("%s %s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32
-		 ", cur %" PRIx32 ", copies %d, snapid %" PRIu32,
-		 iocb->create_snapshot ? "snapshot" : "vdi", name, iocb->size,
-		 *new_vid, iocb->base_vid, info.vid, iocb->nr_copies,
-		 iocb->snapid);
+	if (iocb->base_vid == 0)
+		return create_vdi(iocb, info.snapid, *new_vid);
+	else
+		return clone_vdi(iocb, info.snapid, *new_vid, iocb->base_vid);
+}
+
+/*
+ * This function makes the current working vdi a snapshot, and create a new
+ * working vdi with the same name.  The parent of the newly created vdi is
+ * iocb->base_vid.
+ *
+ * There are 2 snapshot create operation in SD:
+ * 1. snapshot create (base_vid == current_vid)
+ * 2. rollback create (base_vid != current_vid)
+ *
+ * This function expects SUCCESS returned from vdi_lookup().  Both rollback and
+ * snap create started with current working VDI's snap_id + 1. Working VDI
+ * always has the highest snapid.
+ */
+int vdi_snapshot(const struct vdi_iocb *iocb, uint32_t *new_vid)
+{
+	struct vdi_info info = {};
+	int ret;
+
+	ret = vdi_lookup(iocb, &info);
+	if (ret == SD_RES_SUCCESS) {
+		if (sys->enable_object_cache)
+			vdi_flush(iocb->base_vid);
+	} else {
+		sd_err("%s", sd_strerror(ret));
+		return ret;
+	}
+
+	assert(info.snapid > 0);
+	*new_vid = info.free_bit;
+	ret = notify_vdi_add(*new_vid, iocb->nr_copies, info.vid);
+	if (ret != SD_RES_SUCCESS)
+		return ret;
 
-	return create_vdi_obj(iocb, info.snapid, *new_vid, info.vid);
+	if (iocb->base_vid == info.vid)
+		return snapshot_vdi(iocb, info.snapid, *new_vid,
+				    iocb->base_vid);
+	else
+		return rebase_vdi(iocb, info.snapid, *new_vid, iocb->base_vid,
+				  info.vid);
 }
 
 static int start_deletion(struct request *req, uint32_t vid);
-- 
1.7.10.4




More information about the sheepdog mailing list