[sheepdog] [PATCH 3/3] add selectable object_size support of VDI operation

Teruaki Ishizaki ishizaki.teruaki at lab.ntt.co.jp
Tue Dec 9 13:49:24 CET 2014


Data object size was fix to 4MB and not selectable.
This patch add feature to select data object size of VDI.

If you want to use 8MB data object_size, specify the shift bit num.
ex) dog vdi create -z 23 hogehoge 100M

Signed-off-by: Teruaki Ishizaki <ishizaki.teruaki at lab.ntt.co.jp>
---
 dog/common.c                |    7 +-
 dog/dog.h                   |    6 +-
 dog/farm/farm.c             |   17 ++-
 dog/vdi.c                   |  254 ++++++++++++++++++++++++++++++-------------
 include/fec.h               |   12 +-
 include/sheepdog_proto.h    |    7 +-
 lib/fec.c                   |    9 +-
 sheep/gateway.c             |    2 +-
 sheep/group.c               |    3 +-
 sheep/journal.c             |    5 +-
 sheep/object_cache.c        |   27 +++--
 sheep/ops.c                 |   14 ++-
 sheep/plain_store.c         |   17 ++-
 sheep/recovery.c            |    3 +-
 sheep/sheep_priv.h          |    6 +-
 sheep/vdi.c                 |   82 +++++++++++---
 tests/unit/sheep/test_vdi.c |    6 +-
 17 files changed, 336 insertions(+), 141 deletions(-)

diff --git a/dog/common.c b/dog/common.c
index 2d8a173..11011a7 100644
--- a/dog/common.c
+++ b/dog/common.c
@@ -365,7 +365,8 @@ void show_progress(uint64_t done, uint64_t total, bool raw)
 	free(buf);
 }
 
-size_t get_store_objsize(uint8_t copy_policy, uint64_t oid)
+size_t get_store_objsize(uint8_t copy_policy, uint32_t object_size,
+			 uint64_t oid)
 {
 	if (is_vdi_obj(oid))
 		return SD_INODE_SIZE;
@@ -375,9 +376,9 @@ size_t get_store_objsize(uint8_t copy_policy, uint64_t oid)
 		int d;
 
 		ec_policy_to_dp(copy_policy, &d, NULL);
-		return SD_DATA_OBJ_SIZE / d;
+		return object_size / d;
 	}
-	return get_objsize(oid);
+	return get_objsize(oid, object_size);
 }
 
 bool is_erasure_oid(uint64_t oid, uint8_t policy)
diff --git a/dog/dog.h b/dog/dog.h
index 80becc6..d460a0b 100644
--- a/dog/dog.h
+++ b/dog/dog.h
@@ -87,10 +87,12 @@ void confirm(const char *message);
 void work_queue_wait(struct work_queue *q);
 int do_vdi_create(const char *vdiname, int64_t vdi_size,
 		  uint32_t base_vid, uint32_t *vdi_id, bool snapshot,
-		  uint8_t nr_copies, uint8_t copy_policy, uint8_t store_policy);
+		  uint8_t nr_copies, uint8_t copy_policy,
+		  uint8_t store_policy, uint32_t object_size);
 int do_vdi_check(const struct sd_inode *inode);
 void show_progress(uint64_t done, uint64_t total, bool raw);
-size_t get_store_objsize(uint8_t copy_policy, uint64_t oid);
+size_t get_store_objsize(uint8_t copy_policy, uint32_t object_size,
+			 uint64_t oid);
 bool is_erasure_oid(uint64_t oid, uint8_t policy);
 uint8_t parse_copy(const char *str, uint8_t *copy_policy);
 
diff --git a/dog/farm/farm.c b/dog/farm/farm.c
index 9414d42..c5fa40e 100644
--- a/dog/farm/farm.c
+++ b/dog/farm/farm.c
@@ -38,6 +38,7 @@ struct active_vdi_entry {
 	uint8_t  nr_copies;
 	uint8_t copy_policy;
 	uint8_t store_policy;
+	uint32_t object_size;
 };
 
 struct registered_obj_entry {
@@ -77,6 +78,7 @@ static void update_active_vdi_entry(struct active_vdi_entry *vdi,
 	vdi->nr_copies = new->nr_copies;
 	vdi->copy_policy = new->copy_policy;
 	vdi->store_policy = new->store_policy;
+	vdi->object_size = (UINT32_C(1) << new->block_size_shift);
 }
 
 static void add_active_vdi(struct sd_inode *new)
@@ -131,7 +133,8 @@ static int create_active_vdis(void)
 				  vdi->vdi_id, &new_vid,
 				  false, vdi->nr_copies,
 				  vdi->copy_policy,
-				  vdi->store_policy) < 0)
+				  vdi->store_policy,
+				  vdi->object_size) < 0)
 			return -1;
 	}
 	return 0;
@@ -202,7 +205,7 @@ out:
 }
 
 static int notify_vdi_add(uint32_t vdi_id, uint8_t nr_copies,
-			  uint8_t copy_policy)
+			  uint8_t copy_policy, uint32_t object_size)
 {
 	int ret;
 	struct sd_req hdr;
@@ -213,13 +216,14 @@ static int notify_vdi_add(uint32_t vdi_id, uint8_t nr_copies,
 	hdr.vdi_state.new_vid = vdi_id;
 	hdr.vdi_state.copies = nr_copies;
 	hdr.vdi_state.copy_policy = copy_policy;
+	hdr.vdi_state.object_size = object_size;
 	hdr.vdi_state.set_bitmap = true;
 
 	ret = dog_exec_req(&sd_nid, &hdr, buf);
 
 	if (ret < 0)
-		sd_err("Fail to notify vdi add event(%"PRIx32", %d)", vdi_id,
-		       nr_copies);
+		sd_err("Fail to notify vdi add event(%"PRIx32", %d"
+		       ", %"PRIu32")", vdi_id, nr_copies, object_size);
 	if (rsp->result != SD_RES_SUCCESS) {
 		sd_err("%s", sd_strerror(rsp->result));
 		ret = -1;
@@ -261,7 +265,7 @@ static void do_save_object(struct work *work)
 
 	sw = container_of(work, struct snapshot_work, work);
 
-	size = get_objsize(sw->entry.oid);
+	size = get_objsize(sw->entry.oid, sw->entry.object_size);
 	buf = xmalloc(size);
 
 	if (dog_read_object(sw->entry.oid, buf, size, 0, true) < 0)
@@ -413,7 +417,8 @@ static void do_load_object(struct work *work)
 	vid = oid_to_vid(sw->entry.oid);
 	if (register_vdi(vid)) {
 		if (notify_vdi_add(vid, sw->entry.nr_copies,
-				   sw->entry.copy_policy) < 0)
+				   sw->entry.copy_policy,
+				   sw->entry.object_size) < 0)
 			goto error;
 	}
 
diff --git a/dog/vdi.c b/dog/vdi.c
index 5353062..3b0c408 100644
--- a/dog/vdi.c
+++ b/dog/vdi.c
@@ -38,6 +38,8 @@ static struct sd_option vdi_options[] = {
 	{'o', "oid", true, "specify the object id of the tracking object"},
 	{'e', "exist", false, "only check objects exist or not,\n"
 	 "                          neither comparing nor repairing"},
+	{'z', "objsize", true, "specify the bit shift num for"
+			       " data object size"},
 	{ 0, NULL, false, NULL },
 };
 
@@ -49,6 +51,7 @@ static struct vdi_cmd_data {
 	bool delete;
 	bool prealloc;
 	int nr_copies;
+	uint32_t object_size;
 	bool writeback;
 	int from_snapshot_id;
 	char from_snapshot_tag[SD_MAX_VDI_TAG_LEN];
@@ -67,6 +70,7 @@ struct get_vdi_info {
 	uint32_t snapid;
 	uint8_t nr_copies;
 	uint8_t copy_policy;
+	uint32_t object_size;
 };
 
 int dog_bnode_writer(uint64_t oid, void *mem, unsigned int len, uint64_t offset,
@@ -118,6 +122,7 @@ static void print_vdi_list(uint32_t vid, const char *name, const char *tag,
 	struct tm tm;
 	char dbuf[128];
 	struct get_vdi_info *info = data;
+	uint32_t object_size = (UINT32_C(1) << i->block_size_shift);
 
 	if (info && strcmp(name, info->name) != 0)
 		return;
@@ -143,23 +148,24 @@ static void print_vdi_list(uint32_t vid, const char *name, const char *tag,
 				putchar('\\');
 			putchar(*name++);
 		}
-		printf(" %d %s %s %s %s %" PRIx32 " %s %s\n", snapid,
-		       strnumber(i->vdi_size),
-		       strnumber(my_objs * SD_DATA_OBJ_SIZE),
-		       strnumber(cow_objs * SD_DATA_OBJ_SIZE),
+		printf(" %d %s %s %s %s %" PRIx32 " %s %s %" PRIu32 "\n",
+		       snapid, strnumber(i->vdi_size),
+		       strnumber(my_objs * object_size),
+		       strnumber(cow_objs * object_size),
 		       dbuf, vid,
 		       redundancy_scheme(i->nr_copies, i->copy_policy),
-		       i->tag);
+		       i->tag, object_size);
 	} else {
-		printf("%c %-8s %5d %7s %7s %7s %s  %7" PRIx32 " %6s %13s\n",
+		printf("%c %-8s %5d %7s %7s %7s %s  %7" PRIx32
+		       " %6s %13s %7" PRIu32 "\n",
 		       vdi_is_snapshot(i) ? 's' : (is_clone ? 'c' : ' '),
 		       name, snapid,
 		       strnumber(i->vdi_size),
-		       strnumber(my_objs * SD_DATA_OBJ_SIZE),
-		       strnumber(cow_objs * SD_DATA_OBJ_SIZE),
+		       strnumber(my_objs * object_size),
+		       strnumber(cow_objs * object_size),
 		       dbuf, vid,
 		       redundancy_scheme(i->nr_copies, i->copy_policy),
-		       i->tag);
+		       i->tag, object_size);
 	}
 }
 
@@ -282,7 +288,8 @@ static int vdi_list(int argc, char **argv)
 	const char *vdiname = argv[optind];
 
 	if (!raw_output)
-		printf("  Name        Id    Size    Used  Shared    Creation time   VDI id  Copies  Tag\n");
+		printf("  Name        Id    Size    Used  Shared"
+		       "    Creation time   VDI id  Copies  Tag    Obj Size\n");
 
 	if (vdiname) {
 		struct get_vdi_info info;
@@ -396,7 +403,8 @@ int read_vdi_obj(const char *vdiname, int snapid, const char *tag,
 
 int do_vdi_create(const char *vdiname, int64_t vdi_size,
 		  uint32_t base_vid, uint32_t *vdi_id, bool snapshot,
-		  uint8_t nr_copies, uint8_t copy_policy, uint8_t store_policy)
+		  uint8_t nr_copies, uint8_t copy_policy,
+		  uint8_t store_policy, uint32_t object_size)
 {
 	struct sd_req hdr;
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
@@ -416,6 +424,7 @@ int do_vdi_create(const char *vdiname, int64_t vdi_size,
 	hdr.vdi.copies = nr_copies;
 	hdr.vdi.copy_policy = copy_policy;
 	hdr.vdi.store_policy = store_policy;
+	hdr.vdi.object_size = object_size;
 
 	ret = dog_exec_req(&sd_nid, &hdr, buf);
 	if (ret < 0)
@@ -440,6 +449,8 @@ static int vdi_create(int argc, char **argv)
 	uint32_t vid;
 	uint64_t oid;
 	uint32_t idx, max_idx;
+	uint32_t object_size;
+	uint64_t old_max_total_size = 0;
 	struct sd_inode *inode = NULL;
 	int ret;
 
@@ -451,10 +462,34 @@ static int vdi_create(int argc, char **argv)
 	if (ret < 0)
 		return EXIT_USAGE;
 
-	if (size > SD_OLD_MAX_VDI_SIZE && 0 == vdi_cmd_data.store_policy) {
+	if (vdi_cmd_data.object_size)
+		old_max_total_size =
+			vdi_cmd_data.object_size * OLD_MAX_DATA_OBJS;
+	else{
+		struct sd_req hdr;
+		struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+		struct cluster_info cinfo;
+		sd_init_req(&hdr, SD_OP_CLUSTER_INFO);
+		hdr.data_length = sizeof(cinfo);
+		ret = dog_exec_req(&sd_nid, &hdr, &cinfo);
+		if (ret < 0) {
+			sd_err("Fail to execute request: SD_OP_CLUSTER_INFO");
+			ret = EXIT_FAILURE;
+			goto out;
+		}
+		if (rsp->result != SD_RES_SUCCESS) {
+			sd_err("%s", sd_strerror(rsp->result));
+			ret = EXIT_FAILURE;
+			goto out;
+		}
+		old_max_total_size = cinfo.object_size * OLD_MAX_DATA_OBJS;
+	}
+
+	if (size > old_max_total_size && 0 == vdi_cmd_data.store_policy) {
 		sd_err("VDI size is larger than %s bytes, please use '-y' to "
-		       "create a hyper volume with size up to %s bytes",
-		       strnumber(SD_OLD_MAX_VDI_SIZE),
+		       "create a hyper volume with size up to %s bytes"
+		       " or use '-z' to create larger object size volume",
+		       strnumber(old_max_total_size),
 		       strnumber(SD_MAX_VDI_SIZE));
 		return EXIT_USAGE;
 	}
@@ -466,7 +501,8 @@ static int vdi_create(int argc, char **argv)
 
 	ret = do_vdi_create(vdiname, size, 0, &vid, false,
 			    vdi_cmd_data.nr_copies, vdi_cmd_data.copy_policy,
-			    vdi_cmd_data.store_policy);
+			    vdi_cmd_data.store_policy,
+			    vdi_cmd_data.object_size);
 	if (ret != EXIT_SUCCESS || !vdi_cmd_data.prealloc)
 		goto out;
 
@@ -479,10 +515,11 @@ static int vdi_create(int argc, char **argv)
 		ret = EXIT_FAILURE;
 		goto out;
 	}
-	max_idx = DIV_ROUND_UP(size, SD_DATA_OBJ_SIZE);
+	object_size = (UINT32_C(1) << inode->block_size_shift);
+	max_idx = DIV_ROUND_UP(size, object_size);
 
 	for (idx = 0; idx < max_idx; idx++) {
-		vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+		vdi_show_progress(idx * object_size, inode->vdi_size);
 		oid = vid_to_data_oid(vid, idx);
 
 		ret = dog_write_object(oid, 0, NULL, 0, 0, 0, inode->nr_copies,
@@ -499,7 +536,7 @@ static int vdi_create(int argc, char **argv)
 			goto out;
 		}
 	}
-	vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+	vdi_show_progress(idx * object_size, inode->vdi_size);
 	ret = EXIT_SUCCESS;
 
 out:
@@ -559,6 +596,7 @@ static int vdi_snapshot(int argc, char **argv)
 {
 	const char *vdiname = argv[optind++];
 	uint32_t vid, new_vid;
+	uint32_t object_size;
 	int ret;
 	char buf[SD_INODE_HEADER_SIZE];
 	struct sd_inode *inode = (struct sd_inode *)buf;
@@ -662,9 +700,10 @@ static int vdi_snapshot(int argc, char **argv)
 	if (ret != SD_RES_SUCCESS)
 		goto out;
 
+	object_size = (UINT32_C(1) << inode->block_size_shift);
 	ret = do_vdi_create(vdiname, inode->vdi_size, vid, &new_vid, true,
 			    inode->nr_copies, inode->copy_policy,
-			    inode->store_policy);
+			    inode->store_policy, object_size);
 
 	if (ret == EXIT_SUCCESS && verbose) {
 		if (raw_output)
@@ -691,6 +730,7 @@ static int vdi_clone(int argc, char **argv)
 	uint32_t base_vid, new_vid, vdi_id;
 	uint64_t oid;
 	uint32_t idx, max_idx, ret;
+	uint32_t object_size;
 	struct sd_inode *inode = NULL, *new_inode = NULL;
 	char *buf = NULL;
 
@@ -719,9 +759,10 @@ static int vdi_clone(int argc, char **argv)
 	if (vdi_cmd_data.no_share == true)
 		base_vid = 0;
 
+	object_size = (UINT32_C(1) << inode->block_size_shift);
 	ret = do_vdi_create(dst_vdi, inode->vdi_size, base_vid, &new_vid, false,
 			    inode->nr_copies, inode->copy_policy,
-			    inode->store_policy);
+			    inode->store_policy, object_size);
 	if (ret != EXIT_SUCCESS ||
 			(!vdi_cmd_data.prealloc && !vdi_cmd_data.no_share))
 		goto out;
@@ -732,23 +773,23 @@ static int vdi_clone(int argc, char **argv)
 	if (ret != EXIT_SUCCESS)
 		goto out;
 
-	buf = xzalloc(SD_DATA_OBJ_SIZE);
+	buf = xzalloc(object_size);
 	max_idx = count_data_objs(inode);
 
 	for (idx = 0; idx < max_idx; idx++) {
 		size_t size;
 
-		vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+		vdi_show_progress(idx * object_size, inode->vdi_size);
 		vdi_id = sd_inode_get_vid(inode, idx);
 		if (vdi_id) {
 			oid = vid_to_data_oid(vdi_id, idx);
-			ret = dog_read_object(oid, buf, SD_DATA_OBJ_SIZE, 0,
+			ret = dog_read_object(oid, buf, object_size, 0,
 					      true);
 			if (ret) {
 				ret = EXIT_FAILURE;
 				goto out;
 			}
-			size = SD_DATA_OBJ_SIZE;
+			size = object_size;
 		} else {
 			if (vdi_cmd_data.no_share && !vdi_cmd_data.prealloc)
 				continue;
@@ -772,7 +813,7 @@ static int vdi_clone(int argc, char **argv)
 			goto out;
 		}
 	}
-	vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+	vdi_show_progress(idx * object_size, inode->vdi_size);
 	ret = EXIT_SUCCESS;
 
 out:
@@ -952,6 +993,7 @@ static int vdi_rollback(int argc, char **argv)
 {
 	const char *vdiname = argv[optind++];
 	uint32_t base_vid, new_vid;
+	uint32_t object_size;
 	int ret;
 	char buf[SD_INODE_HEADER_SIZE];
 	struct sd_inode *inode = (struct sd_inode *)buf;
@@ -977,9 +1019,10 @@ static int vdi_rollback(int argc, char **argv)
 		return EXIT_FAILURE;
 	}
 
+	object_size = (UINT32_C(1) << inode->block_size_shift);
 	ret = do_vdi_create(vdiname, inode->vdi_size, base_vid, &new_vid,
 			     false, vdi_cmd_data.nr_copies, inode->copy_policy,
-			     inode->store_policy);
+			     inode->store_policy, object_size);
 
 	if (ret == EXIT_SUCCESS && verbose) {
 		if (raw_output)
@@ -1494,6 +1537,7 @@ static int vdi_read(int argc, char **argv)
 	struct sd_inode *inode = NULL;
 	uint64_t offset = 0, oid, done = 0, total = (uint64_t) -1;
 	uint32_t vdi_id, idx;
+	uint32_t object_size;
 	unsigned int len;
 	char *buf = NULL;
 
@@ -1509,25 +1553,27 @@ static int vdi_read(int argc, char **argv)
 	}
 
 	inode = malloc(sizeof(*inode));
-	buf = xmalloc(SD_DATA_OBJ_SIZE);
 
 	ret = read_vdi_obj(vdiname, vdi_cmd_data.snapshot_id,
 			   vdi_cmd_data.snapshot_tag, NULL, inode,
 			   SD_INODE_SIZE);
 	if (ret != EXIT_SUCCESS)
-		goto out;
+		goto load_inode_err;
 
 	if (inode->vdi_size < offset) {
 		sd_err("Read offset is beyond the end of the VDI");
 		ret = EXIT_FAILURE;
-		goto out;
+		goto load_inode_err;
 	}
 
+	object_size = (UINT32_C(1) << inode->block_size_shift);
+	buf = xmalloc(object_size);
+
 	total = min(total, inode->vdi_size - offset);
-	idx = offset / SD_DATA_OBJ_SIZE;
-	offset %= SD_DATA_OBJ_SIZE;
+	idx = offset / object_size;
+	offset %= object_size;
 	while (done < total) {
-		len = min(total - done, SD_DATA_OBJ_SIZE - offset);
+		len = min(total - done, object_size - offset);
 		vdi_id = sd_inode_get_vid(inode, idx);
 		if (vdi_id) {
 			oid = vid_to_data_oid(vdi_id, idx);
@@ -1554,8 +1600,9 @@ static int vdi_read(int argc, char **argv)
 	fsync(STDOUT_FILENO);
 	ret = EXIT_SUCCESS;
 out:
-	free(inode);
 	free(buf);
+load_inode_err:
+	free(inode);
 
 	return ret;
 }
@@ -1564,6 +1611,7 @@ static int vdi_write(int argc, char **argv)
 {
 	const char *vdiname = argv[optind++];
 	uint32_t vid, flags, vdi_id, idx;
+	uint32_t object_size;
 	int ret;
 	struct sd_inode *inode = NULL;
 	uint64_t offset = 0, oid, old_oid, done = 0, total = (uint64_t) -1;
@@ -1583,26 +1631,28 @@ static int vdi_write(int argc, char **argv)
 	}
 
 	inode = xmalloc(sizeof(*inode));
-	buf = xmalloc(SD_DATA_OBJ_SIZE);
 
 	ret = read_vdi_obj(vdiname, 0, "", &vid, inode, SD_INODE_SIZE);
 	if (ret != EXIT_SUCCESS)
-		goto out;
+		goto load_inode_err;
 
 	if (inode->vdi_size < offset) {
 		sd_err("Write offset is beyond the end of the VDI");
 		ret = EXIT_FAILURE;
-		goto out;
+		goto load_inode_err;
 	}
 
+	object_size = (UINT32_C(1) << inode->block_size_shift);
+	buf = xmalloc(object_size);
+
 	total = min(total, inode->vdi_size - offset);
-	idx = offset / SD_DATA_OBJ_SIZE;
-	offset %= SD_DATA_OBJ_SIZE;
+	idx = offset / object_size;
+	offset %= object_size;
 	while (done < total) {
 		create = false;
 		old_oid = 0;
 		flags = 0;
-		len = min(total - done, SD_DATA_OBJ_SIZE - offset);
+		len = min(total - done, object_size - offset);
 
 		vdi_id = sd_inode_get_vid(inode, idx);
 		if (!vdi_id)
@@ -1647,7 +1697,7 @@ static int vdi_write(int argc, char **argv)
 		}
 
 		offset += len;
-		if (offset == SD_DATA_OBJ_SIZE) {
+		if (offset == object_size) {
 			offset = 0;
 			idx++;
 		}
@@ -1655,8 +1705,9 @@ static int vdi_write(int argc, char **argv)
 	}
 	ret = EXIT_SUCCESS;
 out:
-	free(inode);
 	free(buf);
+load_inode_err:
+	free(inode);
 
 	return ret;
 }
@@ -1709,6 +1760,7 @@ struct vdi_check_info {
 	uint64_t oid;
 	uint8_t nr_copies;
 	uint8_t copy_policy;
+	uint32_t object_size;
 	uint64_t total;
 	uint64_t *done;
 	int refcnt;
@@ -1721,7 +1773,7 @@ struct vdi_check_info {
 static void free_vdi_check_info(struct vdi_check_info *info)
 {
 	if (info->done) {
-		*info->done += SD_DATA_OBJ_SIZE;
+		*info->done += info->object_size;
 		vdi_show_progress(*info->done, info->total);
 	}
 	free(info);
@@ -1783,6 +1835,7 @@ static void vdi_check_object_work(struct work *work)
 	if (is_erasure_oid(info->oid, info->copy_policy)) {
 		sd_init_req(&hdr, SD_OP_READ_PEER);
 		hdr.data_length = get_store_objsize(info->copy_policy,
+						    info->object_size,
 						    info->oid);
 		hdr.obj.ec_index = vcw->ec_index;
 		hdr.epoch = sd_epoch;
@@ -1856,7 +1909,8 @@ static void check_erasure_object(struct vdi_check_info *info)
 	struct fec *ctx = ec_init(d, dp);
 	int miss_idx[dp], input_idx[dp];
 	uint64_t oid = info->oid;
-	size_t len = get_store_objsize(info->copy_policy, oid);
+	size_t len = get_store_objsize(info->copy_policy,
+				       info->object_size, oid);
 	char *obj = xmalloc(len);
 	uint8_t *input[dp];
 
@@ -1882,7 +1936,8 @@ static void check_erasure_object(struct vdi_check_info *info)
 			uint8_t *ds[d];
 			for (j = 0; j < d; j++)
 				ds[j] = info->vcw[j].buf;
-			ec_decode_buffer(ctx, ds, idx, obj, d + k);
+			ec_decode_buffer(ctx, ds, idx, obj, d + k,
+					 info->object_size);
 			if (memcmp(obj, info->vcw[d + k].buf, len) != 0) {
 				/* TODO repair the inconsistency */
 				sd_err("object %"PRIx64" is inconsistent", oid);
@@ -1900,7 +1955,8 @@ static void check_erasure_object(struct vdi_check_info *info)
 
 			for (i = 0; i < d; i++)
 				ds[i] = input[i];
-			ec_decode_buffer(ctx, ds, input_idx, obj, m);
+			ec_decode_buffer(ctx, ds, input_idx, obj, m,
+					 info->object_size);
 			write_object_to(info->vcw[m].vnode, oid, obj,
 					len, true, info->vcw[m].ec_index);
 			fprintf(stdout, "fixed missing %"PRIx64", "
@@ -2023,6 +2079,7 @@ struct check_arg {
 	uint64_t *done;
 	struct work_queue *wq;
 	int nr_copies;
+	uint32_t object_size;
 };
 
 static void check_cb(struct sd_index *idx, void *arg, int ignore)
@@ -2032,7 +2089,7 @@ static void check_cb(struct sd_index *idx, void *arg, int ignore)
 
 	if (idx->vdi_id) {
 		oid = vid_to_data_oid(idx->vdi_id, idx->idx);
-		*(carg->done) = (uint64_t)idx->idx * SD_DATA_OBJ_SIZE;
+		*(carg->done) = (uint64_t)idx->idx * carg->object_size;
 		vdi_show_progress(*(carg->done), carg->inode->vdi_size);
 		queue_vdi_check_work(carg->inode, oid, NULL, carg->wq,
 				     carg->nr_copies);
@@ -2046,6 +2103,7 @@ int do_vdi_check(const struct sd_inode *inode)
 	uint32_t vid;
 	struct work_queue *wq;
 	int nr_copies = min((int)inode->nr_copies, sd_zones_nr);
+	uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
 
 	if (0 < inode->copy_policy && sd_zones_nr < (int)inode->nr_copies) {
 		sd_err("ABORT: Not enough active zones for consistency-checking"
@@ -2070,12 +2128,13 @@ int do_vdi_check(const struct sd_inode *inode)
 				queue_vdi_check_work(inode, oid, &done, wq,
 						     nr_copies);
 			} else {
-				done += SD_DATA_OBJ_SIZE;
+				done += object_size;
 				vdi_show_progress(done, inode->vdi_size);
 			}
 		}
 	} else {
-		struct check_arg arg = {inode, &done, wq, nr_copies};
+		struct check_arg arg = {inode, &done, wq, nr_copies,
+					object_size};
 		sd_inode_index_walk(inode, check_cb, &arg);
 		vdi_show_progress(inode->vdi_size, inode->vdi_size);
 	}
@@ -2125,11 +2184,12 @@ struct obj_backup {
 	uint32_t offset;
 	uint32_t length;
 	uint32_t reserved;
-	uint8_t data[SD_DATA_OBJ_SIZE];
+	uint8_t *data;
 };
 
 /* discards redundant area from backup data */
-static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data)
+static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data,
+			       uint32_t object_size)
 {
 	uint8_t *p1, *p2;
 
@@ -2142,8 +2202,8 @@ static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data)
 		backup->length -= SECTOR_SIZE;
 	}
 
-	p1 = backup->data + SD_DATA_OBJ_SIZE - SECTOR_SIZE;
-	p2 = from_data + SD_DATA_OBJ_SIZE - SECTOR_SIZE;
+	p1 = backup->data + object_size - SECTOR_SIZE;
+	p2 = from_data + object_size - SECTOR_SIZE;
 	while (backup->length > 0 && memcmp(p1, p2, SECTOR_SIZE) == 0) {
 		p1 -= SECTOR_SIZE;
 		p2 -= SECTOR_SIZE;
@@ -2152,29 +2212,29 @@ static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data)
 }
 
 static int get_obj_backup(uint32_t idx, uint32_t from_vid, uint32_t to_vid,
-			  struct obj_backup *backup)
+			  struct obj_backup *backup, uint32_t object_size)
 {
 	int ret;
-	uint8_t *from_data = xzalloc(SD_DATA_OBJ_SIZE);
+	uint8_t *from_data = xzalloc(object_size);
 
 	backup->idx = idx;
 	backup->offset = 0;
-	backup->length = SD_DATA_OBJ_SIZE;
+	backup->length = object_size;
 
 	if (to_vid) {
 		ret = dog_read_object(vid_to_data_oid(to_vid, idx),
-				      backup->data, SD_DATA_OBJ_SIZE, 0, true);
+				      backup->data, object_size, 0, true);
 		if (ret != SD_RES_SUCCESS) {
 			sd_err("Failed to read object %" PRIx32 ", %d", to_vid,
 			       idx);
 			return EXIT_FAILURE;
 		}
 	} else
-		memset(backup->data, 0, SD_DATA_OBJ_SIZE);
+		memset(backup->data, 0, object_size);
 
 	if (from_vid) {
 		ret = dog_read_object(vid_to_data_oid(from_vid, idx), from_data,
-				      SD_DATA_OBJ_SIZE, 0, true);
+				      object_size, 0, true);
 		if (ret != SD_RES_SUCCESS) {
 			sd_err("Failed to read object %" PRIx32 ", %d",
 			       from_vid, idx);
@@ -2182,7 +2242,7 @@ static int get_obj_backup(uint32_t idx, uint32_t from_vid, uint32_t to_vid,
 		}
 	}
 
-	compact_obj_backup(backup, from_data);
+	compact_obj_backup(backup, from_data, object_size);
 
 	free(from_data);
 
@@ -2194,13 +2254,13 @@ static int vdi_backup(int argc, char **argv)
 	const char *vdiname = argv[optind++];
 	int ret = EXIT_SUCCESS;
 	uint32_t idx, nr_objs;
+	uint32_t object_size;
 	struct sd_inode *from_inode = xzalloc(sizeof(*from_inode));
 	struct sd_inode *to_inode = xzalloc(sizeof(*to_inode));
 	struct backup_hdr hdr = {
 		.version = VDI_BACKUP_FORMAT_VERSION,
 		.magic = VDI_BACKUP_MAGIC,
 	};
-	struct obj_backup *backup = xzalloc(sizeof(*backup));
 
 	if ((!vdi_cmd_data.snapshot_id && !vdi_cmd_data.snapshot_tag[0]) ||
 	    (!vdi_cmd_data.from_snapshot_id &&
@@ -2214,21 +2274,25 @@ static int vdi_backup(int argc, char **argv)
 			   vdi_cmd_data.from_snapshot_tag, NULL,
 			   from_inode, SD_INODE_SIZE);
 	if (ret != EXIT_SUCCESS)
-		goto out;
+		goto load_inode_err;
 
 	ret = read_vdi_obj(vdiname, vdi_cmd_data.snapshot_id,
 			   vdi_cmd_data.snapshot_tag, NULL, to_inode,
 			   SD_INODE_SIZE);
 	if (ret != EXIT_SUCCESS)
-		goto out;
+		goto load_inode_err;
 
 	nr_objs = count_data_objs(to_inode);
 
+	struct obj_backup *backup = xzalloc(sizeof(*backup));
+	object_size = (UINT32_C(1) << from_inode->block_size_shift);
+	backup->data = xzalloc(sizeof(uint8_t) * object_size);
+
 	ret = xwrite(STDOUT_FILENO, &hdr, sizeof(hdr));
 	if (ret < 0) {
 		sd_err("failed to write backup header, %m");
 		ret = EXIT_SYSFAIL;
-		goto out;
+		goto error;
 	}
 
 	for (idx = 0; idx < nr_objs; idx++) {
@@ -2238,9 +2302,10 @@ static int vdi_backup(int argc, char **argv)
 		if (to_vid == 0 && from_vid == 0)
 			continue;
 
-		ret = get_obj_backup(idx, from_vid, to_vid, backup);
+		ret = get_obj_backup(idx, from_vid, to_vid,
+				     backup, object_size);
 		if (ret != EXIT_SUCCESS)
-			goto out;
+			goto error;
 
 		if (backup->length == 0)
 			continue;
@@ -2250,14 +2315,14 @@ static int vdi_backup(int argc, char **argv)
 		if (ret < 0) {
 			sd_err("failed to write backup data, %m");
 			ret = EXIT_SYSFAIL;
-			goto out;
+			goto error;
 		}
 		ret = xwrite(STDOUT_FILENO, backup->data + backup->offset,
 			     backup->length);
 		if (ret < 0) {
 			sd_err("failed to write backup data, %m");
 			ret = EXIT_SYSFAIL;
-			goto out;
+			goto error;
 		}
 	}
 
@@ -2269,15 +2334,18 @@ static int vdi_backup(int argc, char **argv)
 	if (ret < 0) {
 		sd_err("failed to write end marker, %m");
 		ret = EXIT_SYSFAIL;
-		goto out;
+		goto error;
 	}
 
 	fsync(STDOUT_FILENO);
 	ret = EXIT_SUCCESS;
-out:
+error:
+	free(backup->data);
+	free(backup);
+load_inode_err:
 	free(from_inode);
 	free(to_inode);
-	free(backup);
+out:
 	return ret;
 }
 
@@ -2310,6 +2378,7 @@ static uint32_t do_restore(const char *vdiname, int snapid, const char *tag)
 {
 	int ret;
 	uint32_t vid;
+	uint32_t object_size;
 	struct backup_hdr hdr;
 	struct obj_backup *backup = xzalloc(sizeof(*backup));
 	struct sd_inode *inode = xzalloc(sizeof(*inode));
@@ -2329,9 +2398,10 @@ static uint32_t do_restore(const char *vdiname, int snapid, const char *tag)
 	if (ret != EXIT_SUCCESS)
 		goto out;
 
+	object_size = (UINT32_C(1) << inode->block_size_shift);
 	ret = do_vdi_create(vdiname, inode->vdi_size, inode->vdi_id, &vid,
 			    false, inode->nr_copies, inode->copy_policy,
-			    inode->store_policy);
+			    inode->store_policy, object_size);
 	if (ret != EXIT_SUCCESS) {
 		sd_err("Failed to read VDI");
 		goto out;
@@ -2435,12 +2505,15 @@ static int vdi_restore(int argc, char **argv)
 out:
 	if (need_current_recovery) {
 		int recovery_ret;
+		uint32_t object_size =
+			(UINT32_C(1) << current_inode->block_size_shift);
 		/* recreate the current vdi object */
 		recovery_ret = do_vdi_create(vdiname, current_inode->vdi_size,
 					     current_inode->parent_vdi_id, NULL,
 					     true, current_inode->nr_copies,
 					     current_inode->copy_policy,
-					     current_inode->store_policy);
+					     current_inode->store_policy,
+					     object_size);
 		if (recovery_ret != EXIT_SUCCESS) {
 			sd_err("failed to resume the current vdi");
 			ret = recovery_ret;
@@ -2563,9 +2636,25 @@ static int vdi_cache_info(int argc, char **argv)
 
 	fprintf(stdout, "Name\tTag\tTotal\tDirty\tClean\n");
 	for (i = 0; i < info.count; i++) {
-		uint64_t total = info.caches[i].total * SD_DATA_OBJ_SIZE,
-			 dirty = info.caches[i].dirty * SD_DATA_OBJ_SIZE,
+		uint32_t object_size;
+		uint32_t vid = info.caches[i].vid;
+		struct sd_inode *inode = NULL;
+		int r;
+
+		r = dog_read_object(vid_to_vdi_oid(vid), inode,
+				    SD_INODE_HEADER_SIZE, 0, true);
+		if (r != EXIT_SUCCESS)
+			return r;
+
+		if (!inode->block_size_shift)
+			return EXIT_FAILURE;
+
+		object_size = (UINT32_C(1) << inode->block_size_shift);
+
+		uint64_t total = info.caches[i].total * object_size,
+			 dirty = info.caches[i].dirty * object_size,
 			 clean = total - dirty;
+
 		char name[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
 
 		ret = vid_to_name_tag(info.caches[i].vid, name, tag);
@@ -2955,7 +3044,7 @@ static struct subcommand vdi_cmd[] = {
 	{"check", "<vdiname>", "seaphT", "check and repair image's consistency",
 	 NULL, CMD_NEED_NODELIST|CMD_NEED_ARG,
 	 vdi_check, vdi_options},
-	{"create", "<vdiname> <size>", "PycaphrvT", "create an image",
+	{"create", "<vdiname> <size>", "PycaphrvzT", "create an image",
 	 NULL, CMD_NEED_NODELIST|CMD_NEED_ARG,
 	 vdi_create, vdi_options},
 	{"snapshot", "<vdiname>", "saphrvT", "create a snapshot",
@@ -3023,6 +3112,7 @@ static struct subcommand vdi_cmd[] = {
 static int vdi_parser(int ch, const char *opt)
 {
 	char *p;
+	uint32_t object_size_shift_bit;
 
 	switch (ch) {
 	case 'P':
@@ -3101,6 +3191,20 @@ static int vdi_parser(int ch, const char *opt)
 	case 'e':
 		vdi_cmd_data.exist = true;
 		break;
+	case 'z':
+		object_size_shift_bit = (uint32_t)atoi(opt);
+		if (object_size_shift_bit > 31) {
+			sd_err("Object Size is limited to 2^31."
+			       " Please set shift bit lower than 31");
+			exit(EXIT_FAILURE);
+		}
+		vdi_cmd_data.object_size =
+				(UINT32_C(1) << object_size_shift_bit);
+		if (!vdi_cmd_data.object_size) {
+			sd_err("Invalid parameter %s", opt);
+			exit(EXIT_FAILURE);
+		}
+		break;
 	}
 
 	return 0;
diff --git a/include/fec.h b/include/fec.h
index 1ae32e4..b3ef8d8 100644
--- a/include/fec.h
+++ b/include/fec.h
@@ -96,12 +96,12 @@ void fec_encode(const struct fec *code,
 		size_t num_block_nums, size_t sz);
 
 void fec_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-		       char *buf, int idx);
+		       char *buf, int idx, uint32_t object_size);
 
 /* for isa-l */
 
 void isa_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-		       char *buf, int idx);
+		       char *buf, int idx, uint32_t object_size);
 
 /*
  * @param inpkts an array of packets (size k); If a primary block, i, is present
@@ -119,7 +119,6 @@ void fec_decode(const struct fec *code,
 
 /* Set data stripe as sector size to make VM happy */
 #define SD_EC_DATA_STRIPE_SIZE (512) /* 512 Byte */
-#define SD_EC_NR_STRIPE_PER_OBJECT (SD_DATA_OBJ_SIZE / SD_EC_DATA_STRIPE_SIZE)
 #define SD_EC_MAX_STRIP (16)
 
 static inline int ec_policy_to_dp(uint8_t policy, int *d, int *p)
@@ -205,11 +204,12 @@ static inline void ec_destroy(struct fec *ctx)
 }
 
 static inline void ec_decode_buffer(struct fec *ctx, uint8_t *input[],
-				    const int in_idx[], char *buf, int idx)
+				    const int in_idx[], char *buf,
+				    int idx, uint32_t object_size)
 {
 	if (cpu_has_ssse3)
-		isa_decode_buffer(ctx, input, in_idx, buf, idx);
+		isa_decode_buffer(ctx, input, in_idx, buf, idx, object_size);
 	else
-		fec_decode_buffer(ctx, input, in_idx, buf, idx);
+		fec_decode_buffer(ctx, input, in_idx, buf, idx, object_size);
 }
 #endif
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index cbb65b6..5cdedf5 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -477,10 +477,11 @@ static inline bool is_data_obj(uint64_t oid)
 
 static inline size_t count_data_objs(const struct sd_inode *inode)
 {
-	return DIV_ROUND_UP(inode->vdi_size, SD_DATA_OBJ_SIZE);
+	return DIV_ROUND_UP(inode->vdi_size,
+			    (UINT32_C(1) << inode->block_size_shift));
 }
 
-static inline size_t get_objsize(uint64_t oid)
+static inline size_t get_objsize(uint64_t oid, uint32_t object_size)
 {
 	if (is_vdi_obj(oid))
 		return SD_INODE_SIZE;
@@ -494,7 +495,7 @@ static inline size_t get_objsize(uint64_t oid)
 	if (is_ledger_object(oid))
 		return SD_LEDGER_OBJ_SIZE;
 
-	return SD_DATA_OBJ_SIZE;
+	return object_size;
 }
 
 static inline uint64_t data_oid_to_idx(uint64_t oid)
diff --git a/lib/fec.c b/lib/fec.c
index c4e7a6f..fb40773 100644
--- a/lib/fec.c
+++ b/lib/fec.c
@@ -696,12 +696,13 @@ out:
 }
 
 void fec_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-		      char *buf, int idx)
+		      char *buf, int idx, uint32_t object_size)
 {
 	int i, j, d = ctx->d;
 	size_t strip_size = SD_EC_DATA_STRIPE_SIZE / d;
+	uint32_t nr_stripe_per_object = object_size / SD_EC_DATA_STRIPE_SIZE;
 
-	for (i = 0; i < SD_EC_NR_STRIPE_PER_OBJECT; i++) {
+	for (i = 0; i < nr_stripe_per_object; i++) {
 		const uint8_t *in[d];
 		uint8_t out[strip_size];
 
@@ -713,9 +714,9 @@ void fec_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
 }
 
 void isa_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-		       char *buf, int idx)
+		       char *buf, int idx, uint32_t object_size)
 {
-	int ed = ctx->d, edp = ctx->dp, len = SD_DATA_OBJ_SIZE / ed, i;
+	int ed = ctx->d, edp = ctx->dp, len = object_size / ed, i;
 	unsigned char ec_tbl[ed * edp * 32];
 	unsigned char bm[ed * ed];
 	unsigned char cm[ed];
diff --git a/sheep/gateway.c b/sheep/gateway.c
index 7f7d1d1..408660a 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -713,7 +713,7 @@ out:
 static int gateway_handle_cow(struct request *req)
 {
 	uint64_t oid = req->rq.obj.oid;
-	size_t len = get_objsize(oid);
+	size_t len = get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
 	struct sd_req hdr, *req_hdr = &req->rq;
 	char *buf = xvalloc(len);
 	int ret;
diff --git a/sheep/group.c b/sheep/group.c
index 2b98a9b..e379241 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -510,7 +510,7 @@ retry:
 		if (vs[i].deleted)
 			atomic_set_bit(vs[i].vid, sys->vdi_deleted);
 		add_vdi_state(vs[i].vid, vs[i].nr_copies, vs[i].snapshot,
-			      vs[i].copy_policy);
+			      vs[i].copy_policy, vs[i].object_size);
 	}
 out:
 	free(vs);
@@ -766,6 +766,7 @@ static void cinfo_collection_done(struct work *work)
 		sd_debug("nr_copies: %d", vs->nr_copies);
 		sd_debug("snapshot: %d", vs->snapshot);
 		sd_debug("copy_policy: %d", vs->copy_policy);
+		sd_debug("object_size: %"PRIu32, vs->object_size);
 		sd_debug("lock_state: %x", vs->lock_state);
 		sd_debug("owner: %s",
 			 addr_to_str(vs->lock_owner.addr, vs->lock_owner.port));
diff --git a/sheep/journal.c b/sheep/journal.c
index 5beabdf..4df9a74 100644
--- a/sheep/journal.c
+++ b/sheep/journal.c
@@ -137,6 +137,7 @@ static int replay_journal_entry(struct journal_descriptor *jd)
 {
 	char path[PATH_MAX];
 	ssize_t size;
+	uint32_t object_size = 0;
 	int fd, flags = O_WRONLY, ret = 0;
 	void *buf = NULL;
 	char *p = (char *)jd;
@@ -168,9 +169,9 @@ static int replay_journal_entry(struct journal_descriptor *jd)
 		sd_err("open %m");
 		return -1;
 	}
-
 	if (jd->create) {
-		ret = prealloc(fd, get_objsize(jd->oid));
+		object_size = get_vdi_object_size(oid_to_vid(jd->oid));
+		ret = prealloc(fd, object_size);
 		if (ret < 0)
 			goto out;
 	}
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index a0da92d..31eb003 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -126,7 +126,8 @@ static inline bool idx_has_vdi_bit(uint64_t idx)
 
 static inline size_t get_cache_block_size(uint64_t oid)
 {
-	size_t bsize = DIV_ROUND_UP(get_objsize(oid),
+	uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
+	size_t bsize = DIV_ROUND_UP(get_objsize(oid, object_size),
 				    sizeof(uint64_t) * BITS_PER_BYTE);
 
 	return round_up(bsize, BLOCK_SIZE); /* To be FS friendly */
@@ -457,6 +458,7 @@ static int push_cache_object(uint32_t vid, uint64_t idx, uint64_t bmap,
 	void *buf;
 	off_t offset;
 	uint64_t oid = idx_to_oid(vid, idx);
+	uint32_t object_size = get_objsize(oid, get_vdi_object_size(vid));
 	size_t data_length, bsize = get_cache_block_size(oid);
 	int ret = SD_RES_NO_MEM;
 	int first_bit, last_bit;
@@ -473,7 +475,7 @@ static int push_cache_object(uint32_t vid, uint64_t idx, uint64_t bmap,
 		 oid, bsize, bmap, first_bit, last_bit);
 	offset = first_bit * bsize;
 	data_length = min((last_bit - first_bit + 1) * bsize,
-			  get_objsize(oid) - (size_t)offset);
+			  object_size - (size_t)offset);
 
 	buf = xvalloc(data_length);
 	ret = read_cache_object_noupdate(vid, idx, buf, data_length, offset);
@@ -517,6 +519,7 @@ static void do_reclaim_object(struct object_cache *oc)
 	struct object_cache_entry *entry;
 	uint64_t oid;
 	uint32_t cap;
+	uint32_t cache_object_size = get_vdi_object_size(oc->vid) / 1048576;
 
 	write_lock_cache(oc);
 	list_for_each_entry(entry, &oc->lru_head, lru_list) {
@@ -539,7 +542,7 @@ static void do_reclaim_object(struct object_cache *oc)
 		if (remove_cache_object(oc, entry_idx(entry)) != SD_RES_SUCCESS)
 			continue;
 		free_cache_entry(entry);
-		cap = uatomic_sub_return(&gcache.capacity, CACHE_OBJECT_SIZE);
+		cap = uatomic_sub_return(&gcache.capacity, cache_object_size);
 		sd_debug("%"PRIx64" reclaimed. capacity:%"PRId32, oid, cap);
 		if (cap <= HIGH_WATERMARK)
 			break;
@@ -685,13 +688,14 @@ alloc_cache_entry(struct object_cache *oc, uint64_t idx)
 static void add_to_lru_cache(struct object_cache *oc, uint64_t idx, bool create)
 {
 	struct object_cache_entry *entry = alloc_cache_entry(oc, idx);
+	uint32_t cache_object_size = get_vdi_object_size(oc->vid) / 1048576;
 
 	sd_debug("oid %"PRIx64" added", idx_to_oid(oc->vid, idx));
 
 	write_lock_cache(oc);
 	if (unlikely(lru_tree_insert(&oc->lru_tree, entry)))
 		panic("the object already exist");
-	uatomic_add(&gcache.capacity, CACHE_OBJECT_SIZE);
+	uatomic_add(&gcache.capacity, cache_object_size);
 	list_add_tail(&entry->lru_list, &oc->lru_head);
 	oc->total_count++;
 	if (create) {
@@ -736,7 +740,8 @@ static int object_cache_lookup(struct object_cache *oc, uint64_t idx,
 		ret = SD_RES_EIO;
 		goto out;
 	}
-	ret = prealloc(fd, get_objsize(idx_to_oid(oc->vid, idx)));
+	ret = prealloc(fd, get_objsize(idx_to_oid(oc->vid, idx),
+				       get_vdi_object_size(oc->vid)));
 	if (unlikely(ret < 0)) {
 		ret = SD_RES_EIO;
 		goto out_close;
@@ -804,7 +809,7 @@ static int object_cache_pull(struct object_cache *oc, uint64_t idx)
 	struct sd_req hdr;
 	int ret;
 	uint64_t oid = idx_to_oid(oc->vid, idx);
-	uint32_t data_length = get_objsize(oid);
+	uint32_t data_length = get_objsize(oid, oc->vid);
 	void *buf;
 
 	buf = xvalloc(data_length);
@@ -939,11 +944,14 @@ void object_cache_delete(uint32_t vid)
 	int h = hash(vid);
 	struct object_cache_entry *entry;
 	char path[PATH_MAX];
+	uint32_t cache_object_size;
 
 	cache = find_object_cache(vid, false);
 	if (!cache)
 		return;
 
+	cache_object_size = get_vdi_object_size(cache->vid) / 1048576;
+
 	/* Firstly we free memory */
 	sd_write_lock(&hashtable_lock[h]);
 	hlist_del(&cache->hash);
@@ -952,7 +960,7 @@ void object_cache_delete(uint32_t vid)
 	write_lock_cache(cache);
 	list_for_each_entry(entry, &cache->lru_head, lru_list) {
 		free_cache_entry(entry);
-		uatomic_sub(&gcache.capacity, CACHE_OBJECT_SIZE);
+		uatomic_sub(&gcache.capacity, cache_object_size);
 	}
 	unlock_cache(cache);
 	sd_destroy_rw_lock(&cache->lock);
@@ -1294,6 +1302,7 @@ int object_cache_remove(uint64_t oid)
 	/* Inc the entry refcount to exclude the reclaimer */
 	struct object_cache_entry *entry = oid_to_entry(oid);
 	struct object_cache *oc;
+	uint32_t cache_object_size_mb;
 	int ret;
 
 	if (!entry)
@@ -1305,6 +1314,8 @@ int object_cache_remove(uint64_t oid)
 	while (refcount_read(&entry->refcnt) > 1)
 		usleep(100000); /* Object might be in push */
 
+	cache_object_size_mb = get_vdi_object_size(oc->vid) / 1048576;
+
 	write_lock_cache(oc);
 	/*
 	 * We assume no other thread will inc the refcount of this entry
@@ -1321,7 +1332,7 @@ int object_cache_remove(uint64_t oid)
 	free_cache_entry(entry);
 	unlock_cache(oc);
 
-	uatomic_sub(&gcache.capacity, CACHE_OBJECT_SIZE);
+	uatomic_sub(&gcache.capacity, cache_object_size_mb);
 
 	return SD_RES_SUCCESS;
 }
diff --git a/sheep/ops.c b/sheep/ops.c
index 0c2389a..e5f4c4c 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -93,6 +93,7 @@ static int cluster_new_vdi(struct request *req)
 		.copy_policy = hdr->vdi.copy_policy,
 		.store_policy = hdr->vdi.store_policy,
 		.nr_copies = hdr->vdi.copies,
+		.object_size = hdr->vdi.object_size,
 		.time = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000,
 	};
 
@@ -105,6 +106,9 @@ static int cluster_new_vdi(struct request *req)
 	if (iocb.copy_policy)
 		iocb.nr_copies = ec_policy_to_dp(iocb.copy_policy, NULL, NULL);
 
+	if (!hdr->vdi.object_size)
+		iocb.object_size = sys->cinfo.object_size;
+
 	if (hdr->data_length != SD_MAX_VDI_LEN)
 		return SD_RES_INVALID_PARMS;
 
@@ -115,6 +119,7 @@ static int cluster_new_vdi(struct request *req)
 
 	rsp->vdi.vdi_id = vid;
 	rsp->vdi.copies = iocb.nr_copies;
+	rsp->vdi.object_size = iocb.object_size;
 
 	return ret;
 }
@@ -236,6 +241,7 @@ static int cluster_get_vdi_info(struct request *req)
 
 	rsp->vdi.vdi_id = info.vid;
 	rsp->vdi.copies = get_vdi_copy_number(info.vid);
+	rsp->vdi.object_size = get_vdi_object_size(info.vid);
 
 	return ret;
 }
@@ -655,13 +661,14 @@ static int cluster_notify_vdi_add(const struct sd_req *req, struct sd_rsp *rsp,
 		/* make the previous working vdi a snapshot */
 		add_vdi_state(req->vdi_state.old_vid,
 			      get_vdi_copy_number(req->vdi_state.old_vid),
-			      true, req->vdi_state.copy_policy);
+			      true, req->vdi_state.copy_policy,
+			      get_vdi_object_size(req->vdi_state.old_vid));
 
 	if (req->vdi_state.set_bitmap)
 		atomic_set_bit(req->vdi_state.new_vid, sys->vdi_inuse);
 
 	add_vdi_state(req->vdi_state.new_vid, req->vdi_state.copies, false,
-		      req->vdi_state.copy_policy);
+		      req->vdi_state.copy_policy, req->vdi_state.object_size);
 
 	return SD_RES_SUCCESS;
 }
@@ -759,9 +766,10 @@ static int cluster_alter_vdi_copy(const struct sd_req *req, struct sd_rsp *rsp,
 
 	uint32_t vid = req->vdi_state.new_vid;
 	int nr_copies = req->vdi_state.copies;
+	uint32_t object_size = req->vdi_state.object_size;
 	struct vnode_info *vinfo;
 
-	add_vdi_state(vid, nr_copies, false, 0);
+	add_vdi_state(vid, nr_copies, false, 0, object_size);
 
 	vinfo = get_vnode_info();
 	start_recovery(vinfo, vinfo, false);
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index 1b7b66c..e344189 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -152,7 +152,8 @@ static int default_trim(int fd, uint64_t oid, const struct siocb *iocb,
 
 	if (*poffset + *plen < iocb->offset + iocb->length) {
 		uint64_t end = iocb->offset + iocb->length;
-		if (end == get_objsize(oid))
+		uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
+		if (end == get_objsize(oid, object_size))
 			/* This is necessary to punch the last block */
 			end = round_up(end, BLOCK_SIZE);
 		sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen,
@@ -267,6 +268,7 @@ int default_cleanup(void)
 static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch)
 {
 	int ret;
+	uint32_t object_size;
 	struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE);
 	struct siocb iocb = {
 		.epoch = epoch,
@@ -280,9 +282,9 @@ static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch)
 		       "wat %s", oid, epoch, wd);
 		goto out;
 	}
-
+	object_size = (UINT32_C(1) << inode->block_size_shift);
 	add_vdi_state(oid_to_vid(oid), inode->nr_copies,
-		      vdi_is_snapshot(inode), inode->copy_policy);
+		      vdi_is_snapshot(inode), inode->copy_policy, object_size);
 
 	if (inode->name[0] == '\0')
 		atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted);
@@ -402,9 +404,9 @@ size_t get_store_objsize(uint64_t oid)
 		uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
 		int d;
 		ec_policy_to_dp(policy, &d, NULL);
-		return SD_DATA_OBJ_SIZE / d;
+		return get_vdi_object_size(oid_to_vid(oid)) / d;
 	}
-	return get_objsize(oid);
+	return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
 }
 
 int default_create_and_write(uint64_t oid, const struct siocb *iocb)
@@ -413,6 +415,7 @@ int default_create_and_write(uint64_t oid, const struct siocb *iocb)
 	int flags = prepare_iocb(oid, iocb, true);
 	int ret, fd;
 	uint32_t len = iocb->length;
+	uint32_t object_size = 0;
 	size_t obj_size;
 	uint64_t offset = iocb->offset;
 
@@ -452,7 +455,9 @@ int default_create_and_write(uint64_t oid, const struct siocb *iocb)
 
 	trim_zero_blocks(iocb->buf, &offset, &len);
 
-	if (offset != 0 || len != get_objsize(oid)) {
+	object_size = get_vdi_object_size(oid_to_vid(oid));
+
+	if (offset != 0 || len != get_objsize(oid, object_size)) {
 		if (is_sparse_object(oid))
 			ret = xftruncate(fd, obj_size);
 		else
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 7874fc9..9bf2d9c 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -429,6 +429,7 @@ static void *rebuild_erasure_object(uint64_t oid, uint8_t idx,
 	char *lost = xvalloc(len);
 	int i, j;
 	uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
+	uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
 	int ed = 0, edp;
 	edp = ec_policy_to_dp(policy, &ed, NULL);
 	struct fec *ctx = ec_init(ed, edp);
@@ -458,7 +459,7 @@ static void *rebuild_erasure_object(uint64_t oid, uint8_t idx,
 	}
 
 	/* Rebuild the lost replica */
-	ec_decode_buffer(ctx, bufs, idxs, lost, idx);
+	ec_decode_buffer(ctx, bufs, idxs, lost, idx, object_size);
 out:
 	ec_destroy(ctx);
 	for (i = 0; i < ed; i++)
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 5fc6b90..37946d1 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -219,6 +219,7 @@ struct vdi_iocb {
 	uint8_t copy_policy;
 	uint8_t store_policy;
 	uint8_t nr_copies;
+	uint32_t object_size;
 	uint64_t time;
 };
 
@@ -326,9 +327,12 @@ int fill_vdi_state_list(const struct sd_req *hdr,
 bool oid_is_readonly(uint64_t oid);
 int get_vdi_copy_number(uint32_t vid);
 int get_vdi_copy_policy(uint32_t vid);
+uint32_t get_vdi_object_size(uint32_t vid);
 int get_obj_copy_number(uint64_t oid, int nr_zones);
 int get_req_copy_number(struct request *req);
-int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t);
+uint32_t get_req_object_size(struct request *req);
+int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot,
+		  uint8_t, uint32_t object_size);
 int vdi_exist(uint32_t vid);
 int vdi_create(const struct vdi_iocb *iocb, uint32_t *new_vid);
 int vdi_snapshot(const struct vdi_iocb *iocb, uint32_t *new_vid);
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 1c8fb36..95b3230 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -14,6 +14,7 @@
 struct vdi_state_entry {
 	uint32_t vid;
 	unsigned int nr_copies;
+	uint32_t object_size;
 	bool snapshot;
 	bool deleted;
 	uint8_t copy_policy;
@@ -132,6 +133,23 @@ int get_vdi_copy_policy(uint32_t vid)
 	return entry->copy_policy;
 }
 
+uint32_t get_vdi_object_size(uint32_t vid)
+{
+	struct vdi_state_entry *entry;
+
+	sd_read_lock(&vdi_state_lock);
+	entry = vdi_state_search(&vdi_state_root, vid);
+	sd_rw_unlock(&vdi_state_lock);
+
+	if (!entry) {
+		sd_alert("copy number for %" PRIx32 " not found, set %" PRIx32,
+			 vid, sys->cinfo.object_size);
+		return sys->cinfo.object_size;
+	}
+
+	return entry->object_size;
+}
+
 int get_obj_copy_number(uint64_t oid, int nr_zones)
 {
 	return min(get_vdi_copy_number(oid_to_vid(oid)), nr_zones);
@@ -149,7 +167,19 @@ int get_req_copy_number(struct request *req)
 	return nr_copies;
 }
 
-int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t cp)
+uint32_t get_req_object_size(struct request *req)
+{
+	uint32_t object_size;
+
+	object_size = req->rq.data_length;
+	if (!object_size)
+		object_size = get_vdi_object_size(oid_to_vid(req->rq.obj.oid));
+
+	return object_size;
+}
+
+int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot,
+		  uint8_t cp, uint32_t object_size)
 {
 	struct vdi_state_entry *entry, *old;
 
@@ -158,6 +188,7 @@ int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t cp)
 	entry->nr_copies = nr_copies;
 	entry->snapshot = snapshot;
 	entry->copy_policy = cp;
+	entry->object_size = object_size;
 
 	entry->lock_state = LOCK_STATE_UNLOCKED;
 	memset(&entry->owner, 0, sizeof(struct node_id));
@@ -173,7 +204,8 @@ int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t cp)
 		sd_mutex_unlock(&m);
 	}
 
-	sd_debug("%" PRIx32 ", %d, %d", vid, nr_copies, cp);
+	sd_debug("%" PRIx32 ", %d, %d, %"PRIu32,
+		 vid, nr_copies, cp, object_size);
 
 	sd_write_lock(&vdi_state_lock);
 	old = vdi_state_insert(&vdi_state_root, entry);
@@ -183,6 +215,7 @@ int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t cp)
 		entry->nr_copies = nr_copies;
 		entry->snapshot = snapshot;
 		entry->copy_policy = cp;
+		entry->object_size = object_size;
 	}
 
 	sd_rw_unlock(&vdi_state_lock);
@@ -209,6 +242,7 @@ int fill_vdi_state_list(const struct sd_req *hdr,
 		vs[last].nr_copies = entry->nr_copies;
 		vs[last].snapshot = entry->snapshot;
 		vs[last].copy_policy = entry->copy_policy;
+		vs[last].object_size = entry->object_size;
 		vs[last].lock_state = entry->lock_state;
 		vs[last].lock_owner = entry->owner;
 		vs[last].nr_participants = entry->nr_participants;
@@ -251,6 +285,7 @@ static struct vdi_state *fill_vdi_state_list_with_alloc(int *result_nr)
 		vs[i].snapshot = entry->snapshot;
 		vs[i].deleted = entry->deleted;
 		vs[i].copy_policy = entry->copy_policy;
+		vs[i].object_size = entry->object_size;
 		vs[i].lock_state = entry->lock_state;
 		vs[i].lock_owner = entry->owner;
 		vs[i].nr_participants = entry->nr_participants;
@@ -861,7 +896,7 @@ static struct sd_inode *alloc_inode(const struct vdi_iocb *iocb,
 				    struct generation_reference *gref)
 {
 	struct sd_inode *new = xzalloc(sizeof(*new));
-	unsigned long block_size = SD_DATA_OBJ_SIZE;
+	unsigned long block_size = iocb->object_size;
 
 	pstrcpy(new->name, sizeof(new->name), iocb->name);
 	new->vdi_id = new_vid;
@@ -903,9 +938,10 @@ static int create_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
 	int ret;
 
 	sd_debug("%s: size %" PRIu64 ", new_vid %" PRIx32 ", copies %d, "
-		 "snapid %" PRIu32 " copy policy %"PRIu8 "store policy %"PRIu8,
-		 iocb->name, iocb->size, new_vid, iocb->nr_copies, new_snapid,
-		 new->copy_policy, new->store_policy);
+		 "snapid %" PRIu32 " copy policy %"PRIu8 "store policy %"PRIu8
+		 "object_size %"PRIu32, iocb->name, iocb->size, new_vid,
+		  iocb->nr_copies, new_snapid, new->copy_policy,
+		  new->store_policy, iocb->object_size);
 
 	ret = sd_write_object(vid_to_vdi_oid(new_vid), (char *)new,
 			      sizeof(*new), 0, true);
@@ -940,8 +976,9 @@ static int clone_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
 	int ret;
 
 	sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
-		 "copies %d, snapid %" PRIu32, iocb->name, iocb->size, new_vid,
-		 base_vid, iocb->nr_copies, new_snapid);
+		 "copies %d, object_size %" PRIu32 ", snapid %" PRIu32,
+		 iocb->name, iocb->size, new_vid, base_vid,
+		 iocb->nr_copies, iocb->object_size, new_snapid);
 
 	ret = sd_read_object(vid_to_vdi_oid(base_vid), (char *)base,
 			     sizeof(*base), 0);
@@ -1002,8 +1039,9 @@ static int snapshot_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
 	int ret;
 
 	sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
-		 "copies %d, snapid %" PRIu32, iocb->name, iocb->size, new_vid,
-		 base_vid, iocb->nr_copies, new_snapid);
+		 "copies %d, object_size %"PRIu32 ", snapid %" PRIu32,
+		 iocb->name, iocb->size, new_vid, base_vid,
+		 iocb->nr_copies, iocb->object_size, new_snapid);
 
 	ret = sd_read_object(vid_to_vdi_oid(base_vid), (char *)base,
 			     sizeof(*base), 0);
@@ -1071,8 +1109,9 @@ static int rebase_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
 	int ret;
 
 	sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
-		 "cur %" PRIx32 ", copies %d, snapid %" PRIu32, iocb->name,
-		 iocb->size, new_vid, base_vid, cur_vid, iocb->nr_copies,
+		 "cur %" PRIx32 ", copies %d, object_size %"PRIu32
+		 ", snapid %" PRIu32, iocb->name, iocb->size, new_vid,
+		 base_vid, cur_vid, iocb->nr_copies, iocb->object_size,
 		 new_snapid);
 
 	ret = sd_read_object(vid_to_vdi_oid(base_vid), (char *)base,
@@ -1260,7 +1299,7 @@ int vdi_lookup(const struct vdi_iocb *iocb, struct vdi_info *info)
 }
 
 static int notify_vdi_add(uint32_t vdi_id, uint32_t nr_copies, uint32_t old_vid,
-			  uint8_t copy_policy)
+			  uint8_t copy_policy, uint32_t object_size)
 {
 	int ret;
 	struct sd_req hdr;
@@ -1271,11 +1310,13 @@ static int notify_vdi_add(uint32_t vdi_id, uint32_t nr_copies, uint32_t old_vid,
 	hdr.vdi_state.copies = nr_copies;
 	hdr.vdi_state.set_bitmap = false;
 	hdr.vdi_state.copy_policy = copy_policy;
+	hdr.vdi_state.object_size = object_size;
 
 	ret = exec_local_req(&hdr, NULL);
 	if (ret != SD_RES_SUCCESS)
 		sd_err("fail to notify vdi add event(%" PRIx32 ", %d, %" PRIx32
-		       ")", vdi_id, nr_copies, old_vid);
+		       ", %"PRIu32 ")", vdi_id, nr_copies,
+		       old_vid, object_size);
 
 	return ret;
 }
@@ -1326,7 +1367,7 @@ int vdi_create(const struct vdi_iocb *iocb, uint32_t *new_vid)
 		info.snapid = 1;
 	*new_vid = info.free_bit;
 	ret = notify_vdi_add(*new_vid, iocb->nr_copies, info.vid,
-			     iocb->copy_policy);
+			     iocb->copy_policy, iocb->object_size);
 	if (ret != SD_RES_SUCCESS)
 		return ret;
 
@@ -1366,7 +1407,7 @@ int vdi_snapshot(const struct vdi_iocb *iocb, uint32_t *new_vid)
 	assert(info.snapid > 0);
 	*new_vid = info.free_bit;
 	ret = notify_vdi_add(*new_vid, iocb->nr_copies, info.vid,
-			     iocb->copy_policy);
+			     iocb->copy_policy, iocb->object_size);
 	if (ret != SD_RES_SUCCESS)
 		return ret;
 
@@ -1745,6 +1786,15 @@ int sd_create_hyper_volume(const char *name, uint32_t *vdi_id)
 	hdr.vdi.copies = sys->cinfo.nr_copies;
 	hdr.vdi.copy_policy = sys->cinfo.copy_policy;
 	hdr.vdi.store_policy = 1;
+	/* XXX Cannot use both features, Hypervolume and Change object size */
+	if (sys->cinfo.object_size != SD_DATA_OBJ_SIZE) {
+		hdr.vdi.object_size = SD_DATA_OBJ_SIZE;
+		sd_warn("Cluster default object size is not"
+			" SD_DATA_OBJ_SIZE(%lu)."
+			"Set VDI object size %lu and create HyperVolume",
+			SD_DATA_OBJ_SIZE, SD_DATA_OBJ_SIZE);
+	}
+
 
 	ret = exec_local_req(&hdr, buf);
 	if (ret != SD_RES_SUCCESS) {
diff --git a/tests/unit/sheep/test_vdi.c b/tests/unit/sheep/test_vdi.c
index 2f8946b..132caf5 100644
--- a/tests/unit/sheep/test_vdi.c
+++ b/tests/unit/sheep/test_vdi.c
@@ -17,9 +17,9 @@
 
 START_TEST(test_vdi)
 {
-	add_vdi_state(1, 1, true, 0);
-	add_vdi_state(2, 1, true, 0);
-	add_vdi_state(3, 2, false, 0);
+	add_vdi_state(1, 1, true, 0, 4194304);
+	add_vdi_state(2, 1, true, 0, 4194304);
+	add_vdi_state(3, 2, false, 0, 4194304);
 
 	ck_assert_int_eq(get_vdi_copy_number(1), 1);
 	ck_assert_int_eq(get_vdi_copy_number(2), 1);
-- 
1.7.1




More information about the sheepdog mailing list