[Sheepdog] [PATCH v2 2/2] sheep: abstract out store IO interface
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Thu Nov 17 05:31:19 CET 2011
At Wed, 16 Nov 2011 20:59:20 +0800,
Liu Yuan wrote:
>
> From: Liu Yuan <tailai.ly at taobao.com>
>
> We need to abstract out store IO interface to adopt it to other IO store
> such as the coming 'Farm' store.
>
> The open/read/write/close is cumbersome for a common kv-store to work with,
> but this interface request smallest changes to current sheep store code. It
> sucks but works as a kludge.
>
> We can refine it later to more common get/put/delete interface.
>
> Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
> ---
> sheep/sheep_priv.h | 11 +++
> sheep/store.c | 216 +++++++++++++++++++++++++++++++++------------------
> 2 files changed, 151 insertions(+), 76 deletions(-)
>
> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> index ee4deb4..56b9a99 100644
> --- a/sheep/sheep_priv.h
> +++ b/sheep/sheep_priv.h
> @@ -156,6 +156,17 @@ struct cluster_info {
> struct work_queue *recovery_wqueue;
> };
>
> +struct store_driver {
> + const char *driver_name;
> + int (*init)(char *path);
> + int (*open)(uint64_t oid, void *priv, int create);
Looks a bit strange. If we get private data from this function,
should 'void *priv' be 'void **priv'?
> + ssize_t (*write)(uint64_t oid, void *priv, void *buf, int len, off_t offset);
> + ssize_t (*read)(uint64_t oid, void *priv, void *buf, int len, off_t offset);
> + int (*close)(uint64_t oid, void *priv);
> +};
> +
> +extern void register_store_driver(struct store_driver *);
> +
> extern struct cluster_info *sys;
>
> int create_listen_port(int port, void *data);
> diff --git a/sheep/store.c b/sheep/store.c
> index 3a04c34..a6d865f 100644
> --- a/sheep/store.c
> +++ b/sheep/store.c
> @@ -24,6 +24,8 @@
> #include <time.h>
>
> #include "sheep_priv.h"
> +#include "strbuf.h"
> +#include "util.h"
>
> struct sheepdog_config {
> uint64_t ctime;
> @@ -41,6 +43,92 @@ static char *config_path;
> static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;
> static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
>
> +static int def_store_flags = O_DSYNC | O_RDWR;
> +
> +static int default_store_init(char *path)
> +{
> + eprintf("Use default store driver\n");
> + return 0;
> +}
> +
> +static int default_store_open(uint64_t oid, void *priv, int create)
> +{
> + struct strbuf path = STRBUF_INIT;
> + int ret;
> + int flags = def_store_flags;
> + int epoch = *(int *)priv;
> +
> + if (sys->use_directio && is_data_obj(oid))
> + flags |= O_DIRECT;
> +
> + if (create)
> + flags |= O_CREAT | O_TRUNC;
> +
> + strbuf_addf(&path, "%s%08u/%016" PRIx64, obj_path, epoch, oid);
> +
> + ret = open(path.buf, flags, def_fmode);
> + if (ret < 0) {
> + eprintf("failed to open %s: %m\n", path.buf);
> + if (errno == ENOENT) {
> + struct stat s;
> +
> + ret = SD_RES_NO_OBJ;
> + if (stat(obj_path, &s) < 0) {
> + /* store directory is corrupted */
> + eprintf("corrupted\n");
> + ret = SD_RES_EIO;
> + }
> + } else
> + ret = SD_RES_UNKNOWN;
> + goto out;
> + }
> +
> + *(int *)priv = ret;
> + ret = SD_RES_SUCCESS;
> +out:
> + strbuf_release(&path);
> + return ret;
> +}
> +
> +static ssize_t default_store_write(uint64_t oid, void *priv, void *buf, int len, off_t offset)
> +{
> + int fd = *(int *)priv;
Should be 'fd = (int)priv;'?
> +
> + return xpwrite(fd, buf, len, offset);
> +}
> +
> +static ssize_t default_store_read(uint64_t oid, void *priv, void *buf, int len, off_t offset)
> +{
> + int fd = *(int *)priv;
> +
> + return xpread(fd, buf, len, offset);
> +}
> +
> +static int default_store_close(uint64_t oid, void *priv)
> +{
> + int fd = *(int *)priv;
> +
> + if (close(fd) < 0)
> + return SD_RES_EIO;
> +
> + return SD_RES_SUCCESS;
> +}
> +
> +static struct store_driver store = {
> + .driver_name = "default",
> + .init = default_store_init,
> + .open = default_store_open,
> + .write = default_store_write,
> + .read = default_store_read,
> + .close = default_store_close
> +};
> +
> +void register_store_driver(struct store_driver *driver)
> +{
> + store = *driver;
> + eprintf("Register %s store driver\n", store.driver_name);
> +}
> +
> static int obj_cmp(const void *oid1, const void *oid2)
> {
> const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);
> @@ -158,17 +246,17 @@ out:
> return res;
> }
>
> -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret);
> -
> static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
> unsigned *ori_rlen, void *buf, uint64_t offset)
> {
> - int i, n, nr, fd, ret;
> + int i, n, nr, ret;
> unsigned wlen, rlen;
> char name[128];
> struct sheepdog_vnode_list_entry *e;
> struct sd_obj_req hdr;
> struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
> + int opaque = epoch;
This is still not generic. 'int' should be 'void *'. And don't abuse
opaque for other purposes. If you need to pass epoch to open(),
please add another argument to open().
> + int fd;
>
> e = req->entry;
> nr = req->nr_vnodes;
> @@ -179,15 +267,16 @@ static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
> addr_to_str(name, sizeof(name), e[n].addr, 0);
>
> if (is_myself(e[n].addr, e[n].port)) {
> - fd = ob_open(epoch, oid, 0, &ret);
> - if (fd < 0 || ret != 0)
> + ret = store.open(oid, &opaque, 0);
> + if (ret != SD_RES_SUCCESS)
> continue;
>
> - ret = pread64(fd, buf, *ori_rlen, offset);
> + ret = store.read(oid, &opaque, buf, *ori_rlen, offset);
'&opaque' should be 'opaque'?
> if (ret < 0)
> continue;
> *ori_rlen = ret;
> ret = 0;
> + store.close(oid, &opaque);
> goto out;
> }
>
> @@ -430,37 +519,6 @@ out:
> return ret;
> }
>
> -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret)
> -{
> - char path[1024];
> - int fd;
> -
> - flags |= O_DSYNC | O_RDWR;
> - if (sys->use_directio && is_data_obj(oid))
> - flags |= O_DIRECT;
> -
> - snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid);
> -
> - fd = open(path, flags, def_fmode);
> - if (fd < 0) {
> - eprintf("failed to open %s: %m\n", path);
> - if (errno == ENOENT) {
> - struct stat s;
> -
> - *ret = SD_RES_NO_OBJ;
> - if (stat(obj_path, &s) < 0) {
> - /* store directory is corrupted */
> - eprintf("corrupted\n");
> - *ret = SD_RES_EIO;
> - }
> - } else
> - *ret = SD_RES_UNKNOWN;
> - } else
> - *ret = 0;
> -
> - return fd;
> -}
> -
> int update_epoch_store(uint32_t epoch)
> {
> char new[1024];
> @@ -597,13 +655,13 @@ static int store_read_obj(struct request *req, uint32_t epoch)
> struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
> struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
> int ret = SD_RES_SUCCESS;
> - int fd = -1;
> + int opaque = epoch;
>
> - fd = ob_open(epoch, hdr->oid, 0, &ret);
> - if (fd < 0)
> + ret = store.open(hdr->oid, &opaque, 0);
> + if (ret != SD_RES_SUCCESS)
> return ret;
>
> - ret = pread64(fd, req->data, hdr->data_length, hdr->offset);
> + ret = store.read(hdr->oid, &opaque, req->data, hdr->data_length, hdr->offset);
> if (ret < 0) {
> eprintf("%m\n");
> ret = SD_RES_EIO;
> @@ -615,15 +673,16 @@ static int store_read_obj(struct request *req, uint32_t epoch)
>
> ret = SD_RES_SUCCESS;
> out:
> - close(fd);
> + store.close(hdr->oid, &opaque);
> return ret;
> }
>
> -static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
> +static int store_write_obj_fd(void *opaque, struct request *req, uint32_t epoch)
> {
> struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
> uint64_t oid = hdr->oid;
> int ret = SD_RES_SUCCESS;
> + int fd = *(int *)opaque;
>
> if (hdr->flags & SD_FLAG_CMD_TRUNCATE) {
> ret = ftruncate(fd, hdr->offset + hdr->data_length);
> @@ -643,7 +702,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
> if (ret)
> return ret;
> }
> - ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
> + ret = store.write(oid, opaque, req->data, hdr->data_length, hdr->offset);
> if (ret != hdr->data_length) {
> if (errno == ENOSPC)
> return SD_RES_NO_SPACE;
> @@ -657,24 +716,24 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
> static int store_write_obj(struct request *req, uint32_t epoch)
> {
> struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
> - int ret = SD_RES_SUCCESS;
> - int fd = -1;
> + int ret;
> + int opaque = epoch;
>
> - fd = ob_open(epoch, hdr->oid, 0, &ret);
> - if (fd < 0)
> + ret = store.open(hdr->oid, &opaque, 0);
> + if (ret != SD_RES_SUCCESS)
> return ret;
>
> - ret = store_write_obj_fd(fd, req, epoch);
> + ret = store_write_obj_fd(&opaque, req, epoch);
>
> - close(fd);
> + store.close(hdr->oid, &opaque);
> return ret;
> }
>
> static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
> {
> struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
> - int ret = SD_RES_SUCCESS;
> - int fd = -1;
> + int ret;
> + int opaque = epoch;
> char *buf = NULL;
>
> if (!hdr->copies) {
> @@ -682,8 +741,8 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
> return SD_RES_INVALID_PARMS;
> }
>
> - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
> - if (fd < 0)
> + ret = store.open(hdr->oid, &opaque, 1);
> + if (ret != SD_RES_SUCCESS)
> return ret;
>
> dprintf("%" PRIu64 ", %" PRIx64 "\n", hdr->oid, hdr->cow_oid);
> @@ -701,7 +760,7 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
> ret = SD_RES_EIO;
> goto out;
> }
> - ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0);
> + ret = store.write(hdr->oid, &opaque, buf, SD_DATA_OBJ_SIZE, 0);
> if (ret != SD_DATA_OBJ_SIZE) {
> if (errno == ENOSPC)
> ret = SD_RES_NO_SPACE;
> @@ -714,15 +773,15 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
> free(buf);
> buf = NULL;
>
> - ret = store_write_obj_fd(fd, req, epoch);
> + ret = store_write_obj_fd(&opaque, req, epoch);
> out:
> if (buf)
> free(buf);
> - close(fd);
> + store.close(hdr->oid, &opaque);
> return ret;
> }
>
> -static int store_write_last_sector(int fd)
> +static int store_write_last_sector(uint64_t oid, void *opaque)
> {
> const int size = SECTOR_SIZE;
> char *buf = NULL;
> @@ -735,7 +794,7 @@ static int store_write_last_sector(int fd)
> }
>
> memset(buf, 0, size);
> - ret = pwrite64(fd, buf, size, SD_DATA_OBJ_SIZE - size);
> + ret = store.write(oid, opaque, buf, size, SD_DATA_OBJ_SIZE - size);
> free(buf);
>
> if (ret != size) {
> @@ -751,34 +810,34 @@ static int store_write_last_sector(int fd)
> static int store_create_and_write_obj(struct request *req, uint32_t epoch)
> {
> struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
> - int ret = SD_RES_SUCCESS;
> - int fd = -1;
> + int ret;
> + int opaque = epoch;
>
> if (!hdr->copies) {
> eprintf("the number of copies cannot be zero\n");
> return SD_RES_INVALID_PARMS;
> }
>
> - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
> - if (fd < 0)
> + ret = store.open(hdr->oid, &opaque, 1);
> + if (ret != SD_RES_SUCCESS)
> return ret;
>
> /*
> * Preallocate the whole object to get a better filesystem layout.
> */
> - ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE);
> + ret = fallocate(opaque, 0, 0, SD_DATA_OBJ_SIZE);
> if (ret < 0) {
> if (errno != ENOSYS && errno != EOPNOTSUPP) {
> ret = SD_RES_EIO;
> goto out;
> }
>
> - ret = store_write_last_sector(fd);
> + ret = store_write_last_sector(hdr->oid, &opaque);
> }
> -
> - ret = store_write_obj_fd(fd, req, epoch);
> +
> + ret = store_write_obj_fd(&opaque, req, epoch);
> out:
> - close(fd);
> + store.close(hdr->oid, &opaque);
> return ret;
> }
>
> @@ -1489,14 +1548,14 @@ static void recover_one(struct work *work, int idx)
> int old_copies, cur_copies;
> uint32_t epoch = rw->epoch;
> int i, copy_idx = 0, cur_idx = -1;
> - int fd;
> + int opaque = epoch;
>
> eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid);
>
> - fd = ob_open(epoch, oid, 0, &ret);
> - if (fd != -1) {
> + ret = store.open(oid, &opaque, 0);
> + if (ret == SD_RES_SUCCESS) {
> /* the object is already recovered */
> - close(fd);
> + store.close(oid, &opaque);
> goto out;
> }
>
> @@ -1595,7 +1654,8 @@ int is_recoverying_oid(uint64_t oid)
> uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT);
> uint64_t min_hval;
> struct recovery_work *rw = recovering_work;
> - int ret, fd, i;
> + int ret, i;
> + int opaque = sys->epoch;
>
> if (oid == 0)
> return 0;
> @@ -1611,10 +1671,10 @@ int is_recoverying_oid(uint64_t oid)
> if (rw->state == RW_INIT)
> return 1;
>
> - fd = ob_open(sys->epoch, oid, 0, &ret);
> - if (fd != -1) {
> + ret = store.open(oid, &opaque, 0);
> + if (ret == SD_RES_SUCCESS) {
> dprintf("the object %" PRIx64 " is already recoverd\n", oid);
> - close(fd);
> + store.close(oid, &opaque);
> return 0;
> }
>
> @@ -2105,6 +2165,10 @@ int init_store(const char *d)
> if (ret)
> return ret;
>
> + ret = store.init(obj_path);
> + if (ret)
> + return ret;
> +
> return ret;
> }
>
> --
> 1.7.6.1
>
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog
More information about the sheepdog
mailing list