[sheepdog] [PATCH v2 3/3] Add selectable object_size support of VDI operation

Teruaki Ishizaki ishizaki.teruaki at lab.ntt.co.jp
Fri Dec 12 13:48:32 CET 2014


Data object size was fix to 4MB and not selectable.
This patch add feature to select data object size of VDI.

If you want to use 8MB(2^23) data object_size, specify the
block_size_shift bit num to 23.

ex) dog vdi create -z 23 testvdi 100M

Signed-off-by: Teruaki Ishizaki <ishizaki.teruaki at lab.ntt.co.jp>
---
 dog/common.c                |    9 +-
 dog/dog.h                   |    6 +-
 dog/farm/farm.c             |   18 ++-
 dog/vdi.c                   |  245 ++++++++++++++++++++++++++++++-------------
 include/fec.h               |   12 +-
 include/sheepdog_proto.h    |    7 +-
 lib/fec.c                   |    9 +-
 sheep/gateway.c             |    2 +-
 sheep/group.c               |    3 +-
 sheep/journal.c             |    5 +-
 sheep/object_cache.c        |   27 ++++--
 sheep/ops.c                 |   15 ++-
 sheep/plain_store.c         |   16 ++-
 sheep/recovery.c            |    3 +-
 sheep/sheep_priv.h          |    6 +-
 sheep/vdi.c                 |   92 +++++++++++++---
 tests/unit/sheep/test_vdi.c |    6 +-
 17 files changed, 341 insertions(+), 140 deletions(-)

diff --git a/dog/common.c b/dog/common.c
index 2d8a173..6ff1e19 100644
--- a/dog/common.c
+++ b/dog/common.c
@@ -365,19 +365,22 @@ void show_progress(uint64_t done, uint64_t total, bool raw)
 	free(buf);
 }
 
-size_t get_store_objsize(uint8_t copy_policy, uint64_t oid)
+size_t get_store_objsize(uint8_t copy_policy, uint8_t block_size_shift,
+			 uint64_t oid)
 {
 	if (is_vdi_obj(oid))
 		return SD_INODE_SIZE;
 	if (is_vdi_btree_obj(oid))
 		return SD_INODE_DATA_INDEX_SIZE;
+
+	uint32_t object_size = (UINT32_C(1) << block_size_shift);
 	if (copy_policy != 0) {
 		int d;
 
 		ec_policy_to_dp(copy_policy, &d, NULL);
-		return SD_DATA_OBJ_SIZE / d;
+		return object_size / d;
 	}
-	return get_objsize(oid);
+	return get_objsize(oid, object_size);
 }
 
 bool is_erasure_oid(uint64_t oid, uint8_t policy)
diff --git a/dog/dog.h b/dog/dog.h
index 80becc6..bcf0e6e 100644
--- a/dog/dog.h
+++ b/dog/dog.h
@@ -87,10 +87,12 @@ void confirm(const char *message);
 void work_queue_wait(struct work_queue *q);
 int do_vdi_create(const char *vdiname, int64_t vdi_size,
 		  uint32_t base_vid, uint32_t *vdi_id, bool snapshot,
-		  uint8_t nr_copies, uint8_t copy_policy, uint8_t store_policy);
+		  uint8_t nr_copies, uint8_t copy_policy,
+		  uint8_t store_policy, uint8_t block_size_shift);
 int do_vdi_check(const struct sd_inode *inode);
 void show_progress(uint64_t done, uint64_t total, bool raw);
-size_t get_store_objsize(uint8_t copy_policy, uint64_t oid);
+size_t get_store_objsize(uint8_t copy_policy, uint8_t block_size_shift,
+			 uint64_t oid);
 bool is_erasure_oid(uint64_t oid, uint8_t policy);
 uint8_t parse_copy(const char *str, uint8_t *copy_policy);
 
diff --git a/dog/farm/farm.c b/dog/farm/farm.c
index 5c8ca3b..55bc274 100644
--- a/dog/farm/farm.c
+++ b/dog/farm/farm.c
@@ -38,6 +38,7 @@ struct active_vdi_entry {
 	uint8_t  nr_copies;
 	uint8_t copy_policy;
 	uint8_t store_policy;
+	uint8_t block_size_shift;
 };
 
 struct registered_obj_entry {
@@ -77,6 +78,7 @@ static void update_active_vdi_entry(struct active_vdi_entry *vdi,
 	vdi->nr_copies = new->nr_copies;
 	vdi->copy_policy = new->copy_policy;
 	vdi->store_policy = new->store_policy;
+	vdi->block_size_shift = new->block_size_shift;
 }
 
 static void add_active_vdi(struct sd_inode *new)
@@ -131,7 +133,8 @@ static int create_active_vdis(void)
 				  vdi->vdi_id, &new_vid,
 				  false, vdi->nr_copies,
 				  vdi->copy_policy,
-				  vdi->store_policy) < 0)
+				  vdi->store_policy,
+				  vdi->block_size_shift) < 0)
 			return -1;
 	}
 	return 0;
@@ -202,7 +205,7 @@ out:
 }
 
 static int notify_vdi_add(uint32_t vdi_id, uint8_t nr_copies,
-			  uint8_t copy_policy)
+			  uint8_t copy_policy, uint8_t block_size_shift)
 {
 	int ret;
 	struct sd_req hdr;
@@ -213,13 +216,14 @@ static int notify_vdi_add(uint32_t vdi_id, uint8_t nr_copies,
 	hdr.vdi_state.new_vid = vdi_id;
 	hdr.vdi_state.copies = nr_copies;
 	hdr.vdi_state.copy_policy = copy_policy;
+	hdr.vdi_state.block_size_shift = block_size_shift;
 	hdr.vdi_state.set_bitmap = true;
 
 	ret = dog_exec_req(&sd_nid, &hdr, buf);
 
 	if (ret < 0)
-		sd_err("Fail to notify vdi add event(%"PRIx32", %d)", vdi_id,
-		       nr_copies);
+		sd_err("Fail to notify vdi add event(%"PRIx32", %d"
+		       ", %"PRIu8")", vdi_id, nr_copies, block_size_shift);
 	if (rsp->result != SD_RES_SUCCESS) {
 		sd_err("%s", sd_strerror(rsp->result));
 		ret = -1;
@@ -261,7 +265,8 @@ static void do_save_object(struct work *work)
 
 	sw = container_of(work, struct snapshot_work, work);
 
-	size = get_objsize(sw->entry.oid);
+	size = get_objsize(sw->entry.oid,
+			  (UINT32_C(1) <<  sw->entry.block_size_shift));
 	buf = xmalloc(size);
 
 	if (dog_read_object(sw->entry.oid, buf, size, 0, true) < 0)
@@ -413,7 +418,8 @@ static void do_load_object(struct work *work)
 	vid = oid_to_vid(sw->entry.oid);
 	if (register_vdi(vid)) {
 		if (notify_vdi_add(vid, sw->entry.nr_copies,
-				   sw->entry.copy_policy) < 0)
+				   sw->entry.copy_policy,
+				   sw->entry.block_size_shift) < 0)
 			goto error;
 	}
 
diff --git a/dog/vdi.c b/dog/vdi.c
index 5353062..22d6c83 100644
--- a/dog/vdi.c
+++ b/dog/vdi.c
@@ -38,6 +38,8 @@ static struct sd_option vdi_options[] = {
 	{'o', "oid", true, "specify the object id of the tracking object"},
 	{'e', "exist", false, "only check objects exist or not,\n"
 	 "                          neither comparing nor repairing"},
+	{'z', "block_size_shift", true, "specify the bit shift num for"
+			       " data object size"},
 	{ 0, NULL, false, NULL },
 };
 
@@ -49,6 +51,7 @@ static struct vdi_cmd_data {
 	bool delete;
 	bool prealloc;
 	int nr_copies;
+	uint8_t block_size_shift;
 	bool writeback;
 	int from_snapshot_id;
 	char from_snapshot_tag[SD_MAX_VDI_TAG_LEN];
@@ -67,6 +70,7 @@ struct get_vdi_info {
 	uint32_t snapid;
 	uint8_t nr_copies;
 	uint8_t copy_policy;
+	uint8_t block_size_shift;
 };
 
 int dog_bnode_writer(uint64_t oid, void *mem, unsigned int len, uint64_t offset,
@@ -118,6 +122,7 @@ static void print_vdi_list(uint32_t vid, const char *name, const char *tag,
 	struct tm tm;
 	char dbuf[128];
 	struct get_vdi_info *info = data;
+	uint32_t object_size = (UINT32_C(1) << i->block_size_shift);
 
 	if (info && strcmp(name, info->name) != 0)
 		return;
@@ -143,23 +148,24 @@ static void print_vdi_list(uint32_t vid, const char *name, const char *tag,
 				putchar('\\');
 			putchar(*name++);
 		}
-		printf(" %d %s %s %s %s %" PRIx32 " %s %s\n", snapid,
-		       strnumber(i->vdi_size),
-		       strnumber(my_objs * SD_DATA_OBJ_SIZE),
-		       strnumber(cow_objs * SD_DATA_OBJ_SIZE),
+		printf(" %d %s %s %s %s %" PRIx32 " %s %s %" PRIu8 "\n",
+		       snapid, strnumber(i->vdi_size),
+		       strnumber(my_objs * object_size),
+		       strnumber(cow_objs * object_size),
 		       dbuf, vid,
 		       redundancy_scheme(i->nr_copies, i->copy_policy),
-		       i->tag);
+		       i->tag, i->block_size_shift);
 	} else {
-		printf("%c %-8s %5d %7s %7s %7s %s  %7" PRIx32 " %6s %13s\n",
+		printf("%c %-8s %5d %7s %7s %7s %s  %7" PRIx32
+		       " %6s %13s %3" PRIu8 "\n",
 		       vdi_is_snapshot(i) ? 's' : (is_clone ? 'c' : ' '),
 		       name, snapid,
 		       strnumber(i->vdi_size),
-		       strnumber(my_objs * SD_DATA_OBJ_SIZE),
-		       strnumber(cow_objs * SD_DATA_OBJ_SIZE),
+		       strnumber(my_objs * object_size),
+		       strnumber(cow_objs * object_size),
 		       dbuf, vid,
 		       redundancy_scheme(i->nr_copies, i->copy_policy),
-		       i->tag);
+		       i->tag, i->block_size_shift);
 	}
 }
 
@@ -282,7 +288,9 @@ static int vdi_list(int argc, char **argv)
 	const char *vdiname = argv[optind];
 
 	if (!raw_output)
-		printf("  Name        Id    Size    Used  Shared    Creation time   VDI id  Copies  Tag\n");
+		printf("  Name        Id    Size    Used  Shared"
+		       "    Creation time   VDI id  Copies  Tag"
+		       "   Block Size Shift\n");
 
 	if (vdiname) {
 		struct get_vdi_info info;
@@ -396,7 +404,8 @@ int read_vdi_obj(const char *vdiname, int snapid, const char *tag,
 
 int do_vdi_create(const char *vdiname, int64_t vdi_size,
 		  uint32_t base_vid, uint32_t *vdi_id, bool snapshot,
-		  uint8_t nr_copies, uint8_t copy_policy, uint8_t store_policy)
+		  uint8_t nr_copies, uint8_t copy_policy,
+		  uint8_t store_policy, uint8_t block_size_shift)
 {
 	struct sd_req hdr;
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
@@ -416,6 +425,7 @@ int do_vdi_create(const char *vdiname, int64_t vdi_size,
 	hdr.vdi.copies = nr_copies;
 	hdr.vdi.copy_policy = copy_policy;
 	hdr.vdi.store_policy = store_policy;
+	hdr.vdi.block_size_shift = block_size_shift;
 
 	ret = dog_exec_req(&sd_nid, &hdr, buf);
 	if (ret < 0)
@@ -440,6 +450,8 @@ static int vdi_create(int argc, char **argv)
 	uint32_t vid;
 	uint64_t oid;
 	uint32_t idx, max_idx;
+	uint32_t object_size;
+	uint64_t old_max_total_size = 0;
 	struct sd_inode *inode = NULL;
 	int ret;
 
@@ -451,10 +463,35 @@ static int vdi_create(int argc, char **argv)
 	if (ret < 0)
 		return EXIT_USAGE;
 
-	if (size > SD_OLD_MAX_VDI_SIZE && 0 == vdi_cmd_data.store_policy) {
+	if (vdi_cmd_data.block_size_shift) {
+		object_size = (UINT32_C(1) << vdi_cmd_data.block_size_shift);
+		old_max_total_size = object_size * OLD_MAX_DATA_OBJS;
+	} else {
+		struct sd_req hdr;
+		struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+		struct cluster_info cinfo;
+		sd_init_req(&hdr, SD_OP_CLUSTER_INFO);
+		hdr.data_length = sizeof(cinfo);
+		ret = dog_exec_req(&sd_nid, &hdr, &cinfo);
+		if (ret < 0) {
+			sd_err("Fail to execute request: SD_OP_CLUSTER_INFO");
+			ret = EXIT_FAILURE;
+			goto out;
+		}
+		if (rsp->result != SD_RES_SUCCESS) {
+			sd_err("%s", sd_strerror(rsp->result));
+			ret = EXIT_FAILURE;
+			goto out;
+		}
+		object_size = (UINT32_C(1) << cinfo.block_size_shift);
+		old_max_total_size = object_size * OLD_MAX_DATA_OBJS;
+	}
+
+	if (size > old_max_total_size && 0 == vdi_cmd_data.store_policy) {
 		sd_err("VDI size is larger than %s bytes, please use '-y' to "
-		       "create a hyper volume with size up to %s bytes",
-		       strnumber(SD_OLD_MAX_VDI_SIZE),
+		       "create a hyper volume with size up to %s bytes"
+		       " or use '-z' to create larger object size volume",
+		       strnumber(old_max_total_size),
 		       strnumber(SD_MAX_VDI_SIZE));
 		return EXIT_USAGE;
 	}
@@ -466,7 +503,8 @@ static int vdi_create(int argc, char **argv)
 
 	ret = do_vdi_create(vdiname, size, 0, &vid, false,
 			    vdi_cmd_data.nr_copies, vdi_cmd_data.copy_policy,
-			    vdi_cmd_data.store_policy);
+			    vdi_cmd_data.store_policy,
+			    vdi_cmd_data.block_size_shift);
 	if (ret != EXIT_SUCCESS || !vdi_cmd_data.prealloc)
 		goto out;
 
@@ -479,10 +517,11 @@ static int vdi_create(int argc, char **argv)
 		ret = EXIT_FAILURE;
 		goto out;
 	}
-	max_idx = DIV_ROUND_UP(size, SD_DATA_OBJ_SIZE);
+	object_size = (UINT32_C(1) << inode->block_size_shift);
+	max_idx = DIV_ROUND_UP(size, object_size);
 
 	for (idx = 0; idx < max_idx; idx++) {
-		vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+		vdi_show_progress(idx * object_size, inode->vdi_size);
 		oid = vid_to_data_oid(vid, idx);
 
 		ret = dog_write_object(oid, 0, NULL, 0, 0, 0, inode->nr_copies,
@@ -499,7 +538,7 @@ static int vdi_create(int argc, char **argv)
 			goto out;
 		}
 	}
-	vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+	vdi_show_progress(idx * object_size, inode->vdi_size);
 	ret = EXIT_SUCCESS;
 
 out:
@@ -664,7 +703,7 @@ static int vdi_snapshot(int argc, char **argv)
 
 	ret = do_vdi_create(vdiname, inode->vdi_size, vid, &new_vid, true,
 			    inode->nr_copies, inode->copy_policy,
-			    inode->store_policy);
+			    inode->store_policy, inode->block_size_shift);
 
 	if (ret == EXIT_SUCCESS && verbose) {
 		if (raw_output)
@@ -691,6 +730,7 @@ static int vdi_clone(int argc, char **argv)
 	uint32_t base_vid, new_vid, vdi_id;
 	uint64_t oid;
 	uint32_t idx, max_idx, ret;
+	uint32_t object_size;
 	struct sd_inode *inode = NULL, *new_inode = NULL;
 	char *buf = NULL;
 
@@ -719,9 +759,10 @@ static int vdi_clone(int argc, char **argv)
 	if (vdi_cmd_data.no_share == true)
 		base_vid = 0;
 
+	object_size = (UINT32_C(1) << inode->block_size_shift);
 	ret = do_vdi_create(dst_vdi, inode->vdi_size, base_vid, &new_vid, false,
 			    inode->nr_copies, inode->copy_policy,
-			    inode->store_policy);
+			    inode->store_policy, inode->block_size_shift);
 	if (ret != EXIT_SUCCESS ||
 			(!vdi_cmd_data.prealloc && !vdi_cmd_data.no_share))
 		goto out;
@@ -732,23 +773,23 @@ static int vdi_clone(int argc, char **argv)
 	if (ret != EXIT_SUCCESS)
 		goto out;
 
-	buf = xzalloc(SD_DATA_OBJ_SIZE);
+	buf = xzalloc(object_size);
 	max_idx = count_data_objs(inode);
 
 	for (idx = 0; idx < max_idx; idx++) {
 		size_t size;
 
-		vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+		vdi_show_progress(idx * object_size, inode->vdi_size);
 		vdi_id = sd_inode_get_vid(inode, idx);
 		if (vdi_id) {
 			oid = vid_to_data_oid(vdi_id, idx);
-			ret = dog_read_object(oid, buf, SD_DATA_OBJ_SIZE, 0,
+			ret = dog_read_object(oid, buf, object_size, 0,
 					      true);
 			if (ret) {
 				ret = EXIT_FAILURE;
 				goto out;
 			}
-			size = SD_DATA_OBJ_SIZE;
+			size = object_size;
 		} else {
 			if (vdi_cmd_data.no_share && !vdi_cmd_data.prealloc)
 				continue;
@@ -772,7 +813,7 @@ static int vdi_clone(int argc, char **argv)
 			goto out;
 		}
 	}
-	vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+	vdi_show_progress(idx * object_size, inode->vdi_size);
 	ret = EXIT_SUCCESS;
 
 out:
@@ -979,7 +1020,7 @@ static int vdi_rollback(int argc, char **argv)
 
 	ret = do_vdi_create(vdiname, inode->vdi_size, base_vid, &new_vid,
 			     false, vdi_cmd_data.nr_copies, inode->copy_policy,
-			     inode->store_policy);
+			     inode->store_policy, inode->block_size_shift);
 
 	if (ret == EXIT_SUCCESS && verbose) {
 		if (raw_output)
@@ -1494,6 +1535,7 @@ static int vdi_read(int argc, char **argv)
 	struct sd_inode *inode = NULL;
 	uint64_t offset = 0, oid, done = 0, total = (uint64_t) -1;
 	uint32_t vdi_id, idx;
+	uint32_t object_size;
 	unsigned int len;
 	char *buf = NULL;
 
@@ -1509,25 +1551,27 @@ static int vdi_read(int argc, char **argv)
 	}
 
 	inode = malloc(sizeof(*inode));
-	buf = xmalloc(SD_DATA_OBJ_SIZE);
 
 	ret = read_vdi_obj(vdiname, vdi_cmd_data.snapshot_id,
 			   vdi_cmd_data.snapshot_tag, NULL, inode,
 			   SD_INODE_SIZE);
 	if (ret != EXIT_SUCCESS)
-		goto out;
+		goto load_inode_err;
 
 	if (inode->vdi_size < offset) {
 		sd_err("Read offset is beyond the end of the VDI");
 		ret = EXIT_FAILURE;
-		goto out;
+		goto load_inode_err;
 	}
 
+	object_size = (UINT32_C(1) << inode->block_size_shift);
+	buf = xmalloc(object_size);
+
 	total = min(total, inode->vdi_size - offset);
-	idx = offset / SD_DATA_OBJ_SIZE;
-	offset %= SD_DATA_OBJ_SIZE;
+	idx = offset / object_size;
+	offset %= object_size;
 	while (done < total) {
-		len = min(total - done, SD_DATA_OBJ_SIZE - offset);
+		len = min(total - done, object_size - offset);
 		vdi_id = sd_inode_get_vid(inode, idx);
 		if (vdi_id) {
 			oid = vid_to_data_oid(vdi_id, idx);
@@ -1554,8 +1598,9 @@ static int vdi_read(int argc, char **argv)
 	fsync(STDOUT_FILENO);
 	ret = EXIT_SUCCESS;
 out:
-	free(inode);
 	free(buf);
+load_inode_err:
+	free(inode);
 
 	return ret;
 }
@@ -1564,6 +1609,7 @@ static int vdi_write(int argc, char **argv)
 {
 	const char *vdiname = argv[optind++];
 	uint32_t vid, flags, vdi_id, idx;
+	uint32_t object_size;
 	int ret;
 	struct sd_inode *inode = NULL;
 	uint64_t offset = 0, oid, old_oid, done = 0, total = (uint64_t) -1;
@@ -1583,26 +1629,28 @@ static int vdi_write(int argc, char **argv)
 	}
 
 	inode = xmalloc(sizeof(*inode));
-	buf = xmalloc(SD_DATA_OBJ_SIZE);
 
 	ret = read_vdi_obj(vdiname, 0, "", &vid, inode, SD_INODE_SIZE);
 	if (ret != EXIT_SUCCESS)
-		goto out;
+		goto load_inode_err;
 
 	if (inode->vdi_size < offset) {
 		sd_err("Write offset is beyond the end of the VDI");
 		ret = EXIT_FAILURE;
-		goto out;
+		goto load_inode_err;
 	}
 
+	object_size = (UINT32_C(1) << inode->block_size_shift);
+	buf = xmalloc(object_size);
+
 	total = min(total, inode->vdi_size - offset);
-	idx = offset / SD_DATA_OBJ_SIZE;
-	offset %= SD_DATA_OBJ_SIZE;
+	idx = offset / object_size;
+	offset %= object_size;
 	while (done < total) {
 		create = false;
 		old_oid = 0;
 		flags = 0;
-		len = min(total - done, SD_DATA_OBJ_SIZE - offset);
+		len = min(total - done, object_size - offset);
 
 		vdi_id = sd_inode_get_vid(inode, idx);
 		if (!vdi_id)
@@ -1647,7 +1695,7 @@ static int vdi_write(int argc, char **argv)
 		}
 
 		offset += len;
-		if (offset == SD_DATA_OBJ_SIZE) {
+		if (offset == object_size) {
 			offset = 0;
 			idx++;
 		}
@@ -1655,8 +1703,9 @@ static int vdi_write(int argc, char **argv)
 	}
 	ret = EXIT_SUCCESS;
 out:
-	free(inode);
 	free(buf);
+load_inode_err:
+	free(inode);
 
 	return ret;
 }
@@ -1709,6 +1758,7 @@ struct vdi_check_info {
 	uint64_t oid;
 	uint8_t nr_copies;
 	uint8_t copy_policy;
+	uint8_t block_size_shift;
 	uint64_t total;
 	uint64_t *done;
 	int refcnt;
@@ -1720,8 +1770,9 @@ struct vdi_check_info {
 
 static void free_vdi_check_info(struct vdi_check_info *info)
 {
+	uint32_t object_size = (UINT32_C(1) << info->block_size_shift);
 	if (info->done) {
-		*info->done += SD_DATA_OBJ_SIZE;
+		*info->done += object_size;
 		vdi_show_progress(*info->done, info->total);
 	}
 	free(info);
@@ -1783,6 +1834,7 @@ static void vdi_check_object_work(struct work *work)
 	if (is_erasure_oid(info->oid, info->copy_policy)) {
 		sd_init_req(&hdr, SD_OP_READ_PEER);
 		hdr.data_length = get_store_objsize(info->copy_policy,
+						    info->block_size_shift,
 						    info->oid);
 		hdr.obj.ec_index = vcw->ec_index;
 		hdr.epoch = sd_epoch;
@@ -1856,7 +1908,8 @@ static void check_erasure_object(struct vdi_check_info *info)
 	struct fec *ctx = ec_init(d, dp);
 	int miss_idx[dp], input_idx[dp];
 	uint64_t oid = info->oid;
-	size_t len = get_store_objsize(info->copy_policy, oid);
+	size_t len = get_store_objsize(info->copy_policy,
+				       info->block_size_shift, oid);
 	char *obj = xmalloc(len);
 	uint8_t *input[dp];
 
@@ -1882,7 +1935,8 @@ static void check_erasure_object(struct vdi_check_info *info)
 			uint8_t *ds[d];
 			for (j = 0; j < d; j++)
 				ds[j] = info->vcw[j].buf;
-			ec_decode_buffer(ctx, ds, idx, obj, d + k);
+			ec_decode_buffer(ctx, ds, idx, obj, d + k,
+					 info->block_size_shift);
 			if (memcmp(obj, info->vcw[d + k].buf, len) != 0) {
 				/* TODO repair the inconsistency */
 				sd_err("object %"PRIx64" is inconsistent", oid);
@@ -1900,7 +1954,8 @@ static void check_erasure_object(struct vdi_check_info *info)
 
 			for (i = 0; i < d; i++)
 				ds[i] = input[i];
-			ec_decode_buffer(ctx, ds, input_idx, obj, m);
+			ec_decode_buffer(ctx, ds, input_idx, obj, m,
+					 info->block_size_shift);
 			write_object_to(info->vcw[m].vnode, oid, obj,
 					len, true, info->vcw[m].ec_index);
 			fprintf(stdout, "fixed missing %"PRIx64", "
@@ -2029,10 +2084,11 @@ static void check_cb(struct sd_index *idx, void *arg, int ignore)
 {
 	struct check_arg *carg = arg;
 	uint64_t oid;
+	uint32_t object_size = (UINT32_C(1) << carg->inode->block_size_shift);
 
 	if (idx->vdi_id) {
 		oid = vid_to_data_oid(idx->vdi_id, idx->idx);
-		*(carg->done) = (uint64_t)idx->idx * SD_DATA_OBJ_SIZE;
+		*(carg->done) = (uint64_t)idx->idx * object_size;
 		vdi_show_progress(*(carg->done), carg->inode->vdi_size);
 		queue_vdi_check_work(carg->inode, oid, NULL, carg->wq,
 				     carg->nr_copies);
@@ -2046,6 +2102,7 @@ int do_vdi_check(const struct sd_inode *inode)
 	uint32_t vid;
 	struct work_queue *wq;
 	int nr_copies = min((int)inode->nr_copies, sd_zones_nr);
+	uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
 
 	if (0 < inode->copy_policy && sd_zones_nr < (int)inode->nr_copies) {
 		sd_err("ABORT: Not enough active zones for consistency-checking"
@@ -2070,7 +2127,7 @@ int do_vdi_check(const struct sd_inode *inode)
 				queue_vdi_check_work(inode, oid, &done, wq,
 						     nr_copies);
 			} else {
-				done += SD_DATA_OBJ_SIZE;
+				done += object_size;
 				vdi_show_progress(done, inode->vdi_size);
 			}
 		}
@@ -2125,11 +2182,12 @@ struct obj_backup {
 	uint32_t offset;
 	uint32_t length;
 	uint32_t reserved;
-	uint8_t data[SD_DATA_OBJ_SIZE];
+	uint8_t *data;
 };
 
 /* discards redundant area from backup data */
-static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data)
+static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data,
+			       uint32_t object_size)
 {
 	uint8_t *p1, *p2;
 
@@ -2142,8 +2200,8 @@ static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data)
 		backup->length -= SECTOR_SIZE;
 	}
 
-	p1 = backup->data + SD_DATA_OBJ_SIZE - SECTOR_SIZE;
-	p2 = from_data + SD_DATA_OBJ_SIZE - SECTOR_SIZE;
+	p1 = backup->data + object_size - SECTOR_SIZE;
+	p2 = from_data + object_size - SECTOR_SIZE;
 	while (backup->length > 0 && memcmp(p1, p2, SECTOR_SIZE) == 0) {
 		p1 -= SECTOR_SIZE;
 		p2 -= SECTOR_SIZE;
@@ -2152,29 +2210,29 @@ static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data)
 }
 
 static int get_obj_backup(uint32_t idx, uint32_t from_vid, uint32_t to_vid,
-			  struct obj_backup *backup)
+			  struct obj_backup *backup, uint32_t object_size)
 {
 	int ret;
-	uint8_t *from_data = xzalloc(SD_DATA_OBJ_SIZE);
+	uint8_t *from_data = xzalloc(object_size);
 
 	backup->idx = idx;
 	backup->offset = 0;
-	backup->length = SD_DATA_OBJ_SIZE;
+	backup->length = object_size;
 
 	if (to_vid) {
 		ret = dog_read_object(vid_to_data_oid(to_vid, idx),
-				      backup->data, SD_DATA_OBJ_SIZE, 0, true);
+				      backup->data, object_size, 0, true);
 		if (ret != SD_RES_SUCCESS) {
 			sd_err("Failed to read object %" PRIx32 ", %d", to_vid,
 			       idx);
 			return EXIT_FAILURE;
 		}
 	} else
-		memset(backup->data, 0, SD_DATA_OBJ_SIZE);
+		memset(backup->data, 0, object_size);
 
 	if (from_vid) {
 		ret = dog_read_object(vid_to_data_oid(from_vid, idx), from_data,
-				      SD_DATA_OBJ_SIZE, 0, true);
+				      object_size, 0, true);
 		if (ret != SD_RES_SUCCESS) {
 			sd_err("Failed to read object %" PRIx32 ", %d",
 			       from_vid, idx);
@@ -2182,7 +2240,7 @@ static int get_obj_backup(uint32_t idx, uint32_t from_vid, uint32_t to_vid,
 		}
 	}
 
-	compact_obj_backup(backup, from_data);
+	compact_obj_backup(backup, from_data, object_size);
 
 	free(from_data);
 
@@ -2194,13 +2252,13 @@ static int vdi_backup(int argc, char **argv)
 	const char *vdiname = argv[optind++];
 	int ret = EXIT_SUCCESS;
 	uint32_t idx, nr_objs;
+	uint32_t object_size;
 	struct sd_inode *from_inode = xzalloc(sizeof(*from_inode));
 	struct sd_inode *to_inode = xzalloc(sizeof(*to_inode));
 	struct backup_hdr hdr = {
 		.version = VDI_BACKUP_FORMAT_VERSION,
 		.magic = VDI_BACKUP_MAGIC,
 	};
-	struct obj_backup *backup = xzalloc(sizeof(*backup));
 
 	if ((!vdi_cmd_data.snapshot_id && !vdi_cmd_data.snapshot_tag[0]) ||
 	    (!vdi_cmd_data.from_snapshot_id &&
@@ -2214,21 +2272,25 @@ static int vdi_backup(int argc, char **argv)
 			   vdi_cmd_data.from_snapshot_tag, NULL,
 			   from_inode, SD_INODE_SIZE);
 	if (ret != EXIT_SUCCESS)
-		goto out;
+		goto load_inode_err;
 
 	ret = read_vdi_obj(vdiname, vdi_cmd_data.snapshot_id,
 			   vdi_cmd_data.snapshot_tag, NULL, to_inode,
 			   SD_INODE_SIZE);
 	if (ret != EXIT_SUCCESS)
-		goto out;
+		goto load_inode_err;
 
 	nr_objs = count_data_objs(to_inode);
 
+	struct obj_backup *backup = xzalloc(sizeof(*backup));
+	object_size = (UINT32_C(1) << from_inode->block_size_shift);
+	backup->data = xzalloc(sizeof(uint8_t) * object_size);
+
 	ret = xwrite(STDOUT_FILENO, &hdr, sizeof(hdr));
 	if (ret < 0) {
 		sd_err("failed to write backup header, %m");
 		ret = EXIT_SYSFAIL;
-		goto out;
+		goto error;
 	}
 
 	for (idx = 0; idx < nr_objs; idx++) {
@@ -2238,9 +2300,10 @@ static int vdi_backup(int argc, char **argv)
 		if (to_vid == 0 && from_vid == 0)
 			continue;
 
-		ret = get_obj_backup(idx, from_vid, to_vid, backup);
+		ret = get_obj_backup(idx, from_vid, to_vid,
+				     backup, object_size);
 		if (ret != EXIT_SUCCESS)
-			goto out;
+			goto error;
 
 		if (backup->length == 0)
 			continue;
@@ -2250,14 +2313,14 @@ static int vdi_backup(int argc, char **argv)
 		if (ret < 0) {
 			sd_err("failed to write backup data, %m");
 			ret = EXIT_SYSFAIL;
-			goto out;
+			goto error;
 		}
 		ret = xwrite(STDOUT_FILENO, backup->data + backup->offset,
 			     backup->length);
 		if (ret < 0) {
 			sd_err("failed to write backup data, %m");
 			ret = EXIT_SYSFAIL;
-			goto out;
+			goto error;
 		}
 	}
 
@@ -2269,15 +2332,18 @@ static int vdi_backup(int argc, char **argv)
 	if (ret < 0) {
 		sd_err("failed to write end marker, %m");
 		ret = EXIT_SYSFAIL;
-		goto out;
+		goto error;
 	}
 
 	fsync(STDOUT_FILENO);
 	ret = EXIT_SUCCESS;
-out:
+error:
+	free(backup->data);
+	free(backup);
+load_inode_err:
 	free(from_inode);
 	free(to_inode);
-	free(backup);
+out:
 	return ret;
 }
 
@@ -2331,7 +2397,7 @@ static uint32_t do_restore(const char *vdiname, int snapid, const char *tag)
 
 	ret = do_vdi_create(vdiname, inode->vdi_size, inode->vdi_id, &vid,
 			    false, inode->nr_copies, inode->copy_policy,
-			    inode->store_policy);
+			    inode->store_policy, inode->block_size_shift);
 	if (ret != EXIT_SUCCESS) {
 		sd_err("Failed to read VDI");
 		goto out;
@@ -2440,7 +2506,8 @@ out:
 					     current_inode->parent_vdi_id, NULL,
 					     true, current_inode->nr_copies,
 					     current_inode->copy_policy,
-					     current_inode->store_policy);
+					     current_inode->store_policy,
+					     current_inode->block_size_shift);
 		if (recovery_ret != EXIT_SUCCESS) {
 			sd_err("failed to resume the current vdi");
 			ret = recovery_ret;
@@ -2563,9 +2630,25 @@ static int vdi_cache_info(int argc, char **argv)
 
 	fprintf(stdout, "Name\tTag\tTotal\tDirty\tClean\n");
 	for (i = 0; i < info.count; i++) {
-		uint64_t total = info.caches[i].total * SD_DATA_OBJ_SIZE,
-			 dirty = info.caches[i].dirty * SD_DATA_OBJ_SIZE,
+		uint32_t object_size;
+		uint32_t vid = info.caches[i].vid;
+		struct sd_inode *inode = NULL;
+		int r;
+
+		r = dog_read_object(vid_to_vdi_oid(vid), inode,
+				    SD_INODE_HEADER_SIZE, 0, true);
+		if (r != EXIT_SUCCESS)
+			return r;
+
+		if (!inode->block_size_shift)
+			return EXIT_FAILURE;
+
+		object_size = (UINT32_C(1) << inode->block_size_shift);
+
+		uint64_t total = info.caches[i].total * object_size,
+			 dirty = info.caches[i].dirty * object_size,
 			 clean = total - dirty;
+
 		char name[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
 
 		ret = vid_to_name_tag(info.caches[i].vid, name, tag);
@@ -2955,7 +3038,7 @@ static struct subcommand vdi_cmd[] = {
 	{"check", "<vdiname>", "seaphT", "check and repair image's consistency",
 	 NULL, CMD_NEED_NODELIST|CMD_NEED_ARG,
 	 vdi_check, vdi_options},
-	{"create", "<vdiname> <size>", "PycaphrvT", "create an image",
+	{"create", "<vdiname> <size>", "PycaphrvzT", "create an image",
 	 NULL, CMD_NEED_NODELIST|CMD_NEED_ARG,
 	 vdi_create, vdi_options},
 	{"snapshot", "<vdiname>", "saphrvT", "create a snapshot",
@@ -3023,6 +3106,7 @@ static struct subcommand vdi_cmd[] = {
 static int vdi_parser(int ch, const char *opt)
 {
 	char *p;
+	uint8_t block_size_shift;
 
 	switch (ch) {
 	case 'P':
@@ -3101,6 +3185,19 @@ static int vdi_parser(int ch, const char *opt)
 	case 'e':
 		vdi_cmd_data.exist = true;
 		break;
+	case 'z':
+		block_size_shift = (uint8_t)atoi(opt);
+		if (block_size_shift > 31) {
+			sd_err("Object Size is limited to 2^31."
+			       " Please set shift bit lower than 31");
+			exit(EXIT_FAILURE);
+		} else if (block_size_shift < 20) {
+			sd_err("Object Size is larger than 2^20."
+			       " Please set shift bit larger than 20");
+			exit(EXIT_FAILURE);
+		}
+		vdi_cmd_data.block_size_shift = block_size_shift;
+		break;
 	}
 
 	return 0;
diff --git a/include/fec.h b/include/fec.h
index 1ae32e4..b3ef8d8 100644
--- a/include/fec.h
+++ b/include/fec.h
@@ -96,12 +96,12 @@ void fec_encode(const struct fec *code,
 		size_t num_block_nums, size_t sz);
 
 void fec_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-		       char *buf, int idx);
+		       char *buf, int idx, uint32_t object_size);
 
 /* for isa-l */
 
 void isa_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-		       char *buf, int idx);
+		       char *buf, int idx, uint32_t object_size);
 
 /*
  * @param inpkts an array of packets (size k); If a primary block, i, is present
@@ -119,7 +119,6 @@ void fec_decode(const struct fec *code,
 
 /* Set data stripe as sector size to make VM happy */
 #define SD_EC_DATA_STRIPE_SIZE (512) /* 512 Byte */
-#define SD_EC_NR_STRIPE_PER_OBJECT (SD_DATA_OBJ_SIZE / SD_EC_DATA_STRIPE_SIZE)
 #define SD_EC_MAX_STRIP (16)
 
 static inline int ec_policy_to_dp(uint8_t policy, int *d, int *p)
@@ -205,11 +204,12 @@ static inline void ec_destroy(struct fec *ctx)
 }
 
 static inline void ec_decode_buffer(struct fec *ctx, uint8_t *input[],
-				    const int in_idx[], char *buf, int idx)
+				    const int in_idx[], char *buf,
+				    int idx, uint32_t object_size)
 {
 	if (cpu_has_ssse3)
-		isa_decode_buffer(ctx, input, in_idx, buf, idx);
+		isa_decode_buffer(ctx, input, in_idx, buf, idx, object_size);
 	else
-		fec_decode_buffer(ctx, input, in_idx, buf, idx);
+		fec_decode_buffer(ctx, input, in_idx, buf, idx, object_size);
 }
 #endif
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 7d5c143..4f0c48c 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -476,10 +476,11 @@ static inline bool is_data_obj(uint64_t oid)
 
 static inline size_t count_data_objs(const struct sd_inode *inode)
 {
-	return DIV_ROUND_UP(inode->vdi_size, SD_DATA_OBJ_SIZE);
+	return DIV_ROUND_UP(inode->vdi_size,
+			    (UINT32_C(1) << inode->block_size_shift));
 }
 
-static inline size_t get_objsize(uint64_t oid)
+static inline size_t get_objsize(uint64_t oid, uint32_t object_size)
 {
 	if (is_vdi_obj(oid))
 		return SD_INODE_SIZE;
@@ -493,7 +494,7 @@ static inline size_t get_objsize(uint64_t oid)
 	if (is_ledger_object(oid))
 		return SD_LEDGER_OBJ_SIZE;
 
-	return SD_DATA_OBJ_SIZE;
+	return object_size;
 }
 
 static inline uint64_t data_oid_to_idx(uint64_t oid)
diff --git a/lib/fec.c b/lib/fec.c
index c4e7a6f..fb40773 100644
--- a/lib/fec.c
+++ b/lib/fec.c
@@ -696,12 +696,13 @@ out:
 }
 
 void fec_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-		      char *buf, int idx)
+		      char *buf, int idx, uint32_t object_size)
 {
 	int i, j, d = ctx->d;
 	size_t strip_size = SD_EC_DATA_STRIPE_SIZE / d;
+	uint32_t nr_stripe_per_object = object_size / SD_EC_DATA_STRIPE_SIZE;
 
-	for (i = 0; i < SD_EC_NR_STRIPE_PER_OBJECT; i++) {
+	for (i = 0; i < nr_stripe_per_object; i++) {
 		const uint8_t *in[d];
 		uint8_t out[strip_size];
 
@@ -713,9 +714,9 @@ void fec_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
 }
 
 void isa_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-		       char *buf, int idx)
+		       char *buf, int idx, uint32_t object_size)
 {
-	int ed = ctx->d, edp = ctx->dp, len = SD_DATA_OBJ_SIZE / ed, i;
+	int ed = ctx->d, edp = ctx->dp, len = object_size / ed, i;
 	unsigned char ec_tbl[ed * edp * 32];
 	unsigned char bm[ed * ed];
 	unsigned char cm[ed];
diff --git a/sheep/gateway.c b/sheep/gateway.c
index 7f7d1d1..408660a 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -713,7 +713,7 @@ out:
 static int gateway_handle_cow(struct request *req)
 {
 	uint64_t oid = req->rq.obj.oid;
-	size_t len = get_objsize(oid);
+	size_t len = get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
 	struct sd_req hdr, *req_hdr = &req->rq;
 	char *buf = xvalloc(len);
 	int ret;
diff --git a/sheep/group.c b/sheep/group.c
index 2b98a9b..095b7c5 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -510,7 +510,7 @@ retry:
 		if (vs[i].deleted)
 			atomic_set_bit(vs[i].vid, sys->vdi_deleted);
 		add_vdi_state(vs[i].vid, vs[i].nr_copies, vs[i].snapshot,
-			      vs[i].copy_policy);
+			      vs[i].copy_policy, vs[i].block_size_shift);
 	}
 out:
 	free(vs);
@@ -766,6 +766,7 @@ static void cinfo_collection_done(struct work *work)
 		sd_debug("nr_copies: %d", vs->nr_copies);
 		sd_debug("snapshot: %d", vs->snapshot);
 		sd_debug("copy_policy: %d", vs->copy_policy);
+		sd_debug("block_size_shift: %"PRIu8, vs->block_size_shift);
 		sd_debug("lock_state: %x", vs->lock_state);
 		sd_debug("owner: %s",
 			 addr_to_str(vs->lock_owner.addr, vs->lock_owner.port));
diff --git a/sheep/journal.c b/sheep/journal.c
index 5beabdf..4df9a74 100644
--- a/sheep/journal.c
+++ b/sheep/journal.c
@@ -137,6 +137,7 @@ static int replay_journal_entry(struct journal_descriptor *jd)
 {
 	char path[PATH_MAX];
 	ssize_t size;
+	uint32_t object_size = 0;
 	int fd, flags = O_WRONLY, ret = 0;
 	void *buf = NULL;
 	char *p = (char *)jd;
@@ -168,9 +169,9 @@ static int replay_journal_entry(struct journal_descriptor *jd)
 		sd_err("open %m");
 		return -1;
 	}
-
 	if (jd->create) {
-		ret = prealloc(fd, get_objsize(jd->oid));
+		object_size = get_vdi_object_size(oid_to_vid(jd->oid));
+		ret = prealloc(fd, object_size);
 		if (ret < 0)
 			goto out;
 	}
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index a0da92d..3794c19 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -126,7 +126,8 @@ static inline bool idx_has_vdi_bit(uint64_t idx)
 
 static inline size_t get_cache_block_size(uint64_t oid)
 {
-	size_t bsize = DIV_ROUND_UP(get_objsize(oid),
+	uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
+	size_t bsize = DIV_ROUND_UP(get_objsize(oid, object_size),
 				    sizeof(uint64_t) * BITS_PER_BYTE);
 
 	return round_up(bsize, BLOCK_SIZE); /* To be FS friendly */
@@ -457,6 +458,7 @@ static int push_cache_object(uint32_t vid, uint64_t idx, uint64_t bmap,
 	void *buf;
 	off_t offset;
 	uint64_t oid = idx_to_oid(vid, idx);
+	uint32_t object_size = get_objsize(oid, get_vdi_object_size(vid));
 	size_t data_length, bsize = get_cache_block_size(oid);
 	int ret = SD_RES_NO_MEM;
 	int first_bit, last_bit;
@@ -473,7 +475,7 @@ static int push_cache_object(uint32_t vid, uint64_t idx, uint64_t bmap,
 		 oid, bsize, bmap, first_bit, last_bit);
 	offset = first_bit * bsize;
 	data_length = min((last_bit - first_bit + 1) * bsize,
-			  get_objsize(oid) - (size_t)offset);
+			  object_size - (size_t)offset);
 
 	buf = xvalloc(data_length);
 	ret = read_cache_object_noupdate(vid, idx, buf, data_length, offset);
@@ -517,6 +519,7 @@ static void do_reclaim_object(struct object_cache *oc)
 	struct object_cache_entry *entry;
 	uint64_t oid;
 	uint32_t cap;
+	uint32_t cache_object_size = get_vdi_object_size(oc->vid) / 1048576;
 
 	write_lock_cache(oc);
 	list_for_each_entry(entry, &oc->lru_head, lru_list) {
@@ -539,7 +542,7 @@ static void do_reclaim_object(struct object_cache *oc)
 		if (remove_cache_object(oc, entry_idx(entry)) != SD_RES_SUCCESS)
 			continue;
 		free_cache_entry(entry);
-		cap = uatomic_sub_return(&gcache.capacity, CACHE_OBJECT_SIZE);
+		cap = uatomic_sub_return(&gcache.capacity, cache_object_size);
 		sd_debug("%"PRIx64" reclaimed. capacity:%"PRId32, oid, cap);
 		if (cap <= HIGH_WATERMARK)
 			break;
@@ -685,13 +688,14 @@ alloc_cache_entry(struct object_cache *oc, uint64_t idx)
 static void add_to_lru_cache(struct object_cache *oc, uint64_t idx, bool create)
 {
 	struct object_cache_entry *entry = alloc_cache_entry(oc, idx);
+	uint32_t cache_object_size = get_vdi_object_size(oc->vid) / 1048576;
 
 	sd_debug("oid %"PRIx64" added", idx_to_oid(oc->vid, idx));
 
 	write_lock_cache(oc);
 	if (unlikely(lru_tree_insert(&oc->lru_tree, entry)))
 		panic("the object already exist");
-	uatomic_add(&gcache.capacity, CACHE_OBJECT_SIZE);
+	uatomic_add(&gcache.capacity, cache_object_size);
 	list_add_tail(&entry->lru_list, &oc->lru_head);
 	oc->total_count++;
 	if (create) {
@@ -736,7 +740,8 @@ static int object_cache_lookup(struct object_cache *oc, uint64_t idx,
 		ret = SD_RES_EIO;
 		goto out;
 	}
-	ret = prealloc(fd, get_objsize(idx_to_oid(oc->vid, idx)));
+	ret = prealloc(fd, get_objsize(idx_to_oid(oc->vid, idx),
+				       get_vdi_object_size(oc->vid)));
 	if (unlikely(ret < 0)) {
 		ret = SD_RES_EIO;
 		goto out_close;
@@ -804,7 +809,7 @@ static int object_cache_pull(struct object_cache *oc, uint64_t idx)
 	struct sd_req hdr;
 	int ret;
 	uint64_t oid = idx_to_oid(oc->vid, idx);
-	uint32_t data_length = get_objsize(oid);
+	uint32_t data_length = get_objsize(oid, get_vdi_object_size(oc->vid));
 	void *buf;
 
 	buf = xvalloc(data_length);
@@ -939,11 +944,14 @@ void object_cache_delete(uint32_t vid)
 	int h = hash(vid);
 	struct object_cache_entry *entry;
 	char path[PATH_MAX];
+	uint32_t cache_object_size;
 
 	cache = find_object_cache(vid, false);
 	if (!cache)
 		return;
 
+	cache_object_size = get_vdi_object_size(cache->vid) / 1048576;
+
 	/* Firstly we free memory */
 	sd_write_lock(&hashtable_lock[h]);
 	hlist_del(&cache->hash);
@@ -952,7 +960,7 @@ void object_cache_delete(uint32_t vid)
 	write_lock_cache(cache);
 	list_for_each_entry(entry, &cache->lru_head, lru_list) {
 		free_cache_entry(entry);
-		uatomic_sub(&gcache.capacity, CACHE_OBJECT_SIZE);
+		uatomic_sub(&gcache.capacity, cache_object_size);
 	}
 	unlock_cache(cache);
 	sd_destroy_rw_lock(&cache->lock);
@@ -1294,6 +1302,7 @@ int object_cache_remove(uint64_t oid)
 	/* Inc the entry refcount to exclude the reclaimer */
 	struct object_cache_entry *entry = oid_to_entry(oid);
 	struct object_cache *oc;
+	uint32_t cache_object_size;
 	int ret;
 
 	if (!entry)
@@ -1305,6 +1314,8 @@ int object_cache_remove(uint64_t oid)
 	while (refcount_read(&entry->refcnt) > 1)
 		usleep(100000); /* Object might be in push */
 
+	cache_object_size = get_vdi_object_size(oc->vid) / 1048576;
+
 	write_lock_cache(oc);
 	/*
 	 * We assume no other thread will inc the refcount of this entry
@@ -1321,7 +1332,7 @@ int object_cache_remove(uint64_t oid)
 	free_cache_entry(entry);
 	unlock_cache(oc);
 
-	uatomic_sub(&gcache.capacity, CACHE_OBJECT_SIZE);
+	uatomic_sub(&gcache.capacity, cache_object_size);
 
 	return SD_RES_SUCCESS;
 }
diff --git a/sheep/ops.c b/sheep/ops.c
index c76fc4e..c2f685e 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -93,6 +93,7 @@ static int cluster_new_vdi(struct request *req)
 		.copy_policy = hdr->vdi.copy_policy,
 		.store_policy = hdr->vdi.store_policy,
 		.nr_copies = hdr->vdi.copies,
+		.block_size_shift = hdr->vdi.block_size_shift,
 		.time = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000,
 	};
 
@@ -105,6 +106,9 @@ static int cluster_new_vdi(struct request *req)
 	if (iocb.copy_policy)
 		iocb.nr_copies = ec_policy_to_dp(iocb.copy_policy, NULL, NULL);
 
+	if (!hdr->vdi.block_size_shift)
+		iocb.block_size_shift = sys->cinfo.block_size_shift;
+
 	if (hdr->data_length != SD_MAX_VDI_LEN)
 		return SD_RES_INVALID_PARMS;
 
@@ -115,6 +119,7 @@ static int cluster_new_vdi(struct request *req)
 
 	rsp->vdi.vdi_id = vid;
 	rsp->vdi.copies = iocb.nr_copies;
+	rsp->vdi.block_size_shift = iocb.block_size_shift;
 
 	return ret;
 }
@@ -236,6 +241,7 @@ static int cluster_get_vdi_info(struct request *req)
 
 	rsp->vdi.vdi_id = info.vid;
 	rsp->vdi.copies = get_vdi_copy_number(info.vid);
+	rsp->vdi.block_size_shift = get_vdi_block_size_shift(info.vid);
 
 	return ret;
 }
@@ -655,13 +661,15 @@ static int cluster_notify_vdi_add(const struct sd_req *req, struct sd_rsp *rsp,
 		/* make the previous working vdi a snapshot */
 		add_vdi_state(req->vdi_state.old_vid,
 			      get_vdi_copy_number(req->vdi_state.old_vid),
-			      true, req->vdi_state.copy_policy);
+			      true, req->vdi_state.copy_policy,
+			      get_vdi_block_size_shift(req->vdi_state.old_vid));
 
 	if (req->vdi_state.set_bitmap)
 		atomic_set_bit(req->vdi_state.new_vid, sys->vdi_inuse);
 
 	add_vdi_state(req->vdi_state.new_vid, req->vdi_state.copies, false,
-		      req->vdi_state.copy_policy);
+		      req->vdi_state.copy_policy,
+		      req->vdi_state.block_size_shift);
 
 	return SD_RES_SUCCESS;
 }
@@ -759,9 +767,10 @@ static int cluster_alter_vdi_copy(const struct sd_req *req, struct sd_rsp *rsp,
 
 	uint32_t vid = req->vdi_state.new_vid;
 	int nr_copies = req->vdi_state.copies;
+	uint32_t block_size_shift = req->vdi_state.block_size_shift;
 	struct vnode_info *vinfo;
 
-	add_vdi_state(vid, nr_copies, false, 0);
+	add_vdi_state(vid, nr_copies, false, 0, block_size_shift);
 
 	vinfo = get_vnode_info();
 	start_recovery(vinfo, vinfo, false);
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index 1b7b66c..cb90e31 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -152,7 +152,8 @@ static int default_trim(int fd, uint64_t oid, const struct siocb *iocb,
 
 	if (*poffset + *plen < iocb->offset + iocb->length) {
 		uint64_t end = iocb->offset + iocb->length;
-		if (end == get_objsize(oid))
+		uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
+		if (end == get_objsize(oid, object_size))
 			/* This is necessary to punch the last block */
 			end = round_up(end, BLOCK_SIZE);
 		sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen,
@@ -280,9 +281,9 @@ static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch)
 		       "wat %s", oid, epoch, wd);
 		goto out;
 	}
-
 	add_vdi_state(oid_to_vid(oid), inode->nr_copies,
-		      vdi_is_snapshot(inode), inode->copy_policy);
+		      vdi_is_snapshot(inode), inode->copy_policy,
+		      inode->block_size_shift);
 
 	if (inode->name[0] == '\0')
 		atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted);
@@ -402,9 +403,9 @@ size_t get_store_objsize(uint64_t oid)
 		uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
 		int d;
 		ec_policy_to_dp(policy, &d, NULL);
-		return SD_DATA_OBJ_SIZE / d;
+		return get_vdi_object_size(oid_to_vid(oid)) / d;
 	}
-	return get_objsize(oid);
+	return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
 }
 
 int default_create_and_write(uint64_t oid, const struct siocb *iocb)
@@ -413,6 +414,7 @@ int default_create_and_write(uint64_t oid, const struct siocb *iocb)
 	int flags = prepare_iocb(oid, iocb, true);
 	int ret, fd;
 	uint32_t len = iocb->length;
+	uint32_t object_size = 0;
 	size_t obj_size;
 	uint64_t offset = iocb->offset;
 
@@ -452,7 +454,9 @@ int default_create_and_write(uint64_t oid, const struct siocb *iocb)
 
 	trim_zero_blocks(iocb->buf, &offset, &len);
 
-	if (offset != 0 || len != get_objsize(oid)) {
+	object_size = get_vdi_object_size(oid_to_vid(oid));
+
+	if (offset != 0 || len != get_objsize(oid, object_size)) {
 		if (is_sparse_object(oid))
 			ret = xftruncate(fd, obj_size);
 		else
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 7874fc9..9bf2d9c 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -429,6 +429,7 @@ static void *rebuild_erasure_object(uint64_t oid, uint8_t idx,
 	char *lost = xvalloc(len);
 	int i, j;
 	uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
+	uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
 	int ed = 0, edp;
 	edp = ec_policy_to_dp(policy, &ed, NULL);
 	struct fec *ctx = ec_init(ed, edp);
@@ -458,7 +459,7 @@ static void *rebuild_erasure_object(uint64_t oid, uint8_t idx,
 	}
 
 	/* Rebuild the lost replica */
-	ec_decode_buffer(ctx, bufs, idxs, lost, idx);
+	ec_decode_buffer(ctx, bufs, idxs, lost, idx, object_size);
 out:
 	ec_destroy(ctx);
 	for (i = 0; i < ed; i++)
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 5fc6b90..a724754 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -219,6 +219,7 @@ struct vdi_iocb {
 	uint8_t copy_policy;
 	uint8_t store_policy;
 	uint8_t nr_copies;
+	uint8_t block_size_shift;
 	uint64_t time;
 };
 
@@ -326,9 +327,12 @@ int fill_vdi_state_list(const struct sd_req *hdr,
 bool oid_is_readonly(uint64_t oid);
 int get_vdi_copy_number(uint32_t vid);
 int get_vdi_copy_policy(uint32_t vid);
+uint32_t get_vdi_object_size(uint32_t vid);
+uint8_t get_vdi_block_size_shift(uint32_t vid);
 int get_obj_copy_number(uint64_t oid, int nr_zones);
 int get_req_copy_number(struct request *req);
-int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t);
+int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot,
+		  uint8_t, uint8_t block_size_shift);
 int vdi_exist(uint32_t vid);
 int vdi_create(const struct vdi_iocb *iocb, uint32_t *new_vid);
 int vdi_snapshot(const struct vdi_iocb *iocb, uint32_t *new_vid);
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 1c8fb36..392b860 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -14,6 +14,7 @@
 struct vdi_state_entry {
 	uint32_t vid;
 	unsigned int nr_copies;
+	uint8_t block_size_shift;
 	bool snapshot;
 	bool deleted;
 	uint8_t copy_policy;
@@ -132,6 +133,44 @@ int get_vdi_copy_policy(uint32_t vid)
 	return entry->copy_policy;
 }
 
+uint32_t get_vdi_object_size(uint32_t vid)
+{
+	struct vdi_state_entry *entry;
+	uint32_t object_size;
+
+	sd_read_lock(&vdi_state_lock);
+	entry = vdi_state_search(&vdi_state_root, vid);
+	sd_rw_unlock(&vdi_state_lock);
+
+	if (!entry) {
+		object_size = UINT32_C(1) << sys->cinfo.block_size_shift;
+		sd_alert("object_size for %" PRIx32 " not found, set %" PRIu32,
+			 vid, object_size);
+		return object_size;
+	}
+
+	object_size = UINT32_C(1) << entry->block_size_shift;
+	return object_size;
+}
+
+uint8_t get_vdi_block_size_shift(uint32_t vid)
+{
+	struct vdi_state_entry *entry;
+
+	sd_read_lock(&vdi_state_lock);
+	entry = vdi_state_search(&vdi_state_root, vid);
+	sd_rw_unlock(&vdi_state_lock);
+
+	if (!entry) {
+		sd_alert("block_size_shift for %" PRIx32
+			 " not found, set %" PRIu8, vid,
+			 sys->cinfo.block_size_shift);
+		return sys->cinfo.block_size_shift;
+	}
+
+	return entry->block_size_shift;
+}
+
 int get_obj_copy_number(uint64_t oid, int nr_zones)
 {
 	return min(get_vdi_copy_number(oid_to_vid(oid)), nr_zones);
@@ -149,7 +188,8 @@ int get_req_copy_number(struct request *req)
 	return nr_copies;
 }
 
-int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t cp)
+int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot,
+		  uint8_t cp, uint8_t block_size_shift)
 {
 	struct vdi_state_entry *entry, *old;
 
@@ -158,6 +198,7 @@ int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t cp)
 	entry->nr_copies = nr_copies;
 	entry->snapshot = snapshot;
 	entry->copy_policy = cp;
+	entry->block_size_shift = block_size_shift;
 
 	entry->lock_state = LOCK_STATE_UNLOCKED;
 	memset(&entry->owner, 0, sizeof(struct node_id));
@@ -173,7 +214,8 @@ int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t cp)
 		sd_mutex_unlock(&m);
 	}
 
-	sd_debug("%" PRIx32 ", %d, %d", vid, nr_copies, cp);
+	sd_debug("%" PRIx32 ", %d, %d, %"PRIu8,
+		 vid, nr_copies, cp, block_size_shift);
 
 	sd_write_lock(&vdi_state_lock);
 	old = vdi_state_insert(&vdi_state_root, entry);
@@ -183,6 +225,7 @@ int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t cp)
 		entry->nr_copies = nr_copies;
 		entry->snapshot = snapshot;
 		entry->copy_policy = cp;
+		entry->block_size_shift = block_size_shift;
 	}
 
 	sd_rw_unlock(&vdi_state_lock);
@@ -209,6 +252,7 @@ int fill_vdi_state_list(const struct sd_req *hdr,
 		vs[last].nr_copies = entry->nr_copies;
 		vs[last].snapshot = entry->snapshot;
 		vs[last].copy_policy = entry->copy_policy;
+		vs[last].block_size_shift = entry->block_size_shift;
 		vs[last].lock_state = entry->lock_state;
 		vs[last].lock_owner = entry->owner;
 		vs[last].nr_participants = entry->nr_participants;
@@ -251,6 +295,7 @@ static struct vdi_state *fill_vdi_state_list_with_alloc(int *result_nr)
 		vs[i].snapshot = entry->snapshot;
 		vs[i].deleted = entry->deleted;
 		vs[i].copy_policy = entry->copy_policy;
+		vs[i].block_size_shift = entry->block_size_shift;
 		vs[i].lock_state = entry->lock_state;
 		vs[i].lock_owner = entry->owner;
 		vs[i].nr_participants = entry->nr_participants;
@@ -861,7 +906,7 @@ static struct sd_inode *alloc_inode(const struct vdi_iocb *iocb,
 				    struct generation_reference *gref)
 {
 	struct sd_inode *new = xzalloc(sizeof(*new));
-	unsigned long block_size = SD_DATA_OBJ_SIZE;
+	unsigned long block_size = (UINT32_C(1) << iocb->block_size_shift);
 
 	pstrcpy(new->name, sizeof(new->name), iocb->name);
 	new->vdi_id = new_vid;
@@ -903,9 +948,10 @@ static int create_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
 	int ret;
 
 	sd_debug("%s: size %" PRIu64 ", new_vid %" PRIx32 ", copies %d, "
-		 "snapid %" PRIu32 " copy policy %"PRIu8 "store policy %"PRIu8,
-		 iocb->name, iocb->size, new_vid, iocb->nr_copies, new_snapid,
-		 new->copy_policy, new->store_policy);
+		 "snapid %" PRIu32 " copy policy %"PRIu8 "store policy %"PRIu8
+		 "block_size_shift %"PRIu8, iocb->name, iocb->size, new_vid,
+		  iocb->nr_copies, new_snapid, new->copy_policy,
+		  new->store_policy, iocb->block_size_shift);
 
 	ret = sd_write_object(vid_to_vdi_oid(new_vid), (char *)new,
 			      sizeof(*new), 0, true);
@@ -940,8 +986,9 @@ static int clone_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
 	int ret;
 
 	sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
-		 "copies %d, snapid %" PRIu32, iocb->name, iocb->size, new_vid,
-		 base_vid, iocb->nr_copies, new_snapid);
+		 "copies %d, block_size_shift %" PRIu8 ", snapid %" PRIu32,
+		 iocb->name, iocb->size, new_vid, base_vid,
+		 iocb->nr_copies, iocb->block_size_shift, new_snapid);
 
 	ret = sd_read_object(vid_to_vdi_oid(base_vid), (char *)base,
 			     sizeof(*base), 0);
@@ -1002,8 +1049,9 @@ static int snapshot_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
 	int ret;
 
 	sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
-		 "copies %d, snapid %" PRIu32, iocb->name, iocb->size, new_vid,
-		 base_vid, iocb->nr_copies, new_snapid);
+		 "copies %d, block_size_shift %"PRIu8 ", snapid %" PRIu32,
+		 iocb->name, iocb->size, new_vid, base_vid,
+		 iocb->nr_copies, iocb->block_size_shift, new_snapid);
 
 	ret = sd_read_object(vid_to_vdi_oid(base_vid), (char *)base,
 			     sizeof(*base), 0);
@@ -1071,8 +1119,9 @@ static int rebase_vdi(const struct vdi_iocb *iocb, uint32_t new_snapid,
 	int ret;
 
 	sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
-		 "cur %" PRIx32 ", copies %d, snapid %" PRIu32, iocb->name,
-		 iocb->size, new_vid, base_vid, cur_vid, iocb->nr_copies,
+		 "cur %" PRIx32 ", copies %d, block_size_shift %"PRIu8
+		 ", snapid %" PRIu32, iocb->name, iocb->size, new_vid,
+		 base_vid, cur_vid, iocb->nr_copies, iocb->block_size_shift,
 		 new_snapid);
 
 	ret = sd_read_object(vid_to_vdi_oid(base_vid), (char *)base,
@@ -1260,7 +1309,7 @@ int vdi_lookup(const struct vdi_iocb *iocb, struct vdi_info *info)
 }
 
 static int notify_vdi_add(uint32_t vdi_id, uint32_t nr_copies, uint32_t old_vid,
-			  uint8_t copy_policy)
+			  uint8_t copy_policy, uint8_t block_size_shift)
 {
 	int ret;
 	struct sd_req hdr;
@@ -1271,11 +1320,13 @@ static int notify_vdi_add(uint32_t vdi_id, uint32_t nr_copies, uint32_t old_vid,
 	hdr.vdi_state.copies = nr_copies;
 	hdr.vdi_state.set_bitmap = false;
 	hdr.vdi_state.copy_policy = copy_policy;
+	hdr.vdi_state.block_size_shift = block_size_shift;
 
 	ret = exec_local_req(&hdr, NULL);
 	if (ret != SD_RES_SUCCESS)
 		sd_err("fail to notify vdi add event(%" PRIx32 ", %d, %" PRIx32
-		       ")", vdi_id, nr_copies, old_vid);
+		       ", %"PRIu8 ")", vdi_id, nr_copies,
+		       old_vid, block_size_shift);
 
 	return ret;
 }
@@ -1326,7 +1377,7 @@ int vdi_create(const struct vdi_iocb *iocb, uint32_t *new_vid)
 		info.snapid = 1;
 	*new_vid = info.free_bit;
 	ret = notify_vdi_add(*new_vid, iocb->nr_copies, info.vid,
-			     iocb->copy_policy);
+			     iocb->copy_policy, iocb->block_size_shift);
 	if (ret != SD_RES_SUCCESS)
 		return ret;
 
@@ -1366,7 +1417,7 @@ int vdi_snapshot(const struct vdi_iocb *iocb, uint32_t *new_vid)
 	assert(info.snapid > 0);
 	*new_vid = info.free_bit;
 	ret = notify_vdi_add(*new_vid, iocb->nr_copies, info.vid,
-			     iocb->copy_policy);
+			     iocb->copy_policy, iocb->block_size_shift);
 	if (ret != SD_RES_SUCCESS)
 		return ret;
 
@@ -1745,6 +1796,15 @@ int sd_create_hyper_volume(const char *name, uint32_t *vdi_id)
 	hdr.vdi.copies = sys->cinfo.nr_copies;
 	hdr.vdi.copy_policy = sys->cinfo.copy_policy;
 	hdr.vdi.store_policy = 1;
+	/* XXX Cannot use both features, Hypervolume and Change object size */
+	if (sys->cinfo.block_size_shift != SD_DEFAULT_BLOCK_SIZE_SHIFT) {
+		hdr.vdi.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
+		sd_warn("Cluster default object size is not"
+			" SD_DATA_OBJ_SIZE(%d)."
+			"Set VDI object size %d and create HyperVolume",
+			SD_DEFAULT_BLOCK_SIZE_SHIFT,
+			SD_DEFAULT_BLOCK_SIZE_SHIFT);
+	}
 
 	ret = exec_local_req(&hdr, buf);
 	if (ret != SD_RES_SUCCESS) {
diff --git a/tests/unit/sheep/test_vdi.c b/tests/unit/sheep/test_vdi.c
index 2f8946b..c5336db 100644
--- a/tests/unit/sheep/test_vdi.c
+++ b/tests/unit/sheep/test_vdi.c
@@ -17,9 +17,9 @@
 
 START_TEST(test_vdi)
 {
-	add_vdi_state(1, 1, true, 0);
-	add_vdi_state(2, 1, true, 0);
-	add_vdi_state(3, 2, false, 0);
+	add_vdi_state(1, 1, true, 0, 22);
+	add_vdi_state(2, 1, true, 0, 22);
+	add_vdi_state(3, 2, false, 0, 22);
 
 	ck_assert_int_eq(get_vdi_copy_number(1), 1);
 	ck_assert_int_eq(get_vdi_copy_number(2), 1);
-- 
1.7.1




More information about the sheepdog mailing list