[sheepdog] [PATCH 1/4 v3] sheep: rename files for store_driver
Liu Yuan
namei.unix at gmail.com
Thu Apr 23 07:58:09 CEST 2015
On Fri, Mar 20, 2015 at 06:39:02PM +0900, Saeki Masaki wrote:
> This change is a preparation patch for add store_driver.
> Put files together to the new folder.
>
Hi Masaki,
I'm going to apply this patch set, but some patches like 1/4 is encoded as in
base64, and my git can't apply it. Could you please resend a appliable one?
Thanks,
Yuan
> Signed-off-by: Masaki Saeki <saeki.masaki at po.ntts.co.jp>
> ---
> sheep/Makefile.am | 6 +-
> sheep/md.c | 878 ---------------------------------------------
> sheep/plain_store.c | 759 ---------------------------------------
> sheep/store.c | 511 --------------------------
> sheep/store/common.c | 511 ++++++++++++++++++++++++++
> sheep/store/md.c | 878 +++++++++++++++++++++++++++++++++++++++++++++
> sheep/store/plain_store.c | 759 +++++++++++++++++++++++++++++++++++++++
> 7 files changed, 2152 insertions(+), 2150 deletions(-)
> delete mode 100644 sheep/md.c
> delete mode 100644 sheep/plain_store.c
> delete mode 100644 sheep/store.c
> create mode 100644 sheep/store/common.c
> create mode 100644 sheep/store/md.c
> create mode 100644 sheep/store/plain_store.c
>
> diff --git a/sheep/Makefile.am b/sheep/Makefile.am
> index 7a08838..3ddd761 100644
> --- a/sheep/Makefile.am
> +++ b/sheep/Makefile.am
> @@ -24,10 +24,12 @@ AM_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include \
> sbin_PROGRAMS = sheep
> -sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c \
> +sheep_SOURCES = sheep.c group.c request.c gateway.c vdi.c \
> journal.c ops.c recovery.c cluster/local.c \
> object_cache.c object_list_cache.c \
> - plain_store.c config.c migrate.c md.c
> + store/common.c store/md.c \
> + store/plain_store.c \
> + config.c migrate.c
> if BUILD_HTTP
> sheep_SOURCES += http/http.c http/kv.c http/s3.c http/swift.c \
> diff --git a/sheep/md.c b/sheep/md.c
> deleted file mode 100644
> index 87ab759..0000000
> --- a/sheep/md.c
> +++ /dev/null
> @@ -1,878 +0,0 @@
> -/*
> - * Copyright (C) 2013 Taobao Inc.
> - *
> - * Liu Yuan <namei.unix at gmail.com>
> - *
> - * This program is free software; you can redistribute it and/or
> - * modify it under the terms of the GNU General Public License version
> - * 2 as published by the Free Software Foundation.
> - *
> - * You should have received a copy of the GNU General Public License
> - * along with this program. If not, see <http://www.gnu.org/licenses/>.
> - */
> -
> -#include "sheep_priv.h"
> -
> -#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */
> -
> -#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/əʌo7/!"
> -
> -struct md md = {
> - .vroot = RB_ROOT,
> - .root = RB_ROOT,
> - .lock = SD_RW_LOCK_INITIALIZER,
> -};
> -
> -static inline uint32_t nr_online_disks(void)
> -{
> - uint32_t nr;
> -
> - sd_read_lock(&md.lock);
> - nr = md.nr_disks;
> - sd_rw_unlock(&md.lock);
> -
> - return nr;
> -}
> -
> -static inline int vdisk_number(const struct disk *disk)
> -{
> - return DIV_ROUND_UP(disk->space, MD_VDISK_SIZE);
> -}
> -
> -static int disk_cmp(const struct disk *d1, const struct disk *d2)
> -{
> - return strcmp(d1->path, d2->path);
> -}
> -
> -static int vdisk_cmp(const struct vdisk *d1, const struct vdisk *d2)
> -{
> - return intcmp(d1->hash, d2->hash);
> -}
> -
> -static struct vdisk *vdisk_insert(struct vdisk *new)
> -{
> - return rb_insert(&md.vroot, new, rb, vdisk_cmp);
> -}
> -
> -/* If v1_hash < hval <= v2_hash, then oid is resident in v2 */
> -static struct vdisk *hval_to_vdisk(uint64_t hval)
> -{
> - struct vdisk dummy = { .hash = hval };
> -
> - return rb_nsearch(&md.vroot, &dummy, rb, vdisk_cmp);
> -}
> -
> -static struct vdisk *oid_to_vdisk(uint64_t oid)
> -{
> - return hval_to_vdisk(sd_hash_oid(oid));
> -}
> -
> -static void create_vdisks(const struct disk *disk)
> -{
> - uint64_t hval = sd_hash(disk->path, strlen(disk->path));
> - const struct sd_node *n = &sys->this_node;
> - uint64_t node_hval;
> - int nr;
> -
> - if (is_cluster_diskmode(&sys->cinfo)) {
> - node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
> - hval = fnv_64a_64(node_hval, hval);
> - nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
> - if (0 == n->nid.port)
> - return;
> - } else
> - nr = vdisk_number(disk);
> -
> - for (int i = 0; i < nr; i++) {
> - struct vdisk *v = xmalloc(sizeof(*v));
> -
> - hval = sd_hash_next(hval);
> - v->hash = hval;
> - v->disk = disk;
> - if (unlikely(vdisk_insert(v)))
> - panic("vdisk hash collison");
> - }
> -}
> -
> -static inline void vdisk_free(struct vdisk *v)
> -{
> - rb_erase(&v->rb, &md.vroot);
> - free(v);
> -}
> -
> -static void remove_vdisks(const struct disk *disk)
> -{
> - uint64_t hval = sd_hash(disk->path, strlen(disk->path));
> - const struct sd_node *n = &sys->this_node;
> - uint64_t node_hval;
> - int nr;
> -
> - if (is_cluster_diskmode(&sys->cinfo)) {
> - node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
> - hval = fnv_64a_64(node_hval, hval);
> - nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
> - } else
> - nr = vdisk_number(disk);
> -
> - for (int i = 0; i < nr; i++) {
> - struct vdisk *v;
> -
> - hval = sd_hash_next(hval);
> - v = hval_to_vdisk(hval);
> - assert(v->hash == hval);
> -
> - vdisk_free(v);
> - }
> -}
> -
> -static inline void trim_last_slash(char *path)
> -{
> - assert(path[0]);
> - while (path[strlen(path) - 1] == '/')
> - path[strlen(path) - 1] = '\0';
> -}
> -
> -static struct disk *path_to_disk(const char *path)
> -{
> - struct disk key = {};
> -
> - pstrcpy(key.path, sizeof(key.path), path);
> - trim_last_slash(key.path);
> -
> - return rb_search(&md.root, &key, rb, disk_cmp);
> -}
> -
> -size_t get_store_objsize(uint64_t oid)
> -{
> - if (is_erasure_oid(oid)) {
> - uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
> - int d;
> - ec_policy_to_dp(policy, &d, NULL);
> - return get_vdi_object_size(oid_to_vid(oid)) / d;
> - }
> - return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
> -}
> -
> -static int get_total_object_size(uint64_t oid, const char *wd, uint32_t epoch,
> - uint8_t ec_index, struct vnode_info *vinfo,
> - void *total)
> -{
> - uint64_t *t = total;
> - struct stat s;
> - char path[PATH_MAX];
> -
> - snprintf(path, PATH_MAX, "%s/%016" PRIx64, wd, oid);
> - if (stat(path, &s) == 0)
> - *t += s.st_blocks * SECTOR_SIZE;
> - else
> - *t += get_store_objsize(oid);
> -
> - return SD_RES_SUCCESS;
> -}
> -
> -static int64_t find_string_integer(const char *str, const char *delimiter)
> -{
> - char *pos = strstr(str, delimiter), *p;
> - int64_t ret;
> -
> - ret = strtoll(pos + 1, &p, 10);
> - if (ret == LLONG_MAX || p == pos + 1) {
> - sd_err("%s strtoul failed, delimiter %s, %m", str, delimiter);
> - return -1;
> - }
> -
> - return ret;
> -}
> -
> -/* If cleanup is true, temporary objects will be removed */
> -static int for_each_object_in_path(const char *path,
> - int (*func)(uint64_t, const char *, uint32_t,
> - uint8_t, struct vnode_info *,
> - void *),
> - bool cleanup, struct vnode_info *vinfo,
> - void *arg)
> -{
> - DIR *dir;
> - struct dirent *d;
> - uint64_t oid;
> - int ret = SD_RES_SUCCESS;
> - char file_name[PATH_MAX];
> -
> - dir = opendir(path);
> - if (unlikely(!dir)) {
> - sd_err("failed to open %s, %m", path);
> - return SD_RES_EIO;
> - }
> -
> - while ((d = readdir(dir))) {
> - uint32_t epoch = 0;
> - uint8_t ec_index = SD_MAX_COPIES;
> -
> - /* skip ".", ".." and ".stale" */
> - if (unlikely(!strncmp(d->d_name, ".", 1)))
> - continue;
> -
> - sd_debug("%s, %s", path, d->d_name);
> - oid = strtoull(d->d_name, NULL, 16);
> - if (oid == 0 || oid == ULLONG_MAX)
> - continue;
> -
> - /* don't call callback against temporary objects */
> - if (is_tmp_dentry(d->d_name)) {
> - if (cleanup) {
> - snprintf(file_name, sizeof(file_name),
> - "%s/%s", path, d->d_name);
> - sd_debug("remove tmp object %s", file_name);
> - if (unlink(file_name) < 0)
> - sd_err("failed to unlink %s: %m",
> - file_name);
> - }
> - continue;
> - }
> -
> - if (is_stale_dentry(d->d_name)) {
> - epoch = find_string_integer(d->d_name, ".");
> - if (epoch < 0)
> - continue;
> - }
> -
> - if (is_ec_dentry(d->d_name)) {
> - ec_index = find_string_integer(d->d_name, "_");
> - if (ec_index < 0)
> - continue;
> - }
> -
> - ret = func(oid, path, epoch, ec_index, vinfo, arg);
> - if (ret != SD_RES_SUCCESS)
> - break;
> - }
> - closedir(dir);
> - return ret;
> -}
> -
> -static uint64_t get_path_free_size(const char *path, uint64_t *used)
> -{
> - struct statvfs fs;
> - uint64_t size;
> -
> - if (statvfs(path, &fs) < 0) {
> - sd_err("get disk %s space failed %m", path);
> - return 0;
> - }
> - size = (int64_t)fs.f_frsize * fs.f_bavail;
> -
> - if (!used)
> - goto out;
> - if (for_each_object_in_path(path, get_total_object_size, false,
> - NULL, used)
> - != SD_RES_SUCCESS)
> - return 0;
> -out:
> - return size;
> -}
> -
> -/*
> - * If path is broken during initialization or not support xattr return 0. We can
> - * safely use 0 to represent failure case because 0 space path can be
> - * considered as broken path.
> - */
> -static uint64_t init_path_space(const char *path, bool purge)
> -{
> - uint64_t size;
> - char stale[PATH_MAX];
> -
> - if (!is_xattr_enabled(path)) {
> - sd_warn("multi-disk support need xattr feature for path: %s",
> - path);
> - goto broken_path;
> - }
> -
> - if (purge && purge_directory(path) < 0)
> - sd_err("failed to purge %s", path);
> -
> - snprintf(stale, PATH_MAX, "%s/.stale", path);
> - if (xmkdir(stale, sd_def_dmode) < 0) {
> - sd_err("can't mkdir for %s, %m", stale);
> - goto broken_path;
> - }
> -
> -#define MDNAME "user.md.size"
> -#define MDSIZE sizeof(uint64_t)
> - if (getxattr(path, MDNAME, &size, MDSIZE) < 0) {
> - if (errno == ENODATA) {
> - goto create;
> - } else {
> - sd_err("%s, %m", path);
> - goto broken_path;
> - }
> - }
> -
> - return size;
> -create:
> - size = get_path_free_size(path, NULL);
> - if (!size)
> - goto broken_path;
> - if (setxattr(path, MDNAME, &size, MDSIZE, 0) < 0) {
> - sd_err("%s, %m", path);
> - goto broken_path;
> - }
> - return size;
> -broken_path:
> - return 0;
> -}
> -
> -/* We don't need lock at init stage */
> -bool md_add_disk(const char *path, bool purge)
> -{
> - struct disk *new;
> -
> - if (path_to_disk(path)) {
> - sd_err("duplicate path %s", path);
> - return false;
> - }
> -
> - if (xmkdir(path, sd_def_dmode) < 0) {
> - sd_err("can't mkdir for %s, %m", path);
> - return false;
> - }
> -
> - new = xmalloc(sizeof(*new));
> - pstrcpy(new->path, PATH_MAX, path);
> - trim_last_slash(new->path);
> - new->space = init_path_space(new->path, purge);
> - if (!new->space) {
> - free(new);
> - return false;
> - }
> -
> - create_vdisks(new);
> - rb_insert(&md.root, new, rb, disk_cmp);
> - md.space += new->space;
> - md.nr_disks++;
> -
> - sd_info("%s, vdisk nr %d, total disk %d", new->path, vdisk_number(new),
> - md.nr_disks);
> - return true;
> -}
> -
> -static inline void md_remove_disk(struct disk *disk)
> -{
> - sd_info("%s from multi-disk array", disk->path);
> - rb_erase(&disk->rb, &md.root);
> - md.nr_disks--;
> - remove_vdisks(disk);
> - free(disk);
> -}
> -
> -uint64_t md_init_space(void)
> -{
> - return md.space;
> -}
> -
> -static const char *md_get_object_dir_nolock(uint64_t oid)
> -{
> - const struct vdisk *vd;
> -
> - if (unlikely(md.nr_disks == 0))
> - return NONE_EXIST_PATH; /* To generate EIO */
> -
> - vd = oid_to_vdisk(oid);
> - return vd->disk->path;
> -}
> -
> -const char *md_get_object_dir(uint64_t oid)
> -{
> - const char *p;
> -
> - sd_read_lock(&md.lock);
> - p = md_get_object_dir_nolock(oid);
> - sd_rw_unlock(&md.lock);
> -
> - return p;
> -}
> -
> -struct process_path_arg {
> - const char *path;
> - struct vnode_info *vinfo;
> - int (*func)(uint64_t oid, const char *, uint32_t, uint8_t,
> - struct vnode_info *, void *arg);
> - bool cleanup;
> - void *opaque;
> - int result;
> -};
> -
> -static void *thread_process_path(void *arg)
> -{
> - int ret;
> - struct process_path_arg *parg = (struct process_path_arg *)arg;
> -
> - ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup,
> - parg->vinfo, parg->opaque);
> - if (ret != SD_RES_SUCCESS)
> - parg->result = ret;
> -
> - return arg;
> -}
> -
> -main_fn int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
> - uint32_t epoch, uint8_t ec_index,
> - struct vnode_info *vinfo, void *arg),
> - bool cleanup, void *arg)
> -{
> - int ret = SD_RES_SUCCESS;
> - const struct disk *disk;
> - struct process_path_arg *thread_args, *path_arg;
> - struct vnode_info *vinfo;
> - void *ret_arg;
> - sd_thread_t *thread_array;
> - int nr_thread = 0, idx = 0;
> -
> - sd_read_lock(&md.lock);
> -
> - rb_for_each_entry(disk, &md.root, rb) {
> - nr_thread++;
> - }
> -
> - thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
> - thread_array = xmalloc(nr_thread * sizeof(sd_thread_t));
> -
> - vinfo = get_vnode_info();
> -
> - rb_for_each_entry(disk, &md.root, rb) {
> - thread_args[idx].path = disk->path;
> - thread_args[idx].vinfo = vinfo;
> - thread_args[idx].func = func;
> - thread_args[idx].cleanup = cleanup;
> - thread_args[idx].opaque = arg;
> - thread_args[idx].result = SD_RES_SUCCESS;
> - ret = sd_thread_create_with_idx("foreach wd",
> - thread_array + idx,
> - thread_process_path,
> - (void *)(thread_args + idx));
> - if (ret) {
> - /*
> - * If we can't create enough threads to process
> - * files, the data-consistent will be broken if
> - * we continued.
> - */
> - panic("Failed to create thread for path %s",
> - disk->path);
> - }
> - idx++;
> - }
> -
> - sd_debug("Create %d threads for all path", nr_thread);
> - /* wait for all threads to exit */
> - for (idx = 0; idx < nr_thread; idx++) {
> - ret = sd_thread_join(thread_array[idx], &ret_arg);
> - if (ret)
> - sd_err("Failed to join thread");
> - if (ret_arg) {
> - path_arg = (struct process_path_arg *)ret_arg;
> - if (path_arg->result != SD_RES_SUCCESS)
> - sd_err("%s, %s", path_arg->path,
> - sd_strerror(path_arg->result));
> - }
> - }
> -
> - put_vnode_info(vinfo);
> - sd_rw_unlock(&md.lock);
> -
> - free(thread_args);
> - free(thread_array);
> - return ret;
> -}
> -
> -int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path,
> - uint32_t epoch, uint8_t,
> - struct vnode_info *, void *arg),
> - void *arg)
> -{
> - int ret = SD_RES_SUCCESS;
> - char path[PATH_MAX];
> - const struct disk *disk;
> -
> - sd_read_lock(&md.lock);
> - rb_for_each_entry(disk, &md.root, rb) {
> - snprintf(path, sizeof(path), "%s/.stale", disk->path);
> - ret = for_each_object_in_path(path, func, false, NULL, arg);
> - if (ret != SD_RES_SUCCESS)
> - break;
> - }
> - sd_rw_unlock(&md.lock);
> - return ret;
> -}
> -
> -
> -int for_each_obj_path(int (*func)(const char *path))
> -{
> - int ret = SD_RES_SUCCESS;
> - const struct disk *disk;
> -
> - sd_read_lock(&md.lock);
> - rb_for_each_entry(disk, &md.root, rb) {
> - ret = func(disk->path);
> - if (ret != SD_RES_SUCCESS)
> - break;
> - }
> - sd_rw_unlock(&md.lock);
> - return ret;
> -}
> -
> -struct md_work {
> - struct work work;
> - char path[PATH_MAX];
> -};
> -
> -static inline void kick_recover(void)
> -{
> - struct vnode_info *vinfo = get_vnode_info();
> -
> - if (is_cluster_diskmode(&sys->cinfo))
> - sys->cdrv->update_node(&sys->this_node);
> - else {
> - start_recovery(vinfo, vinfo, false);
> - put_vnode_info(vinfo);
> - }
> -}
> -
> -static void md_do_recover(struct work *work)
> -{
> - struct md_work *mw = container_of(work, struct md_work, work);
> - struct disk *disk;
> - int nr = 0;
> -
> - sd_write_lock(&md.lock);
> - disk = path_to_disk(mw->path);
> - if (!disk)
> - /* Just ignore the duplicate EIO of the same path */
> - goto out;
> - md_remove_disk(disk);
> - nr = md.nr_disks;
> -out:
> - sd_rw_unlock(&md.lock);
> -
> - if (disk) {
> - if (nr > 0) {
> - update_node_disks();
> - kick_recover();
> - } else {
> - leave_cluster();
> - }
> - }
> -
> - free(mw);
> -}
> -
> -int md_handle_eio(const char *fault_path)
> -{
> - struct md_work *mw;
> -
> - if (nr_online_disks() == 0)
> - return SD_RES_EIO;
> -
> - mw = xzalloc(sizeof(*mw));
> - mw->work.done = md_do_recover;
> - pstrcpy(mw->path, PATH_MAX, fault_path);
> - queue_work(sys->md_wqueue, &mw->work);
> -
> - /* Fool the requester to retry */
> - return SD_RES_NETWORK_ERROR;
> -}
> -
> -static inline bool md_access(const char *path)
> -{
> - if (access(path, R_OK | W_OK) < 0) {
> - if (unlikely(errno != ENOENT))
> - sd_err("failed to check %s, %m", path);
> - return false;
> - }
> -
> - return true;
> -}
> -
> -static int get_old_new_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
> - const char *path, char *old, char *new)
> -{
> - if (!epoch) {
> - if (!is_erasure_oid(oid)) {
> - snprintf(old, PATH_MAX, "%s/%016" PRIx64, path, oid);
> - snprintf(new, PATH_MAX, "%s/%016" PRIx64,
> - md_get_object_dir_nolock(oid), oid);
> - } else {
> - snprintf(old, PATH_MAX, "%s/%016" PRIx64"_%d", path,
> - oid, ec_index);
> - snprintf(new, PATH_MAX, "%s/%016" PRIx64"_%d",
> - md_get_object_dir_nolock(oid), oid, ec_index);
> - }
> - } else {
> - if (!is_erasure_oid(oid)) {
> - snprintf(old, PATH_MAX,
> - "%s/.stale/%016"PRIx64".%"PRIu32, path,
> - oid, epoch);
> - snprintf(new, PATH_MAX,
> - "%s/.stale/%016"PRIx64".%"PRIu32,
> - md_get_object_dir_nolock(oid), oid, epoch);
> - } else {
> - snprintf(old, PATH_MAX,
> - "%s/.stale/%016"PRIx64"_%d.%"PRIu32, path,
> - oid, ec_index, epoch);
> - snprintf(new, PATH_MAX,
> - "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
> - md_get_object_dir_nolock(oid),
> - oid, ec_index, epoch);
> - }
> - }
> -
> - if (!md_access(old))
> - return -1;
> -
> - return 0;
> -}
> -
> -static int md_move_object(uint64_t oid, const char *old, const char *new)
> -{
> - struct strbuf buf = STRBUF_INIT;
> - int fd, ret = -1;
> - size_t sz = get_store_objsize(oid);
> -
> - fd = open(old, O_RDONLY);
> - if (fd < 0) {
> - sd_err("failed to open %s", old);
> - goto out;
> - }
> -
> - ret = strbuf_read(&buf, fd, sz);
> - if (ret != sz) {
> - sd_err("failed to read %s, size %zu, %d, %m", old, sz, ret);
> - ret = -1;
> - goto out_close;
> - }
> -
> - if (atomic_create_and_write(new, buf.buf, buf.len, false) < 0) {
> - if (errno != EEXIST) {
> - sd_err("failed to create %s", new);
> - ret = -1;
> - goto out_close;
> - }
> - }
> - unlink(old);
> - ret = 0;
> -out_close:
> - close(fd);
> -out:
> - strbuf_release(&buf);
> - return ret;
> -}
> -
> -static int md_check_and_move(uint64_t oid, uint32_t epoch, uint8_t ec_index,
> - const char *path)
> -{
> - char old[PATH_MAX], new[PATH_MAX];
> -
> - if (get_old_new_path(oid, epoch, ec_index, path, old, new) < 0)
> - return SD_RES_EIO;
> - /*
> - * Recovery thread and main thread might try to recover the same object.
> - * Either one succeeds, the other will fail and proceed and end up
> - * trying to move the object to where it is already in place, in this
> - * case we simply return.
> - */
> - if (!strcmp(old, new))
> - return SD_RES_SUCCESS;
> -
> - /* We can't use rename(2) across device */
> - if (md_move_object(oid, old, new) < 0) {
> - sd_err("move old %s to new %s failed", old, new);
> - return SD_RES_EIO;
> - }
> -
> - sd_debug("from %s to %s", old, new);
> - return SD_RES_SUCCESS;
> -}
> -
> -static int scan_wd(uint64_t oid, uint32_t epoch, uint8_t ec_index)
> -{
> - int ret = SD_RES_EIO;
> - const struct disk *disk;
> -
> - sd_read_lock(&md.lock);
> - rb_for_each_entry(disk, &md.root, rb) {
> - ret = md_check_and_move(oid, epoch, ec_index, disk->path);
> - if (ret == SD_RES_SUCCESS)
> - break;
> - }
> - sd_rw_unlock(&md.lock);
> - return ret;
> -}
> -
> -bool md_exist(uint64_t oid, uint8_t ec_index, char *path)
> -{
> - if (md_access(path))
> - return true;
> - /*
> - * We have to iterate the WD because we don't have epoch-like history
> - * track to locate the objects for multiple disk failure. Simply do
> - * hard iteration simplify the code a lot.
> - */
> - if (scan_wd(oid, 0, ec_index) == SD_RES_SUCCESS)
> - return true;
> -
> - return false;
> -}
> -
> -int md_get_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
> - char *path)
> -{
> - if (unlikely(!epoch))
> - panic("invalid 0 epoch");
> -
> - if (is_erasure_oid(oid)) {
> - if (unlikely(ec_index >= SD_MAX_COPIES))
> - panic("invalid ec index %d", ec_index);
> -
> - snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
> - md_get_object_dir(oid), oid, ec_index, epoch);
> - } else
> - snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
> - md_get_object_dir(oid), oid, epoch);
> -
> - if (md_access(path))
> - return SD_RES_SUCCESS;
> -
> - if (scan_wd(oid, epoch, ec_index) == SD_RES_SUCCESS)
> - return SD_RES_SUCCESS;
> -
> - return SD_RES_NO_OBJ;
> -}
> -
> -uint32_t md_get_info(struct sd_md_info *info)
> -{
> - uint32_t ret = sizeof(*info);
> - const struct disk *disk;
> - int i = 0;
> -
> - memset(info, 0, ret);
> - sd_read_lock(&md.lock);
> - rb_for_each_entry(disk, &md.root, rb) {
> - info->disk[i].idx = i;
> - pstrcpy(info->disk[i].path, PATH_MAX, disk->path);
> - /* FIXME: better handling failure case. */
> - info->disk[i].free = get_path_free_size(info->disk[i].path,
> - &info->disk[i].used);
> - i++;
> - }
> - info->nr = md.nr_disks;
> - sd_rw_unlock(&md.lock);
> - return ret;
> -}
> -
> -static inline void md_del_disk(const char *path)
> -{
> - struct disk *disk = path_to_disk(path);
> -
> - if (!disk) {
> - sd_err("invalid path %s", path);
> - return;
> - }
> - md_remove_disk(disk);
> -}
> -
> -#ifdef HAVE_DISKVNODES
> -void update_node_disks(void)
> -{
> - const struct disk *disk;
> - int i = 0;
> - bool rb_empty = false;
> -
> - if (!sys)
> - return;
> -
> - memset(sys->this_node.disks, 0, sizeof(struct disk_info) * DISK_MAX);
> - sd_read_lock(&md.lock);
> - rb_for_each_entry(disk, &md.root, rb) {
> - sys->this_node.disks[i].disk_id =
> - sd_hash(disk->path, strlen(disk->path));
> - sys->this_node.disks[i].disk_space = disk->space;
> - i++;
> - }
> - sd_rw_unlock(&md.lock);
> -
> - if (RB_EMPTY_ROOT(&md.vroot))
> - rb_empty = true;
> - sd_write_lock(&md.lock);
> - rb_for_each_entry(disk, &md.root, rb) {
> - if (!rb_empty)
> - remove_vdisks(disk);
> - create_vdisks(disk);
> - }
> - sd_rw_unlock(&md.lock);
> -}
> -#else
> -void update_node_disks(void)
> -{
> -}
> -#endif
> -
> -static int do_plug_unplug(char *disks, bool plug)
> -{
> - const char *path;
> - int old_nr, ret = SD_RES_UNKNOWN;
> -
> - sd_write_lock(&md.lock);
> - old_nr = md.nr_disks;
> - path = strtok(disks, ",");
> - do {
> - if (plug) {
> - if (!md_add_disk(path, true))
> - sd_err("failed to add %s", path);
> - } else {
> - md_del_disk(path);
> - }
> - } while ((path = strtok(NULL, ",")));
> -
> - /* If no disks change, bail out */
> - if (old_nr == md.nr_disks)
> - goto out;
> -
> - ret = SD_RES_SUCCESS;
> -out:
> - sd_rw_unlock(&md.lock);
> -
> - if (ret == SD_RES_SUCCESS) {
> - update_node_disks();
> - kick_recover();
> - }
> -
> - return ret;
> -}
> -
> -int md_plug_disks(char *disks)
> -{
> - return do_plug_unplug(disks, true);
> -}
> -
> -int md_unplug_disks(char *disks)
> -{
> - return do_plug_unplug(disks, false);
> -}
> -
> -uint64_t md_get_size(uint64_t *used)
> -{
> - uint64_t fsize = 0;
> - const struct disk *disk;
> -
> - *used = 0;
> - sd_read_lock(&md.lock);
> - rb_for_each_entry(disk, &md.root, rb) {
> - fsize += get_path_free_size(disk->path, used);
> - }
> - sd_rw_unlock(&md.lock);
> -
> - return fsize + *used;
> -}
> -
> -uint32_t md_nr_disks(void)
> -{
> - return nr_online_disks();
> -}
> diff --git a/sheep/plain_store.c b/sheep/plain_store.c
> deleted file mode 100644
> index 4c19832..0000000
> --- a/sheep/plain_store.c
> +++ /dev/null
> @@ -1,759 +0,0 @@
> -/*
> - * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
> - *
> - * This program is free software; you can redistribute it and/or
> - * modify it under the terms of the GNU General Public License version
> - * 2 as published by the Free Software Foundation.
> - *
> - * You should have received a copy of the GNU General Public License
> - * along with this program. If not, see <http://www.gnu.org/licenses/>.
> - */
> -
> -#include <libgen.h>
> -#include <linux/falloc.h>
> -
> -#include "sheep_priv.h"
> -
> -#ifndef FALLOC_FL_PUNCH_HOLE
> -#define FALLOC_FL_PUNCH_HOLE 0x02
> -#endif
> -
> -#define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; })
> -
> -static inline bool iocb_is_aligned(const struct siocb *iocb)
> -{
> - return sector_algined(iocb->offset) && sector_algined(iocb->length);
> -}
> -
> -static int prepare_iocb(uint64_t oid, const struct siocb *iocb, bool create)
> -{
> - int syncflag = create ? O_SYNC : O_DSYNC;
> - int flags = syncflag | O_RDWR;
> -
> - if (uatomic_is_true(&sys->use_journal) || sys->nosync == true)
> - flags &= ~syncflag;
> -
> - if (sys->backend_dio && iocb_is_aligned(iocb)) {
> - if (!is_aligned_to_pagesize(iocb->buf))
> - panic("Memory isn't aligned to pagesize %p", iocb->buf);
> - flags |= O_DIRECT;
> - }
> -
> - if (create)
> - flags |= O_CREAT | O_EXCL;
> -
> - return flags;
> -}
> -
> -static int get_store_path(uint64_t oid, uint8_t ec_index, char *path)
> -{
> - if (is_erasure_oid(oid)) {
> - if (unlikely(ec_index >= SD_MAX_COPIES))
> - panic("invalid ec_index %d", ec_index);
> - return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
> - md_get_object_dir(oid), oid, ec_index);
> - }
> -
> - return snprintf(path, PATH_MAX, "%s/%016" PRIx64,
> - md_get_object_dir(oid), oid);
> -}
> -
> -static int get_store_tmp_path(uint64_t oid, uint8_t ec_index, char *path)
> -{
> - if (is_erasure_oid(oid)) {
> - if (unlikely(ec_index >= SD_MAX_COPIES))
> - panic("invalid ec_index %d", ec_index);
> - return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d.tmp",
> - md_get_object_dir(oid), oid, ec_index);
> - }
> -
> - return snprintf(path, PATH_MAX, "%s/%016" PRIx64".tmp",
> - md_get_object_dir(oid), oid);
> -}
> -
> -static int get_store_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
> - char *path)
> -{
> - return md_get_stale_path(oid, epoch, ec_index, path);
> -}
> -
> -/*
> - * Check if oid is in this nodes (if oid is in the wrong place, it will be moved
> - * to the correct one after this call in a MD setup.
> - */
> -bool default_exist(uint64_t oid, uint8_t ec_index)
> -{
> - char path[PATH_MAX];
> -
> - get_store_path(oid, ec_index, path);
> -
> - return md_exist(oid, ec_index, path);
> -}
> -
> -static int err_to_sderr(const char *path, uint64_t oid, int err)
> -{
> - struct stat s;
> - char p[PATH_MAX], *dir;
> -
> - /* Use a temporary buffer since dirname() may modify its argument. */
> - pstrcpy(p, sizeof(p), path);
> - dir = dirname(p);
> -
> - sd_debug("%s", path);
> - switch (err) {
> - case ENOENT:
> - if (stat(dir, &s) < 0) {
> - sd_err("%s corrupted", dir);
> - return md_handle_eio(dir);
> - }
> - sd_debug("object %016" PRIx64 " not found locally", oid);
> - return SD_RES_NO_OBJ;
> - case ENOSPC:
> - /* TODO: stop automatic recovery */
> - sd_err("diskfull, oid=%"PRIx64, oid);
> - return SD_RES_NO_SPACE;
> - case EMFILE:
> - case ENFILE:
> - case EINTR:
> - case EAGAIN:
> - case EEXIST:
> - sd_err("%m, oid=%"PRIx64, oid);
> - /* make gateway try again */
> - return SD_RES_NETWORK_ERROR;
> - default:
> - sd_err("oid=%"PRIx64", %m", oid);
> - return md_handle_eio(dir);
> - }
> -}
> -
> -static int discard(int fd, uint64_t start, uint32_t end)
> -{
> - int ret = xfallocate(fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
> - start, end - start);
> - if (ret < 0) {
> - if (errno == ENOSYS || errno == EOPNOTSUPP)
> - sd_info("FALLOC_FL_PUNCH_HOLE is not supported "
> - "on this filesystem");
> - else
> - sd_err("failed to discard object, %m");
> - }
> -
> - return ret;
> -}
> -
> -/* Trim zero blocks of the beginning and end of the object. */
> -static int default_trim(int fd, uint64_t oid, const struct siocb *iocb,
> - uint64_t *poffset, uint32_t *plen)
> -{
> - trim_zero_blocks(iocb->buf, poffset, plen);
> -
> - if (iocb->offset < *poffset) {
> - sd_debug("discard between %d, %ld, %" PRIx64, iocb->offset,
> - *poffset, oid);
> -
> - if (discard(fd, iocb->offset, *poffset) < 0)
> - return -1;
> - }
> -
> - if (*poffset + *plen < iocb->offset + iocb->length) {
> - uint64_t end = iocb->offset + iocb->length;
> - uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
> - if (end == get_objsize(oid, object_size))
> - /* This is necessary to punch the last block */
> - end = round_up(end, BLOCK_SIZE);
> - sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen,
> - end, oid);
> -
> - if (discard(fd, *poffset + *plen, end) < 0)
> - return -1;
> - }
> -
> - return 0;
> -}
> -
> -int default_write(uint64_t oid, const struct siocb *iocb)
> -{
> - int flags = prepare_iocb(oid, iocb, false), fd,
> - ret = SD_RES_SUCCESS;
> - char path[PATH_MAX];
> - ssize_t size;
> - uint32_t len = iocb->length;
> - uint64_t offset = iocb->offset;
> - static bool trim_is_supported = true;
> -
> - if (iocb->epoch < sys_epoch()) {
> - sd_debug("%"PRIu32" sys %"PRIu32, iocb->epoch, sys_epoch());
> - return SD_RES_OLD_NODE_VER;
> - }
> -
> - if (uatomic_is_true(&sys->use_journal) &&
> - unlikely(journal_write_store(oid, iocb->buf, iocb->length,
> - iocb->offset, false))
> - != SD_RES_SUCCESS) {
> - sd_err("turn off journaling");
> - uatomic_set_false(&sys->use_journal);
> - flags |= O_DSYNC;
> - sync();
> - }
> -
> - get_store_path(oid, iocb->ec_index, path);
> -
> - /*
> - * Make sure oid is in the right place because oid might be misplaced
> - * in a wrong place, due to 'shutdown/restart with less/more disks' or
> - * any bugs. We need call err_to_sderr() to return EIO if disk is broken
> - */
> - if (!default_exist(oid, iocb->ec_index))
> - return err_to_sderr(path, oid, ENOENT);
> -
> - fd = open(path, flags, sd_def_fmode);
> - if (unlikely(fd < 0))
> - return err_to_sderr(path, oid, errno);
> -
> - if (trim_is_supported && is_sparse_object(oid)) {
> - if (default_trim(fd, oid, iocb, &offset, &len) < 0) {
> - trim_is_supported = false;
> - offset = iocb->offset;
> - len = iocb->length;
> - }
> - }
> -
> - size = xpwrite(fd, iocb->buf, len, offset);
> - if (unlikely(size != len)) {
> - sd_err("failed to write object %"PRIx64", path=%s, offset=%"
> - PRId32", size=%"PRId32", result=%zd, %m", oid, path,
> - iocb->offset, iocb->length, size);
> - ret = err_to_sderr(path, oid, errno);
> - goto out;
> - }
> -out:
> - close(fd);
> - return ret;
> -}
> -
> -static int make_stale_dir(const char *path)
> -{
> - char p[PATH_MAX];
> -
> - snprintf(p, PATH_MAX, "%s/.stale", path);
> - if (xmkdir(p, sd_def_dmode) < 0) {
> - sd_err("%s failed, %m", p);
> - return SD_RES_EIO;
> - }
> - return SD_RES_SUCCESS;
> -}
> -
> -static int purge_dir(const char *path)
> -{
> - if (purge_directory(path) < 0)
> - return SD_RES_EIO;
> -
> - return SD_RES_SUCCESS;
> -}
> -
> -static int purge_stale_dir(const char *path)
> -{
> - char p[PATH_MAX];
> -
> - snprintf(p, PATH_MAX, "%s/.stale", path);
> -
> - if (purge_directory_async(p) < 0)
> - return SD_RES_EIO;
> -
> - return SD_RES_SUCCESS;
> -}
> -
> -int default_cleanup(void)
> -{
> - int ret;
> -
> - ret = for_each_obj_path(purge_stale_dir);
> - if (ret != SD_RES_SUCCESS)
> - return ret;
> -
> - return SD_RES_SUCCESS;
> -}
> -
> -static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch)
> -{
> - int ret;
> - struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE);
> - struct siocb iocb = {
> - .epoch = epoch,
> - .buf = inode,
> - .length = SD_INODE_HEADER_SIZE,
> - };
> -
> - ret = default_read(oid, &iocb);
> - if (ret != SD_RES_SUCCESS) {
> - sd_err("failed to read inode header %" PRIx64 " %" PRId32
> - "wat %s", oid, epoch, wd);
> - goto out;
> - }
> - add_vdi_state_unordered(oid_to_vid(oid), inode->nr_copies,
> - vdi_is_snapshot(inode), inode->copy_policy,
> - inode->block_size_shift, inode->parent_vdi_id);
> -
> - if (inode->name[0] == '\0')
> - atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted);
> -
> - atomic_set_bit(oid_to_vid(oid), sys->vdi_inuse);
> -
> - ret = SD_RES_SUCCESS;
> -out:
> - free(inode);
> - return ret;
> -}
> -
> -static int init_objlist_and_vdi_bitmap(uint64_t oid, const char *wd,
> - uint32_t epoch, uint8_t ec_index,
> - struct vnode_info *vinfo,
> - void *arg)
> -{
> - int ret;
> - objlist_cache_insert(oid);
> -
> - if (is_vdi_obj(oid)) {
> - sd_debug("found the VDI object %" PRIx64" epoch %"PRIu32
> - " at %s", oid, epoch, wd);
> - ret = init_vdi_state(oid, wd, epoch);
> - if (ret != SD_RES_SUCCESS)
> - return ret;
> - }
> - return SD_RES_SUCCESS;
> -}
> -
> -int default_init(void)
> -{
> - int ret;
> -
> - sd_debug("use plain store driver");
> - ret = for_each_obj_path(make_stale_dir);
> - if (ret != SD_RES_SUCCESS)
> - return ret;
> -
> - for_each_object_in_stale(init_objlist_and_vdi_bitmap, NULL);
> -
> - return for_each_object_in_wd(init_objlist_and_vdi_bitmap, true, NULL);
> -}
> -
> -static int default_read_from_path(uint64_t oid, const char *path,
> - const struct siocb *iocb)
> -{
> - int flags = prepare_iocb(oid, iocb, false), fd,
> - ret = SD_RES_SUCCESS;
> - ssize_t size;
> -
> - /*
> - * Make sure oid is in the right place because oid might be misplaced
> - * in a wrong place, due to 'shutdown/restart with less disks' or any
> - * bugs. We need call err_to_sderr() to return EIO if disk is broken.
> - *
> - * For stale path, get_store_stale_path already does default_exist job.
> - */
> - if (!is_stale_path(path) && !default_exist(oid, iocb->ec_index))
> - return err_to_sderr(path, oid, ENOENT);
> -
> - fd = open(path, flags);
> - if (fd < 0)
> - return err_to_sderr(path, oid, errno);
> -
> - size = xpread(fd, iocb->buf, iocb->length, iocb->offset);
> - if (size < 0) {
> - sd_err("failed to read object %"PRIx64", path=%s, offset=%"
> - PRId32", size=%"PRId32", result=%zd, %m", oid, path,
> - iocb->offset, iocb->length, size);
> - ret = err_to_sderr(path, oid, errno);
> - }
> - close(fd);
> - return ret;
> -}
> -
> -int default_read(uint64_t oid, const struct siocb *iocb)
> -{
> - int ret;
> - char path[PATH_MAX];
> -
> - get_store_path(oid, iocb->ec_index, path);
> - ret = default_read_from_path(oid, path, iocb);
> -
> - /*
> - * If the request is against the older epoch, try to read from
> - * the stale directory
> - */
> - if (ret == SD_RES_NO_OBJ && iocb->epoch > 0 &&
> - iocb->epoch < sys_epoch()) {
> - get_store_stale_path(oid, iocb->epoch, iocb->ec_index, path);
> - ret = default_read_from_path(oid, path, iocb);
> - }
> -
> - return ret;
> -}
> -
> -int default_create_and_write(uint64_t oid, const struct siocb *iocb)
> -{
> - char path[PATH_MAX], tmp_path[PATH_MAX], *dir;
> - int flags = prepare_iocb(oid, iocb, true);
> - int ret, fd;
> - uint32_t len = iocb->length;
> - uint32_t object_size = 0;
> - size_t obj_size;
> - uint64_t offset = iocb->offset;
> -
> - sd_debug("%"PRIx64, oid);
> - get_store_path(oid, iocb->ec_index, path);
> - get_store_tmp_path(oid, iocb->ec_index, tmp_path);
> -
> - if (uatomic_is_true(&sys->use_journal) &&
> - journal_write_store(oid, iocb->buf, iocb->length,
> - iocb->offset, true)
> - != SD_RES_SUCCESS) {
> - sd_err("turn off journaling");
> - uatomic_set_false(&sys->use_journal);
> - flags |= O_SYNC;
> - sync();
> - }
> -
> - fd = open(tmp_path, flags, sd_def_fmode);
> - if (fd < 0) {
> - if (errno == EEXIST) {
> - /*
> - * This happens if node membership changes during object
> - * creation; while gateway retries a CREATE request,
> - * recovery process could also recover the object at the
> - * same time. They should try to write the same date,
> - * so it is okay to simply return success here.
> - */
> - sd_debug("%s exists", tmp_path);
> - return SD_RES_SUCCESS;
> - }
> -
> - sd_err("failed to open %s: %m", tmp_path);
> - return err_to_sderr(path, oid, errno);
> - }
> -
> - obj_size = get_store_objsize(oid);
> -
> - trim_zero_blocks(iocb->buf, &offset, &len);
> -
> - object_size = get_vdi_object_size(oid_to_vid(oid));
> -
> - if (offset != 0 || len != get_objsize(oid, object_size)) {
> - if (is_sparse_object(oid))
> - ret = xftruncate(fd, obj_size);
> - else
> - ret = prealloc(fd, obj_size);
> - if (ret < 0) {
> - ret = err_to_sderr(path, oid, errno);
> - goto out;
> - }
> - }
> -
> - ret = xpwrite(fd, iocb->buf, len, offset);
> - if (ret != len) {
> - sd_err("failed to write object. %m");
> - ret = err_to_sderr(path, oid, errno);
> - goto out;
> - }
> -
> - ret = rename(tmp_path, path);
> - if (ret < 0) {
> - sd_err("failed to rename %s to %s: %m", tmp_path, path);
> - ret = err_to_sderr(path, oid, errno);
> - goto out;
> - }
> -
> - close(fd);
> -
> - if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) {
> - objlist_cache_insert(oid);
> - return SD_RES_SUCCESS;
> - }
> -
> - pstrcpy(tmp_path, sizeof(tmp_path), path);
> - dir = dirname(tmp_path);
> - fd = open(dir, O_DIRECTORY | O_RDONLY);
> - if (fd < 0) {
> - sd_err("failed to open directory %s: %m", dir);
> - return err_to_sderr(path, oid, errno);
> - }
> -
> - if (fsync(fd) != 0) {
> - sd_err("failed to write directory %s: %m", dir);
> - ret = err_to_sderr(path, oid, errno);
> - close(fd);
> - if (unlink(path) != 0)
> - sd_err("failed to unlink %s: %m", path);
> - return ret;
> - }
> - close(fd);
> - objlist_cache_insert(oid);
> - return SD_RES_SUCCESS;
> -
> -out:
> - if (unlink(tmp_path) != 0)
> - sd_err("failed to unlink %s: %m", tmp_path);
> - close(fd);
> - return ret;
> -}
> -
> -int default_link(uint64_t oid, uint32_t tgt_epoch)
> -{
> - char path[PATH_MAX], stale_path[PATH_MAX];
> -
> - sd_debug("try link %"PRIx64" from snapshot with epoch %d", oid,
> - tgt_epoch);
> -
> - snprintf(path, PATH_MAX, "%s/%016"PRIx64, md_get_object_dir(oid), oid);
> - get_store_stale_path(oid, tgt_epoch, 0, stale_path);
> -
> - if (link(stale_path, path) < 0) {
> - /*
> - * Recovery thread and main thread might try to recover the
> - * same object and we might get EEXIST in such case.
> - */
> - if (errno == EEXIST)
> - goto out;
> -
> - sd_debug("failed to link from %s to %s, %m", stale_path, path);
> - return err_to_sderr(path, oid, errno);
> - }
> -out:
> - return SD_RES_SUCCESS;
> -}
> -
> -/*
> - * For replicated object, if any of the replica belongs to this node, we
> - * consider it not stale.
> - *
> - * For erasure coded object, since every copy is unique and if it migrates to
> - * other node(index gets changed even it has some other copy belongs to it)
> - * because of hash ring changes, we consider it stale.
> - */
> -static bool oid_stale(uint64_t oid, int ec_index, struct vnode_info *vinfo)
> -{
> - uint32_t i, nr_copies;
> - const struct sd_vnode *v;
> - bool ret = true;
> - const struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
> -
> - nr_copies = get_obj_copy_number(oid, vinfo->nr_zones);
> - oid_to_vnodes(oid, &vinfo->vroot, nr_copies, obj_vnodes);
> - for (i = 0; i < nr_copies; i++) {
> - v = obj_vnodes[i];
> - if (vnode_is_local(v)) {
> - if (ec_index < SD_MAX_COPIES) {
> - if (i == ec_index)
> - ret = false;
> - } else {
> - ret = false;
> - }
> - break;
> - }
> - }
> -
> - return ret;
> -}
> -
> -static int move_object_to_stale_dir(uint64_t oid, const char *wd,
> - uint32_t epoch, uint8_t ec_index,
> - struct vnode_info *vinfo, void *arg)
> -{
> - char path[PATH_MAX], stale_path[PATH_MAX];
> - uint32_t tgt_epoch = *(uint32_t *)arg;
> -
> - /* ec_index from md.c is reliable so we can directly use it */
> - if (ec_index < SD_MAX_COPIES) {
> - snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
> - md_get_object_dir(oid), oid, ec_index);
> - snprintf(stale_path, PATH_MAX,
> - "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
> - md_get_object_dir(oid), oid, ec_index, tgt_epoch);
> - } else {
> - snprintf(path, PATH_MAX, "%s/%016" PRIx64,
> - md_get_object_dir(oid), oid);
> - snprintf(stale_path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
> - md_get_object_dir(oid), oid, tgt_epoch);
> - }
> -
> - if (unlikely(rename(path, stale_path)) < 0) {
> - sd_err("failed to move stale object %" PRIX64 " to %s, %m", oid,
> - path);
> - return SD_RES_EIO;
> - }
> -
> - sd_debug("moved object %"PRIx64, oid);
> - return SD_RES_SUCCESS;
> -}
> -
> -static int check_stale_objects(uint64_t oid, const char *wd, uint32_t epoch,
> - uint8_t ec_index, struct vnode_info *vinfo,
> - void *arg)
> -{
> - if (oid_stale(oid, ec_index, vinfo))
> - return move_object_to_stale_dir(oid, wd, 0, ec_index,
> - NULL, arg);
> -
> - return SD_RES_SUCCESS;
> -}
> -
> -int default_update_epoch(uint32_t epoch)
> -{
> - assert(epoch);
> - return for_each_object_in_wd(check_stale_objects, false, &epoch);
> -}
> -
> -int default_format(void)
> -{
> - unsigned ret;
> -
> - sd_debug("try get a clean store");
> - ret = for_each_obj_path(purge_dir);
> - if (ret != SD_RES_SUCCESS)
> - return ret;
> -
> - if (sys->enable_object_cache)
> - object_cache_format();
> -
> - return SD_RES_SUCCESS;
> -}
> -
> -int default_remove_object(uint64_t oid, uint8_t ec_index)
> -{
> - char path[PATH_MAX];
> -
> - if (uatomic_is_true(&sys->use_journal))
> - journal_remove_object(oid);
> -
> - get_store_path(oid, ec_index, path);
> -
> - if (unlink(path) < 0) {
> - if (errno == ENOENT)
> - return SD_RES_NO_OBJ;
> -
> - sd_err("failed, %s, %m", path);
> - return SD_RES_EIO;
> - }
> -
> - return SD_RES_SUCCESS;
> -}
> -
> -#define SHA1NAME "user.obj.sha1"
> -
> -static int get_object_sha1(const char *path, uint8_t *sha1)
> -{
> - if (getxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE)
> - != SHA1_DIGEST_SIZE) {
> - if (errno == ENODATA)
> - sd_debug("sha1 is not cached yet, %s", path);
> - else
> - sd_err("fail to get xattr, %s", path);
> - return -1;
> - }
> -
> - return 0;
> -}
> -
> -static int set_object_sha1(const char *path, const uint8_t *sha1)
> -{
> - int ret;
> -
> - ret = setxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE, 0);
> - if (ret < 0)
> - sd_err("fail to set sha1, %s", path);
> -
> - return ret;
> -}
> -
> -static int get_object_path(uint64_t oid, uint32_t epoch, char *path,
> - size_t size)
> -{
> - if (default_exist(oid, 0)) {
> - snprintf(path, PATH_MAX, "%s/%016"PRIx64,
> - md_get_object_dir(oid), oid);
> - } else {
> - get_store_stale_path(oid, epoch, 0, path);
> - if (access(path, F_OK) < 0) {
> - if (errno == ENOENT)
> - return SD_RES_NO_OBJ;
> - return SD_RES_EIO;
> - }
> -
> - }
> -
> - return SD_RES_SUCCESS;
> -}
> -
> -int default_get_hash(uint64_t oid, uint32_t epoch, uint8_t *sha1)
> -{
> - int ret;
> - void *buf;
> - struct siocb iocb = {};
> - uint32_t length;
> - bool is_readonly_obj = oid_is_readonly(oid);
> - char path[PATH_MAX];
> -
> - ret = get_object_path(oid, epoch, path, sizeof(path));
> - if (ret != SD_RES_SUCCESS)
> - return ret;
> -
> - if (is_readonly_obj) {
> - if (get_object_sha1(path, sha1) == 0) {
> - sd_debug("use cached sha1 digest %s",
> - sha1_to_hex(sha1));
> - return SD_RES_SUCCESS;
> - }
> - }
> -
> - length = get_store_objsize(oid);
> - buf = valloc(length);
> - if (buf == NULL)
> - return SD_RES_NO_MEM;
> -
> - iocb.epoch = epoch;
> - iocb.buf = buf;
> - iocb.length = length;
> -
> - ret = default_read_from_path(oid, path, &iocb);
> - if (ret != SD_RES_SUCCESS) {
> - free(buf);
> - return ret;
> - }
> -
> - get_buffer_sha1(buf, length, sha1);
> - free(buf);
> -
> - sd_debug("the message digest of %"PRIx64" at epoch %d is %s", oid,
> - epoch, sha1_to_hex(sha1));
> -
> - if (is_readonly_obj)
> - set_object_sha1(path, sha1);
> -
> - return ret;
> -}
> -
> -int default_purge_obj(void)
> -{
> - uint32_t tgt_epoch = get_latest_epoch();
> -
> - return for_each_object_in_wd(move_object_to_stale_dir, true,
> - &tgt_epoch);
> -}
> -
> -static struct store_driver plain_store = {
> - .name = "plain",
> - .init = default_init,
> - .exist = default_exist,
> - .create_and_write = default_create_and_write,
> - .write = default_write,
> - .read = default_read,
> - .link = default_link,
> - .update_epoch = default_update_epoch,
> - .cleanup = default_cleanup,
> - .format = default_format,
> - .remove_object = default_remove_object,
> - .get_hash = default_get_hash,
> - .purge_obj = default_purge_obj,
> -};
> -
> -add_store_driver(plain_store);
> diff --git a/sheep/store.c b/sheep/store.c
> deleted file mode 100644
> index 8843fb8..0000000
> --- a/sheep/store.c
> +++ /dev/null
> @@ -1,511 +0,0 @@
> -/*
> - * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
> - *
> - * This program is free software; you can redistribute it and/or
> - * modify it under the terms of the GNU General Public License version
> - * 2 as published by the Free Software Foundation.
> - *
> - * You should have received a copy of the GNU General Public License
> - * along with this program. If not, see <http://www.gnu.org/licenses/>.
> - */
> -
> -#include "sheep_priv.h"
> -
> -char *obj_path;
> -char *epoch_path;
> -
> -struct store_driver *sd_store;
> -LIST_HEAD(store_drivers);
> -
> -int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes)
> -{
> - int ret, len, nodes_len;
> - time_t t;
> - char path[PATH_MAX], *buf;
> -
> - sd_debug("update epoch: %d, %zu", epoch, nr_nodes);
> -
> - /* Piggyback the epoch creation time for 'dog cluster info' */
> - time(&t);
> - nodes_len = nr_nodes * sizeof(struct sd_node);
> - len = nodes_len + sizeof(time_t);
> - buf = xmalloc(len);
> - memcpy(buf, nodes, nodes_len);
> - memcpy(buf + nodes_len, &t, sizeof(time_t));
> -
> - /*
> - * rb field is unused in epoch file, zero-filling it
> - * is good for epoch file recovery because it is unified
> - */
> - for (int i = 0; i < nr_nodes; i++)
> - memset(buf + i * sizeof(struct sd_node)
> - + offsetof(struct sd_node, rb),
> - 0, sizeof(struct rb_node));
> -
> - snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
> -
> - ret = atomic_create_and_write(path, buf, len, true);
> -
> - free(buf);
> - return ret;
> -}
> -
> -static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len,
> - int *nr_nodes, time_t *timestamp)
> -{
> - int fd, ret, buf_len;
> - char path[PATH_MAX];
> - struct stat epoch_stat;
> -
> - snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
> - fd = open(path, O_RDONLY);
> - if (fd < 0) {
> - sd_debug("failed to open epoch %"PRIu32" log, %m", epoch);
> - goto err;
> - }
> -
> - memset(&epoch_stat, 0, sizeof(epoch_stat));
> - ret = fstat(fd, &epoch_stat);
> - if (ret < 0) {
> - sd_err("failed to stat epoch %"PRIu32" log, %m", epoch);
> - goto err;
> - }
> -
> - buf_len = epoch_stat.st_size - sizeof(*timestamp);
> - if (buf_len < 0) {
> - sd_err("invalid epoch %"PRIu32" log", epoch);
> - goto err;
> - }
> - if (len < buf_len) {
> - close(fd);
> - return SD_RES_BUFFER_SMALL;
> - }
> -
> - ret = xread(fd, nodes, buf_len);
> - if (ret < 0) {
> - sd_err("failed to read epoch %"PRIu32" log, %m", epoch);
> - goto err;
> - }
> -
> - /* Broken epoch, just ignore */
> - if (ret % sizeof(struct sd_node) != 0) {
> - sd_err("invalid epoch %"PRIu32" log", epoch);
> - goto err;
> - }
> -
> - *nr_nodes = ret / sizeof(struct sd_node);
> -
> - if (timestamp) {
> - ret = xread(fd, timestamp, sizeof(*timestamp));
> - if (ret != sizeof(*timestamp)) {
> - sd_err("invalid epoch %"PRIu32" log", epoch);
> - goto err;
> - }
> - }
> -
> - close(fd);
> - return SD_RES_SUCCESS;
> -err:
> - if (fd >= 0)
> - close(fd);
> - return SD_RES_NO_TAG;
> -}
> -
> -int epoch_log_read(uint32_t epoch, struct sd_node *nodes,
> - int len, int *nr_nodes)
> -{
> - return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL);
> -}
> -
> -int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes,
> - int len, int *nr_nodes, time_t *timestamp)
> -{
> - return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp);
> -}
> -
> -uint32_t get_latest_epoch(void)
> -{
> - DIR *dir;
> - struct dirent *d;
> - uint32_t e, epoch = 0;
> - char *p;
> -
> - dir = opendir(epoch_path);
> - if (!dir)
> - panic("failed to get the latest epoch: %m");
> -
> - while ((d = readdir(dir))) {
> - e = strtol(d->d_name, &p, 10);
> - if (d->d_name == p)
> - continue;
> -
> - if (strlen(d->d_name) != 8)
> - continue;
> -
> - if (e > epoch)
> - epoch = e;
> - }
> - closedir(dir);
> -
> - return epoch;
> -}
> -
> -int lock_base_dir(const char *d)
> -{
> -#define LOCK_PATH "/lock"
> - char *lock_path;
> - int ret = 0;
> - int fd, len = strlen(d) + strlen(LOCK_PATH) + 1;
> -
> - lock_path = xzalloc(len);
> - snprintf(lock_path, len, "%s" LOCK_PATH, d);
> -
> - fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode);
> - if (fd < 0) {
> - sd_err("failed to open lock file %s (%m)", lock_path);
> - ret = -1;
> - goto out;
> - }
> -
> - if (lockf(fd, F_TLOCK, 1) < 0) {
> - if (errno == EACCES || errno == EAGAIN)
> - sd_err("another sheep daemon is using %s", d);
> - else
> - sd_err("unable to get base dir lock (%m)");
> - ret = -1;
> - goto out;
> - }
> -
> -out:
> - free(lock_path);
> - return ret;
> -}
> -
> -int init_base_path(const char *d)
> -{
> - if (xmkdir(d, sd_def_dmode) < 0) {
> - sd_err("cannot create the directory %s (%m)", d);
> - return -1;
> - }
> -
> - return 0;
> -}
> -
> -static inline int check_path_len(const char *path)
> -{
> - int len = strlen(path);
> - if (len > PATH_MAX) {
> - sd_err("insanely long object directory %s", path);
> - return -1;
> - }
> -
> - return 0;
> -}
> -
> -static int is_meta_store(const char *path)
> -{
> - char conf[PATH_MAX];
> - char epoch[PATH_MAX];
> -
> - snprintf(conf, PATH_MAX, "%s/config", path);
> - snprintf(epoch, PATH_MAX, "%s/epoch", path);
> - if (!access(conf, R_OK) && !access(epoch, R_OK))
> - return true;
> -
> - return false;
> -}
> -
> -static int init_obj_path(const char *base_path, char *argp)
> -{
> - char *p;
> - int len;
> -
> - if (check_path_len(base_path) < 0)
> - return -1;
> -
> -#define OBJ_PATH "/obj"
> - len = strlen(base_path) + strlen(OBJ_PATH) + 1;
> - obj_path = xzalloc(len);
> - snprintf(obj_path, len, "%s" OBJ_PATH, base_path);
> -
> - /* Eat up the first component */
> - strtok(argp, ",");
> - p = strtok(NULL, ",");
> - if (!p) {
> - /*
> - * If We have only one path, meta-store and object-store share
> - * it. This is helpful to upgrade old sheep cluster to
> - * the MD-enabled.
> - */
> - md_add_disk(obj_path, false);
> - } else {
> - do {
> - if (is_meta_store(p)) {
> - sd_err("%s is meta-store, abort", p);
> - return -1;
> - }
> - md_add_disk(p, false);
> - } while ((p = strtok(NULL, ",")));
> - }
> -
> - if (md_nr_disks() <= 0) {
> - sd_err("There isn't any available disk!");
> - return -1;
> - }
> -
> - return xmkdir(obj_path, sd_def_dmode);
> -}
> -
> -static int init_epoch_path(const char *base_path)
> -{
> -#define EPOCH_PATH "/epoch/"
> - int len = strlen(base_path) + strlen(EPOCH_PATH) + 1;
> - epoch_path = xzalloc(len);
> - snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path);
> -
> - return xmkdir(epoch_path, sd_def_dmode);
> -}
> -
> -/*
> - * If the node is gateway, this function only finds the store driver.
> - * Otherwise, this function initializes the backend store
> - */
> -int init_store_driver(bool is_gateway)
> -{
> - char driver_name[STORE_LEN], *p;
> -
> - pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store);
> -
> - p = memchr(driver_name, '\0', STORE_LEN);
> - if (!p) {
> - /*
> - * If the driver name is not NUL terminated we are in deep
> - * trouble, let's get out here.
> - */
> - sd_debug("store name not NUL terminated");
> - return SD_RES_NO_STORE;
> - }
> -
> - /*
> - * The store file might not exist in case this is a new sheep that
> - * never joined a cluster before.
> - */
> - if (p == driver_name)
> - return 0;
> -
> - sd_store = find_store_driver(driver_name);
> - if (!sd_store) {
> - sd_debug("store %s not found", driver_name);
> - return SD_RES_NO_STORE;
> - }
> -
> - if (is_gateway)
> - return SD_RES_SUCCESS;
> -
> - return sd_store->init();
> -}
> -
> -int init_disk_space(const char *base_path)
> -{
> - int ret = SD_RES_SUCCESS;
> - uint64_t space_size = 0, mds;
> - struct statvfs fs;
> -
> - if (sys->gateway_only)
> - goto out;
> -
> - /* We need to init md even we don't need to update sapce */
> - mds = md_init_space();
> -
> - /* If it is restarted */
> - ret = get_node_space(&space_size);
> - if (space_size != 0) {
> - sys->disk_space = space_size;
> - goto out;
> - }
> -
> - /* User has specified the space at startup */
> - if (sys->disk_space) {
> - ret = set_node_space(sys->disk_space);
> - goto out;
> - }
> -
> - if (mds) {
> - sys->disk_space = mds;
> - } else {
> - ret = statvfs(base_path, &fs);
> - if (ret < 0) {
> - sd_debug("get disk space failed %m");
> - ret = SD_RES_EIO;
> - goto out;
> - }
> - sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail;
> - }
> -
> - ret = set_node_space(sys->disk_space);
> -out:
> - sd_debug("disk free space is %" PRIu64, sys->disk_space);
> - return ret;
> -}
> -
> -/* Initialize all the global pathnames used internally */
> -int init_global_pathnames(const char *d, char *argp)
> -{
> - int ret;
> -
> - ret = init_obj_path(d, argp);
> - if (ret)
> - return ret;
> -
> - ret = init_epoch_path(d);
> - if (ret)
> - return ret;
> -
> - init_config_path(d);
> -
> - return 0;
> -}
> -
> -/* Write data to both local object cache (if enabled) and backends */
> -int sd_write_object(uint64_t oid, char *data, unsigned int datalen,
> - uint64_t offset, bool create)
> -{
> - struct sd_req hdr;
> - int ret;
> -
> - if (sys->enable_object_cache && object_is_cached(oid)) {
> - ret = object_cache_write(oid, data, datalen, offset,
> - create);
> - if (ret == SD_RES_NO_CACHE)
> - goto forward_write;
> -
> - if (ret != 0) {
> - sd_err("write cache failed %" PRIx64 " %" PRIx32, oid,
> - ret);
> - return ret;
> - }
> - }
> -
> -forward_write:
> - if (create)
> - sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
> - else
> - sd_init_req(&hdr, SD_OP_WRITE_OBJ);
> - hdr.flags = SD_FLAG_CMD_WRITE;
> - hdr.data_length = datalen;
> -
> - hdr.obj.oid = oid;
> - hdr.obj.offset = offset;
> -
> - ret = exec_local_req(&hdr, data);
> - if (ret != SD_RES_SUCCESS)
> - sd_err("failed to write object %" PRIx64 ", %s", oid,
> - sd_strerror(ret));
> -
> - return ret;
> -}
> -
> -int read_backend_object(uint64_t oid, char *data, unsigned int datalen,
> - uint64_t offset)
> -{
> - struct sd_req hdr;
> - int ret;
> -
> - sd_init_req(&hdr, SD_OP_READ_OBJ);
> - hdr.data_length = datalen;
> - hdr.obj.oid = oid;
> - hdr.obj.offset = offset;
> -
> - ret = exec_local_req(&hdr, data);
> - if (ret != SD_RES_SUCCESS)
> - sd_err("failed to read object %" PRIx64 ", %s", oid,
> - sd_strerror(ret));
> - return ret;
> -}
> -
> -/*
> - * Read data firstly from local object cache(if enabled), if fail,
> - * try read backends
> - */
> -int sd_read_object(uint64_t oid, char *data, unsigned int datalen,
> - uint64_t offset)
> -{
> - int ret;
> -
> - if (sys->enable_object_cache && object_is_cached(oid)) {
> - ret = object_cache_read(oid, data, datalen, offset);
> - if (ret != SD_RES_SUCCESS) {
> - sd_err("try forward read %" PRIx64 " %s", oid,
> - sd_strerror(ret));
> - goto forward_read;
> - }
> - return ret;
> - }
> -
> -forward_read:
> - return read_backend_object(oid, data, datalen, offset);
> -}
> -
> -int sd_remove_object(uint64_t oid)
> -{
> - struct sd_req hdr;
> - int ret;
> -
> - if (sys->enable_object_cache && object_is_cached(oid)) {
> - ret = object_cache_remove(oid);
> - if (ret != SD_RES_SUCCESS)
> - return ret;
> - }
> -
> - sd_init_req(&hdr, SD_OP_REMOVE_OBJ);
> - hdr.obj.oid = oid;
> -
> - ret = exec_local_req(&hdr, NULL);
> - if (ret != SD_RES_SUCCESS)
> - sd_err("failed to remove object %" PRIx64 ", %s", oid,
> - sd_strerror(ret));
> -
> - return ret;
> -}
> -
> -int sd_discard_object(uint64_t oid)
> -{
> - int ret;
> - struct sd_req hdr;
> -
> - sd_init_req(&hdr, SD_OP_DISCARD_OBJ);
> - hdr.obj.oid = oid;
> -
> - ret = exec_local_req(&hdr, NULL);
> - if (ret != SD_RES_SUCCESS)
> - sd_err("Failed to discard data obj %"PRIu64" %s", oid,
> - sd_strerror(ret));
> -
> - return ret;
> -}
> -
> -int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation,
> - uint32_t refcnt)
> -{
> - struct sd_req hdr;
> - int ret;
> - uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid);
> -
> - sd_debug("%"PRIx64", %" PRId32 ", %" PRId32,
> - data_oid, generation, refcnt);
> -
> - if (generation == 0 && refcnt == 0)
> - return sd_remove_object(data_oid);
> -
> - sd_init_req(&hdr, SD_OP_DECREF_OBJ);
> - hdr.ref.oid = ledger_oid;
> - hdr.ref.generation = generation;
> - hdr.ref.count = refcnt;
> -
> - ret = exec_local_req(&hdr, NULL);
> - if (ret != SD_RES_SUCCESS)
> - sd_err("failed to decrement reference %" PRIx64 ", %s",
> - ledger_oid, sd_strerror(ret));
> -
> - return ret;
> -}
> diff --git a/sheep/store/common.c b/sheep/store/common.c
> new file mode 100644
> index 0000000..8843fb8
> --- /dev/null
> +++ b/sheep/store/common.c
> @@ -0,0 +1,511 @@
> +/*
> + * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#include "sheep_priv.h"
> +
> +char *obj_path;
> +char *epoch_path;
> +
> +struct store_driver *sd_store;
> +LIST_HEAD(store_drivers);
> +
> +int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes)
> +{
> + int ret, len, nodes_len;
> + time_t t;
> + char path[PATH_MAX], *buf;
> +
> + sd_debug("update epoch: %d, %zu", epoch, nr_nodes);
> +
> + /* Piggyback the epoch creation time for 'dog cluster info' */
> + time(&t);
> + nodes_len = nr_nodes * sizeof(struct sd_node);
> + len = nodes_len + sizeof(time_t);
> + buf = xmalloc(len);
> + memcpy(buf, nodes, nodes_len);
> + memcpy(buf + nodes_len, &t, sizeof(time_t));
> +
> + /*
> + * rb field is unused in epoch file, zero-filling it
> + * is good for epoch file recovery because it is unified
> + */
> + for (int i = 0; i < nr_nodes; i++)
> + memset(buf + i * sizeof(struct sd_node)
> + + offsetof(struct sd_node, rb),
> + 0, sizeof(struct rb_node));
> +
> + snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
> +
> + ret = atomic_create_and_write(path, buf, len, true);
> +
> + free(buf);
> + return ret;
> +}
> +
> +static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len,
> + int *nr_nodes, time_t *timestamp)
> +{
> + int fd, ret, buf_len;
> + char path[PATH_MAX];
> + struct stat epoch_stat;
> +
> + snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
> + fd = open(path, O_RDONLY);
> + if (fd < 0) {
> + sd_debug("failed to open epoch %"PRIu32" log, %m", epoch);
> + goto err;
> + }
> +
> + memset(&epoch_stat, 0, sizeof(epoch_stat));
> + ret = fstat(fd, &epoch_stat);
> + if (ret < 0) {
> + sd_err("failed to stat epoch %"PRIu32" log, %m", epoch);
> + goto err;
> + }
> +
> + buf_len = epoch_stat.st_size - sizeof(*timestamp);
> + if (buf_len < 0) {
> + sd_err("invalid epoch %"PRIu32" log", epoch);
> + goto err;
> + }
> + if (len < buf_len) {
> + close(fd);
> + return SD_RES_BUFFER_SMALL;
> + }
> +
> + ret = xread(fd, nodes, buf_len);
> + if (ret < 0) {
> + sd_err("failed to read epoch %"PRIu32" log, %m", epoch);
> + goto err;
> + }
> +
> + /* Broken epoch, just ignore */
> + if (ret % sizeof(struct sd_node) != 0) {
> + sd_err("invalid epoch %"PRIu32" log", epoch);
> + goto err;
> + }
> +
> + *nr_nodes = ret / sizeof(struct sd_node);
> +
> + if (timestamp) {
> + ret = xread(fd, timestamp, sizeof(*timestamp));
> + if (ret != sizeof(*timestamp)) {
> + sd_err("invalid epoch %"PRIu32" log", epoch);
> + goto err;
> + }
> + }
> +
> + close(fd);
> + return SD_RES_SUCCESS;
> +err:
> + if (fd >= 0)
> + close(fd);
> + return SD_RES_NO_TAG;
> +}
> +
> +int epoch_log_read(uint32_t epoch, struct sd_node *nodes,
> + int len, int *nr_nodes)
> +{
> + return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL);
> +}
> +
> +int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes,
> + int len, int *nr_nodes, time_t *timestamp)
> +{
> + return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp);
> +}
> +
> +uint32_t get_latest_epoch(void)
> +{
> + DIR *dir;
> + struct dirent *d;
> + uint32_t e, epoch = 0;
> + char *p;
> +
> + dir = opendir(epoch_path);
> + if (!dir)
> + panic("failed to get the latest epoch: %m");
> +
> + while ((d = readdir(dir))) {
> + e = strtol(d->d_name, &p, 10);
> + if (d->d_name == p)
> + continue;
> +
> + if (strlen(d->d_name) != 8)
> + continue;
> +
> + if (e > epoch)
> + epoch = e;
> + }
> + closedir(dir);
> +
> + return epoch;
> +}
> +
> +int lock_base_dir(const char *d)
> +{
> +#define LOCK_PATH "/lock"
> + char *lock_path;
> + int ret = 0;
> + int fd, len = strlen(d) + strlen(LOCK_PATH) + 1;
> +
> + lock_path = xzalloc(len);
> + snprintf(lock_path, len, "%s" LOCK_PATH, d);
> +
> + fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode);
> + if (fd < 0) {
> + sd_err("failed to open lock file %s (%m)", lock_path);
> + ret = -1;
> + goto out;
> + }
> +
> + if (lockf(fd, F_TLOCK, 1) < 0) {
> + if (errno == EACCES || errno == EAGAIN)
> + sd_err("another sheep daemon is using %s", d);
> + else
> + sd_err("unable to get base dir lock (%m)");
> + ret = -1;
> + goto out;
> + }
> +
> +out:
> + free(lock_path);
> + return ret;
> +}
> +
> +int init_base_path(const char *d)
> +{
> + if (xmkdir(d, sd_def_dmode) < 0) {
> + sd_err("cannot create the directory %s (%m)", d);
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +static inline int check_path_len(const char *path)
> +{
> + int len = strlen(path);
> + if (len > PATH_MAX) {
> + sd_err("insanely long object directory %s", path);
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +static int is_meta_store(const char *path)
> +{
> + char conf[PATH_MAX];
> + char epoch[PATH_MAX];
> +
> + snprintf(conf, PATH_MAX, "%s/config", path);
> + snprintf(epoch, PATH_MAX, "%s/epoch", path);
> + if (!access(conf, R_OK) && !access(epoch, R_OK))
> + return true;
> +
> + return false;
> +}
> +
> +static int init_obj_path(const char *base_path, char *argp)
> +{
> + char *p;
> + int len;
> +
> + if (check_path_len(base_path) < 0)
> + return -1;
> +
> +#define OBJ_PATH "/obj"
> + len = strlen(base_path) + strlen(OBJ_PATH) + 1;
> + obj_path = xzalloc(len);
> + snprintf(obj_path, len, "%s" OBJ_PATH, base_path);
> +
> + /* Eat up the first component */
> + strtok(argp, ",");
> + p = strtok(NULL, ",");
> + if (!p) {
> + /*
> + * If We have only one path, meta-store and object-store share
> + * it. This is helpful to upgrade old sheep cluster to
> + * the MD-enabled.
> + */
> + md_add_disk(obj_path, false);
> + } else {
> + do {
> + if (is_meta_store(p)) {
> + sd_err("%s is meta-store, abort", p);
> + return -1;
> + }
> + md_add_disk(p, false);
> + } while ((p = strtok(NULL, ",")));
> + }
> +
> + if (md_nr_disks() <= 0) {
> + sd_err("There isn't any available disk!");
> + return -1;
> + }
> +
> + return xmkdir(obj_path, sd_def_dmode);
> +}
> +
> +static int init_epoch_path(const char *base_path)
> +{
> +#define EPOCH_PATH "/epoch/"
> + int len = strlen(base_path) + strlen(EPOCH_PATH) + 1;
> + epoch_path = xzalloc(len);
> + snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path);
> +
> + return xmkdir(epoch_path, sd_def_dmode);
> +}
> +
> +/*
> + * If the node is gateway, this function only finds the store driver.
> + * Otherwise, this function initializes the backend store
> + */
> +int init_store_driver(bool is_gateway)
> +{
> + char driver_name[STORE_LEN], *p;
> +
> + pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store);
> +
> + p = memchr(driver_name, '\0', STORE_LEN);
> + if (!p) {
> + /*
> + * If the driver name is not NUL terminated we are in deep
> + * trouble, let's get out here.
> + */
> + sd_debug("store name not NUL terminated");
> + return SD_RES_NO_STORE;
> + }
> +
> + /*
> + * The store file might not exist in case this is a new sheep that
> + * never joined a cluster before.
> + */
> + if (p == driver_name)
> + return 0;
> +
> + sd_store = find_store_driver(driver_name);
> + if (!sd_store) {
> + sd_debug("store %s not found", driver_name);
> + return SD_RES_NO_STORE;
> + }
> +
> + if (is_gateway)
> + return SD_RES_SUCCESS;
> +
> + return sd_store->init();
> +}
> +
> +int init_disk_space(const char *base_path)
> +{
> + int ret = SD_RES_SUCCESS;
> + uint64_t space_size = 0, mds;
> + struct statvfs fs;
> +
> + if (sys->gateway_only)
> + goto out;
> +
> + /* We need to init md even we don't need to update sapce */
> + mds = md_init_space();
> +
> + /* If it is restarted */
> + ret = get_node_space(&space_size);
> + if (space_size != 0) {
> + sys->disk_space = space_size;
> + goto out;
> + }
> +
> + /* User has specified the space at startup */
> + if (sys->disk_space) {
> + ret = set_node_space(sys->disk_space);
> + goto out;
> + }
> +
> + if (mds) {
> + sys->disk_space = mds;
> + } else {
> + ret = statvfs(base_path, &fs);
> + if (ret < 0) {
> + sd_debug("get disk space failed %m");
> + ret = SD_RES_EIO;
> + goto out;
> + }
> + sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail;
> + }
> +
> + ret = set_node_space(sys->disk_space);
> +out:
> + sd_debug("disk free space is %" PRIu64, sys->disk_space);
> + return ret;
> +}
> +
> +/* Initialize all the global pathnames used internally */
> +int init_global_pathnames(const char *d, char *argp)
> +{
> + int ret;
> +
> + ret = init_obj_path(d, argp);
> + if (ret)
> + return ret;
> +
> + ret = init_epoch_path(d);
> + if (ret)
> + return ret;
> +
> + init_config_path(d);
> +
> + return 0;
> +}
> +
> +/* Write data to both local object cache (if enabled) and backends */
> +int sd_write_object(uint64_t oid, char *data, unsigned int datalen,
> + uint64_t offset, bool create)
> +{
> + struct sd_req hdr;
> + int ret;
> +
> + if (sys->enable_object_cache && object_is_cached(oid)) {
> + ret = object_cache_write(oid, data, datalen, offset,
> + create);
> + if (ret == SD_RES_NO_CACHE)
> + goto forward_write;
> +
> + if (ret != 0) {
> + sd_err("write cache failed %" PRIx64 " %" PRIx32, oid,
> + ret);
> + return ret;
> + }
> + }
> +
> +forward_write:
> + if (create)
> + sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
> + else
> + sd_init_req(&hdr, SD_OP_WRITE_OBJ);
> + hdr.flags = SD_FLAG_CMD_WRITE;
> + hdr.data_length = datalen;
> +
> + hdr.obj.oid = oid;
> + hdr.obj.offset = offset;
> +
> + ret = exec_local_req(&hdr, data);
> + if (ret != SD_RES_SUCCESS)
> + sd_err("failed to write object %" PRIx64 ", %s", oid,
> + sd_strerror(ret));
> +
> + return ret;
> +}
> +
> +int read_backend_object(uint64_t oid, char *data, unsigned int datalen,
> + uint64_t offset)
> +{
> + struct sd_req hdr;
> + int ret;
> +
> + sd_init_req(&hdr, SD_OP_READ_OBJ);
> + hdr.data_length = datalen;
> + hdr.obj.oid = oid;
> + hdr.obj.offset = offset;
> +
> + ret = exec_local_req(&hdr, data);
> + if (ret != SD_RES_SUCCESS)
> + sd_err("failed to read object %" PRIx64 ", %s", oid,
> + sd_strerror(ret));
> + return ret;
> +}
> +
> +/*
> + * Read data firstly from local object cache(if enabled), if fail,
> + * try read backends
> + */
> +int sd_read_object(uint64_t oid, char *data, unsigned int datalen,
> + uint64_t offset)
> +{
> + int ret;
> +
> + if (sys->enable_object_cache && object_is_cached(oid)) {
> + ret = object_cache_read(oid, data, datalen, offset);
> + if (ret != SD_RES_SUCCESS) {
> + sd_err("try forward read %" PRIx64 " %s", oid,
> + sd_strerror(ret));
> + goto forward_read;
> + }
> + return ret;
> + }
> +
> +forward_read:
> + return read_backend_object(oid, data, datalen, offset);
> +}
> +
> +int sd_remove_object(uint64_t oid)
> +{
> + struct sd_req hdr;
> + int ret;
> +
> + if (sys->enable_object_cache && object_is_cached(oid)) {
> + ret = object_cache_remove(oid);
> + if (ret != SD_RES_SUCCESS)
> + return ret;
> + }
> +
> + sd_init_req(&hdr, SD_OP_REMOVE_OBJ);
> + hdr.obj.oid = oid;
> +
> + ret = exec_local_req(&hdr, NULL);
> + if (ret != SD_RES_SUCCESS)
> + sd_err("failed to remove object %" PRIx64 ", %s", oid,
> + sd_strerror(ret));
> +
> + return ret;
> +}
> +
> +int sd_discard_object(uint64_t oid)
> +{
> + int ret;
> + struct sd_req hdr;
> +
> + sd_init_req(&hdr, SD_OP_DISCARD_OBJ);
> + hdr.obj.oid = oid;
> +
> + ret = exec_local_req(&hdr, NULL);
> + if (ret != SD_RES_SUCCESS)
> + sd_err("Failed to discard data obj %"PRIu64" %s", oid,
> + sd_strerror(ret));
> +
> + return ret;
> +}
> +
> +int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation,
> + uint32_t refcnt)
> +{
> + struct sd_req hdr;
> + int ret;
> + uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid);
> +
> + sd_debug("%"PRIx64", %" PRId32 ", %" PRId32,
> + data_oid, generation, refcnt);
> +
> + if (generation == 0 && refcnt == 0)
> + return sd_remove_object(data_oid);
> +
> + sd_init_req(&hdr, SD_OP_DECREF_OBJ);
> + hdr.ref.oid = ledger_oid;
> + hdr.ref.generation = generation;
> + hdr.ref.count = refcnt;
> +
> + ret = exec_local_req(&hdr, NULL);
> + if (ret != SD_RES_SUCCESS)
> + sd_err("failed to decrement reference %" PRIx64 ", %s",
> + ledger_oid, sd_strerror(ret));
> +
> + return ret;
> +}
> diff --git a/sheep/store/md.c b/sheep/store/md.c
> new file mode 100644
> index 0000000..87ab759
> --- /dev/null
> +++ b/sheep/store/md.c
> @@ -0,0 +1,878 @@
> +/*
> + * Copyright (C) 2013 Taobao Inc.
> + *
> + * Liu Yuan <namei.unix at gmail.com>
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#include "sheep_priv.h"
> +
> +#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */
> +
> +#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/ノ厂経7/!"
> +
> +struct md md = {
> + .vroot = RB_ROOT,
> + .root = RB_ROOT,
> + .lock = SD_RW_LOCK_INITIALIZER,
> +};
> +
> +static inline uint32_t nr_online_disks(void)
> +{
> + uint32_t nr;
> +
> + sd_read_lock(&md.lock);
> + nr = md.nr_disks;
> + sd_rw_unlock(&md.lock);
> +
> + return nr;
> +}
> +
> +static inline int vdisk_number(const struct disk *disk)
> +{
> + return DIV_ROUND_UP(disk->space, MD_VDISK_SIZE);
> +}
> +
> +static int disk_cmp(const struct disk *d1, const struct disk *d2)
> +{
> + return strcmp(d1->path, d2->path);
> +}
> +
> +static int vdisk_cmp(const struct vdisk *d1, const struct vdisk *d2)
> +{
> + return intcmp(d1->hash, d2->hash);
> +}
> +
> +static struct vdisk *vdisk_insert(struct vdisk *new)
> +{
> + return rb_insert(&md.vroot, new, rb, vdisk_cmp);
> +}
> +
> +/* If v1_hash < hval <= v2_hash, then oid is resident in v2 */
> +static struct vdisk *hval_to_vdisk(uint64_t hval)
> +{
> + struct vdisk dummy = { .hash = hval };
> +
> + return rb_nsearch(&md.vroot, &dummy, rb, vdisk_cmp);
> +}
> +
> +static struct vdisk *oid_to_vdisk(uint64_t oid)
> +{
> + return hval_to_vdisk(sd_hash_oid(oid));
> +}
> +
> +static void create_vdisks(const struct disk *disk)
> +{
> + uint64_t hval = sd_hash(disk->path, strlen(disk->path));
> + const struct sd_node *n = &sys->this_node;
> + uint64_t node_hval;
> + int nr;
> +
> + if (is_cluster_diskmode(&sys->cinfo)) {
> + node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
> + hval = fnv_64a_64(node_hval, hval);
> + nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
> + if (0 == n->nid.port)
> + return;
> + } else
> + nr = vdisk_number(disk);
> +
> + for (int i = 0; i < nr; i++) {
> + struct vdisk *v = xmalloc(sizeof(*v));
> +
> + hval = sd_hash_next(hval);
> + v->hash = hval;
> + v->disk = disk;
> + if (unlikely(vdisk_insert(v)))
> + panic("vdisk hash collison");
> + }
> +}
> +
> +static inline void vdisk_free(struct vdisk *v)
> +{
> + rb_erase(&v->rb, &md.vroot);
> + free(v);
> +}
> +
> +static void remove_vdisks(const struct disk *disk)
> +{
> + uint64_t hval = sd_hash(disk->path, strlen(disk->path));
> + const struct sd_node *n = &sys->this_node;
> + uint64_t node_hval;
> + int nr;
> +
> + if (is_cluster_diskmode(&sys->cinfo)) {
> + node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
> + hval = fnv_64a_64(node_hval, hval);
> + nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
> + } else
> + nr = vdisk_number(disk);
> +
> + for (int i = 0; i < nr; i++) {
> + struct vdisk *v;
> +
> + hval = sd_hash_next(hval);
> + v = hval_to_vdisk(hval);
> + assert(v->hash == hval);
> +
> + vdisk_free(v);
> + }
> +}
> +
> +static inline void trim_last_slash(char *path)
> +{
> + assert(path[0]);
> + while (path[strlen(path) - 1] == '/')
> + path[strlen(path) - 1] = '\0';
> +}
> +
> +static struct disk *path_to_disk(const char *path)
> +{
> + struct disk key = {};
> +
> + pstrcpy(key.path, sizeof(key.path), path);
> + trim_last_slash(key.path);
> +
> + return rb_search(&md.root, &key, rb, disk_cmp);
> +}
> +
> +size_t get_store_objsize(uint64_t oid)
> +{
> + if (is_erasure_oid(oid)) {
> + uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
> + int d;
> + ec_policy_to_dp(policy, &d, NULL);
> + return get_vdi_object_size(oid_to_vid(oid)) / d;
> + }
> + return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
> +}
> +
> +static int get_total_object_size(uint64_t oid, const char *wd, uint32_t epoch,
> + uint8_t ec_index, struct vnode_info *vinfo,
> + void *total)
> +{
> + uint64_t *t = total;
> + struct stat s;
> + char path[PATH_MAX];
> +
> + snprintf(path, PATH_MAX, "%s/%016" PRIx64, wd, oid);
> + if (stat(path, &s) == 0)
> + *t += s.st_blocks * SECTOR_SIZE;
> + else
> + *t += get_store_objsize(oid);
> +
> + return SD_RES_SUCCESS;
> +}
> +
> +static int64_t find_string_integer(const char *str, const char *delimiter)
> +{
> + char *pos = strstr(str, delimiter), *p;
> + int64_t ret;
> +
> + ret = strtoll(pos + 1, &p, 10);
> + if (ret == LLONG_MAX || p == pos + 1) {
> + sd_err("%s strtoul failed, delimiter %s, %m", str, delimiter);
> + return -1;
> + }
> +
> + return ret;
> +}
> +
> +/* If cleanup is true, temporary objects will be removed */
> +static int for_each_object_in_path(const char *path,
> + int (*func)(uint64_t, const char *, uint32_t,
> + uint8_t, struct vnode_info *,
> + void *),
> + bool cleanup, struct vnode_info *vinfo,
> + void *arg)
> +{
> + DIR *dir;
> + struct dirent *d;
> + uint64_t oid;
> + int ret = SD_RES_SUCCESS;
> + char file_name[PATH_MAX];
> +
> + dir = opendir(path);
> + if (unlikely(!dir)) {
> + sd_err("failed to open %s, %m", path);
> + return SD_RES_EIO;
> + }
> +
> + while ((d = readdir(dir))) {
> + uint32_t epoch = 0;
> + uint8_t ec_index = SD_MAX_COPIES;
> +
> + /* skip ".", ".." and ".stale" */
> + if (unlikely(!strncmp(d->d_name, ".", 1)))
> + continue;
> +
> + sd_debug("%s, %s", path, d->d_name);
> + oid = strtoull(d->d_name, NULL, 16);
> + if (oid == 0 || oid == ULLONG_MAX)
> + continue;
> +
> + /* don't call callback against temporary objects */
> + if (is_tmp_dentry(d->d_name)) {
> + if (cleanup) {
> + snprintf(file_name, sizeof(file_name),
> + "%s/%s", path, d->d_name);
> + sd_debug("remove tmp object %s", file_name);
> + if (unlink(file_name) < 0)
> + sd_err("failed to unlink %s: %m",
> + file_name);
> + }
> + continue;
> + }
> +
> + if (is_stale_dentry(d->d_name)) {
> + epoch = find_string_integer(d->d_name, ".");
> + if (epoch < 0)
> + continue;
> + }
> +
> + if (is_ec_dentry(d->d_name)) {
> + ec_index = find_string_integer(d->d_name, "_");
> + if (ec_index < 0)
> + continue;
> + }
> +
> + ret = func(oid, path, epoch, ec_index, vinfo, arg);
> + if (ret != SD_RES_SUCCESS)
> + break;
> + }
> + closedir(dir);
> + return ret;
> +}
> +
> +static uint64_t get_path_free_size(const char *path, uint64_t *used)
> +{
> + struct statvfs fs;
> + uint64_t size;
> +
> + if (statvfs(path, &fs) < 0) {
> + sd_err("get disk %s space failed %m", path);
> + return 0;
> + }
> + size = (int64_t)fs.f_frsize * fs.f_bavail;
> +
> + if (!used)
> + goto out;
> + if (for_each_object_in_path(path, get_total_object_size, false,
> + NULL, used)
> + != SD_RES_SUCCESS)
> + return 0;
> +out:
> + return size;
> +}
> +
> +/*
> + * If path is broken during initialization or not support xattr return 0. We can
> + * safely use 0 to represent failure case because 0 space path can be
> + * considered as broken path.
> + */
> +static uint64_t init_path_space(const char *path, bool purge)
> +{
> + uint64_t size;
> + char stale[PATH_MAX];
> +
> + if (!is_xattr_enabled(path)) {
> + sd_warn("multi-disk support need xattr feature for path: %s",
> + path);
> + goto broken_path;
> + }
> +
> + if (purge && purge_directory(path) < 0)
> + sd_err("failed to purge %s", path);
> +
> + snprintf(stale, PATH_MAX, "%s/.stale", path);
> + if (xmkdir(stale, sd_def_dmode) < 0) {
> + sd_err("can't mkdir for %s, %m", stale);
> + goto broken_path;
> + }
> +
> +#define MDNAME "user.md.size"
> +#define MDSIZE sizeof(uint64_t)
> + if (getxattr(path, MDNAME, &size, MDSIZE) < 0) {
> + if (errno == ENODATA) {
> + goto create;
> + } else {
> + sd_err("%s, %m", path);
> + goto broken_path;
> + }
> + }
> +
> + return size;
> +create:
> + size = get_path_free_size(path, NULL);
> + if (!size)
> + goto broken_path;
> + if (setxattr(path, MDNAME, &size, MDSIZE, 0) < 0) {
> + sd_err("%s, %m", path);
> + goto broken_path;
> + }
> + return size;
> +broken_path:
> + return 0;
> +}
> +
> +/* We don't need lock at init stage */
> +bool md_add_disk(const char *path, bool purge)
> +{
> + struct disk *new;
> +
> + if (path_to_disk(path)) {
> + sd_err("duplicate path %s", path);
> + return false;
> + }
> +
> + if (xmkdir(path, sd_def_dmode) < 0) {
> + sd_err("can't mkdir for %s, %m", path);
> + return false;
> + }
> +
> + new = xmalloc(sizeof(*new));
> + pstrcpy(new->path, PATH_MAX, path);
> + trim_last_slash(new->path);
> + new->space = init_path_space(new->path, purge);
> + if (!new->space) {
> + free(new);
> + return false;
> + }
> +
> + create_vdisks(new);
> + rb_insert(&md.root, new, rb, disk_cmp);
> + md.space += new->space;
> + md.nr_disks++;
> +
> + sd_info("%s, vdisk nr %d, total disk %d", new->path, vdisk_number(new),
> + md.nr_disks);
> + return true;
> +}
> +
> +static inline void md_remove_disk(struct disk *disk)
> +{
> + sd_info("%s from multi-disk array", disk->path);
> + rb_erase(&disk->rb, &md.root);
> + md.nr_disks--;
> + remove_vdisks(disk);
> + free(disk);
> +}
> +
> +uint64_t md_init_space(void)
> +{
> + return md.space;
> +}
> +
> +static const char *md_get_object_dir_nolock(uint64_t oid)
> +{
> + const struct vdisk *vd;
> +
> + if (unlikely(md.nr_disks == 0))
> + return NONE_EXIST_PATH; /* To generate EIO */
> +
> + vd = oid_to_vdisk(oid);
> + return vd->disk->path;
> +}
> +
> +const char *md_get_object_dir(uint64_t oid)
> +{
> + const char *p;
> +
> + sd_read_lock(&md.lock);
> + p = md_get_object_dir_nolock(oid);
> + sd_rw_unlock(&md.lock);
> +
> + return p;
> +}
> +
> +struct process_path_arg {
> + const char *path;
> + struct vnode_info *vinfo;
> + int (*func)(uint64_t oid, const char *, uint32_t, uint8_t,
> + struct vnode_info *, void *arg);
> + bool cleanup;
> + void *opaque;
> + int result;
> +};
> +
> +static void *thread_process_path(void *arg)
> +{
> + int ret;
> + struct process_path_arg *parg = (struct process_path_arg *)arg;
> +
> + ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup,
> + parg->vinfo, parg->opaque);
> + if (ret != SD_RES_SUCCESS)
> + parg->result = ret;
> +
> + return arg;
> +}
> +
> +main_fn int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
> + uint32_t epoch, uint8_t ec_index,
> + struct vnode_info *vinfo, void *arg),
> + bool cleanup, void *arg)
> +{
> + int ret = SD_RES_SUCCESS;
> + const struct disk *disk;
> + struct process_path_arg *thread_args, *path_arg;
> + struct vnode_info *vinfo;
> + void *ret_arg;
> + sd_thread_t *thread_array;
> + int nr_thread = 0, idx = 0;
> +
> + sd_read_lock(&md.lock);
> +
> + rb_for_each_entry(disk, &md.root, rb) {
> + nr_thread++;
> + }
> +
> + thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
> + thread_array = xmalloc(nr_thread * sizeof(sd_thread_t));
> +
> + vinfo = get_vnode_info();
> +
> + rb_for_each_entry(disk, &md.root, rb) {
> + thread_args[idx].path = disk->path;
> + thread_args[idx].vinfo = vinfo;
> + thread_args[idx].func = func;
> + thread_args[idx].cleanup = cleanup;
> + thread_args[idx].opaque = arg;
> + thread_args[idx].result = SD_RES_SUCCESS;
> + ret = sd_thread_create_with_idx("foreach wd",
> + thread_array + idx,
> + thread_process_path,
> + (void *)(thread_args + idx));
> + if (ret) {
> + /*
> + * If we can't create enough threads to process
> + * files, the data-consistent will be broken if
> + * we continued.
> + */
> + panic("Failed to create thread for path %s",
> + disk->path);
> + }
> + idx++;
> + }
> +
> + sd_debug("Create %d threads for all path", nr_thread);
> + /* wait for all threads to exit */
> + for (idx = 0; idx < nr_thread; idx++) {
> + ret = sd_thread_join(thread_array[idx], &ret_arg);
> + if (ret)
> + sd_err("Failed to join thread");
> + if (ret_arg) {
> + path_arg = (struct process_path_arg *)ret_arg;
> + if (path_arg->result != SD_RES_SUCCESS)
> + sd_err("%s, %s", path_arg->path,
> + sd_strerror(path_arg->result));
> + }
> + }
> +
> + put_vnode_info(vinfo);
> + sd_rw_unlock(&md.lock);
> +
> + free(thread_args);
> + free(thread_array);
> + return ret;
> +}
> +
> +int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path,
> + uint32_t epoch, uint8_t,
> + struct vnode_info *, void *arg),
> + void *arg)
> +{
> + int ret = SD_RES_SUCCESS;
> + char path[PATH_MAX];
> + const struct disk *disk;
> +
> + sd_read_lock(&md.lock);
> + rb_for_each_entry(disk, &md.root, rb) {
> + snprintf(path, sizeof(path), "%s/.stale", disk->path);
> + ret = for_each_object_in_path(path, func, false, NULL, arg);
> + if (ret != SD_RES_SUCCESS)
> + break;
> + }
> + sd_rw_unlock(&md.lock);
> + return ret;
> +}
> +
> +
> +int for_each_obj_path(int (*func)(const char *path))
> +{
> + int ret = SD_RES_SUCCESS;
> + const struct disk *disk;
> +
> + sd_read_lock(&md.lock);
> + rb_for_each_entry(disk, &md.root, rb) {
> + ret = func(disk->path);
> + if (ret != SD_RES_SUCCESS)
> + break;
> + }
> + sd_rw_unlock(&md.lock);
> + return ret;
> +}
> +
> +struct md_work {
> + struct work work;
> + char path[PATH_MAX];
> +};
> +
> +static inline void kick_recover(void)
> +{
> + struct vnode_info *vinfo = get_vnode_info();
> +
> + if (is_cluster_diskmode(&sys->cinfo))
> + sys->cdrv->update_node(&sys->this_node);
> + else {
> + start_recovery(vinfo, vinfo, false);
> + put_vnode_info(vinfo);
> + }
> +}
> +
> +static void md_do_recover(struct work *work)
> +{
> + struct md_work *mw = container_of(work, struct md_work, work);
> + struct disk *disk;
> + int nr = 0;
> +
> + sd_write_lock(&md.lock);
> + disk = path_to_disk(mw->path);
> + if (!disk)
> + /* Just ignore the duplicate EIO of the same path */
> + goto out;
> + md_remove_disk(disk);
> + nr = md.nr_disks;
> +out:
> + sd_rw_unlock(&md.lock);
> +
> + if (disk) {
> + if (nr > 0) {
> + update_node_disks();
> + kick_recover();
> + } else {
> + leave_cluster();
> + }
> + }
> +
> + free(mw);
> +}
> +
> +int md_handle_eio(const char *fault_path)
> +{
> + struct md_work *mw;
> +
> + if (nr_online_disks() == 0)
> + return SD_RES_EIO;
> +
> + mw = xzalloc(sizeof(*mw));
> + mw->work.done = md_do_recover;
> + pstrcpy(mw->path, PATH_MAX, fault_path);
> + queue_work(sys->md_wqueue, &mw->work);
> +
> + /* Fool the requester to retry */
> + return SD_RES_NETWORK_ERROR;
> +}
> +
> +static inline bool md_access(const char *path)
> +{
> + if (access(path, R_OK | W_OK) < 0) {
> + if (unlikely(errno != ENOENT))
> + sd_err("failed to check %s, %m", path);
> + return false;
> + }
> +
> + return true;
> +}
> +
> +static int get_old_new_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
> + const char *path, char *old, char *new)
> +{
> + if (!epoch) {
> + if (!is_erasure_oid(oid)) {
> + snprintf(old, PATH_MAX, "%s/%016" PRIx64, path, oid);
> + snprintf(new, PATH_MAX, "%s/%016" PRIx64,
> + md_get_object_dir_nolock(oid), oid);
> + } else {
> + snprintf(old, PATH_MAX, "%s/%016" PRIx64"_%d", path,
> + oid, ec_index);
> + snprintf(new, PATH_MAX, "%s/%016" PRIx64"_%d",
> + md_get_object_dir_nolock(oid), oid, ec_index);
> + }
> + } else {
> + if (!is_erasure_oid(oid)) {
> + snprintf(old, PATH_MAX,
> + "%s/.stale/%016"PRIx64".%"PRIu32, path,
> + oid, epoch);
> + snprintf(new, PATH_MAX,
> + "%s/.stale/%016"PRIx64".%"PRIu32,
> + md_get_object_dir_nolock(oid), oid, epoch);
> + } else {
> + snprintf(old, PATH_MAX,
> + "%s/.stale/%016"PRIx64"_%d.%"PRIu32, path,
> + oid, ec_index, epoch);
> + snprintf(new, PATH_MAX,
> + "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
> + md_get_object_dir_nolock(oid),
> + oid, ec_index, epoch);
> + }
> + }
> +
> + if (!md_access(old))
> + return -1;
> +
> + return 0;
> +}
> +
> +static int md_move_object(uint64_t oid, const char *old, const char *new)
> +{
> + struct strbuf buf = STRBUF_INIT;
> + int fd, ret = -1;
> + size_t sz = get_store_objsize(oid);
> +
> + fd = open(old, O_RDONLY);
> + if (fd < 0) {
> + sd_err("failed to open %s", old);
> + goto out;
> + }
> +
> + ret = strbuf_read(&buf, fd, sz);
> + if (ret != sz) {
> + sd_err("failed to read %s, size %zu, %d, %m", old, sz, ret);
> + ret = -1;
> + goto out_close;
> + }
> +
> + if (atomic_create_and_write(new, buf.buf, buf.len, false) < 0) {
> + if (errno != EEXIST) {
> + sd_err("failed to create %s", new);
> + ret = -1;
> + goto out_close;
> + }
> + }
> + unlink(old);
> + ret = 0;
> +out_close:
> + close(fd);
> +out:
> + strbuf_release(&buf);
> + return ret;
> +}
> +
> +static int md_check_and_move(uint64_t oid, uint32_t epoch, uint8_t ec_index,
> + const char *path)
> +{
> + char old[PATH_MAX], new[PATH_MAX];
> +
> + if (get_old_new_path(oid, epoch, ec_index, path, old, new) < 0)
> + return SD_RES_EIO;
> + /*
> + * Recovery thread and main thread might try to recover the same object.
> + * Either one succeeds, the other will fail and proceed and end up
> + * trying to move the object to where it is already in place, in this
> + * case we simply return.
> + */
> + if (!strcmp(old, new))
> + return SD_RES_SUCCESS;
> +
> + /* We can't use rename(2) across device */
> + if (md_move_object(oid, old, new) < 0) {
> + sd_err("move old %s to new %s failed", old, new);
> + return SD_RES_EIO;
> + }
> +
> + sd_debug("from %s to %s", old, new);
> + return SD_RES_SUCCESS;
> +}
> +
> +static int scan_wd(uint64_t oid, uint32_t epoch, uint8_t ec_index)
> +{
> + int ret = SD_RES_EIO;
> + const struct disk *disk;
> +
> + sd_read_lock(&md.lock);
> + rb_for_each_entry(disk, &md.root, rb) {
> + ret = md_check_and_move(oid, epoch, ec_index, disk->path);
> + if (ret == SD_RES_SUCCESS)
> + break;
> + }
> + sd_rw_unlock(&md.lock);
> + return ret;
> +}
> +
> +bool md_exist(uint64_t oid, uint8_t ec_index, char *path)
> +{
> + if (md_access(path))
> + return true;
> + /*
> + * We have to iterate the WD because we don't have epoch-like history
> + * track to locate the objects for multiple disk failure. Simply do
> + * hard iteration simplify the code a lot.
> + */
> + if (scan_wd(oid, 0, ec_index) == SD_RES_SUCCESS)
> + return true;
> +
> + return false;
> +}
> +
> +int md_get_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
> + char *path)
> +{
> + if (unlikely(!epoch))
> + panic("invalid 0 epoch");
> +
> + if (is_erasure_oid(oid)) {
> + if (unlikely(ec_index >= SD_MAX_COPIES))
> + panic("invalid ec index %d", ec_index);
> +
> + snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
> + md_get_object_dir(oid), oid, ec_index, epoch);
> + } else
> + snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
> + md_get_object_dir(oid), oid, epoch);
> +
> + if (md_access(path))
> + return SD_RES_SUCCESS;
> +
> + if (scan_wd(oid, epoch, ec_index) == SD_RES_SUCCESS)
> + return SD_RES_SUCCESS;
> +
> + return SD_RES_NO_OBJ;
> +}
> +
> +uint32_t md_get_info(struct sd_md_info *info)
> +{
> + uint32_t ret = sizeof(*info);
> + const struct disk *disk;
> + int i = 0;
> +
> + memset(info, 0, ret);
> + sd_read_lock(&md.lock);
> + rb_for_each_entry(disk, &md.root, rb) {
> + info->disk[i].idx = i;
> + pstrcpy(info->disk[i].path, PATH_MAX, disk->path);
> + /* FIXME: better handling failure case. */
> + info->disk[i].free = get_path_free_size(info->disk[i].path,
> + &info->disk[i].used);
> + i++;
> + }
> + info->nr = md.nr_disks;
> + sd_rw_unlock(&md.lock);
> + return ret;
> +}
> +
> +static inline void md_del_disk(const char *path)
> +{
> + struct disk *disk = path_to_disk(path);
> +
> + if (!disk) {
> + sd_err("invalid path %s", path);
> + return;
> + }
> + md_remove_disk(disk);
> +}
> +
> +#ifdef HAVE_DISKVNODES
> +void update_node_disks(void)
> +{
> + const struct disk *disk;
> + int i = 0;
> + bool rb_empty = false;
> +
> + if (!sys)
> + return;
> +
> + memset(sys->this_node.disks, 0, sizeof(struct disk_info) * DISK_MAX);
> + sd_read_lock(&md.lock);
> + rb_for_each_entry(disk, &md.root, rb) {
> + sys->this_node.disks[i].disk_id =
> + sd_hash(disk->path, strlen(disk->path));
> + sys->this_node.disks[i].disk_space = disk->space;
> + i++;
> + }
> + sd_rw_unlock(&md.lock);
> +
> + if (RB_EMPTY_ROOT(&md.vroot))
> + rb_empty = true;
> + sd_write_lock(&md.lock);
> + rb_for_each_entry(disk, &md.root, rb) {
> + if (!rb_empty)
> + remove_vdisks(disk);
> + create_vdisks(disk);
> + }
> + sd_rw_unlock(&md.lock);
> +}
> +#else
> +void update_node_disks(void)
> +{
> +}
> +#endif
> +
> +static int do_plug_unplug(char *disks, bool plug)
> +{
> + const char *path;
> + int old_nr, ret = SD_RES_UNKNOWN;
> +
> + sd_write_lock(&md.lock);
> + old_nr = md.nr_disks;
> + path = strtok(disks, ",");
> + do {
> + if (plug) {
> + if (!md_add_disk(path, true))
> + sd_err("failed to add %s", path);
> + } else {
> + md_del_disk(path);
> + }
> + } while ((path = strtok(NULL, ",")));
> +
> + /* If no disks change, bail out */
> + if (old_nr == md.nr_disks)
> + goto out;
> +
> + ret = SD_RES_SUCCESS;
> +out:
> + sd_rw_unlock(&md.lock);
> +
> + if (ret == SD_RES_SUCCESS) {
> + update_node_disks();
> + kick_recover();
> + }
> +
> + return ret;
> +}
> +
> +int md_plug_disks(char *disks)
> +{
> + return do_plug_unplug(disks, true);
> +}
> +
> +int md_unplug_disks(char *disks)
> +{
> + return do_plug_unplug(disks, false);
> +}
> +
> +uint64_t md_get_size(uint64_t *used)
> +{
> + uint64_t fsize = 0;
> + const struct disk *disk;
> +
> + *used = 0;
> + sd_read_lock(&md.lock);
> + rb_for_each_entry(disk, &md.root, rb) {
> + fsize += get_path_free_size(disk->path, used);
> + }
> + sd_rw_unlock(&md.lock);
> +
> + return fsize + *used;
> +}
> +
> +uint32_t md_nr_disks(void)
> +{
> + return nr_online_disks();
> +}
> diff --git a/sheep/store/plain_store.c b/sheep/store/plain_store.c
> new file mode 100644
> index 0000000..4c19832
> --- /dev/null
> +++ b/sheep/store/plain_store.c
> @@ -0,0 +1,759 @@
> +/*
> + * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#include <libgen.h>
> +#include <linux/falloc.h>
> +
> +#include "sheep_priv.h"
> +
> +#ifndef FALLOC_FL_PUNCH_HOLE
> +#define FALLOC_FL_PUNCH_HOLE 0x02
> +#endif
> +
> +#define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; })
> +
> +static inline bool iocb_is_aligned(const struct siocb *iocb)
> +{
> + return sector_algined(iocb->offset) && sector_algined(iocb->length);
> +}
> +
> +static int prepare_iocb(uint64_t oid, const struct siocb *iocb, bool create)
> +{
> + int syncflag = create ? O_SYNC : O_DSYNC;
> + int flags = syncflag | O_RDWR;
> +
> + if (uatomic_is_true(&sys->use_journal) || sys->nosync == true)
> + flags &= ~syncflag;
> +
> + if (sys->backend_dio && iocb_is_aligned(iocb)) {
> + if (!is_aligned_to_pagesize(iocb->buf))
> + panic("Memory isn't aligned to pagesize %p", iocb->buf);
> + flags |= O_DIRECT;
> + }
> +
> + if (create)
> + flags |= O_CREAT | O_EXCL;
> +
> + return flags;
> +}
> +
> +static int get_store_path(uint64_t oid, uint8_t ec_index, char *path)
> +{
> + if (is_erasure_oid(oid)) {
> + if (unlikely(ec_index >= SD_MAX_COPIES))
> + panic("invalid ec_index %d", ec_index);
> + return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
> + md_get_object_dir(oid), oid, ec_index);
> + }
> +
> + return snprintf(path, PATH_MAX, "%s/%016" PRIx64,
> + md_get_object_dir(oid), oid);
> +}
> +
> +static int get_store_tmp_path(uint64_t oid, uint8_t ec_index, char *path)
> +{
> + if (is_erasure_oid(oid)) {
> + if (unlikely(ec_index >= SD_MAX_COPIES))
> + panic("invalid ec_index %d", ec_index);
> + return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d.tmp",
> + md_get_object_dir(oid), oid, ec_index);
> + }
> +
> + return snprintf(path, PATH_MAX, "%s/%016" PRIx64".tmp",
> + md_get_object_dir(oid), oid);
> +}
> +
> +static int get_store_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
> + char *path)
> +{
> + return md_get_stale_path(oid, epoch, ec_index, path);
> +}
> +
> +/*
> + * Check if oid is in this nodes (if oid is in the wrong place, it will be moved
> + * to the correct one after this call in a MD setup.
> + */
> +bool default_exist(uint64_t oid, uint8_t ec_index)
> +{
> + char path[PATH_MAX];
> +
> + get_store_path(oid, ec_index, path);
> +
> + return md_exist(oid, ec_index, path);
> +}
> +
> +static int err_to_sderr(const char *path, uint64_t oid, int err)
> +{
> + struct stat s;
> + char p[PATH_MAX], *dir;
> +
> + /* Use a temporary buffer since dirname() may modify its argument. */
> + pstrcpy(p, sizeof(p), path);
> + dir = dirname(p);
> +
> + sd_debug("%s", path);
> + switch (err) {
> + case ENOENT:
> + if (stat(dir, &s) < 0) {
> + sd_err("%s corrupted", dir);
> + return md_handle_eio(dir);
> + }
> + sd_debug("object %016" PRIx64 " not found locally", oid);
> + return SD_RES_NO_OBJ;
> + case ENOSPC:
> + /* TODO: stop automatic recovery */
> + sd_err("diskfull, oid=%"PRIx64, oid);
> + return SD_RES_NO_SPACE;
> + case EMFILE:
> + case ENFILE:
> + case EINTR:
> + case EAGAIN:
> + case EEXIST:
> + sd_err("%m, oid=%"PRIx64, oid);
> + /* make gateway try again */
> + return SD_RES_NETWORK_ERROR;
> + default:
> + sd_err("oid=%"PRIx64", %m", oid);
> + return md_handle_eio(dir);
> + }
> +}
> +
> +static int discard(int fd, uint64_t start, uint32_t end)
> +{
> + int ret = xfallocate(fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
> + start, end - start);
> + if (ret < 0) {
> + if (errno == ENOSYS || errno == EOPNOTSUPP)
> + sd_info("FALLOC_FL_PUNCH_HOLE is not supported "
> + "on this filesystem");
> + else
> + sd_err("failed to discard object, %m");
> + }
> +
> + return ret;
> +}
> +
> +/* Trim zero blocks of the beginning and end of the object. */
> +static int default_trim(int fd, uint64_t oid, const struct siocb *iocb,
> + uint64_t *poffset, uint32_t *plen)
> +{
> + trim_zero_blocks(iocb->buf, poffset, plen);
> +
> + if (iocb->offset < *poffset) {
> + sd_debug("discard between %d, %ld, %" PRIx64, iocb->offset,
> + *poffset, oid);
> +
> + if (discard(fd, iocb->offset, *poffset) < 0)
> + return -1;
> + }
> +
> + if (*poffset + *plen < iocb->offset + iocb->length) {
> + uint64_t end = iocb->offset + iocb->length;
> + uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
> + if (end == get_objsize(oid, object_size))
> + /* This is necessary to punch the last block */
> + end = round_up(end, BLOCK_SIZE);
> + sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen,
> + end, oid);
> +
> + if (discard(fd, *poffset + *plen, end) < 0)
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +int default_write(uint64_t oid, const struct siocb *iocb)
> +{
> + int flags = prepare_iocb(oid, iocb, false), fd,
> + ret = SD_RES_SUCCESS;
> + char path[PATH_MAX];
> + ssize_t size;
> + uint32_t len = iocb->length;
> + uint64_t offset = iocb->offset;
> + static bool trim_is_supported = true;
> +
> + if (iocb->epoch < sys_epoch()) {
> + sd_debug("%"PRIu32" sys %"PRIu32, iocb->epoch, sys_epoch());
> + return SD_RES_OLD_NODE_VER;
> + }
> +
> + if (uatomic_is_true(&sys->use_journal) &&
> + unlikely(journal_write_store(oid, iocb->buf, iocb->length,
> + iocb->offset, false))
> + != SD_RES_SUCCESS) {
> + sd_err("turn off journaling");
> + uatomic_set_false(&sys->use_journal);
> + flags |= O_DSYNC;
> + sync();
> + }
> +
> + get_store_path(oid, iocb->ec_index, path);
> +
> + /*
> + * Make sure oid is in the right place because oid might be misplaced
> + * in a wrong place, due to 'shutdown/restart with less/more disks' or
> + * any bugs. We need call err_to_sderr() to return EIO if disk is broken
> + */
> + if (!default_exist(oid, iocb->ec_index))
> + return err_to_sderr(path, oid, ENOENT);
> +
> + fd = open(path, flags, sd_def_fmode);
> + if (unlikely(fd < 0))
> + return err_to_sderr(path, oid, errno);
> +
> + if (trim_is_supported && is_sparse_object(oid)) {
> + if (default_trim(fd, oid, iocb, &offset, &len) < 0) {
> + trim_is_supported = false;
> + offset = iocb->offset;
> + len = iocb->length;
> + }
> + }
> +
> + size = xpwrite(fd, iocb->buf, len, offset);
> + if (unlikely(size != len)) {
> + sd_err("failed to write object %"PRIx64", path=%s, offset=%"
> + PRId32", size=%"PRId32", result=%zd, %m", oid, path,
> + iocb->offset, iocb->length, size);
> + ret = err_to_sderr(path, oid, errno);
> + goto out;
> + }
> +out:
> + close(fd);
> + return ret;
> +}
> +
> +static int make_stale_dir(const char *path)
> +{
> + char p[PATH_MAX];
> +
> + snprintf(p, PATH_MAX, "%s/.stale", path);
> + if (xmkdir(p, sd_def_dmode) < 0) {
> + sd_err("%s failed, %m", p);
> + return SD_RES_EIO;
> + }
> + return SD_RES_SUCCESS;
> +}
> +
> +static int purge_dir(const char *path)
> +{
> + if (purge_directory(path) < 0)
> + return SD_RES_EIO;
> +
> + return SD_RES_SUCCESS;
> +}
> +
> +static int purge_stale_dir(const char *path)
> +{
> + char p[PATH_MAX];
> +
> + snprintf(p, PATH_MAX, "%s/.stale", path);
> +
> + if (purge_directory_async(p) < 0)
> + return SD_RES_EIO;
> +
> + return SD_RES_SUCCESS;
> +}
> +
> +int default_cleanup(void)
> +{
> + int ret;
> +
> + ret = for_each_obj_path(purge_stale_dir);
> + if (ret != SD_RES_SUCCESS)
> + return ret;
> +
> + return SD_RES_SUCCESS;
> +}
> +
> +static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch)
> +{
> + int ret;
> + struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE);
> + struct siocb iocb = {
> + .epoch = epoch,
> + .buf = inode,
> + .length = SD_INODE_HEADER_SIZE,
> + };
> +
> + ret = default_read(oid, &iocb);
> + if (ret != SD_RES_SUCCESS) {
> + sd_err("failed to read inode header %" PRIx64 " %" PRId32
> + "wat %s", oid, epoch, wd);
> + goto out;
> + }
> + add_vdi_state_unordered(oid_to_vid(oid), inode->nr_copies,
> + vdi_is_snapshot(inode), inode->copy_policy,
> + inode->block_size_shift, inode->parent_vdi_id);
> +
> + if (inode->name[0] == '\0')
> + atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted);
> +
> + atomic_set_bit(oid_to_vid(oid), sys->vdi_inuse);
> +
> + ret = SD_RES_SUCCESS;
> +out:
> + free(inode);
> + return ret;
> +}
> +
> +static int init_objlist_and_vdi_bitmap(uint64_t oid, const char *wd,
> + uint32_t epoch, uint8_t ec_index,
> + struct vnode_info *vinfo,
> + void *arg)
> +{
> + int ret;
> + objlist_cache_insert(oid);
> +
> + if (is_vdi_obj(oid)) {
> + sd_debug("found the VDI object %" PRIx64" epoch %"PRIu32
> + " at %s", oid, epoch, wd);
> + ret = init_vdi_state(oid, wd, epoch);
> + if (ret != SD_RES_SUCCESS)
> + return ret;
> + }
> + return SD_RES_SUCCESS;
> +}
> +
> +int default_init(void)
> +{
> + int ret;
> +
> + sd_debug("use plain store driver");
> + ret = for_each_obj_path(make_stale_dir);
> + if (ret != SD_RES_SUCCESS)
> + return ret;
> +
> + for_each_object_in_stale(init_objlist_and_vdi_bitmap, NULL);
> +
> + return for_each_object_in_wd(init_objlist_and_vdi_bitmap, true, NULL);
> +}
> +
> +static int default_read_from_path(uint64_t oid, const char *path,
> + const struct siocb *iocb)
> +{
> + int flags = prepare_iocb(oid, iocb, false), fd,
> + ret = SD_RES_SUCCESS;
> + ssize_t size;
> +
> + /*
> + * Make sure oid is in the right place because oid might be misplaced
> + * in a wrong place, due to 'shutdown/restart with less disks' or any
> + * bugs. We need call err_to_sderr() to return EIO if disk is broken.
> + *
> + * For stale path, get_store_stale_path already does default_exist job.
> + */
> + if (!is_stale_path(path) && !default_exist(oid, iocb->ec_index))
> + return err_to_sderr(path, oid, ENOENT);
> +
> + fd = open(path, flags);
> + if (fd < 0)
> + return err_to_sderr(path, oid, errno);
> +
> + size = xpread(fd, iocb->buf, iocb->length, iocb->offset);
> + if (size < 0) {
> + sd_err("failed to read object %"PRIx64", path=%s, offset=%"
> + PRId32", size=%"PRId32", result=%zd, %m", oid, path,
> + iocb->offset, iocb->length, size);
> + ret = err_to_sderr(path, oid, errno);
> + }
> + close(fd);
> + return ret;
> +}
> +
> +int default_read(uint64_t oid, const struct siocb *iocb)
> +{
> + int ret;
> + char path[PATH_MAX];
> +
> + get_store_path(oid, iocb->ec_index, path);
> + ret = default_read_from_path(oid, path, iocb);
> +
> + /*
> + * If the request is against the older epoch, try to read from
> + * the stale directory
> + */
> + if (ret == SD_RES_NO_OBJ && iocb->epoch > 0 &&
> + iocb->epoch < sys_epoch()) {
> + get_store_stale_path(oid, iocb->epoch, iocb->ec_index, path);
> + ret = default_read_from_path(oid, path, iocb);
> + }
> +
> + return ret;
> +}
> +
> +int default_create_and_write(uint64_t oid, const struct siocb *iocb)
> +{
> + char path[PATH_MAX], tmp_path[PATH_MAX], *dir;
> + int flags = prepare_iocb(oid, iocb, true);
> + int ret, fd;
> + uint32_t len = iocb->length;
> + uint32_t object_size = 0;
> + size_t obj_size;
> + uint64_t offset = iocb->offset;
> +
> + sd_debug("%"PRIx64, oid);
> + get_store_path(oid, iocb->ec_index, path);
> + get_store_tmp_path(oid, iocb->ec_index, tmp_path);
> +
> + if (uatomic_is_true(&sys->use_journal) &&
> + journal_write_store(oid, iocb->buf, iocb->length,
> + iocb->offset, true)
> + != SD_RES_SUCCESS) {
> + sd_err("turn off journaling");
> + uatomic_set_false(&sys->use_journal);
> + flags |= O_SYNC;
> + sync();
> + }
> +
> + fd = open(tmp_path, flags, sd_def_fmode);
> + if (fd < 0) {
> + if (errno == EEXIST) {
> + /*
> + * This happens if node membership changes during object
> + * creation; while gateway retries a CREATE request,
> + * recovery process could also recover the object at the
> + * same time. They should try to write the same date,
> + * so it is okay to simply return success here.
> + */
> + sd_debug("%s exists", tmp_path);
> + return SD_RES_SUCCESS;
> + }
> +
> + sd_err("failed to open %s: %m", tmp_path);
> + return err_to_sderr(path, oid, errno);
> + }
> +
> + obj_size = get_store_objsize(oid);
> +
> + trim_zero_blocks(iocb->buf, &offset, &len);
> +
> + object_size = get_vdi_object_size(oid_to_vid(oid));
> +
> + if (offset != 0 || len != get_objsize(oid, object_size)) {
> + if (is_sparse_object(oid))
> + ret = xftruncate(fd, obj_size);
> + else
> + ret = prealloc(fd, obj_size);
> + if (ret < 0) {
> + ret = err_to_sderr(path, oid, errno);
> + goto out;
> + }
> + }
> +
> + ret = xpwrite(fd, iocb->buf, len, offset);
> + if (ret != len) {
> + sd_err("failed to write object. %m");
> + ret = err_to_sderr(path, oid, errno);
> + goto out;
> + }
> +
> + ret = rename(tmp_path, path);
> + if (ret < 0) {
> + sd_err("failed to rename %s to %s: %m", tmp_path, path);
> + ret = err_to_sderr(path, oid, errno);
> + goto out;
> + }
> +
> + close(fd);
> +
> + if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) {
> + objlist_cache_insert(oid);
> + return SD_RES_SUCCESS;
> + }
> +
> + pstrcpy(tmp_path, sizeof(tmp_path), path);
> + dir = dirname(tmp_path);
> + fd = open(dir, O_DIRECTORY | O_RDONLY);
> + if (fd < 0) {
> + sd_err("failed to open directory %s: %m", dir);
> + return err_to_sderr(path, oid, errno);
> + }
> +
> + if (fsync(fd) != 0) {
> + sd_err("failed to write directory %s: %m", dir);
> + ret = err_to_sderr(path, oid, errno);
> + close(fd);
> + if (unlink(path) != 0)
> + sd_err("failed to unlink %s: %m", path);
> + return ret;
> + }
> + close(fd);
> + objlist_cache_insert(oid);
> + return SD_RES_SUCCESS;
> +
> +out:
> + if (unlink(tmp_path) != 0)
> + sd_err("failed to unlink %s: %m", tmp_path);
> + close(fd);
> + return ret;
> +}
> +
> +int default_link(uint64_t oid, uint32_t tgt_epoch)
> +{
> + char path[PATH_MAX], stale_path[PATH_MAX];
> +
> + sd_debug("try link %"PRIx64" from snapshot with epoch %d", oid,
> + tgt_epoch);
> +
> + snprintf(path, PATH_MAX, "%s/%016"PRIx64, md_get_object_dir(oid), oid);
> + get_store_stale_path(oid, tgt_epoch, 0, stale_path);
> +
> + if (link(stale_path, path) < 0) {
> + /*
> + * Recovery thread and main thread might try to recover the
> + * same object and we might get EEXIST in such case.
> + */
> + if (errno == EEXIST)
> + goto out;
> +
> + sd_debug("failed to link from %s to %s, %m", stale_path, path);
> + return err_to_sderr(path, oid, errno);
> + }
> +out:
> + return SD_RES_SUCCESS;
> +}
> +
> +/*
> + * For replicated object, if any of the replica belongs to this node, we
> + * consider it not stale.
> + *
> + * For erasure coded object, since every copy is unique and if it migrates to
> + * other node(index gets changed even it has some other copy belongs to it)
> + * because of hash ring changes, we consider it stale.
> + */
> +static bool oid_stale(uint64_t oid, int ec_index, struct vnode_info *vinfo)
> +{
> + uint32_t i, nr_copies;
> + const struct sd_vnode *v;
> + bool ret = true;
> + const struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
> +
> + nr_copies = get_obj_copy_number(oid, vinfo->nr_zones);
> + oid_to_vnodes(oid, &vinfo->vroot, nr_copies, obj_vnodes);
> + for (i = 0; i < nr_copies; i++) {
> + v = obj_vnodes[i];
> + if (vnode_is_local(v)) {
> + if (ec_index < SD_MAX_COPIES) {
> + if (i == ec_index)
> + ret = false;
> + } else {
> + ret = false;
> + }
> + break;
> + }
> + }
> +
> + return ret;
> +}
> +
> +static int move_object_to_stale_dir(uint64_t oid, const char *wd,
> + uint32_t epoch, uint8_t ec_index,
> + struct vnode_info *vinfo, void *arg)
> +{
> + char path[PATH_MAX], stale_path[PATH_MAX];
> + uint32_t tgt_epoch = *(uint32_t *)arg;
> +
> + /* ec_index from md.c is reliable so we can directly use it */
> + if (ec_index < SD_MAX_COPIES) {
> + snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
> + md_get_object_dir(oid), oid, ec_index);
> + snprintf(stale_path, PATH_MAX,
> + "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
> + md_get_object_dir(oid), oid, ec_index, tgt_epoch);
> + } else {
> + snprintf(path, PATH_MAX, "%s/%016" PRIx64,
> + md_get_object_dir(oid), oid);
> + snprintf(stale_path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
> + md_get_object_dir(oid), oid, tgt_epoch);
> + }
> +
> + if (unlikely(rename(path, stale_path)) < 0) {
> + sd_err("failed to move stale object %" PRIX64 " to %s, %m", oid,
> + path);
> + return SD_RES_EIO;
> + }
> +
> + sd_debug("moved object %"PRIx64, oid);
> + return SD_RES_SUCCESS;
> +}
> +
> +static int check_stale_objects(uint64_t oid, const char *wd, uint32_t epoch,
> + uint8_t ec_index, struct vnode_info *vinfo,
> + void *arg)
> +{
> + if (oid_stale(oid, ec_index, vinfo))
> + return move_object_to_stale_dir(oid, wd, 0, ec_index,
> + NULL, arg);
> +
> + return SD_RES_SUCCESS;
> +}
> +
> +int default_update_epoch(uint32_t epoch)
> +{
> + assert(epoch);
> + return for_each_object_in_wd(check_stale_objects, false, &epoch);
> +}
> +
> +int default_format(void)
> +{
> + unsigned ret;
> +
> + sd_debug("try get a clean store");
> + ret = for_each_obj_path(purge_dir);
> + if (ret != SD_RES_SUCCESS)
> + return ret;
> +
> + if (sys->enable_object_cache)
> + object_cache_format();
> +
> + return SD_RES_SUCCESS;
> +}
> +
> +int default_remove_object(uint64_t oid, uint8_t ec_index)
> +{
> + char path[PATH_MAX];
> +
> + if (uatomic_is_true(&sys->use_journal))
> + journal_remove_object(oid);
> +
> + get_store_path(oid, ec_index, path);
> +
> + if (unlink(path) < 0) {
> + if (errno == ENOENT)
> + return SD_RES_NO_OBJ;
> +
> + sd_err("failed, %s, %m", path);
> + return SD_RES_EIO;
> + }
> +
> + return SD_RES_SUCCESS;
> +}
> +
> +#define SHA1NAME "user.obj.sha1"
> +
> +static int get_object_sha1(const char *path, uint8_t *sha1)
> +{
> + if (getxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE)
> + != SHA1_DIGEST_SIZE) {
> + if (errno == ENODATA)
> + sd_debug("sha1 is not cached yet, %s", path);
> + else
> + sd_err("fail to get xattr, %s", path);
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +static int set_object_sha1(const char *path, const uint8_t *sha1)
> +{
> + int ret;
> +
> + ret = setxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE, 0);
> + if (ret < 0)
> + sd_err("fail to set sha1, %s", path);
> +
> + return ret;
> +}
> +
> +static int get_object_path(uint64_t oid, uint32_t epoch, char *path,
> + size_t size)
> +{
> + if (default_exist(oid, 0)) {
> + snprintf(path, PATH_MAX, "%s/%016"PRIx64,
> + md_get_object_dir(oid), oid);
> + } else {
> + get_store_stale_path(oid, epoch, 0, path);
> + if (access(path, F_OK) < 0) {
> + if (errno == ENOENT)
> + return SD_RES_NO_OBJ;
> + return SD_RES_EIO;
> + }
> +
> + }
> +
> + return SD_RES_SUCCESS;
> +}
> +
> +int default_get_hash(uint64_t oid, uint32_t epoch, uint8_t *sha1)
> +{
> + int ret;
> + void *buf;
> + struct siocb iocb = {};
> + uint32_t length;
> + bool is_readonly_obj = oid_is_readonly(oid);
> + char path[PATH_MAX];
> +
> + ret = get_object_path(oid, epoch, path, sizeof(path));
> + if (ret != SD_RES_SUCCESS)
> + return ret;
> +
> + if (is_readonly_obj) {
> + if (get_object_sha1(path, sha1) == 0) {
> + sd_debug("use cached sha1 digest %s",
> + sha1_to_hex(sha1));
> + return SD_RES_SUCCESS;
> + }
> + }
> +
> + length = get_store_objsize(oid);
> + buf = valloc(length);
> + if (buf == NULL)
> + return SD_RES_NO_MEM;
> +
> + iocb.epoch = epoch;
> + iocb.buf = buf;
> + iocb.length = length;
> +
> + ret = default_read_from_path(oid, path, &iocb);
> + if (ret != SD_RES_SUCCESS) {
> + free(buf);
> + return ret;
> + }
> +
> + get_buffer_sha1(buf, length, sha1);
> + free(buf);
> +
> + sd_debug("the message digest of %"PRIx64" at epoch %d is %s", oid,
> + epoch, sha1_to_hex(sha1));
> +
> + if (is_readonly_obj)
> + set_object_sha1(path, sha1);
> +
> + return ret;
> +}
> +
> +int default_purge_obj(void)
> +{
> + uint32_t tgt_epoch = get_latest_epoch();
> +
> + return for_each_object_in_wd(move_object_to_stale_dir, true,
> + &tgt_epoch);
> +}
> +
> +static struct store_driver plain_store = {
> + .name = "plain",
> + .init = default_init,
> + .exist = default_exist,
> + .create_and_write = default_create_and_write,
> + .write = default_write,
> + .read = default_read,
> + .link = default_link,
> + .update_epoch = default_update_epoch,
> + .cleanup = default_cleanup,
> + .format = default_format,
> + .remove_object = default_remove_object,
> + .get_hash = default_get_hash,
> + .purge_obj = default_purge_obj,
> +};
> +
> +add_store_driver(plain_store);
> --
> 1.7.1
>
>
>
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> https://lists.wpkg.org/mailman/listinfo/sheepdog
More information about the sheepdog
mailing list