From: Liu Yuan <tailai.ly at taobao.com> We need to abstract out store IO interface to adopt it to other IO store such as the coming 'Farm' store. The open/read/write/close is cumbersome for a common kv-store to work with, but this interface request smallest changes to current sheep store code. It sucks but works as a kludge. We can refine it later to more common get/put/delete interface. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/sheep_priv.h | 11 ++++ sheep/store.c | 132 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 96 insertions(+), 47 deletions(-) diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index ee4deb4..85b670d 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -156,6 +156,17 @@ struct cluster_info { struct work_queue *recovery_wqueue; }; +struct store_driver { + const char *driver_name; + int (*init)(char *path); + int (*open)(uint32_t epoch, uint64_t oid, int flags, int *ret); + ssize_t (*write)(uint64_t oid, int fd, void *buf, int len, off_t offset); + ssize_t (*read)(uint64_t oid, int fd, void *buf, int len, off_t offset); + int (*close)(int fd); +}; + +extern void register_store_driver(struct store_driver *); + extern struct cluster_info *sys; int create_listen_port(int port, void *data); diff --git a/sheep/store.c b/sheep/store.c index 3a04c34..28719da 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -41,6 +41,73 @@ static char *config_path; static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP; static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; +static int default_store_init(char *path) +{ + eprintf("Use default store driver\n"); + return 0; +} + +static int default_store_open(uint32_t epoch, uint64_t oid, int flags, int *ret) +{ + char path[1024]; + int fd; + + flags |= O_DSYNC | O_RDWR; + if (sys->use_directio && is_data_obj(oid)) + flags |= O_DIRECT; + + snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid); + + fd = open(path, flags, def_fmode); + if (fd < 0) { + eprintf("failed to open %s: %m\n", path); + if (errno == ENOENT) { + struct stat s; + + *ret = SD_RES_NO_OBJ; + if (stat(obj_path, &s) < 0) { + /* store directory is corrupted */ + eprintf("corrupted\n"); + *ret = SD_RES_EIO; + } + } else + *ret = SD_RES_UNKNOWN; + } else + *ret = 0; + + return fd; +} + +static ssize_t default_store_write(uint64_t oid, int fd, void *buf, int len, off_t offset) +{ + return pwrite64(fd, buf, len, offset); +} + +static ssize_t default_store_read(uint64_t oid, int fd, void *buf, int len, off_t offset) +{ + return pread64(fd, buf, len, offset); +} + +static int default_store_close(int fd) +{ + return close(fd); +} + +static struct store_driver store = { + .driver_name = "default", + .init = default_store_init, + .open = default_store_open, + .write = default_store_write, + .read = default_store_read, + .close = default_store_close +}; + +void register_store_driver(struct store_driver *driver) +{ + store = *driver; + eprintf("Register %s store driver\n", store.driver_name); +} + static int obj_cmp(const void *oid1, const void *oid2) { const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT); @@ -158,8 +225,6 @@ out: return res; } -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret); - static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid, unsigned *ori_rlen, void *buf, uint64_t offset) { @@ -179,11 +244,11 @@ static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid, addr_to_str(name, sizeof(name), e[n].addr, 0); if (is_myself(e[n].addr, e[n].port)) { - fd = ob_open(epoch, oid, 0, &ret); + fd = store.open(epoch, oid, 0, &ret); if (fd < 0 || ret != 0) continue; - ret = pread64(fd, buf, *ori_rlen, offset); + ret = store.read(oid, fd, buf, *ori_rlen, offset); if (ret < 0) continue; *ori_rlen = ret; @@ -430,37 +495,6 @@ out: return ret; } -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret) -{ - char path[1024]; - int fd; - - flags |= O_DSYNC | O_RDWR; - if (sys->use_directio && is_data_obj(oid)) - flags |= O_DIRECT; - - snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid); - - fd = open(path, flags, def_fmode); - if (fd < 0) { - eprintf("failed to open %s: %m\n", path); - if (errno == ENOENT) { - struct stat s; - - *ret = SD_RES_NO_OBJ; - if (stat(obj_path, &s) < 0) { - /* store directory is corrupted */ - eprintf("corrupted\n"); - *ret = SD_RES_EIO; - } - } else - *ret = SD_RES_UNKNOWN; - } else - *ret = 0; - - return fd; -} - int update_epoch_store(uint32_t epoch) { char new[1024]; @@ -599,11 +633,11 @@ static int store_read_obj(struct request *req, uint32_t epoch) int ret = SD_RES_SUCCESS; int fd = -1; - fd = ob_open(epoch, hdr->oid, 0, &ret); + fd = store.open(epoch, hdr->oid, 0, &ret); if (fd < 0) return ret; - ret = pread64(fd, req->data, hdr->data_length, hdr->offset); + ret = store.read(hdr->oid, fd, req->data, hdr->data_length, hdr->offset); if (ret < 0) { eprintf("%m\n"); ret = SD_RES_EIO; @@ -643,7 +677,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) if (ret) return ret; } - ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset); + ret = store.write(oid, fd, req->data, hdr->data_length, hdr->offset); if (ret != hdr->data_length) { if (errno == ENOSPC) return SD_RES_NO_SPACE; @@ -660,7 +694,7 @@ static int store_write_obj(struct request *req, uint32_t epoch) int ret = SD_RES_SUCCESS; int fd = -1; - fd = ob_open(epoch, hdr->oid, 0, &ret); + fd = store.open(epoch, hdr->oid, 0, &ret); if (fd < 0) return ret; @@ -682,7 +716,7 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) return SD_RES_INVALID_PARMS; } - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); + fd = store.open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); if (fd < 0) return ret; @@ -701,7 +735,7 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) ret = SD_RES_EIO; goto out; } - ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0); + ret = store.write(hdr->oid, fd, buf, SD_DATA_OBJ_SIZE, 0); if (ret != SD_DATA_OBJ_SIZE) { if (errno == ENOSPC) ret = SD_RES_NO_SPACE; @@ -722,7 +756,7 @@ out: return ret; } -static int store_write_last_sector(int fd) +static int store_write_last_sector(uint64_t oid, int fd) { const int size = SECTOR_SIZE; char *buf = NULL; @@ -735,7 +769,7 @@ static int store_write_last_sector(int fd) } memset(buf, 0, size); - ret = pwrite64(fd, buf, size, SD_DATA_OBJ_SIZE - size); + ret = store.write(oid, fd, buf, size, SD_DATA_OBJ_SIZE - size); free(buf); if (ret != size) { @@ -759,7 +793,7 @@ static int store_create_and_write_obj(struct request *req, uint32_t epoch) return SD_RES_INVALID_PARMS; } - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); + fd = store.open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); if (fd < 0) return ret; @@ -773,7 +807,7 @@ static int store_create_and_write_obj(struct request *req, uint32_t epoch) goto out; } - ret = store_write_last_sector(fd); + ret = store_write_last_sector(hdr->oid, fd); } ret = store_write_obj_fd(fd, req, epoch); @@ -1493,7 +1527,7 @@ static void recover_one(struct work *work, int idx) eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid); - fd = ob_open(epoch, oid, 0, &ret); + fd = store.open(epoch, oid, 0, &ret); if (fd != -1) { /* the object is already recovered */ close(fd); @@ -1611,7 +1645,7 @@ int is_recoverying_oid(uint64_t oid) if (rw->state == RW_INIT) return 1; - fd = ob_open(sys->epoch, oid, 0, &ret); + fd = store.open(sys->epoch, oid, 0, &ret); if (fd != -1) { dprintf("the object %" PRIx64 " is already recoverd\n", oid); close(fd); @@ -2105,6 +2139,10 @@ int init_store(const char *d) if (ret) return ret; + ret = store.init(obj_path); + if (ret) + return ret; + return ret; } -- 1.7.6.1 |