[sheepdog] [PATCH 1/2] sheep: rename files for store_driver

Saeki Masaki saeki.masaki at po.ntts.co.jp
Wed Mar 18 03:44:50 CET 2015


On 2015/03/18 10:50, Liu Yuan wrote:
> On Tue, Mar 17, 2015 at 06:00:53PM +0900, Saeki Masaki wrote:
>> This change is a preparation patch for add store_driver.
>> Put files together to the new folder.
>>
>> and fix little style problem in md.c
>>
>> Signed-off-by: Masaki Saeki <saeki.masaki at po.ntts.co.jp>
>> ---
>>   sheep/Makefile.am         |    6 +-
>>   sheep/md.c                |  878 ---------------------------------------------
>>   sheep/plain_store.c       |  759 ---------------------------------------
>>   sheep/store.c             |  511 --------------------------
>>   sheep/store/common.c      |  511 ++++++++++++++++++++++++++
>>   sheep/store/md.c          |  878 +++++++++++++++++++++++++++++++++++++++++++++
>>   sheep/store/plain_store.c |  759 +++++++++++++++++++++++++++++++++++++++
>>   7 files changed, 2152 insertions(+), 2150 deletions(-)
>>   delete mode 100644 sheep/md.c
>>   delete mode 100644 sheep/plain_store.c
>>   delete mode 100644 sheep/store.c
>>   create mode 100644 sheep/store/common.c
>>   create mode 100644 sheep/store/md.c
>>   create mode 100644 sheep/store/plain_store.c
>>
>> diff --git a/sheep/Makefile.am b/sheep/Makefile.am
>> index 7a08838..3ddd761 100644
>> --- a/sheep/Makefile.am
>> +++ b/sheep/Makefile.am
>> @@ -24,10 +24,12 @@ AM_CPPFLAGS		= -I$(top_builddir)/include -I$(top_srcdir)/include \
>>
>>   sbin_PROGRAMS		= sheep
>>
>> -sheep_SOURCES		= sheep.c group.c request.c gateway.c store.c vdi.c \
>> +sheep_SOURCES		= sheep.c group.c request.c gateway.c vdi.c \
>>   			  journal.c ops.c recovery.c cluster/local.c \
>>   			  object_cache.c object_list_cache.c \
>> -			  plain_store.c config.c migrate.c md.c
>> +			  store/common.c store/md.c \
>> +			  store/plain_store.c \
>> +			  config.c migrate.c
>>
>>   if BUILD_HTTP
>>   sheep_SOURCES		+= http/http.c http/kv.c http/s3.c http/swift.c \
>> diff --git a/sheep/md.c b/sheep/md.c
>> deleted file mode 100644
>> index c00d7a5..0000000
>> --- a/sheep/md.c
>> +++ /dev/null
>> @@ -1,878 +0,0 @@
>> -/*
>> - * Copyright (C) 2013 Taobao Inc.
>> - *
>> - * Liu Yuan <namei.unix at gmail.com>
>> - *
>> - * This program is free software; you can redistribute it and/or
>> - * modify it under the terms of the GNU General Public License version
>> - * 2 as published by the Free Software Foundation.
>> - *
>> - * You should have received a copy of the GNU General Public License
>> - * along with this program. If not, see <http://www.gnu.org/licenses/>.
>> - */
>> -
>> -#include "sheep_priv.h"
>> -
>> -#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */
>> -
>> -#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/ノ厂経7/!"
>> -
>> -struct md md = {
>> -	.vroot = RB_ROOT,
>> -	.root = RB_ROOT,
>> -	.lock = SD_RW_LOCK_INITIALIZER,
>> -};
>> -
>> -static inline uint32_t nr_online_disks(void)
>> -{
>> -	uint32_t nr;
>> -
>> -	sd_read_lock(&md.lock);
>> -	nr = md.nr_disks;
>> -	sd_rw_unlock(&md.lock);
>> -
>> -	return nr;
>> -}
>> -
>> -static inline int vdisk_number(const struct disk *disk)
>> -{
>> -	return DIV_ROUND_UP(disk->space, MD_VDISK_SIZE);
>> -}
>> -
>> -static int disk_cmp(const struct disk *d1, const struct disk *d2)
>> -{
>> -	return strcmp(d1->path, d2->path);
>> -}
>> -
>> -static int vdisk_cmp(const struct vdisk *d1, const struct vdisk *d2)
>> -{
>> -	return intcmp(d1->hash, d2->hash);
>> -}
>> -
>> -static struct vdisk *vdisk_insert(struct vdisk *new)
>> -{
>> -	return rb_insert(&md.vroot, new, rb, vdisk_cmp);
>> -}
>> -
>> -/* If v1_hash < hval <= v2_hash, then oid is resident in v2 */
>> -static struct vdisk *hval_to_vdisk(uint64_t hval)
>> -{
>> -	struct vdisk dummy = { .hash = hval };
>> -
>> -	return rb_nsearch(&md.vroot, &dummy, rb, vdisk_cmp);
>> -}
>> -
>> -static struct vdisk *oid_to_vdisk(uint64_t oid)
>> -{
>> -	return hval_to_vdisk(sd_hash_oid(oid));
>> -}
>> -
>> -static void create_vdisks(const struct disk *disk)
>> -{
>> -	uint64_t hval = sd_hash(disk->path, strlen(disk->path));
>> -	const struct sd_node *n = &sys->this_node;
>> -	uint64_t node_hval;
>> -	int nr;
>> -
>> -	if (is_cluster_diskmode(&sys->cinfo)) {
>> -		node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
>> -		hval = fnv_64a_64(node_hval, hval);
>> -		nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
>> -		if (0 == n->nid.port)
>> -			return;
>> -	} else
>> -		nr = vdisk_number(disk);
>> -
>> -	for (int i = 0; i < nr; i++) {
>> -		struct vdisk *v = xmalloc(sizeof(*v));
>> -
>> -		hval = sd_hash_next(hval);
>> -		v->hash = hval;
>> -		v->disk = disk;
>> -		if (unlikely(vdisk_insert(v)))
>> -			panic("vdisk hash collison");
>> -	}
>> -}
>> -
>> -static inline void vdisk_free(struct vdisk *v)
>> -{
>> -	rb_erase(&v->rb, &md.vroot);
>> -	free(v);
>> -}
>> -
>> -static void remove_vdisks(const struct disk *disk)
>> -{
>> -	uint64_t hval = sd_hash(disk->path, strlen(disk->path));
>> -	const struct sd_node *n = &sys->this_node;
>> -	uint64_t node_hval;
>> -	int nr;
>> -
>> -	if (is_cluster_diskmode(&sys->cinfo)) {
>> -		node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
>> -		hval = fnv_64a_64(node_hval, hval);
>> -		nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
>> -	} else
>> -		nr = vdisk_number(disk);
>> -
>> -	for (int i = 0; i < nr; i++) {
>> -		struct vdisk *v;
>> -
>> -		hval = sd_hash_next(hval);
>> -		v = hval_to_vdisk(hval);
>> -		assert(v->hash == hval);
>> -
>> -		vdisk_free(v);
>> -	}
>> -}
>> -
>> -static inline void trim_last_slash(char *path)
>> -{
>> -	assert(path[0]);
>> -	while (path[strlen(path) - 1] == '/')
>> -		path[strlen(path) - 1] = '\0';
>> -}
>> -
>> -static struct disk *path_to_disk(const char *path)
>> -{
>> -	struct disk key = {};
>> -
>> -	pstrcpy(key.path, sizeof(key.path), path);
>> -	trim_last_slash(key.path);
>> -
>> -	return rb_search(&md.root, &key, rb, disk_cmp);
>> -}
>> -
>> -size_t get_store_objsize(uint64_t oid)
>> -{
>> -	if (is_erasure_oid(oid)) {
>> -		uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
>> -		int d;
>> -		ec_policy_to_dp(policy, &d, NULL);
>> -		return get_vdi_object_size(oid_to_vid(oid)) / d;
>> -	}
>> -	return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
>> -}
>> -
>> -static int get_total_object_size(uint64_t oid, const char *wd, uint32_t epoch,
>> -				 uint8_t ec_index, struct vnode_info *vinfo,
>> -				 void *total)
>> -{
>> -	uint64_t *t = total;
>> -	struct stat s;
>> -	char path[PATH_MAX];
>> -
>> -	snprintf(path, PATH_MAX, "%s/%016" PRIx64, wd, oid);
>> -	if (stat(path, &s) == 0)
>> -		*t += s.st_blocks * SECTOR_SIZE;
>> -	else
>> -		*t += get_store_objsize(oid);
>> -
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -static int64_t find_string_integer(const char *str, const char *delimiter)
>> -{
>> -	char *pos = strstr(str, delimiter), *p;
>> -	int64_t ret;
>> -
>> -	ret = strtoll(pos + 1, &p, 10);
>> -	if (ret == LLONG_MAX || p == pos + 1) {
>> -		sd_err("%s strtoul failed, delimiter %s, %m", str, delimiter);
>> -		return -1;
>> -	}
>> -
>> -	return ret;
>> -}
>> -
>> -/* If cleanup is true, temporary objects will be removed */
>> -static int for_each_object_in_path(const char *path,
>> -				   int (*func)(uint64_t, const char *, uint32_t,
>> -					       uint8_t, struct vnode_info *,
>> -					       void *),
>> -				   bool cleanup, struct vnode_info *vinfo,
>> -				   void *arg)
>> -{
>> -	DIR *dir;
>> -	struct dirent *d;
>> -	uint64_t oid;
>> -	int ret = SD_RES_SUCCESS;
>> -	char file_name[PATH_MAX];
>> -
>> -	dir = opendir(path);
>> -	if (unlikely(!dir)) {
>> -		sd_err("failed to open %s, %m", path);
>> -		return SD_RES_EIO;
>> -	}
>> -
>> -	while ((d = readdir(dir))) {
>> -		uint32_t epoch = 0;
>> -		uint8_t ec_index = SD_MAX_COPIES;
>> -
>> -		/* skip ".", ".." and ".stale" */
>> -		if (unlikely(!strncmp(d->d_name, ".", 1)))
>> -			continue;
>> -
>> -		sd_debug("%s, %s", path, d->d_name);
>> -		oid = strtoull(d->d_name, NULL, 16);
>> -		if (oid == 0 || oid == ULLONG_MAX)
>> -			continue;
>> -
>> -		/* don't call callback against temporary objects */
>> -		if (is_tmp_dentry(d->d_name)) {
>> -			if (cleanup) {
>> -				snprintf(file_name, sizeof(file_name),
>> -						"%s/%s", path, d->d_name);
>> -				sd_debug("remove tmp object %s", file_name);
>> -				if (unlink(file_name) < 0)
>> -					sd_err("failed to unlink %s: %m",
>> -							file_name);
>> -			}
>> -			continue;
>> -		}
>> -
>> -		if (is_stale_dentry(d->d_name)) {
>> -			epoch = find_string_integer(d->d_name, ".");
>> -			if (epoch < 0)
>> -				continue;
>> -		}
>> -
>> -		if (is_ec_dentry(d->d_name)) {
>> -			ec_index = find_string_integer(d->d_name, "_");
>> -			if (ec_index < 0)
>> -				continue;
>> -		}
>> -
>> -		ret = func(oid, path, epoch, ec_index, vinfo, arg);
>> -		if (ret != SD_RES_SUCCESS)
>> -			break;
>> -	}
>> -	closedir(dir);
>> -	return ret;
>> -}
>> -
>> -static uint64_t get_path_free_size(const char *path, uint64_t *used)
>> -{
>> -	struct statvfs fs;
>> -	uint64_t size;
>> -
>> -	if (statvfs(path, &fs) < 0) {
>> -		sd_err("get disk %s space failed %m", path);
>> -		return 0;
>> -	}
>> -	size = (int64_t)fs.f_frsize * fs.f_bavail;
>> -
>> -	if (!used)
>> -		goto out;
>> -	if (for_each_object_in_path(path, get_total_object_size, false,
>> -				    NULL, used)
>> -	    != SD_RES_SUCCESS)
>> -		return 0;
>> -out:
>> -	return size;
>> -}
>> -
>> -/*
>> - * If path is broken during initialization or not support xattr return 0. We can
>> - * safely use 0 to represent failure case  because 0 space path can be
>> - * considered as broken path.
>> - */
>> -static uint64_t init_path_space(const char *path, bool purge)
>> -{
>> -	uint64_t size;
>> -	char stale[PATH_MAX];
>> -
>> -	if (!is_xattr_enabled(path)) {
>> -		sd_warn("multi-disk support need xattr feature for path: %s",
>> -			path);
>> -		goto broken_path;
>> -	}
>> -
>> -	if (purge && purge_directory(path) < 0)
>> -		sd_err("failed to purge %s", path);
>> -
>> -	snprintf(stale, PATH_MAX, "%s/.stale", path);
>> -	if (xmkdir(stale, sd_def_dmode) < 0) {
>> -		sd_err("can't mkdir for %s, %m", stale);
>> -		goto broken_path;
>> -	}
>> -
>> -#define MDNAME	"user.md.size"
>> -#define MDSIZE	sizeof(uint64_t)
>> -	if (getxattr(path, MDNAME, &size, MDSIZE) < 0) {
>> -		if (errno == ENODATA) {
>> -			goto create;
>> -		} else {
>> -			sd_err("%s, %m", path);
>> -			goto broken_path;
>> -		}
>> -	}
>> -
>> -	return size;
>> -create:
>> -	size = get_path_free_size(path, NULL);
>> -	if (!size)
>> -		goto broken_path;
>> -	if (setxattr(path, MDNAME, &size, MDSIZE, 0) < 0) {
>> -		sd_err("%s, %m", path);
>> -		goto broken_path;
>> -	}
>> -	return size;
>> -broken_path:
>> -	return 0;
>> -}
>> -
>> -/* We don't need lock at init stage */
>> -bool md_add_disk(const char *path, bool purge)
>> -{
>> -	struct disk *new;
>> -
>> -	if (path_to_disk(path)) {
>> -		sd_err("duplicate path %s", path);
>> -		return false;
>> -	}
>> -
>> -	if (xmkdir(path, sd_def_dmode) < 0) {
>> -		sd_err("can't mkdir for %s, %m", path);
>> -		return false;
>> -	}
>> -
>> -	new = xmalloc(sizeof(*new));
>> -	pstrcpy(new->path, PATH_MAX, path);
>> -	trim_last_slash(new->path);
>> -	new->space = init_path_space(new->path, purge);
>> -	if (!new->space) {
>> -		free(new);
>> -		return false;
>> -	}
>> -
>> -	create_vdisks(new);
>> -	rb_insert(&md.root, new, rb, disk_cmp);
>> -	md.space += new->space;
>> -	md.nr_disks++;
>> -
>> -	sd_info("%s, vdisk nr %d, total disk %d", new->path, vdisk_number(new),
>> -		md.nr_disks);
>> -	return true;
>> -}
>> -
>> -static inline void md_remove_disk(struct disk *disk)
>> -{
>> -	sd_info("%s from multi-disk array", disk->path);
>> -	rb_erase(&disk->rb, &md.root);
>> -	md.nr_disks--;
>> -	remove_vdisks(disk);
>> -	free(disk);
>> -}
>> -
>> -uint64_t md_init_space(void)
>> -{
>> -	return md.space;
>> -}
>> -
>> -static const char *md_get_object_dir_nolock(uint64_t oid)
>> -{
>> -	const struct vdisk *vd;
>> -
>> -	if (unlikely(md.nr_disks == 0))
>> -		return NONE_EXIST_PATH; /* To generate EIO */
>> -
>> -	vd = oid_to_vdisk(oid);
>> -	return vd->disk->path;
>> -}
>> -
>> -const char *md_get_object_dir(uint64_t oid)
>> -{
>> -	const char *p;
>> -
>> -	sd_read_lock(&md.lock);
>> -	p = md_get_object_dir_nolock(oid);
>> -	sd_rw_unlock(&md.lock);
>> -
>> -	return p;
>> -}
>> -
>> -struct process_path_arg {
>> -	const char *path;
>> -	struct vnode_info *vinfo;
>> -	int (*func)(uint64_t oid, const char *, uint32_t, uint8_t,
>> -		    struct vnode_info *, void *arg);
>> -	bool cleanup;
>> -	void *opaque;
>> -	int result;
>> -};
>> -
>> -static void *thread_process_path(void *arg)
>> -{
>> -	int ret;
>> -	struct process_path_arg *parg = (struct process_path_arg *)arg;
>> -
>> -	ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup,
>> -				      parg->vinfo, parg->opaque);
>> -	if (ret != SD_RES_SUCCESS)
>> -		parg->result = ret;
>> -
>> -	return arg;
>> -}
>> -
>> -main_fn int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
>> -				      uint32_t epoch, uint8_t ec_index,
>> -				      struct vnode_info *vinfo, void *arg),
>> -				  bool cleanup, void *arg)
>> -{
>> -	int ret = SD_RES_SUCCESS;
>> -	const struct disk *disk;
>> -	struct process_path_arg *thread_args, *path_arg;
>> -	struct vnode_info *vinfo;
>> -	void *ret_arg;
>> -	sd_thread_t *thread_array;
>> -	int nr_thread = 0, idx = 0;
>> -
>> -	sd_read_lock(&md.lock);
>> -
>> -	rb_for_each_entry(disk, &md.root, rb) {
>> -		nr_thread++;
>> -	}
>> -
>> -	thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
>> -	thread_array = xmalloc(nr_thread * sizeof(sd_thread_t));
>> -
>> -	vinfo = get_vnode_info();
>> -
>> -	rb_for_each_entry(disk, &md.root, rb) {
>> -		thread_args[idx].path = disk->path;
>> -		thread_args[idx].vinfo = vinfo;
>> -		thread_args[idx].func = func;
>> -		thread_args[idx].cleanup = cleanup;
>> -		thread_args[idx].opaque = arg;
>> -		thread_args[idx].result = SD_RES_SUCCESS;
>> -		ret = sd_thread_create_with_idx("foreach wd",
>> -						thread_array + idx,
>> -						thread_process_path,
>> -						(void *)(thread_args + idx));
>> -		if (ret) {
>> -			/*
>> -			 * If we can't create enough threads to process
>> -			 * files, the data-consistent will be broken if
>> -			 * we continued.
>> -			 */
>> -			panic("Failed to create thread for path %s",
>> -			      disk->path);
>> -		}
>> -		idx++;
>> -	}
>> -
>> -	sd_debug("Create %d threads for all path", nr_thread);
>> -	/* wait for all threads to exit */
>> -	for (idx = 0; idx < nr_thread; idx++) {
>> -		ret = sd_thread_join(thread_array[idx], &ret_arg);
>> -		if (ret)
>> -			sd_err("Failed to join thread");
>> -		if (ret_arg) {
>> -			path_arg = (struct process_path_arg *)ret_arg;
>> -			if (path_arg->result != SD_RES_SUCCESS)
>> -				sd_err("%s, %s", path_arg->path,
>> -				       sd_strerror(path_arg->result));
>> -		}
>> -	}
>> -
>> -	put_vnode_info(vinfo);
>> -	sd_rw_unlock(&md.lock);
>> -
>> -	free(thread_args);
>> -	free(thread_array);
>> -	return ret;
>> -}
>> -
>> -int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path,
>> -					 uint32_t epoch, uint8_t,
>> -					 struct vnode_info *, void *arg),
>> -			     void *arg)
>> -{
>> -	int ret = SD_RES_SUCCESS;
>> -	char path[PATH_MAX];
>> -	const struct disk *disk;
>> -
>> -	sd_read_lock(&md.lock);
>> -	rb_for_each_entry(disk, &md.root, rb) {
>> -		snprintf(path, sizeof(path), "%s/.stale", disk->path);
>> -		ret = for_each_object_in_path(path, func, false, NULL, arg);
>> -		if (ret != SD_RES_SUCCESS)
>> -			break;
>> -	}
>> -	sd_rw_unlock(&md.lock);
>> -	return ret;
>> -}
>> -
>> -
>> -int for_each_obj_path(int (*func)(const char *path))
>> -{
>> -	int ret = SD_RES_SUCCESS;
>> -	const struct disk *disk;
>> -
>> -	sd_read_lock(&md.lock);
>> -	rb_for_each_entry(disk, &md.root, rb) {
>> -		ret = func(disk->path);
>> -		if (ret != SD_RES_SUCCESS)
>> -			break;
>> -	}
>> -	sd_rw_unlock(&md.lock);
>> -	return ret;
>> -}
>> -
>> -struct md_work {
>> -	struct work work;
>> -	char path[PATH_MAX];
>> -};
>> -
>> -static inline void kick_recover(void)
>> -{
>> -	struct vnode_info *vinfo = get_vnode_info();
>> -
>> -	if (is_cluster_diskmode(&sys->cinfo))
>> -		sys->cdrv->update_node(&sys->this_node);
>> -	else {
>> -		start_recovery(vinfo, vinfo, false);
>> -		put_vnode_info(vinfo);
>> -	}
>> -}
>> -
>> -static void md_do_recover(struct work *work)
>> -{
>> -	struct md_work *mw = container_of(work, struct md_work, work);
>> -	struct disk *disk;
>> -	int nr = 0;
>> -
>> -	sd_write_lock(&md.lock);
>> -	disk = path_to_disk(mw->path);
>> -	if (!disk)
>> -		/* Just ignore the duplicate EIO of the same path */
>> -		goto out;
>> -	md_remove_disk(disk);
>> -	nr = md.nr_disks;
>> -out:
>> -	sd_rw_unlock(&md.lock);
>> -
>> -	if (disk) {
>> -		if (nr > 0) {
>> -			update_node_disks();
>> -			kick_recover();
>> -		} else {
>> -			leave_cluster();
>> -		}
>> -	}
>> -
>> -	free(mw);
>> -}
>> -
>> -int md_handle_eio(const char *fault_path)
>> -{
>> -	struct md_work *mw;
>> -
>> -	if (nr_online_disks() == 0)
>> -		return SD_RES_EIO;
>> -
>> -	mw = xzalloc(sizeof(*mw));
>> -	mw->work.done = md_do_recover;
>> -	pstrcpy(mw->path, PATH_MAX, fault_path);
>> -	queue_work(sys->md_wqueue, &mw->work);
>> -
>> -	/* Fool the requester to retry */
>> -	return SD_RES_NETWORK_ERROR;
>> -}
>> -
>> -static inline bool md_access(const char *path)
>> -{
>> -	if (access(path, R_OK | W_OK) < 0) {
>> -		if (unlikely(errno != ENOENT))
>> -			sd_err("failed to check %s, %m", path);
>> -		return false;
>> -	}
>> -
>> -	return true;
>> -}
>> -
>> -static int get_old_new_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
>> -			    const char *path, char *old, char *new)
>> -{
>> -	if (!epoch) {
>> -		if (!is_erasure_oid(oid)) {
>> -			snprintf(old, PATH_MAX, "%s/%016" PRIx64, path, oid);
>> -			snprintf(new, PATH_MAX, "%s/%016" PRIx64,
>> -				 md_get_object_dir_nolock(oid), oid);
>> -		} else {
>> -			snprintf(old, PATH_MAX, "%s/%016" PRIx64"_%d", path,
>> -				 oid, ec_index);
>> -			snprintf(new, PATH_MAX, "%s/%016" PRIx64"_%d",
>> -				 md_get_object_dir_nolock(oid), oid, ec_index);
>> -		}
>> -	} else {
>> -		if (!is_erasure_oid(oid)) {
>> -			snprintf(old, PATH_MAX,
>> -				 "%s/.stale/%016"PRIx64".%"PRIu32, path,
>> -				 oid, epoch);
>> -			snprintf(new, PATH_MAX,
>> -				 "%s/.stale/%016"PRIx64".%"PRIu32,
>> -				 md_get_object_dir_nolock(oid), oid, epoch);
>> -		} else {
>> -			snprintf(old, PATH_MAX,
>> -				 "%s/.stale/%016"PRIx64"_%d.%"PRIu32, path,
>> -				 oid, ec_index, epoch);
>> -			snprintf(new, PATH_MAX,
>> -				 "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
>> -				 md_get_object_dir_nolock(oid),
>> -				 oid, ec_index ,epoch);
>> -		}
>> -	}
>> -
>> -	if (!md_access(old))
>> -		return -1;
>> -
>> -	return 0;
>> -}
>> -
>> -static int md_move_object(uint64_t oid, const char *old, const char *new)
>> -{
>> -	struct strbuf buf = STRBUF_INIT;
>> -	int fd, ret = -1;
>> -	size_t sz = get_store_objsize(oid);
>> -
>> -	fd = open(old, O_RDONLY);
>> -	if (fd < 0) {
>> -		sd_err("failed to open %s", old);
>> -		goto out;
>> -	}
>> -
>> -	ret = strbuf_read(&buf, fd, sz);
>> -	if (ret != sz) {
>> -		sd_err("failed to read %s, size %zu, %d, %m", old, sz, ret);
>> -		ret = -1;
>> -		goto out_close;
>> -	}
>> -
>> -	if (atomic_create_and_write(new, buf.buf, buf.len, false) < 0) {
>> -		if (errno != EEXIST) {
>> -			sd_err("failed to create %s", new);
>> -			ret = -1;
>> -			goto out_close;
>> -		}
>> -	}
>> -	unlink(old);
>> -	ret = 0;
>> -out_close:
>> -	close(fd);
>> -out:
>> -	strbuf_release(&buf);
>> -	return ret;
>> -}
>> -
>> -static int md_check_and_move(uint64_t oid, uint32_t epoch, uint8_t ec_index,
>> -			     const char *path)
>> -{
>> -	char old[PATH_MAX], new[PATH_MAX];
>> -
>> -	if (get_old_new_path(oid, epoch, ec_index, path, old, new) < 0)
>> -		return SD_RES_EIO;
>> -	/*
>> -	 * Recovery thread and main thread might try to recover the same object.
>> -	 * Either one succeeds, the other will fail and proceed and end up
>> -	 * trying to move the object to where it is already in place, in this
>> -	 * case we simply return.
>> -	 */
>> -	if (!strcmp(old, new))
>> -		return SD_RES_SUCCESS;
>> -
>> -	/* We can't use rename(2) across device */
>> -	if (md_move_object(oid, old, new) < 0) {
>> -		sd_err("move old %s to new %s failed", old, new);
>> -		return SD_RES_EIO;
>> -	}
>> -
>> -	sd_debug("from %s to %s", old, new);
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -static int scan_wd(uint64_t oid, uint32_t epoch, uint8_t ec_index)
>> -{
>> -	int ret = SD_RES_EIO;
>> -	const struct disk *disk;
>> -
>> -	sd_read_lock(&md.lock);
>> -	rb_for_each_entry(disk, &md.root, rb) {
>> -		ret = md_check_and_move(oid, epoch, ec_index, disk->path);
>> -		if (ret == SD_RES_SUCCESS)
>> -			break;
>> -	}
>> -	sd_rw_unlock(&md.lock);
>> -	return ret;
>> -}
>> -
>> -bool md_exist(uint64_t oid, uint8_t ec_index, char *path)
>> -{
>> -	if (md_access(path))
>> -		return true;
>> -	/*
>> -	 * We have to iterate the WD because we don't have epoch-like history
>> -	 * track to locate the objects for multiple disk failure. Simply do
>> -	 * hard iteration simplify the code a lot.
>> -	 */
>> -	if (scan_wd(oid, 0, ec_index) == SD_RES_SUCCESS)
>> -		return true;
>> -
>> -	return false;
>> -}
>> -
>> -int md_get_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
>> -		      char *path)
>> -{
>> -	if (unlikely(!epoch))
>> -		panic("invalid 0 epoch");
>> -
>> -	if (is_erasure_oid(oid)) {
>> -		if (unlikely(ec_index >= SD_MAX_COPIES))
>> -			panic("invalid ec index %d", ec_index);
>> -
>> -		snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
>> -			 md_get_object_dir(oid), oid, ec_index, epoch);
>> -	} else
>> -		snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
>> -			 md_get_object_dir(oid), oid, epoch);
>> -
>> -	if (md_access(path))
>> -		return SD_RES_SUCCESS;
>> -
>> -	if (scan_wd(oid, epoch, ec_index) == SD_RES_SUCCESS)
>> -		return SD_RES_SUCCESS;
>> -
>> -	return SD_RES_NO_OBJ;
>> -}
>> -
>> -uint32_t md_get_info(struct sd_md_info *info)
>> -{
>> -	uint32_t ret = sizeof(*info);
>> -	const struct disk *disk;
>> -	int i = 0;
>> -
>> -	memset(info, 0, ret);
>> -	sd_read_lock(&md.lock);
>> -	rb_for_each_entry(disk, &md.root, rb) {
>> -		info->disk[i].idx = i;
>> -		pstrcpy(info->disk[i].path, PATH_MAX, disk->path);
>> -		/* FIXME: better handling failure case. */
>> -		info->disk[i].free = get_path_free_size(info->disk[i].path,
>> -							&info->disk[i].used);
>> -		i++;
>> -	}
>> -	info->nr = md.nr_disks;
>> -	sd_rw_unlock(&md.lock);
>> -	return ret;
>> -}
>> -
>> -static inline void md_del_disk(const char *path)
>> -{
>> -	struct disk *disk = path_to_disk(path);
>> -
>> -	if (!disk) {
>> -		sd_err("invalid path %s", path);
>> -		return;
>> -	}
>> -	md_remove_disk(disk);
>> -}
>> -
>> -#ifdef HAVE_DISKVNODES
>> -void update_node_disks(void)
>> -{
>> -	const struct disk *disk;
>> -	int i = 0;
>> -	bool rb_empty = false;
>> -
>> -	if (!sys)
>> -		return;
>> -
>> -	memset(sys->this_node.disks, 0, sizeof(struct disk_info) * DISK_MAX);
>> -	sd_read_lock(&md.lock);
>> -	rb_for_each_entry(disk, &md.root, rb) {
>> -		sys->this_node.disks[i].disk_id =
>> -			sd_hash(disk->path, strlen(disk->path));
>> -		sys->this_node.disks[i].disk_space = disk->space;
>> -		i++;
>> -	}
>> -	sd_rw_unlock(&md.lock);
>> -
>> -	if (RB_EMPTY_ROOT(&md.vroot))
>> -		rb_empty = true;
>> -	sd_write_lock(&md.lock);
>> -	rb_for_each_entry(disk, &md.root, rb) {
>> -		if (!rb_empty)
>> -			remove_vdisks(disk);
>> -		create_vdisks(disk);
>> -	}
>> -	sd_rw_unlock(&md.lock);
>> -}
>> -#else
>> -void update_node_disks(void)
>> -{
>> -}
>> -#endif
>> -
>> -static int do_plug_unplug(char *disks, bool plug)
>> -{
>> -	const char *path;
>> -	int old_nr, ret = SD_RES_UNKNOWN;
>> -
>> -	sd_write_lock(&md.lock);
>> -	old_nr = md.nr_disks;
>> -	path = strtok(disks, ",");
>> -	do {
>> -		if (plug) {
>> -			if (!md_add_disk(path, true))
>> -				sd_err("failed to add %s", path);
>> -		} else {
>> -			md_del_disk(path);
>> -		}
>> -	} while ((path = strtok(NULL, ",")));
>> -
>> -	/* If no disks change, bail out */
>> -	if (old_nr == md.nr_disks)
>> -		goto out;
>> -
>> -	ret = SD_RES_SUCCESS;
>> -out:
>> -	sd_rw_unlock(&md.lock);
>> -
>> -	if (ret == SD_RES_SUCCESS) {
>> -		update_node_disks();
>> -		kick_recover();
>> -	}
>> -
>> -	return ret;
>> -}
>> -
>> -int md_plug_disks(char *disks)
>> -{
>> -	return do_plug_unplug(disks, true);
>> -}
>> -
>> -int md_unplug_disks(char *disks)
>> -{
>> -	return do_plug_unplug(disks, false);
>> -}
>> -
>> -uint64_t md_get_size(uint64_t *used)
>> -{
>> -	uint64_t fsize = 0;
>> -	const struct disk *disk;
>> -
>> -	*used = 0;
>> -	sd_read_lock(&md.lock);
>> -	rb_for_each_entry(disk, &md.root, rb) {
>> -		fsize += get_path_free_size(disk->path, used);
>> -	}
>> -	sd_rw_unlock(&md.lock);
>> -
>> -	return fsize + *used;
>> -}
>> -
>> -uint32_t md_nr_disks(void)
>> -{
>> -	return nr_online_disks();
>> -}
>> diff --git a/sheep/plain_store.c b/sheep/plain_store.c
>> deleted file mode 100644
>> index 4c19832..0000000
>> --- a/sheep/plain_store.c
>> +++ /dev/null
>> @@ -1,759 +0,0 @@
>> -/*
>> - * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
>> - *
>> - * This program is free software; you can redistribute it and/or
>> - * modify it under the terms of the GNU General Public License version
>> - * 2 as published by the Free Software Foundation.
>> - *
>> - * You should have received a copy of the GNU General Public License
>> - * along with this program. If not, see <http://www.gnu.org/licenses/>.
>> - */
>> -
>> -#include <libgen.h>
>> -#include <linux/falloc.h>
>> -
>> -#include "sheep_priv.h"
>> -
>> -#ifndef FALLOC_FL_PUNCH_HOLE
>> -#define FALLOC_FL_PUNCH_HOLE 0x02
>> -#endif
>> -
>> -#define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; })
>> -
>> -static inline bool iocb_is_aligned(const struct siocb *iocb)
>> -{
>> -	return  sector_algined(iocb->offset) && sector_algined(iocb->length);
>> -}
>> -
>> -static int prepare_iocb(uint64_t oid, const struct siocb *iocb, bool create)
>> -{
>> -	int syncflag = create ? O_SYNC : O_DSYNC;
>> -	int flags = syncflag | O_RDWR;
>> -
>> -	if (uatomic_is_true(&sys->use_journal) || sys->nosync == true)
>> -		flags &= ~syncflag;
>> -
>> -	if (sys->backend_dio && iocb_is_aligned(iocb)) {
>> -		if (!is_aligned_to_pagesize(iocb->buf))
>> -			panic("Memory isn't aligned to pagesize %p", iocb->buf);
>> -		flags |= O_DIRECT;
>> -	}
>> -
>> -	if (create)
>> -		flags |= O_CREAT | O_EXCL;
>> -
>> -	return flags;
>> -}
>> -
>> -static int get_store_path(uint64_t oid, uint8_t ec_index, char *path)
>> -{
>> -	if (is_erasure_oid(oid)) {
>> -		if (unlikely(ec_index >= SD_MAX_COPIES))
>> -			panic("invalid ec_index %d", ec_index);
>> -		return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
>> -				md_get_object_dir(oid), oid, ec_index);
>> -	}
>> -
>> -	return snprintf(path, PATH_MAX, "%s/%016" PRIx64,
>> -			md_get_object_dir(oid), oid);
>> -}
>> -
>> -static int get_store_tmp_path(uint64_t oid, uint8_t ec_index, char *path)
>> -{
>> -	if (is_erasure_oid(oid)) {
>> -		if (unlikely(ec_index >= SD_MAX_COPIES))
>> -			panic("invalid ec_index %d", ec_index);
>> -		return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d.tmp",
>> -				md_get_object_dir(oid), oid, ec_index);
>> -	}
>> -
>> -	return snprintf(path, PATH_MAX, "%s/%016" PRIx64".tmp",
>> -			md_get_object_dir(oid), oid);
>> -}
>> -
>> -static int get_store_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
>> -				char *path)
>> -{
>> -	return md_get_stale_path(oid, epoch, ec_index, path);
>> -}
>> -
>> -/*
>> - * Check if oid is in this nodes (if oid is in the wrong place, it will be moved
>> - * to the correct one after this call in a MD setup.
>> - */
>> -bool default_exist(uint64_t oid, uint8_t ec_index)
>> -{
>> -	char path[PATH_MAX];
>> -
>> -	get_store_path(oid, ec_index, path);
>> -
>> -	return md_exist(oid, ec_index, path);
>> -}
>> -
>> -static int err_to_sderr(const char *path, uint64_t oid, int err)
>> -{
>> -	struct stat s;
>> -	char p[PATH_MAX], *dir;
>> -
>> -	/* Use a temporary buffer since dirname() may modify its argument. */
>> -	pstrcpy(p, sizeof(p), path);
>> -	dir = dirname(p);
>> -
>> -	sd_debug("%s", path);
>> -	switch (err) {
>> -	case ENOENT:
>> -		if (stat(dir, &s) < 0) {
>> -			sd_err("%s corrupted", dir);
>> -			return md_handle_eio(dir);
>> -		}
>> -		sd_debug("object %016" PRIx64 " not found locally", oid);
>> -		return SD_RES_NO_OBJ;
>> -	case ENOSPC:
>> -		/* TODO: stop automatic recovery */
>> -		sd_err("diskfull, oid=%"PRIx64, oid);
>> -		return SD_RES_NO_SPACE;
>> -	case EMFILE:
>> -	case ENFILE:
>> -	case EINTR:
>> -	case EAGAIN:
>> -	case EEXIST:
>> -		sd_err("%m, oid=%"PRIx64, oid);
>> -		/* make gateway try again */
>> -		return SD_RES_NETWORK_ERROR;
>> -	default:
>> -		sd_err("oid=%"PRIx64", %m", oid);
>> -		return md_handle_eio(dir);
>> -	}
>> -}
>> -
>> -static int discard(int fd, uint64_t start, uint32_t end)
>> -{
>> -	int ret = xfallocate(fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
>> -			     start, end - start);
>> -	if (ret < 0) {
>> -		if (errno == ENOSYS || errno == EOPNOTSUPP)
>> -			sd_info("FALLOC_FL_PUNCH_HOLE is not supported "
>> -				"on this filesystem");
>> -		else
>> -			sd_err("failed to discard object, %m");
>> -	}
>> -
>> -	return ret;
>> -}
>> -
>> -/* Trim zero blocks of the beginning and end of the object. */
>> -static int default_trim(int fd, uint64_t oid, const struct siocb *iocb,
>> -			uint64_t *poffset, uint32_t *plen)
>> -{
>> -	trim_zero_blocks(iocb->buf, poffset, plen);
>> -
>> -	if (iocb->offset < *poffset) {
>> -		sd_debug("discard between %d, %ld, %" PRIx64, iocb->offset,
>> -			 *poffset, oid);
>> -
>> -		if (discard(fd, iocb->offset, *poffset) < 0)
>> -			return -1;
>> -	}
>> -
>> -	if (*poffset + *plen < iocb->offset + iocb->length) {
>> -		uint64_t end = iocb->offset + iocb->length;
>> -		uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
>> -		if (end == get_objsize(oid, object_size))
>> -			/* This is necessary to punch the last block */
>> -			end = round_up(end, BLOCK_SIZE);
>> -		sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen,
>> -			 end, oid);
>> -
>> -		if (discard(fd, *poffset + *plen, end) < 0)
>> -			return -1;
>> -	}
>> -
>> -	return 0;
>> -}
>> -
>> -int default_write(uint64_t oid, const struct siocb *iocb)
>> -{
>> -	int flags = prepare_iocb(oid, iocb, false), fd,
>> -	    ret = SD_RES_SUCCESS;
>> -	char path[PATH_MAX];
>> -	ssize_t size;
>> -	uint32_t len = iocb->length;
>> -	uint64_t offset = iocb->offset;
>> -	static bool trim_is_supported = true;
>> -
>> -	if (iocb->epoch < sys_epoch()) {
>> -		sd_debug("%"PRIu32" sys %"PRIu32, iocb->epoch, sys_epoch());
>> -		return SD_RES_OLD_NODE_VER;
>> -	}
>> -
>> -	if (uatomic_is_true(&sys->use_journal) &&
>> -	    unlikely(journal_write_store(oid, iocb->buf, iocb->length,
>> -					 iocb->offset, false))
>> -	    != SD_RES_SUCCESS) {
>> -		sd_err("turn off journaling");
>> -		uatomic_set_false(&sys->use_journal);
>> -		flags |= O_DSYNC;
>> -		sync();
>> -	}
>> -
>> -	get_store_path(oid, iocb->ec_index, path);
>> -
>> -	/*
>> -	 * Make sure oid is in the right place because oid might be misplaced
>> -	 * in a wrong place, due to 'shutdown/restart with less/more disks' or
>> -	 * any bugs. We need call err_to_sderr() to return EIO if disk is broken
>> -	 */
>> -	if (!default_exist(oid, iocb->ec_index))
>> -		return err_to_sderr(path, oid, ENOENT);
>> -
>> -	fd = open(path, flags, sd_def_fmode);
>> -	if (unlikely(fd < 0))
>> -		return err_to_sderr(path, oid, errno);
>> -
>> -	if (trim_is_supported && is_sparse_object(oid)) {
>> -		if (default_trim(fd, oid, iocb, &offset, &len) < 0) {
>> -			trim_is_supported = false;
>> -			offset = iocb->offset;
>> -			len = iocb->length;
>> -		}
>> -	}
>> -
>> -	size = xpwrite(fd, iocb->buf, len, offset);
>> -	if (unlikely(size != len)) {
>> -		sd_err("failed to write object %"PRIx64", path=%s, offset=%"
>> -		       PRId32", size=%"PRId32", result=%zd, %m", oid, path,
>> -		       iocb->offset, iocb->length, size);
>> -		ret = err_to_sderr(path, oid, errno);
>> -		goto out;
>> -	}
>> -out:
>> -	close(fd);
>> -	return ret;
>> -}
>> -
>> -static int make_stale_dir(const char *path)
>> -{
>> -	char p[PATH_MAX];
>> -
>> -	snprintf(p, PATH_MAX, "%s/.stale", path);
>> -	if (xmkdir(p, sd_def_dmode) < 0) {
>> -		sd_err("%s failed, %m", p);
>> -		return SD_RES_EIO;
>> -	}
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -static int purge_dir(const char *path)
>> -{
>> -	if (purge_directory(path) < 0)
>> -		return SD_RES_EIO;
>> -
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -static int purge_stale_dir(const char *path)
>> -{
>> -	char p[PATH_MAX];
>> -
>> -	snprintf(p, PATH_MAX, "%s/.stale", path);
>> -
>> -	if (purge_directory_async(p) < 0)
>> -		return SD_RES_EIO;
>> -
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -int default_cleanup(void)
>> -{
>> -	int ret;
>> -
>> -	ret = for_each_obj_path(purge_stale_dir);
>> -	if (ret != SD_RES_SUCCESS)
>> -		return ret;
>> -
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch)
>> -{
>> -	int ret;
>> -	struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE);
>> -	struct siocb iocb = {
>> -		.epoch = epoch,
>> -		.buf = inode,
>> -		.length = SD_INODE_HEADER_SIZE,
>> -	};
>> -
>> -	ret = default_read(oid, &iocb);
>> -	if (ret != SD_RES_SUCCESS) {
>> -		sd_err("failed to read inode header %" PRIx64 " %" PRId32
>> -		       "wat %s", oid, epoch, wd);
>> -		goto out;
>> -	}
>> -	add_vdi_state_unordered(oid_to_vid(oid), inode->nr_copies,
>> -		      vdi_is_snapshot(inode), inode->copy_policy,
>> -		      inode->block_size_shift, inode->parent_vdi_id);
>> -
>> -	if (inode->name[0] == '\0')
>> -		atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted);
>> -
>> -	atomic_set_bit(oid_to_vid(oid), sys->vdi_inuse);
>> -
>> -	ret = SD_RES_SUCCESS;
>> -out:
>> -	free(inode);
>> -	return ret;
>> -}
>> -
>> -static int init_objlist_and_vdi_bitmap(uint64_t oid, const char *wd,
>> -				       uint32_t epoch, uint8_t ec_index,
>> -				       struct vnode_info *vinfo,
>> -				       void *arg)
>> -{
>> -	int ret;
>> -	objlist_cache_insert(oid);
>> -
>> -	if (is_vdi_obj(oid)) {
>> -		sd_debug("found the VDI object %" PRIx64" epoch %"PRIu32
>> -			 " at %s", oid, epoch, wd);
>> -		ret = init_vdi_state(oid, wd, epoch);
>> -		if (ret != SD_RES_SUCCESS)
>> -			return ret;
>> -	}
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -int default_init(void)
>> -{
>> -	int ret;
>> -
>> -	sd_debug("use plain store driver");
>> -	ret = for_each_obj_path(make_stale_dir);
>> -	if (ret != SD_RES_SUCCESS)
>> -		return ret;
>> -
>> -	for_each_object_in_stale(init_objlist_and_vdi_bitmap, NULL);
>> -
>> -	return for_each_object_in_wd(init_objlist_and_vdi_bitmap, true, NULL);
>> -}
>> -
>> -static int default_read_from_path(uint64_t oid, const char *path,
>> -				  const struct siocb *iocb)
>> -{
>> -	int flags = prepare_iocb(oid, iocb, false), fd,
>> -	    ret = SD_RES_SUCCESS;
>> -	ssize_t size;
>> -
>> -	/*
>> -	 * Make sure oid is in the right place because oid might be misplaced
>> -	 * in a wrong place, due to 'shutdown/restart with less disks' or any
>> -	 * bugs. We need call err_to_sderr() to return EIO if disk is broken.
>> -	 *
>> -	 * For stale path, get_store_stale_path already does default_exist job.
>> -	 */
>> -	if (!is_stale_path(path) && !default_exist(oid, iocb->ec_index))
>> -		return err_to_sderr(path, oid, ENOENT);
>> -
>> -	fd = open(path, flags);
>> -	if (fd < 0)
>> -		return err_to_sderr(path, oid, errno);
>> -
>> -	size = xpread(fd, iocb->buf, iocb->length, iocb->offset);
>> -	if (size < 0) {
>> -		sd_err("failed to read object %"PRIx64", path=%s, offset=%"
>> -		       PRId32", size=%"PRId32", result=%zd, %m", oid, path,
>> -		       iocb->offset, iocb->length, size);
>> -		ret = err_to_sderr(path, oid, errno);
>> -	}
>> -	close(fd);
>> -	return ret;
>> -}
>> -
>> -int default_read(uint64_t oid, const struct siocb *iocb)
>> -{
>> -	int ret;
>> -	char path[PATH_MAX];
>> -
>> -	get_store_path(oid, iocb->ec_index, path);
>> -	ret = default_read_from_path(oid, path, iocb);
>> -
>> -	/*
>> -	 * If the request is against the older epoch, try to read from
>> -	 * the stale directory
>> -	 */
>> -	if (ret == SD_RES_NO_OBJ && iocb->epoch > 0 &&
>> -	    iocb->epoch < sys_epoch()) {
>> -		get_store_stale_path(oid, iocb->epoch, iocb->ec_index, path);
>> -		ret = default_read_from_path(oid, path, iocb);
>> -	}
>> -
>> -	return ret;
>> -}
>> -
>> -int default_create_and_write(uint64_t oid, const struct siocb *iocb)
>> -{
>> -	char path[PATH_MAX], tmp_path[PATH_MAX], *dir;
>> -	int flags = prepare_iocb(oid, iocb, true);
>> -	int ret, fd;
>> -	uint32_t len = iocb->length;
>> -	uint32_t object_size = 0;
>> -	size_t obj_size;
>> -	uint64_t offset = iocb->offset;
>> -
>> -	sd_debug("%"PRIx64, oid);
>> -	get_store_path(oid, iocb->ec_index, path);
>> -	get_store_tmp_path(oid, iocb->ec_index, tmp_path);
>> -
>> -	if (uatomic_is_true(&sys->use_journal) &&
>> -	    journal_write_store(oid, iocb->buf, iocb->length,
>> -				iocb->offset, true)
>> -	    != SD_RES_SUCCESS) {
>> -		sd_err("turn off journaling");
>> -		uatomic_set_false(&sys->use_journal);
>> -		flags |= O_SYNC;
>> -		sync();
>> -	}
>> -
>> -	fd = open(tmp_path, flags, sd_def_fmode);
>> -	if (fd < 0) {
>> -		if (errno == EEXIST) {
>> -			/*
>> -			 * This happens if node membership changes during object
>> -			 * creation; while gateway retries a CREATE request,
>> -			 * recovery process could also recover the object at the
>> -			 * same time.  They should try to write the same date,
>> -			 * so it is okay to simply return success here.
>> -			 */
>> -			sd_debug("%s exists", tmp_path);
>> -			return SD_RES_SUCCESS;
>> -		}
>> -
>> -		sd_err("failed to open %s: %m", tmp_path);
>> -		return err_to_sderr(path, oid, errno);
>> -	}
>> -
>> -	obj_size = get_store_objsize(oid);
>> -
>> -	trim_zero_blocks(iocb->buf, &offset, &len);
>> -
>> -	object_size = get_vdi_object_size(oid_to_vid(oid));
>> -
>> -	if (offset != 0 || len != get_objsize(oid, object_size)) {
>> -		if (is_sparse_object(oid))
>> -			ret = xftruncate(fd, obj_size);
>> -		else
>> -			ret = prealloc(fd, obj_size);
>> -		if (ret < 0) {
>> -			ret = err_to_sderr(path, oid, errno);
>> -			goto out;
>> -		}
>> -	}
>> -
>> -	ret = xpwrite(fd, iocb->buf, len, offset);
>> -	if (ret != len) {
>> -		sd_err("failed to write object. %m");
>> -		ret = err_to_sderr(path, oid, errno);
>> -		goto out;
>> -	}
>> -
>> -	ret = rename(tmp_path, path);
>> -	if (ret < 0) {
>> -		sd_err("failed to rename %s to %s: %m", tmp_path, path);
>> -		ret = err_to_sderr(path, oid, errno);
>> -		goto out;
>> -	}
>> -
>> -	close(fd);
>> -
>> -	if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) {
>> -		objlist_cache_insert(oid);
>> -		return SD_RES_SUCCESS;
>> -	}
>> -
>> -	pstrcpy(tmp_path, sizeof(tmp_path), path);
>> -	dir = dirname(tmp_path);
>> -	fd = open(dir, O_DIRECTORY | O_RDONLY);
>> -	if (fd < 0) {
>> -		sd_err("failed to open directory %s: %m", dir);
>> -		return err_to_sderr(path, oid, errno);
>> -	}
>> -
>> -	if (fsync(fd) != 0) {
>> -		sd_err("failed to write directory %s: %m", dir);
>> -		ret = err_to_sderr(path, oid, errno);
>> -		close(fd);
>> -		if (unlink(path) != 0)
>> -			sd_err("failed to unlink %s: %m", path);
>> -		return ret;
>> -	}
>> -	close(fd);
>> -	objlist_cache_insert(oid);
>> -	return SD_RES_SUCCESS;
>> -
>> -out:
>> -	if (unlink(tmp_path) != 0)
>> -		sd_err("failed to unlink %s: %m", tmp_path);
>> -	close(fd);
>> -	return ret;
>> -}
>> -
>> -int default_link(uint64_t oid, uint32_t tgt_epoch)
>> -{
>> -	char path[PATH_MAX], stale_path[PATH_MAX];
>> -
>> -	sd_debug("try link %"PRIx64" from snapshot with epoch %d", oid,
>> -		 tgt_epoch);
>> -
>> -	snprintf(path, PATH_MAX, "%s/%016"PRIx64, md_get_object_dir(oid), oid);
>> -	get_store_stale_path(oid, tgt_epoch, 0, stale_path);
>> -
>> -	if (link(stale_path, path) < 0) {
>> -		/*
>> -		 * Recovery thread and main thread might try to recover the
>> -		 * same object and we might get EEXIST in such case.
>> -		 */
>> -		if (errno == EEXIST)
>> -			goto out;
>> -
>> -		sd_debug("failed to link from %s to %s, %m", stale_path, path);
>> -		return err_to_sderr(path, oid, errno);
>> -	}
>> -out:
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -/*
>> - * For replicated object, if any of the replica belongs to this node, we
>> - * consider it not stale.
>> - *
>> - * For erasure coded object, since every copy is unique and if it migrates to
>> - * other node(index gets changed even it has some other copy belongs to it)
>> - * because of hash ring changes, we consider it stale.
>> - */
>> -static bool oid_stale(uint64_t oid, int ec_index, struct vnode_info *vinfo)
>> -{
>> -	uint32_t i, nr_copies;
>> -	const struct sd_vnode *v;
>> -	bool ret = true;
>> -	const struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
>> -
>> -	nr_copies = get_obj_copy_number(oid, vinfo->nr_zones);
>> -	oid_to_vnodes(oid, &vinfo->vroot, nr_copies, obj_vnodes);
>> -	for (i = 0; i < nr_copies; i++) {
>> -		v = obj_vnodes[i];
>> -		if (vnode_is_local(v)) {
>> -			if (ec_index < SD_MAX_COPIES) {
>> -				if (i == ec_index)
>> -					ret = false;
>> -			} else {
>> -				ret = false;
>> -			}
>> -			break;
>> -		}
>> -	}
>> -
>> -	return ret;
>> -}
>> -
>> -static int move_object_to_stale_dir(uint64_t oid, const char *wd,
>> -				    uint32_t epoch, uint8_t ec_index,
>> -				    struct vnode_info *vinfo, void *arg)
>> -{
>> -	char path[PATH_MAX], stale_path[PATH_MAX];
>> -	uint32_t tgt_epoch = *(uint32_t *)arg;
>> -
>> -	/* ec_index from md.c is reliable so we can directly use it */
>> -	if (ec_index < SD_MAX_COPIES) {
>> -		snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
>> -			 md_get_object_dir(oid), oid, ec_index);
>> -		snprintf(stale_path, PATH_MAX,
>> -			 "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
>> -			 md_get_object_dir(oid), oid, ec_index, tgt_epoch);
>> -	} else {
>> -		snprintf(path, PATH_MAX, "%s/%016" PRIx64,
>> -			 md_get_object_dir(oid), oid);
>> -		snprintf(stale_path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
>> -			 md_get_object_dir(oid), oid, tgt_epoch);
>> -	}
>> -
>> -	if (unlikely(rename(path, stale_path)) < 0) {
>> -		sd_err("failed to move stale object %" PRIX64 " to %s, %m", oid,
>> -		       path);
>> -		return SD_RES_EIO;
>> -	}
>> -
>> -	sd_debug("moved object %"PRIx64, oid);
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -static int check_stale_objects(uint64_t oid, const char *wd, uint32_t epoch,
>> -			       uint8_t ec_index, struct vnode_info *vinfo,
>> -			       void *arg)
>> -{
>> -	if (oid_stale(oid, ec_index, vinfo))
>> -		return move_object_to_stale_dir(oid, wd, 0, ec_index,
>> -						NULL, arg);
>> -
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -int default_update_epoch(uint32_t epoch)
>> -{
>> -	assert(epoch);
>> -	return for_each_object_in_wd(check_stale_objects, false, &epoch);
>> -}
>> -
>> -int default_format(void)
>> -{
>> -	unsigned ret;
>> -
>> -	sd_debug("try get a clean store");
>> -	ret = for_each_obj_path(purge_dir);
>> -	if (ret != SD_RES_SUCCESS)
>> -		return ret;
>> -
>> -	if (sys->enable_object_cache)
>> -		object_cache_format();
>> -
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -int default_remove_object(uint64_t oid, uint8_t ec_index)
>> -{
>> -	char path[PATH_MAX];
>> -
>> -	if (uatomic_is_true(&sys->use_journal))
>> -		journal_remove_object(oid);
>> -
>> -	get_store_path(oid, ec_index, path);
>> -
>> -	if (unlink(path) < 0) {
>> -		if (errno == ENOENT)
>> -			return SD_RES_NO_OBJ;
>> -
>> -		sd_err("failed, %s, %m", path);
>> -		return SD_RES_EIO;
>> -	}
>> -
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -#define SHA1NAME "user.obj.sha1"
>> -
>> -static int get_object_sha1(const char *path, uint8_t *sha1)
>> -{
>> -	if (getxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE)
>> -	    != SHA1_DIGEST_SIZE) {
>> -		if (errno == ENODATA)
>> -			sd_debug("sha1 is not cached yet, %s", path);
>> -		else
>> -			sd_err("fail to get xattr, %s", path);
>> -		return -1;
>> -	}
>> -
>> -	return 0;
>> -}
>> -
>> -static int set_object_sha1(const char *path, const uint8_t *sha1)
>> -{
>> -	int ret;
>> -
>> -	ret = setxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE, 0);
>> -	if (ret < 0)
>> -		sd_err("fail to set sha1, %s", path);
>> -
>> -	return ret;
>> -}
>> -
>> -static int get_object_path(uint64_t oid, uint32_t epoch, char *path,
>> -			   size_t size)
>> -{
>> -	if (default_exist(oid, 0)) {
>> -		snprintf(path, PATH_MAX, "%s/%016"PRIx64,
>> -			 md_get_object_dir(oid), oid);
>> -	} else {
>> -		get_store_stale_path(oid, epoch, 0, path);
>> -		if (access(path, F_OK) < 0) {
>> -			if (errno == ENOENT)
>> -				return SD_RES_NO_OBJ;
>> -			return SD_RES_EIO;
>> -		}
>> -
>> -	}
>> -
>> -	return SD_RES_SUCCESS;
>> -}
>> -
>> -int default_get_hash(uint64_t oid, uint32_t epoch, uint8_t *sha1)
>> -{
>> -	int ret;
>> -	void *buf;
>> -	struct siocb iocb = {};
>> -	uint32_t length;
>> -	bool is_readonly_obj = oid_is_readonly(oid);
>> -	char path[PATH_MAX];
>> -
>> -	ret = get_object_path(oid, epoch, path, sizeof(path));
>> -	if (ret != SD_RES_SUCCESS)
>> -		return ret;
>> -
>> -	if (is_readonly_obj) {
>> -		if (get_object_sha1(path, sha1) == 0) {
>> -			sd_debug("use cached sha1 digest %s",
>> -				 sha1_to_hex(sha1));
>> -			return SD_RES_SUCCESS;
>> -		}
>> -	}
>> -
>> -	length = get_store_objsize(oid);
>> -	buf = valloc(length);
>> -	if (buf == NULL)
>> -		return SD_RES_NO_MEM;
>> -
>> -	iocb.epoch = epoch;
>> -	iocb.buf = buf;
>> -	iocb.length = length;
>> -
>> -	ret = default_read_from_path(oid, path, &iocb);
>> -	if (ret != SD_RES_SUCCESS) {
>> -		free(buf);
>> -		return ret;
>> -	}
>> -
>> -	get_buffer_sha1(buf, length, sha1);
>> -	free(buf);
>> -
>> -	sd_debug("the message digest of %"PRIx64" at epoch %d is %s", oid,
>> -		 epoch, sha1_to_hex(sha1));
>> -
>> -	if (is_readonly_obj)
>> -		set_object_sha1(path, sha1);
>> -
>> -	return ret;
>> -}
>> -
>> -int default_purge_obj(void)
>> -{
>> -	uint32_t tgt_epoch = get_latest_epoch();
>> -
>> -	return for_each_object_in_wd(move_object_to_stale_dir, true,
>> -				     &tgt_epoch);
>> -}
>> -
>> -static struct store_driver plain_store = {
>> -	.name = "plain",
>> -	.init = default_init,
>> -	.exist = default_exist,
>> -	.create_and_write = default_create_and_write,
>> -	.write = default_write,
>> -	.read = default_read,
>> -	.link = default_link,
>> -	.update_epoch = default_update_epoch,
>> -	.cleanup = default_cleanup,
>> -	.format = default_format,
>> -	.remove_object = default_remove_object,
>> -	.get_hash = default_get_hash,
>> -	.purge_obj = default_purge_obj,
>> -};
>> -
>> -add_store_driver(plain_store);
>> diff --git a/sheep/store.c b/sheep/store.c
>> deleted file mode 100644
>> index 8843fb8..0000000
>> --- a/sheep/store.c
>> +++ /dev/null
>> @@ -1,511 +0,0 @@
>> -/*
>> - * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
>> - *
>> - * This program is free software; you can redistribute it and/or
>> - * modify it under the terms of the GNU General Public License version
>> - * 2 as published by the Free Software Foundation.
>> - *
>> - * You should have received a copy of the GNU General Public License
>> - * along with this program. If not, see <http://www.gnu.org/licenses/>.
>> - */
>> -
>> -#include "sheep_priv.h"
>> -
>> -char *obj_path;
>> -char *epoch_path;
>> -
>> -struct store_driver *sd_store;
>> -LIST_HEAD(store_drivers);
>> -
>> -int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes)
>> -{
>> -	int ret, len, nodes_len;
>> -	time_t t;
>> -	char path[PATH_MAX], *buf;
>> -
>> -	sd_debug("update epoch: %d, %zu", epoch, nr_nodes);
>> -
>> -	/* Piggyback the epoch creation time for 'dog cluster info' */
>> -	time(&t);
>> -	nodes_len = nr_nodes * sizeof(struct sd_node);
>> -	len = nodes_len + sizeof(time_t);
>> -	buf = xmalloc(len);
>> -	memcpy(buf, nodes, nodes_len);
>> -	memcpy(buf + nodes_len, &t, sizeof(time_t));
>> -
>> -	/*
>> -	 * rb field is unused in epoch file, zero-filling it
>> -	 * is good for epoch file recovery because it is unified
>> -	 */
>> -	for (int i = 0; i < nr_nodes; i++)
>> -		memset(buf + i * sizeof(struct sd_node)
>> -				+ offsetof(struct sd_node, rb),
>> -				0, sizeof(struct rb_node));
>> -
>> -	snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
>> -
>> -	ret = atomic_create_and_write(path, buf, len, true);
>> -
>> -	free(buf);
>> -	return ret;
>> -}
>> -
>> -static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len,
>> -			     int *nr_nodes, time_t *timestamp)
>> -{
>> -	int fd, ret, buf_len;
>> -	char path[PATH_MAX];
>> -	struct stat epoch_stat;
>> -
>> -	snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
>> -	fd = open(path, O_RDONLY);
>> -	if (fd < 0) {
>> -		sd_debug("failed to open epoch %"PRIu32" log, %m", epoch);
>> -		goto err;
>> -	}
>> -
>> -	memset(&epoch_stat, 0, sizeof(epoch_stat));
>> -	ret = fstat(fd, &epoch_stat);
>> -	if (ret < 0) {
>> -		sd_err("failed to stat epoch %"PRIu32" log, %m", epoch);
>> -		goto err;
>> -	}
>> -
>> -	buf_len = epoch_stat.st_size - sizeof(*timestamp);
>> -	if (buf_len < 0) {
>> -		sd_err("invalid epoch %"PRIu32" log", epoch);
>> -		goto err;
>> -	}
>> -	if (len < buf_len) {
>> -		close(fd);
>> -		return SD_RES_BUFFER_SMALL;
>> -	}
>> -
>> -	ret = xread(fd, nodes, buf_len);
>> -	if (ret < 0) {
>> -		sd_err("failed to read epoch %"PRIu32" log, %m", epoch);
>> -		goto err;
>> -	}
>> -
>> -	/* Broken epoch, just ignore */
>> -	if (ret % sizeof(struct sd_node) != 0) {
>> -		sd_err("invalid epoch %"PRIu32" log", epoch);
>> -		goto err;
>> -	}
>> -
>> -	*nr_nodes = ret / sizeof(struct sd_node);
>> -
>> -	if (timestamp) {
>> -		ret = xread(fd, timestamp, sizeof(*timestamp));
>> -		if (ret != sizeof(*timestamp)) {
>> -			sd_err("invalid epoch %"PRIu32" log", epoch);
>> -			goto err;
>> -		}
>> -	}
>> -
>> -	close(fd);
>> -	return SD_RES_SUCCESS;
>> -err:
>> -	if (fd >= 0)
>> -		close(fd);
>> -	return SD_RES_NO_TAG;
>> -}
>> -
>> -int epoch_log_read(uint32_t epoch, struct sd_node *nodes,
>> -				int len, int *nr_nodes)
>> -{
>> -	return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL);
>> -}
>> -
>> -int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes,
>> -				int len, int *nr_nodes, time_t *timestamp)
>> -{
>> -	return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp);
>> -}
>> -
>> -uint32_t get_latest_epoch(void)
>> -{
>> -	DIR *dir;
>> -	struct dirent *d;
>> -	uint32_t e, epoch = 0;
>> -	char *p;
>> -
>> -	dir = opendir(epoch_path);
>> -	if (!dir)
>> -		panic("failed to get the latest epoch: %m");
>> -
>> -	while ((d = readdir(dir))) {
>> -		e = strtol(d->d_name, &p, 10);
>> -		if (d->d_name == p)
>> -			continue;
>> -
>> -		if (strlen(d->d_name) != 8)
>> -			continue;
>> -
>> -		if (e > epoch)
>> -			epoch = e;
>> -	}
>> -	closedir(dir);
>> -
>> -	return epoch;
>> -}
>> -
>> -int lock_base_dir(const char *d)
>> -{
>> -#define LOCK_PATH "/lock"
>> -	char *lock_path;
>> -	int ret = 0;
>> -	int fd, len = strlen(d) + strlen(LOCK_PATH) + 1;
>> -
>> -	lock_path = xzalloc(len);
>> -	snprintf(lock_path, len, "%s" LOCK_PATH, d);
>> -
>> -	fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode);
>> -	if (fd < 0) {
>> -		sd_err("failed to open lock file %s (%m)", lock_path);
>> -		ret = -1;
>> -		goto out;
>> -	}
>> -
>> -	if (lockf(fd, F_TLOCK, 1) < 0) {
>> -		if (errno == EACCES || errno == EAGAIN)
>> -			sd_err("another sheep daemon is using %s", d);
>> -		else
>> -			sd_err("unable to get base dir lock (%m)");
>> -		ret = -1;
>> -		goto out;
>> -	}
>> -
>> -out:
>> -	free(lock_path);
>> -	return ret;
>> -}
>> -
>> -int init_base_path(const char *d)
>> -{
>> -	if (xmkdir(d, sd_def_dmode) < 0) {
>> -		sd_err("cannot create the directory %s (%m)", d);
>> -		return -1;
>> -	}
>> -
>> -	return 0;
>> -}
>> -
>> -static inline int check_path_len(const char *path)
>> -{
>> -	int len = strlen(path);
>> -	if (len > PATH_MAX) {
>> -		sd_err("insanely long object directory %s", path);
>> -		return -1;
>> -	}
>> -
>> -	return 0;
>> -}
>> -
>> -static int is_meta_store(const char *path)
>> -{
>> -	char conf[PATH_MAX];
>> -	char epoch[PATH_MAX];
>> -
>> -	snprintf(conf, PATH_MAX, "%s/config", path);
>> -	snprintf(epoch, PATH_MAX, "%s/epoch", path);
>> -	if (!access(conf, R_OK) && !access(epoch, R_OK))
>> -		return true;
>> -
>> -	return false;
>> -}
>> -
>> -static int init_obj_path(const char *base_path, char *argp)
>> -{
>> -	char *p;
>> -	int len;
>> -
>> -	if (check_path_len(base_path) < 0)
>> -		return -1;
>> -
>> -#define OBJ_PATH "/obj"
>> -	len = strlen(base_path) + strlen(OBJ_PATH) + 1;
>> -	obj_path = xzalloc(len);
>> -	snprintf(obj_path, len, "%s" OBJ_PATH, base_path);
>> -
>> -	/* Eat up the first component */
>> -	strtok(argp, ",");
>> -	p = strtok(NULL, ",");
>> -	if (!p) {
>> -		/*
>> -		 * If We have only one path, meta-store and object-store share
>> -		 * it. This is helpful to upgrade old sheep cluster to
>> -		 * the MD-enabled.
>> -		 */
>> -		md_add_disk(obj_path, false);
>> -	} else {
>> -		do {
>> -			if (is_meta_store(p)) {
>> -				sd_err("%s is meta-store, abort", p);
>> -				return -1;
>> -			}
>> -			md_add_disk(p, false);
>> -		} while ((p = strtok(NULL, ",")));
>> -	}
>> -
>> -	if (md_nr_disks() <= 0) {
>> -		sd_err("There isn't any available disk!");
>> -		return -1;
>> -	}
>> -
>> -	return xmkdir(obj_path, sd_def_dmode);
>> -}
>> -
>> -static int init_epoch_path(const char *base_path)
>> -{
>> -#define EPOCH_PATH "/epoch/"
>> -	int len = strlen(base_path) + strlen(EPOCH_PATH) + 1;
>> -	epoch_path = xzalloc(len);
>> -	snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path);
>> -
>> -	return xmkdir(epoch_path, sd_def_dmode);
>> -}
>> -
>> -/*
>> - * If the node is gateway, this function only finds the store driver.
>> - * Otherwise, this function initializes the backend store
>> - */
>> -int init_store_driver(bool is_gateway)
>> -{
>> -	char driver_name[STORE_LEN], *p;
>> -
>> -	pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store);
>> -
>> -	p = memchr(driver_name, '\0', STORE_LEN);
>> -	if (!p) {
>> -		/*
>> -		 * If the driver name is not NUL terminated we are in deep
>> -		 * trouble, let's get out here.
>> -		 */
>> -		sd_debug("store name not NUL terminated");
>> -		return SD_RES_NO_STORE;
>> -	}
>> -
>> -	/*
>> -	 * The store file might not exist in case this is a new sheep that
>> -	 * never joined a cluster before.
>> -	 */
>> -	if (p == driver_name)
>> -		return 0;
>> -
>> -	sd_store = find_store_driver(driver_name);
>> -	if (!sd_store) {
>> -		sd_debug("store %s not found", driver_name);
>> -		return SD_RES_NO_STORE;
>> -	}
>> -
>> -	if (is_gateway)
>> -		return SD_RES_SUCCESS;
>> -
>> -	return sd_store->init();
>> -}
>> -
>> -int init_disk_space(const char *base_path)
>> -{
>> -	int ret = SD_RES_SUCCESS;
>> -	uint64_t space_size = 0, mds;
>> -	struct statvfs fs;
>> -
>> -	if (sys->gateway_only)
>> -		goto out;
>> -
>> -	/* We need to init md even we don't need to update sapce */
>> -	mds = md_init_space();
>> -
>> -	/* If it is restarted */
>> -	ret = get_node_space(&space_size);
>> -	if (space_size != 0) {
>> -		sys->disk_space = space_size;
>> -		goto out;
>> -	}
>> -
>> -	/* User has specified the space at startup */
>> -	if (sys->disk_space) {
>> -		ret = set_node_space(sys->disk_space);
>> -		goto out;
>> -	}
>> -
>> -	if (mds) {
>> -		sys->disk_space = mds;
>> -	} else {
>> -		ret = statvfs(base_path, &fs);
>> -		if (ret < 0) {
>> -			sd_debug("get disk space failed %m");
>> -			ret = SD_RES_EIO;
>> -			goto out;
>> -		}
>> -		sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail;
>> -	}
>> -
>> -	ret = set_node_space(sys->disk_space);
>> -out:
>> -	sd_debug("disk free space is %" PRIu64, sys->disk_space);
>> -	return ret;
>> -}
>> -
>> -/* Initialize all the global pathnames used internally */
>> -int init_global_pathnames(const char *d, char *argp)
>> -{
>> -	int ret;
>> -
>> -	ret = init_obj_path(d, argp);
>> -	if (ret)
>> -		return ret;
>> -
>> -	ret = init_epoch_path(d);
>> -	if (ret)
>> -		return ret;
>> -
>> -	init_config_path(d);
>> -
>> -	return 0;
>> -}
>> -
>> -/* Write data to both local object cache (if enabled) and backends */
>> -int sd_write_object(uint64_t oid, char *data, unsigned int datalen,
>> -		    uint64_t offset, bool create)
>> -{
>> -	struct sd_req hdr;
>> -	int ret;
>> -
>> -	if (sys->enable_object_cache && object_is_cached(oid)) {
>> -		ret = object_cache_write(oid, data, datalen, offset,
>> -					 create);
>> -		if (ret == SD_RES_NO_CACHE)
>> -			goto forward_write;
>> -
>> -		if (ret != 0) {
>> -			sd_err("write cache failed %" PRIx64 " %" PRIx32, oid,
>> -			       ret);
>> -			return ret;
>> -		}
>> -	}
>> -
>> -forward_write:
>> -	if (create)
>> -		sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
>> -	else
>> -		sd_init_req(&hdr, SD_OP_WRITE_OBJ);
>> -	hdr.flags = SD_FLAG_CMD_WRITE;
>> -	hdr.data_length = datalen;
>> -
>> -	hdr.obj.oid = oid;
>> -	hdr.obj.offset = offset;
>> -
>> -	ret = exec_local_req(&hdr, data);
>> -	if (ret != SD_RES_SUCCESS)
>> -		sd_err("failed to write object %" PRIx64 ", %s", oid,
>> -			   sd_strerror(ret));
>> -
>> -	return ret;
>> -}
>> -
>> -int read_backend_object(uint64_t oid, char *data, unsigned int datalen,
>> -			uint64_t offset)
>> -{
>> -	struct sd_req hdr;
>> -	int ret;
>> -
>> -	sd_init_req(&hdr, SD_OP_READ_OBJ);
>> -	hdr.data_length = datalen;
>> -	hdr.obj.oid = oid;
>> -	hdr.obj.offset = offset;
>> -
>> -	ret = exec_local_req(&hdr, data);
>> -	if (ret != SD_RES_SUCCESS)
>> -		sd_err("failed to read object %" PRIx64 ", %s", oid,
>> -		       sd_strerror(ret));
>> -	return ret;
>> -}
>> -
>> -/*
>> - * Read data firstly from local object cache(if enabled), if fail,
>> - * try read backends
>> - */
>> -int sd_read_object(uint64_t oid, char *data, unsigned int datalen,
>> -		   uint64_t offset)
>> -{
>> -	int ret;
>> -
>> -	if (sys->enable_object_cache && object_is_cached(oid)) {
>> -		ret = object_cache_read(oid, data, datalen, offset);
>> -		if (ret != SD_RES_SUCCESS) {
>> -			sd_err("try forward read %" PRIx64 " %s", oid,
>> -			       sd_strerror(ret));
>> -			goto forward_read;
>> -		}
>> -		return ret;
>> -	}
>> -
>> -forward_read:
>> -	return read_backend_object(oid, data, datalen, offset);
>> -}
>> -
>> -int sd_remove_object(uint64_t oid)
>> -{
>> -	struct sd_req hdr;
>> -	int ret;
>> -
>> -	if (sys->enable_object_cache && object_is_cached(oid)) {
>> -		ret = object_cache_remove(oid);
>> -		if (ret != SD_RES_SUCCESS)
>> -			return ret;
>> -	}
>> -
>> -	sd_init_req(&hdr, SD_OP_REMOVE_OBJ);
>> -	hdr.obj.oid = oid;
>> -
>> -	ret = exec_local_req(&hdr, NULL);
>> -	if (ret != SD_RES_SUCCESS)
>> -		sd_err("failed to remove object %" PRIx64 ", %s", oid,
>> -		       sd_strerror(ret));
>> -
>> -	return ret;
>> -}
>> -
>> -int sd_discard_object(uint64_t oid)
>> -{
>> -	int ret;
>> -	struct sd_req hdr;
>> -
>> -	sd_init_req(&hdr, SD_OP_DISCARD_OBJ);
>> -	hdr.obj.oid = oid;
>> -
>> -	ret = exec_local_req(&hdr, NULL);
>> -	if (ret != SD_RES_SUCCESS)
>> -		sd_err("Failed to discard data obj %"PRIu64" %s", oid,
>> -		       sd_strerror(ret));
>> -
>> -	return ret;
>> -}
>> -
>> -int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation,
>> -			 uint32_t refcnt)
>> -{
>> -	struct sd_req hdr;
>> -	int ret;
>> -	uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid);
>> -
>> -	sd_debug("%"PRIx64", %" PRId32 ", %" PRId32,
>> -		 data_oid, generation, refcnt);
>> -
>> -	if (generation == 0 && refcnt == 0)
>> -		return sd_remove_object(data_oid);
>> -
>> -	sd_init_req(&hdr, SD_OP_DECREF_OBJ);
>> -	hdr.ref.oid = ledger_oid;
>> -	hdr.ref.generation = generation;
>> -	hdr.ref.count = refcnt;
>> -
>> -	ret = exec_local_req(&hdr, NULL);
>> -	if (ret != SD_RES_SUCCESS)
>> -		sd_err("failed to decrement reference %" PRIx64 ", %s",
>> -		       ledger_oid, sd_strerror(ret));
>> -
>> -	return ret;
>> -}
>> diff --git a/sheep/store/common.c b/sheep/store/common.c
>> new file mode 100644
>> index 0000000..8843fb8
>> --- /dev/null
>> +++ b/sheep/store/common.c
>> @@ -0,0 +1,511 @@
>> +/*
>> + * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
>> + *
>> + * This program is free software; you can redistribute it and/or
>> + * modify it under the terms of the GNU General Public License version
>> + * 2 as published by the Free Software Foundation.
>> + *
>> + * You should have received a copy of the GNU General Public License
>> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
>> + */
>> +
>> +#include "sheep_priv.h"
>> +
>> +char *obj_path;
>> +char *epoch_path;
>> +
>> +struct store_driver *sd_store;
>> +LIST_HEAD(store_drivers);
>> +
>> +int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes)
>> +{
>> +	int ret, len, nodes_len;
>> +	time_t t;
>> +	char path[PATH_MAX], *buf;
>> +
>> +	sd_debug("update epoch: %d, %zu", epoch, nr_nodes);
>> +
>> +	/* Piggyback the epoch creation time for 'dog cluster info' */
>> +	time(&t);
>> +	nodes_len = nr_nodes * sizeof(struct sd_node);
>> +	len = nodes_len + sizeof(time_t);
>> +	buf = xmalloc(len);
>> +	memcpy(buf, nodes, nodes_len);
>> +	memcpy(buf + nodes_len, &t, sizeof(time_t));
>> +
>> +	/*
>> +	 * rb field is unused in epoch file, zero-filling it
>> +	 * is good for epoch file recovery because it is unified
>> +	 */
>> +	for (int i = 0; i < nr_nodes; i++)
>> +		memset(buf + i * sizeof(struct sd_node)
>> +				+ offsetof(struct sd_node, rb),
>> +				0, sizeof(struct rb_node));
>> +
>> +	snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
>> +
>> +	ret = atomic_create_and_write(path, buf, len, true);
>> +
>> +	free(buf);
>> +	return ret;
>> +}
>> +
>> +static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len,
>> +			     int *nr_nodes, time_t *timestamp)
>> +{
>> +	int fd, ret, buf_len;
>> +	char path[PATH_MAX];
>> +	struct stat epoch_stat;
>> +
>> +	snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
>> +	fd = open(path, O_RDONLY);
>> +	if (fd < 0) {
>> +		sd_debug("failed to open epoch %"PRIu32" log, %m", epoch);
>> +		goto err;
>> +	}
>> +
>> +	memset(&epoch_stat, 0, sizeof(epoch_stat));
>> +	ret = fstat(fd, &epoch_stat);
>> +	if (ret < 0) {
>> +		sd_err("failed to stat epoch %"PRIu32" log, %m", epoch);
>> +		goto err;
>> +	}
>> +
>> +	buf_len = epoch_stat.st_size - sizeof(*timestamp);
>> +	if (buf_len < 0) {
>> +		sd_err("invalid epoch %"PRIu32" log", epoch);
>> +		goto err;
>> +	}
>> +	if (len < buf_len) {
>> +		close(fd);
>> +		return SD_RES_BUFFER_SMALL;
>> +	}
>> +
>> +	ret = xread(fd, nodes, buf_len);
>> +	if (ret < 0) {
>> +		sd_err("failed to read epoch %"PRIu32" log, %m", epoch);
>> +		goto err;
>> +	}
>> +
>> +	/* Broken epoch, just ignore */
>> +	if (ret % sizeof(struct sd_node) != 0) {
>> +		sd_err("invalid epoch %"PRIu32" log", epoch);
>> +		goto err;
>> +	}
>> +
>> +	*nr_nodes = ret / sizeof(struct sd_node);
>> +
>> +	if (timestamp) {
>> +		ret = xread(fd, timestamp, sizeof(*timestamp));
>> +		if (ret != sizeof(*timestamp)) {
>> +			sd_err("invalid epoch %"PRIu32" log", epoch);
>> +			goto err;
>> +		}
>> +	}
>> +
>> +	close(fd);
>> +	return SD_RES_SUCCESS;
>> +err:
>> +	if (fd >= 0)
>> +		close(fd);
>> +	return SD_RES_NO_TAG;
>> +}
>> +
>> +int epoch_log_read(uint32_t epoch, struct sd_node *nodes,
>> +				int len, int *nr_nodes)
>> +{
>> +	return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL);
>> +}
>> +
>> +int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes,
>> +				int len, int *nr_nodes, time_t *timestamp)
>> +{
>> +	return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp);
>> +}
>> +
>> +uint32_t get_latest_epoch(void)
>> +{
>> +	DIR *dir;
>> +	struct dirent *d;
>> +	uint32_t e, epoch = 0;
>> +	char *p;
>> +
>> +	dir = opendir(epoch_path);
>> +	if (!dir)
>> +		panic("failed to get the latest epoch: %m");
>> +
>> +	while ((d = readdir(dir))) {
>> +		e = strtol(d->d_name, &p, 10);
>> +		if (d->d_name == p)
>> +			continue;
>> +
>> +		if (strlen(d->d_name) != 8)
>> +			continue;
>> +
>> +		if (e > epoch)
>> +			epoch = e;
>> +	}
>> +	closedir(dir);
>> +
>> +	return epoch;
>> +}
>> +
>> +int lock_base_dir(const char *d)
>> +{
>> +#define LOCK_PATH "/lock"
>> +	char *lock_path;
>> +	int ret = 0;
>> +	int fd, len = strlen(d) + strlen(LOCK_PATH) + 1;
>> +
>> +	lock_path = xzalloc(len);
>> +	snprintf(lock_path, len, "%s" LOCK_PATH, d);
>> +
>> +	fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode);
>> +	if (fd < 0) {
>> +		sd_err("failed to open lock file %s (%m)", lock_path);
>> +		ret = -1;
>> +		goto out;
>> +	}
>> +
>> +	if (lockf(fd, F_TLOCK, 1) < 0) {
>> +		if (errno == EACCES || errno == EAGAIN)
>> +			sd_err("another sheep daemon is using %s", d);
>> +		else
>> +			sd_err("unable to get base dir lock (%m)");
>> +		ret = -1;
>> +		goto out;
>> +	}
>> +
>> +out:
>> +	free(lock_path);
>> +	return ret;
>> +}
>> +
>> +int init_base_path(const char *d)
>> +{
>> +	if (xmkdir(d, sd_def_dmode) < 0) {
>> +		sd_err("cannot create the directory %s (%m)", d);
>> +		return -1;
>> +	}
>> +
>> +	return 0;
>> +}
>> +
>> +static inline int check_path_len(const char *path)
>> +{
>> +	int len = strlen(path);
>> +	if (len > PATH_MAX) {
>> +		sd_err("insanely long object directory %s", path);
>> +		return -1;
>> +	}
>> +
>> +	return 0;
>> +}
>> +
>> +static int is_meta_store(const char *path)
>> +{
>> +	char conf[PATH_MAX];
>> +	char epoch[PATH_MAX];
>> +
>> +	snprintf(conf, PATH_MAX, "%s/config", path);
>> +	snprintf(epoch, PATH_MAX, "%s/epoch", path);
>> +	if (!access(conf, R_OK) && !access(epoch, R_OK))
>> +		return true;
>> +
>> +	return false;
>> +}
>> +
>> +static int init_obj_path(const char *base_path, char *argp)
>> +{
>> +	char *p;
>> +	int len;
>> +
>> +	if (check_path_len(base_path) < 0)
>> +		return -1;
>> +
>> +#define OBJ_PATH "/obj"
>> +	len = strlen(base_path) + strlen(OBJ_PATH) + 1;
>> +	obj_path = xzalloc(len);
>> +	snprintf(obj_path, len, "%s" OBJ_PATH, base_path);
>> +
>> +	/* Eat up the first component */
>> +	strtok(argp, ",");
>> +	p = strtok(NULL, ",");
>> +	if (!p) {
>> +		/*
>> +		 * If We have only one path, meta-store and object-store share
>> +		 * it. This is helpful to upgrade old sheep cluster to
>> +		 * the MD-enabled.
>> +		 */
>> +		md_add_disk(obj_path, false);
>> +	} else {
>> +		do {
>> +			if (is_meta_store(p)) {
>> +				sd_err("%s is meta-store, abort", p);
>> +				return -1;
>> +			}
>> +			md_add_disk(p, false);
>> +		} while ((p = strtok(NULL, ",")));
>> +	}
>> +
>> +	if (md_nr_disks() <= 0) {
>> +		sd_err("There isn't any available disk!");
>> +		return -1;
>> +	}
>> +
>> +	return xmkdir(obj_path, sd_def_dmode);
>> +}
>> +
>> +static int init_epoch_path(const char *base_path)
>> +{
>> +#define EPOCH_PATH "/epoch/"
>> +	int len = strlen(base_path) + strlen(EPOCH_PATH) + 1;
>> +	epoch_path = xzalloc(len);
>> +	snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path);
>> +
>> +	return xmkdir(epoch_path, sd_def_dmode);
>> +}
>> +
>> +/*
>> + * If the node is gateway, this function only finds the store driver.
>> + * Otherwise, this function initializes the backend store
>> + */
>> +int init_store_driver(bool is_gateway)
>> +{
>> +	char driver_name[STORE_LEN], *p;
>> +
>> +	pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store);
>> +
>> +	p = memchr(driver_name, '\0', STORE_LEN);
>> +	if (!p) {
>> +		/*
>> +		 * If the driver name is not NUL terminated we are in deep
>> +		 * trouble, let's get out here.
>> +		 */
>> +		sd_debug("store name not NUL terminated");
>> +		return SD_RES_NO_STORE;
>> +	}
>> +
>> +	/*
>> +	 * The store file might not exist in case this is a new sheep that
>> +	 * never joined a cluster before.
>> +	 */
>> +	if (p == driver_name)
>> +		return 0;
>> +
>> +	sd_store = find_store_driver(driver_name);
>> +	if (!sd_store) {
>> +		sd_debug("store %s not found", driver_name);
>> +		return SD_RES_NO_STORE;
>> +	}
>> +
>> +	if (is_gateway)
>> +		return SD_RES_SUCCESS;
>> +
>> +	return sd_store->init();
>> +}
>> +
>> +int init_disk_space(const char *base_path)
>> +{
>> +	int ret = SD_RES_SUCCESS;
>> +	uint64_t space_size = 0, mds;
>> +	struct statvfs fs;
>> +
>> +	if (sys->gateway_only)
>> +		goto out;
>> +
>> +	/* We need to init md even we don't need to update sapce */
>> +	mds = md_init_space();
>> +
>> +	/* If it is restarted */
>> +	ret = get_node_space(&space_size);
>> +	if (space_size != 0) {
>> +		sys->disk_space = space_size;
>> +		goto out;
>> +	}
>> +
>> +	/* User has specified the space at startup */
>> +	if (sys->disk_space) {
>> +		ret = set_node_space(sys->disk_space);
>> +		goto out;
>> +	}
>> +
>> +	if (mds) {
>> +		sys->disk_space = mds;
>> +	} else {
>> +		ret = statvfs(base_path, &fs);
>> +		if (ret < 0) {
>> +			sd_debug("get disk space failed %m");
>> +			ret = SD_RES_EIO;
>> +			goto out;
>> +		}
>> +		sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail;
>> +	}
>> +
>> +	ret = set_node_space(sys->disk_space);
>> +out:
>> +	sd_debug("disk free space is %" PRIu64, sys->disk_space);
>> +	return ret;
>> +}
>> +
>> +/* Initialize all the global pathnames used internally */
>> +int init_global_pathnames(const char *d, char *argp)
>> +{
>> +	int ret;
>> +
>> +	ret = init_obj_path(d, argp);
>> +	if (ret)
>> +		return ret;
>> +
>> +	ret = init_epoch_path(d);
>> +	if (ret)
>> +		return ret;
>> +
>> +	init_config_path(d);
>> +
>> +	return 0;
>> +}
>> +
>> +/* Write data to both local object cache (if enabled) and backends */
>> +int sd_write_object(uint64_t oid, char *data, unsigned int datalen,
>> +		    uint64_t offset, bool create)
>> +{
>> +	struct sd_req hdr;
>> +	int ret;
>> +
>> +	if (sys->enable_object_cache && object_is_cached(oid)) {
>> +		ret = object_cache_write(oid, data, datalen, offset,
>> +					 create);
>> +		if (ret == SD_RES_NO_CACHE)
>> +			goto forward_write;
>> +
>> +		if (ret != 0) {
>> +			sd_err("write cache failed %" PRIx64 " %" PRIx32, oid,
>> +			       ret);
>> +			return ret;
>> +		}
>> +	}
>> +
>> +forward_write:
>> +	if (create)
>> +		sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
>> +	else
>> +		sd_init_req(&hdr, SD_OP_WRITE_OBJ);
>> +	hdr.flags = SD_FLAG_CMD_WRITE;
>> +	hdr.data_length = datalen;
>> +
>> +	hdr.obj.oid = oid;
>> +	hdr.obj.offset = offset;
>> +
>> +	ret = exec_local_req(&hdr, data);
>> +	if (ret != SD_RES_SUCCESS)
>> +		sd_err("failed to write object %" PRIx64 ", %s", oid,
>> +			   sd_strerror(ret));
>> +
>> +	return ret;
>> +}
>> +
>> +int read_backend_object(uint64_t oid, char *data, unsigned int datalen,
>> +			uint64_t offset)
>> +{
>> +	struct sd_req hdr;
>> +	int ret;
>> +
>> +	sd_init_req(&hdr, SD_OP_READ_OBJ);
>> +	hdr.data_length = datalen;
>> +	hdr.obj.oid = oid;
>> +	hdr.obj.offset = offset;
>> +
>> +	ret = exec_local_req(&hdr, data);
>> +	if (ret != SD_RES_SUCCESS)
>> +		sd_err("failed to read object %" PRIx64 ", %s", oid,
>> +		       sd_strerror(ret));
>> +	return ret;
>> +}
>> +
>> +/*
>> + * Read data firstly from local object cache(if enabled), if fail,
>> + * try read backends
>> + */
>> +int sd_read_object(uint64_t oid, char *data, unsigned int datalen,
>> +		   uint64_t offset)
>> +{
>> +	int ret;
>> +
>> +	if (sys->enable_object_cache && object_is_cached(oid)) {
>> +		ret = object_cache_read(oid, data, datalen, offset);
>> +		if (ret != SD_RES_SUCCESS) {
>> +			sd_err("try forward read %" PRIx64 " %s", oid,
>> +			       sd_strerror(ret));
>> +			goto forward_read;
>> +		}
>> +		return ret;
>> +	}
>> +
>> +forward_read:
>> +	return read_backend_object(oid, data, datalen, offset);
>> +}
>> +
>> +int sd_remove_object(uint64_t oid)
>> +{
>> +	struct sd_req hdr;
>> +	int ret;
>> +
>> +	if (sys->enable_object_cache && object_is_cached(oid)) {
>> +		ret = object_cache_remove(oid);
>> +		if (ret != SD_RES_SUCCESS)
>> +			return ret;
>> +	}
>> +
>> +	sd_init_req(&hdr, SD_OP_REMOVE_OBJ);
>> +	hdr.obj.oid = oid;
>> +
>> +	ret = exec_local_req(&hdr, NULL);
>> +	if (ret != SD_RES_SUCCESS)
>> +		sd_err("failed to remove object %" PRIx64 ", %s", oid,
>> +		       sd_strerror(ret));
>> +
>> +	return ret;
>> +}
>> +
>> +int sd_discard_object(uint64_t oid)
>> +{
>> +	int ret;
>> +	struct sd_req hdr;
>> +
>> +	sd_init_req(&hdr, SD_OP_DISCARD_OBJ);
>> +	hdr.obj.oid = oid;
>> +
>> +	ret = exec_local_req(&hdr, NULL);
>> +	if (ret != SD_RES_SUCCESS)
>> +		sd_err("Failed to discard data obj %"PRIu64" %s", oid,
>> +		       sd_strerror(ret));
>> +
>> +	return ret;
>> +}
>> +
>> +int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation,
>> +			 uint32_t refcnt)
>> +{
>> +	struct sd_req hdr;
>> +	int ret;
>> +	uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid);
>> +
>> +	sd_debug("%"PRIx64", %" PRId32 ", %" PRId32,
>> +		 data_oid, generation, refcnt);
>> +
>> +	if (generation == 0 && refcnt == 0)
>> +		return sd_remove_object(data_oid);
>> +
>> +	sd_init_req(&hdr, SD_OP_DECREF_OBJ);
>> +	hdr.ref.oid = ledger_oid;
>> +	hdr.ref.generation = generation;
>> +	hdr.ref.count = refcnt;
>> +
>> +	ret = exec_local_req(&hdr, NULL);
>> +	if (ret != SD_RES_SUCCESS)
>> +		sd_err("failed to decrement reference %" PRIx64 ", %s",
>> +		       ledger_oid, sd_strerror(ret));
>> +
>> +	return ret;
>> +}
>> diff --git a/sheep/store/md.c b/sheep/store/md.c
>> new file mode 100644
>> index 0000000..87ab759
>> --- /dev/null
>> +++ b/sheep/store/md.c
>> @@ -0,0 +1,878 @@
>> +/*
>> + * Copyright (C) 2013 Taobao Inc.
>> + *
>> + * Liu Yuan <namei.unix at gmail.com>
>> + *
>> + * This program is free software; you can redistribute it and/or
>> + * modify it under the terms of the GNU General Public License version
>> + * 2 as published by the Free Software Foundation.
>> + *
>> + * You should have received a copy of the GNU General Public License
>> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
>> + */
>> +
>> +#include "sheep_priv.h"
>> +
>> +#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */
>> +
>> +#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/ノ厂経7/!"
>
> why NONE_EXIST_PATH is changed? And I don't think imbed japanese in to the assic
> code is good idea.

Hmm, I do not have changed intentionally,
and NODE_EXIST_PATH string with no meaning for Japanese.
so I do not know what means the last unicode string

However, I agree to be coded in only ASCII code.

>
> By the way, please use git format-patch to generate a series of patches and send
> them by git send-email to get a chain of patches under a single topic. For now
> your patches are split into different topics on my Mutt client.

Sorry,
I do not be able to git send-mail directly because restriction of my environment.


Thanks, Saeki.

>
> Thanks,
> Yuan
>






More information about the sheepdog mailing list