From: Liu Yuan <tailai.ly at taobao.com> We need to abstract out store IO interface to adopt it to other IO store such as the coming 'Farm' store. The open/read/write/close is cumbersome for a common kv-store to work with, but this interface request smallest changes to current sheep store code. It sucks but works as a kludge. We can refine it later to more common get/put/delete interface. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/Makefile.am | 2 +- sheep/sheep_priv.h | 20 +++++ sheep/simple_store.c | 176 +++++++++++++++++++++++++++++++++++++++++++ sheep/store.c | 204 ++++++++++++++++++++----------------------------- 4 files changed, 280 insertions(+), 122 deletions(-) create mode 100644 sheep/simple_store.c diff --git a/sheep/Makefile.am b/sheep/Makefile.am index b7cc459..4220e47 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -24,7 +24,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $ sbin_PROGRAMS = sheep sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \ - cluster/local.c util.c strbuf.c + cluster/local.c util.c strbuf.c simple_store.c if BUILD_COROSYNC sheep_SOURCES += cluster/corosync.c endif diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index cbacab3..ab1a164 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -156,6 +156,26 @@ struct cluster_info { struct work_queue *recovery_wqueue; }; +struct store_driver { + const char *driver_name; + int (*init)(char *path); + int (*open)(uint64_t oid, void *priv, int create); + ssize_t (*write)(uint64_t oid, void *priv); + ssize_t (*read)(uint64_t oid, void *priv); + int (*close)(uint64_t oid, void *priv); +}; + +struct siocb { + int fd; + uint16_t flags; + uint32_t epoch; + void *buf; + uint32_t length; + uint64_t offset; +}; + +extern void register_store_driver(struct store_driver *); + extern struct cluster_info *sys; int create_listen_port(int port, void *data); diff --git a/sheep/simple_store.c b/sheep/simple_store.c new file mode 100644 index 0000000..7945068 --- /dev/null +++ b/sheep/simple_store.c @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2011 Taobao Inc. + * + * Liu Yuan <namei.unix at gmail.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/statvfs.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "sheep_priv.h" +#include "strbuf.h" +#include "util.h" + + +extern char *obj_path; + +extern mode_t def_fmode; + +static int def_store_flags = O_DSYNC | O_RDWR; + +struct store_driver store; + +static int default_store_init(char *path) +{ + eprintf("Use default store driver\n"); + return 0; +} + +static int store_write_last_sector(uint64_t oid, void *opaque) +{ + const int size = SECTOR_SIZE; + char *buf = NULL; + int ret; + struct siocb *iocb = (struct siocb *)opaque; + + buf = valloc(size); + if (!buf) { + eprintf("failed to allocate memory\n"); + return SD_RES_NO_MEM; + } + + memset(buf, 0, size); + iocb->buf = buf; + iocb->length = size; + iocb->offset = SD_DATA_OBJ_SIZE - size; + ret = store.write(oid, iocb); + free(buf); + + if (ret != size) { + if (errno == ENOSPC) + return SD_RES_NO_SPACE; + eprintf("%m\n"); + return SD_RES_EIO; + } + + return SD_RES_SUCCESS; +} + +static int default_store_open(uint64_t oid, void *priv, int create) +{ + struct strbuf path = STRBUF_INIT; + int ret, fd; + int flags = def_store_flags; + struct siocb *iocb = (struct siocb *)priv; + + if (sys->use_directio && is_data_obj(oid)) + flags |= O_DIRECT; + + if (create) + flags |= O_CREAT | O_TRUNC; + + strbuf_addf(&path, "%s%08u/%016" PRIx64, obj_path, iocb->epoch, oid); + + ret = open(path.buf, flags, def_fmode); + if (ret < 0) { + eprintf("failed to open %s: %m\n", path.buf); + if (errno == ENOENT) { + struct stat s; + + ret = SD_RES_NO_OBJ; + if (stat(obj_path, &s) < 0) { + /* store directory is corrupted */ + eprintf("corrupted\n"); + ret = SD_RES_EIO; + } + } else + ret = SD_RES_UNKNOWN; + goto out; + } + + fd = ret; + if (!(iocb->flags & SD_FLAG_CMD_COW) && create) { + /* + * Preallocate the whole object to get a better filesystem layout. + */ + ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE); + if (ret < 0) { + if (errno != ENOSYS && errno != EOPNOTSUPP) { + ret = SD_RES_EIO; + close(fd); + goto out; + } + + ret = store_write_last_sector(oid, &fd); + if (ret) + goto out; + } + } + if (iocb->flags & SD_FLAG_CMD_TRUNCATE) { + ret = ftruncate(fd, iocb->offset + iocb->length); + if (ret) { + eprintf("%m\n"); + ret = SD_RES_EIO; + close(fd); + goto out; + } + } + + iocb->fd = fd; + ret = SD_RES_SUCCESS; +out: + strbuf_release(&path); + return ret; +} + +static ssize_t default_store_write(uint64_t oid, void *priv) +{ + struct siocb *iocb = (struct siocb *)priv; + + return xpwrite(iocb->fd, iocb->buf, iocb->length, iocb->offset); +} + +static ssize_t default_store_read(uint64_t oid, void *priv) +{ + struct siocb *iocb = (struct siocb *)priv; + + return xpread(iocb->fd, iocb->buf, iocb->length, iocb->offset); +} + +static int default_store_close(uint64_t oid, void *priv) +{ + struct siocb *iocb = (struct siocb *)priv; + + if (close(iocb->fd) < 0) + return SD_RES_EIO; + + return SD_RES_SUCCESS; +} + +struct store_driver store = { + .driver_name = "default", + .init = default_store_init, + .open = default_store_open, + .write = default_store_write, + .read = default_store_read, + .close = default_store_close +}; + +void register_store_driver(struct store_driver *driver) +{ + store = *driver; + eprintf("Register %s store driver\n", store.driver_name); +} diff --git a/sheep/store.c b/sheep/store.c index 5844be9..14a6076 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -24,6 +24,8 @@ #include <time.h> #include "sheep_priv.h" +#include "strbuf.h" +#include "util.h" struct sheepdog_config { uint64_t ctime; @@ -39,7 +41,9 @@ static char *jrnl_path; static char *config_path; static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP; -static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; +mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + +extern struct store_driver store; static int obj_cmp(const void *oid1, const void *oid2) { @@ -158,17 +162,17 @@ out: return res; } -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret); - static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid, unsigned *ori_rlen, void *buf, uint64_t offset) { - int i, n, nr, fd, ret; + int i, n, nr, ret; unsigned wlen, rlen; char name[128]; struct sheepdog_vnode_list_entry *e; struct sd_obj_req hdr; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; + struct siocb opaque; + int fd; e = req->entry; nr = req->nr_vnodes; @@ -179,15 +183,21 @@ static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid, addr_to_str(name, sizeof(name), e[n].addr, 0); if (is_myself(e[n].addr, e[n].port)) { - fd = ob_open(epoch, oid, 0, &ret); - if (fd < 0 || ret != 0) + memset(&opaque, 0, sizeof(opaque)); + opaque.epoch = epoch; + ret = store.open(oid, &opaque, 0); + if (ret != SD_RES_SUCCESS) continue; - ret = pread64(fd, buf, *ori_rlen, offset); + opaque.buf = buf; + opaque.length = *ori_rlen; + opaque.offset = offset; + ret = store.read(oid, &opaque); if (ret < 0) continue; *ori_rlen = ret; ret = 0; + store.close(oid, &opaque); goto out; } @@ -430,37 +440,6 @@ out: return ret; } -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret) -{ - char path[1024]; - int fd; - - flags |= O_DSYNC | O_RDWR; - if (sys->use_directio && is_data_obj(oid)) - flags |= O_DIRECT; - - snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid); - - fd = open(path, flags, def_fmode); - if (fd < 0) { - eprintf("failed to open %s: %m\n", path); - if (errno == ENOENT) { - struct stat s; - - *ret = SD_RES_NO_OBJ; - if (stat(obj_path, &s) < 0) { - /* store directory is corrupted */ - eprintf("corrupted\n"); - *ret = SD_RES_EIO; - } - } else - *ret = SD_RES_UNKNOWN; - } else - *ret = 0; - - return fd; -} - int update_epoch_store(uint32_t epoch) { char new[1024]; @@ -597,13 +576,19 @@ static int store_read_obj(struct request *req, uint32_t epoch) struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp; int ret = SD_RES_SUCCESS; - int fd = -1; + struct siocb opaque; - fd = ob_open(epoch, hdr->oid, 0, &ret); - if (fd < 0) + memset(&opaque, 0, sizeof(opaque)); + opaque.epoch = epoch; + opaque.flags = hdr->flags; + ret = store.open(hdr->oid, &opaque, 0); + if (ret != SD_RES_SUCCESS) return ret; - ret = pread64(fd, req->data, hdr->data_length, hdr->offset); + opaque.buf = req->data; + opaque.length = hdr->data_length; + opaque.offset = hdr->offset; + ret = store.read(hdr->oid, &opaque); if (ret < 0) { eprintf("%m\n"); ret = SD_RES_EIO; @@ -615,25 +600,21 @@ static int store_read_obj(struct request *req, uint32_t epoch) ret = SD_RES_SUCCESS; out: - close(fd); + store.close(hdr->oid, &opaque); return ret; } -static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) +static int do_write_obj(void *opaque, struct request *req, uint32_t epoch) { struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; uint64_t oid = hdr->oid; int ret = SD_RES_SUCCESS; void *jd = NULL; - if (hdr->flags & SD_FLAG_CMD_TRUNCATE) { - ret = ftruncate(fd, hdr->offset + hdr->data_length); - if (ret) { - eprintf("%m\n"); - return SD_RES_EIO; - } - } - + struct siocb *iocb = (struct siocb *)opaque; + iocb->buf = req->data; + iocb->length = hdr->data_length; + iocb->offset = hdr->offset; if (is_vdi_obj(oid)) { char path[1024]; @@ -643,7 +624,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) hdr->offset, path, jrnl_path); if (!jd) return SD_RES_EIO; - ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset); + ret = store.write(oid, iocb); if (ret != hdr->data_length) { if (errno == ENOSPC) return SD_RES_NO_SPACE; @@ -653,7 +634,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) } jrnl_end(jd); } else { - ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset); + ret = store.write(oid, iocb); if (ret != hdr->data_length) { if (errno == ENOSPC) return SD_RES_NO_SPACE; @@ -667,33 +648,39 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) static int store_write_obj(struct request *req, uint32_t epoch) { struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - int ret = SD_RES_SUCCESS; - int fd = -1; + int ret; + struct siocb opaque; - fd = ob_open(epoch, hdr->oid, 0, &ret); - if (fd < 0) + memset(&opaque, 0, sizeof(opaque)); + opaque.epoch = epoch; + opaque.flags = hdr->flags; + ret = store.open(hdr->oid, &opaque, 0); + if (ret != SD_RES_SUCCESS) return ret; - ret = store_write_obj_fd(fd, req, epoch); + ret = do_write_obj(&opaque, req, epoch); - close(fd); + store.close(hdr->oid, &opaque); return ret; } static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) { struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - int ret = SD_RES_SUCCESS; - int fd = -1; + int ret; char *buf = NULL; + struct siocb opaque; if (!hdr->copies) { eprintf("the number of copies cannot be zero\n"); return SD_RES_INVALID_PARMS; } - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); - if (fd < 0) + memset(&opaque, 0, sizeof(opaque)); + opaque.epoch = epoch; + opaque.flags = hdr->flags; + ret = store.open(hdr->oid, &opaque, 1); + if (ret != SD_RES_SUCCESS) return ret; dprintf("%" PRIu64 ", %" PRIx64 "\n", hdr->oid, hdr->cow_oid); @@ -711,7 +698,10 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) ret = SD_RES_EIO; goto out; } - ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0); + opaque.buf = buf; + opaque.length = SD_DATA_OBJ_SIZE; + opaque.offset = 0; + ret = store.write(hdr->oid, &opaque); if (ret != SD_DATA_OBJ_SIZE) { if (errno == ENOSPC) ret = SD_RES_NO_SPACE; @@ -724,71 +714,34 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) free(buf); buf = NULL; - ret = store_write_obj_fd(fd, req, epoch); + ret = do_write_obj(&opaque, req, epoch); out: if (buf) free(buf); - close(fd); + store.close(hdr->oid, &opaque); return ret; } -static int store_write_last_sector(int fd) -{ - const int size = SECTOR_SIZE; - char *buf = NULL; - int ret; - - buf = valloc(size); - if (!buf) { - eprintf("failed to allocate memory\n"); - return SD_RES_NO_MEM; - } - - memset(buf, 0, size); - ret = pwrite64(fd, buf, size, SD_DATA_OBJ_SIZE - size); - free(buf); - - if (ret != size) { - if (errno == ENOSPC) - return SD_RES_NO_SPACE; - eprintf("%m\n"); - return SD_RES_EIO; - } - - return SD_RES_SUCCESS; -} - static int store_create_and_write_obj(struct request *req, uint32_t epoch) { struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - int ret = SD_RES_SUCCESS; - int fd = -1; + int ret; + struct siocb opaque; if (!hdr->copies) { eprintf("the number of copies cannot be zero\n"); return SD_RES_INVALID_PARMS; } - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); - if (fd < 0) + memset(&opaque, 0, sizeof(opaque)); + opaque.epoch = epoch; + opaque.flags = hdr->flags; + ret = store.open(hdr->oid, &opaque, 1); + if (ret != SD_RES_SUCCESS) return ret; - /* - * Preallocate the whole object to get a better filesystem layout. - */ - ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE); - if (ret < 0) { - if (errno != ENOSYS && errno != EOPNOTSUPP) { - ret = SD_RES_EIO; - goto out; - } - - ret = store_write_last_sector(fd); - } - - ret = store_write_obj_fd(fd, req, epoch); -out: - close(fd); + ret = do_write_obj(&opaque, req, epoch); + store.close(hdr->oid, &opaque); return ret; } @@ -1502,14 +1455,16 @@ static void recover_one(struct work *work, int idx) int old_copies, cur_copies; uint32_t epoch = rw->epoch; int i, copy_idx = 0, cur_idx = -1; - int fd; + struct siocb opaque; eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid); - fd = ob_open(epoch, oid, 0, &ret); - if (fd != -1) { + memset(&opaque, 0, sizeof(opaque)); + opaque.epoch = epoch; + ret = store.open(oid, &opaque, 0); + if (ret == SD_RES_SUCCESS) { /* the object is already recovered */ - close(fd); + store.close(oid, &opaque); goto out; } @@ -1608,7 +1563,8 @@ int is_recoverying_oid(uint64_t oid) uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT); uint64_t min_hval; struct recovery_work *rw = recovering_work; - int ret, fd, i; + int ret, i; + struct siocb opaque; if (oid == 0) return 0; @@ -1624,10 +1580,12 @@ int is_recoverying_oid(uint64_t oid) if (rw->state == RW_INIT) return 1; - fd = ob_open(sys->epoch, oid, 0, &ret); - if (fd != -1) { + memset(&opaque, 0, sizeof(opaque)); + opaque.epoch = sys->epoch; + ret = store.open(oid, &opaque, 0); + if (ret == SD_RES_SUCCESS) { dprintf("the object %" PRIx64 " is already recoverd\n", oid); - close(fd); + store.close(oid, &opaque); return 0; } @@ -2118,6 +2076,10 @@ int init_store(const char *d) if (ret) return ret; + ret = store.init(obj_path); + if (ret) + return ret; + return ret; } -- 1.7.6.1 |