From: Liu Yuan <tailai.ly at taobao.com> We need to abstract out store IO interface to adopt it to other IO store such as the coming 'Farm' store. The open/read/write/close is cumbersome for a common kv-store to work with, but this interface request smallest changes to current sheep store code. It sucks but works as a kludge. We can refine it later to more common get/put/delete interface. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/sheep_priv.h | 11 +++ sheep/store.c | 216 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 151 insertions(+), 76 deletions(-) diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index ee4deb4..56b9a99 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -156,6 +156,17 @@ struct cluster_info { struct work_queue *recovery_wqueue; }; +struct store_driver { + const char *driver_name; + int (*init)(char *path); + int (*open)(uint64_t oid, void *priv, int create); + ssize_t (*write)(uint64_t oid, void *priv, void *buf, int len, off_t offset); + ssize_t (*read)(uint64_t oid, void *priv, void *buf, int len, off_t offset); + int (*close)(uint64_t oid, void *priv); +}; + +extern void register_store_driver(struct store_driver *); + extern struct cluster_info *sys; int create_listen_port(int port, void *data); diff --git a/sheep/store.c b/sheep/store.c index 3a04c34..a6d865f 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -24,6 +24,8 @@ #include <time.h> #include "sheep_priv.h" +#include "strbuf.h" +#include "util.h" struct sheepdog_config { uint64_t ctime; @@ -41,6 +43,92 @@ static char *config_path; static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP; static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; +static int def_store_flags = O_DSYNC | O_RDWR; + +static int default_store_init(char *path) +{ + eprintf("Use default store driver\n"); + return 0; +} + +static int default_store_open(uint64_t oid, void *priv, int create) +{ + struct strbuf path = STRBUF_INIT; + int ret; + int flags = def_store_flags; + int epoch = *(int *)priv; + + if (sys->use_directio && is_data_obj(oid)) + flags |= O_DIRECT; + + if (create) + flags |= O_CREAT | O_TRUNC; + + strbuf_addf(&path, "%s%08u/%016" PRIx64, obj_path, epoch, oid); + + ret = open(path.buf, flags, def_fmode); + if (ret < 0) { + eprintf("failed to open %s: %m\n", path.buf); + if (errno == ENOENT) { + struct stat s; + + ret = SD_RES_NO_OBJ; + if (stat(obj_path, &s) < 0) { + /* store directory is corrupted */ + eprintf("corrupted\n"); + ret = SD_RES_EIO; + } + } else + ret = SD_RES_UNKNOWN; + goto out; + } + + *(int *)priv = ret; + ret = SD_RES_SUCCESS; +out: + strbuf_release(&path); + return ret; +} + +static ssize_t default_store_write(uint64_t oid, void *priv, void *buf, int len, off_t offset) +{ + int fd = *(int *)priv; + + return xpwrite(fd, buf, len, offset); +} + +static ssize_t default_store_read(uint64_t oid, void *priv, void *buf, int len, off_t offset) +{ + int fd = *(int *)priv; + + return xpread(fd, buf, len, offset); +} + +static int default_store_close(uint64_t oid, void *priv) +{ + int fd = *(int *)priv; + + if (close(fd) < 0) + return SD_RES_EIO; + + return SD_RES_SUCCESS; +} + +static struct store_driver store = { + .driver_name = "default", + .init = default_store_init, + .open = default_store_open, + .write = default_store_write, + .read = default_store_read, + .close = default_store_close +}; + +void register_store_driver(struct store_driver *driver) +{ + store = *driver; + eprintf("Register %s store driver\n", store.driver_name); +} + static int obj_cmp(const void *oid1, const void *oid2) { const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT); @@ -158,17 +246,17 @@ out: return res; } -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret); - static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid, unsigned *ori_rlen, void *buf, uint64_t offset) { - int i, n, nr, fd, ret; + int i, n, nr, ret; unsigned wlen, rlen; char name[128]; struct sheepdog_vnode_list_entry *e; struct sd_obj_req hdr; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; + int opaque = epoch; + int fd; e = req->entry; nr = req->nr_vnodes; @@ -179,15 +267,16 @@ static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid, addr_to_str(name, sizeof(name), e[n].addr, 0); if (is_myself(e[n].addr, e[n].port)) { - fd = ob_open(epoch, oid, 0, &ret); - if (fd < 0 || ret != 0) + ret = store.open(oid, &opaque, 0); + if (ret != SD_RES_SUCCESS) continue; - ret = pread64(fd, buf, *ori_rlen, offset); + ret = store.read(oid, &opaque, buf, *ori_rlen, offset); if (ret < 0) continue; *ori_rlen = ret; ret = 0; + store.close(oid, &opaque); goto out; } @@ -430,37 +519,6 @@ out: return ret; } -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret) -{ - char path[1024]; - int fd; - - flags |= O_DSYNC | O_RDWR; - if (sys->use_directio && is_data_obj(oid)) - flags |= O_DIRECT; - - snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid); - - fd = open(path, flags, def_fmode); - if (fd < 0) { - eprintf("failed to open %s: %m\n", path); - if (errno == ENOENT) { - struct stat s; - - *ret = SD_RES_NO_OBJ; - if (stat(obj_path, &s) < 0) { - /* store directory is corrupted */ - eprintf("corrupted\n"); - *ret = SD_RES_EIO; - } - } else - *ret = SD_RES_UNKNOWN; - } else - *ret = 0; - - return fd; -} - int update_epoch_store(uint32_t epoch) { char new[1024]; @@ -597,13 +655,13 @@ static int store_read_obj(struct request *req, uint32_t epoch) struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp; int ret = SD_RES_SUCCESS; - int fd = -1; + int opaque = epoch; - fd = ob_open(epoch, hdr->oid, 0, &ret); - if (fd < 0) + ret = store.open(hdr->oid, &opaque, 0); + if (ret != SD_RES_SUCCESS) return ret; - ret = pread64(fd, req->data, hdr->data_length, hdr->offset); + ret = store.read(hdr->oid, &opaque, req->data, hdr->data_length, hdr->offset); if (ret < 0) { eprintf("%m\n"); ret = SD_RES_EIO; @@ -615,15 +673,16 @@ static int store_read_obj(struct request *req, uint32_t epoch) ret = SD_RES_SUCCESS; out: - close(fd); + store.close(hdr->oid, &opaque); return ret; } -static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) +static int store_write_obj_fd(void *opaque, struct request *req, uint32_t epoch) { struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; uint64_t oid = hdr->oid; int ret = SD_RES_SUCCESS; + int fd = *(int *)opaque; if (hdr->flags & SD_FLAG_CMD_TRUNCATE) { ret = ftruncate(fd, hdr->offset + hdr->data_length); @@ -643,7 +702,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) if (ret) return ret; } - ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset); + ret = store.write(oid, opaque, req->data, hdr->data_length, hdr->offset); if (ret != hdr->data_length) { if (errno == ENOSPC) return SD_RES_NO_SPACE; @@ -657,24 +716,24 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) static int store_write_obj(struct request *req, uint32_t epoch) { struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - int ret = SD_RES_SUCCESS; - int fd = -1; + int ret; + int opaque = epoch; - fd = ob_open(epoch, hdr->oid, 0, &ret); - if (fd < 0) + ret = store.open(hdr->oid, &opaque, 0); + if (ret != SD_RES_SUCCESS) return ret; - ret = store_write_obj_fd(fd, req, epoch); + ret = store_write_obj_fd(&opaque, req, epoch); - close(fd); + store.close(hdr->oid, &opaque); return ret; } static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) { struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - int ret = SD_RES_SUCCESS; - int fd = -1; + int ret; + int opaque = epoch; char *buf = NULL; if (!hdr->copies) { @@ -682,8 +741,8 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) return SD_RES_INVALID_PARMS; } - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); - if (fd < 0) + ret = store.open(hdr->oid, &opaque, 1); + if (ret != SD_RES_SUCCESS) return ret; dprintf("%" PRIu64 ", %" PRIx64 "\n", hdr->oid, hdr->cow_oid); @@ -701,7 +760,7 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) ret = SD_RES_EIO; goto out; } - ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0); + ret = store.write(hdr->oid, &opaque, buf, SD_DATA_OBJ_SIZE, 0); if (ret != SD_DATA_OBJ_SIZE) { if (errno == ENOSPC) ret = SD_RES_NO_SPACE; @@ -714,15 +773,15 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) free(buf); buf = NULL; - ret = store_write_obj_fd(fd, req, epoch); + ret = store_write_obj_fd(&opaque, req, epoch); out: if (buf) free(buf); - close(fd); + store.close(hdr->oid, &opaque); return ret; } -static int store_write_last_sector(int fd) +static int store_write_last_sector(uint64_t oid, void *opaque) { const int size = SECTOR_SIZE; char *buf = NULL; @@ -735,7 +794,7 @@ static int store_write_last_sector(int fd) } memset(buf, 0, size); - ret = pwrite64(fd, buf, size, SD_DATA_OBJ_SIZE - size); + ret = store.write(oid, opaque, buf, size, SD_DATA_OBJ_SIZE - size); free(buf); if (ret != size) { @@ -751,34 +810,34 @@ static int store_write_last_sector(int fd) static int store_create_and_write_obj(struct request *req, uint32_t epoch) { struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - int ret = SD_RES_SUCCESS; - int fd = -1; + int ret; + int opaque = epoch; if (!hdr->copies) { eprintf("the number of copies cannot be zero\n"); return SD_RES_INVALID_PARMS; } - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); - if (fd < 0) + ret = store.open(hdr->oid, &opaque, 1); + if (ret != SD_RES_SUCCESS) return ret; /* * Preallocate the whole object to get a better filesystem layout. */ - ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE); + ret = fallocate(opaque, 0, 0, SD_DATA_OBJ_SIZE); if (ret < 0) { if (errno != ENOSYS && errno != EOPNOTSUPP) { ret = SD_RES_EIO; goto out; } - ret = store_write_last_sector(fd); + ret = store_write_last_sector(hdr->oid, &opaque); } - - ret = store_write_obj_fd(fd, req, epoch); + + ret = store_write_obj_fd(&opaque, req, epoch); out: - close(fd); + store.close(hdr->oid, &opaque); return ret; } @@ -1489,14 +1548,14 @@ static void recover_one(struct work *work, int idx) int old_copies, cur_copies; uint32_t epoch = rw->epoch; int i, copy_idx = 0, cur_idx = -1; - int fd; + int opaque = epoch; eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid); - fd = ob_open(epoch, oid, 0, &ret); - if (fd != -1) { + ret = store.open(oid, &opaque, 0); + if (ret == SD_RES_SUCCESS) { /* the object is already recovered */ - close(fd); + store.close(oid, &opaque); goto out; } @@ -1595,7 +1654,8 @@ int is_recoverying_oid(uint64_t oid) uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT); uint64_t min_hval; struct recovery_work *rw = recovering_work; - int ret, fd, i; + int ret, i; + int opaque = sys->epoch; if (oid == 0) return 0; @@ -1611,10 +1671,10 @@ int is_recoverying_oid(uint64_t oid) if (rw->state == RW_INIT) return 1; - fd = ob_open(sys->epoch, oid, 0, &ret); - if (fd != -1) { + ret = store.open(oid, &opaque, 0); + if (ret == SD_RES_SUCCESS) { dprintf("the object %" PRIx64 " is already recoverd\n", oid); - close(fd); + store.close(oid, &opaque); return 0; } @@ -2105,6 +2165,10 @@ int init_store(const char *d) if (ret) return ret; + ret = store.init(obj_path); + if (ret) + return ret; + return ret; } -- 1.7.6.1 |