At Wed, 16 Nov 2011 20:59:20 +0800, Liu Yuan wrote: > > From: Liu Yuan <tailai.ly at taobao.com> > > We need to abstract out store IO interface to adopt it to other IO store > such as the coming 'Farm' store. > > The open/read/write/close is cumbersome for a common kv-store to work with, > but this interface request smallest changes to current sheep store code. It > sucks but works as a kludge. > > We can refine it later to more common get/put/delete interface. > > Signed-off-by: Liu Yuan <tailai.ly at taobao.com> > --- > sheep/sheep_priv.h | 11 +++ > sheep/store.c | 216 +++++++++++++++++++++++++++++++++------------------ > 2 files changed, 151 insertions(+), 76 deletions(-) > > diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h > index ee4deb4..56b9a99 100644 > --- a/sheep/sheep_priv.h > +++ b/sheep/sheep_priv.h > @@ -156,6 +156,17 @@ struct cluster_info { > struct work_queue *recovery_wqueue; > }; > > +struct store_driver { > + const char *driver_name; > + int (*init)(char *path); > + int (*open)(uint64_t oid, void *priv, int create); Looks a bit strange. If we get private data from this function, should 'void *priv' be 'void **priv'? > + ssize_t (*write)(uint64_t oid, void *priv, void *buf, int len, off_t offset); > + ssize_t (*read)(uint64_t oid, void *priv, void *buf, int len, off_t offset); > + int (*close)(uint64_t oid, void *priv); > +}; > + > +extern void register_store_driver(struct store_driver *); > + > extern struct cluster_info *sys; > > int create_listen_port(int port, void *data); > diff --git a/sheep/store.c b/sheep/store.c > index 3a04c34..a6d865f 100644 > --- a/sheep/store.c > +++ b/sheep/store.c > @@ -24,6 +24,8 @@ > #include <time.h> > > #include "sheep_priv.h" > +#include "strbuf.h" > +#include "util.h" > > struct sheepdog_config { > uint64_t ctime; > @@ -41,6 +43,92 @@ static char *config_path; > static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP; > static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; > > +static int def_store_flags = O_DSYNC | O_RDWR; > + > +static int default_store_init(char *path) > +{ > + eprintf("Use default store driver\n"); > + return 0; > +} > + > +static int default_store_open(uint64_t oid, void *priv, int create) > +{ > + struct strbuf path = STRBUF_INIT; > + int ret; > + int flags = def_store_flags; > + int epoch = *(int *)priv; > + > + if (sys->use_directio && is_data_obj(oid)) > + flags |= O_DIRECT; > + > + if (create) > + flags |= O_CREAT | O_TRUNC; > + > + strbuf_addf(&path, "%s%08u/%016" PRIx64, obj_path, epoch, oid); > + > + ret = open(path.buf, flags, def_fmode); > + if (ret < 0) { > + eprintf("failed to open %s: %m\n", path.buf); > + if (errno == ENOENT) { > + struct stat s; > + > + ret = SD_RES_NO_OBJ; > + if (stat(obj_path, &s) < 0) { > + /* store directory is corrupted */ > + eprintf("corrupted\n"); > + ret = SD_RES_EIO; > + } > + } else > + ret = SD_RES_UNKNOWN; > + goto out; > + } > + > + *(int *)priv = ret; > + ret = SD_RES_SUCCESS; > +out: > + strbuf_release(&path); > + return ret; > +} > + > +static ssize_t default_store_write(uint64_t oid, void *priv, void *buf, int len, off_t offset) > +{ > + int fd = *(int *)priv; Should be 'fd = (int)priv;'? > + > + return xpwrite(fd, buf, len, offset); > +} > + > +static ssize_t default_store_read(uint64_t oid, void *priv, void *buf, int len, off_t offset) > +{ > + int fd = *(int *)priv; > + > + return xpread(fd, buf, len, offset); > +} > + > +static int default_store_close(uint64_t oid, void *priv) > +{ > + int fd = *(int *)priv; > + > + if (close(fd) < 0) > + return SD_RES_EIO; > + > + return SD_RES_SUCCESS; > +} > + > +static struct store_driver store = { > + .driver_name = "default", > + .init = default_store_init, > + .open = default_store_open, > + .write = default_store_write, > + .read = default_store_read, > + .close = default_store_close > +}; > + > +void register_store_driver(struct store_driver *driver) > +{ > + store = *driver; > + eprintf("Register %s store driver\n", store.driver_name); > +} > + > static int obj_cmp(const void *oid1, const void *oid2) > { > const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT); > @@ -158,17 +246,17 @@ out: > return res; > } > > -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret); > - > static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid, > unsigned *ori_rlen, void *buf, uint64_t offset) > { > - int i, n, nr, fd, ret; > + int i, n, nr, ret; > unsigned wlen, rlen; > char name[128]; > struct sheepdog_vnode_list_entry *e; > struct sd_obj_req hdr; > struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; > + int opaque = epoch; This is still not generic. 'int' should be 'void *'. And don't abuse opaque for other purposes. If you need to pass epoch to open(), please add another argument to open(). > + int fd; > > e = req->entry; > nr = req->nr_vnodes; > @@ -179,15 +267,16 @@ static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid, > addr_to_str(name, sizeof(name), e[n].addr, 0); > > if (is_myself(e[n].addr, e[n].port)) { > - fd = ob_open(epoch, oid, 0, &ret); > - if (fd < 0 || ret != 0) > + ret = store.open(oid, &opaque, 0); > + if (ret != SD_RES_SUCCESS) > continue; > > - ret = pread64(fd, buf, *ori_rlen, offset); > + ret = store.read(oid, &opaque, buf, *ori_rlen, offset); '&opaque' should be 'opaque'? > if (ret < 0) > continue; > *ori_rlen = ret; > ret = 0; > + store.close(oid, &opaque); > goto out; > } > > @@ -430,37 +519,6 @@ out: > return ret; > } > > -static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret) > -{ > - char path[1024]; > - int fd; > - > - flags |= O_DSYNC | O_RDWR; > - if (sys->use_directio && is_data_obj(oid)) > - flags |= O_DIRECT; > - > - snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid); > - > - fd = open(path, flags, def_fmode); > - if (fd < 0) { > - eprintf("failed to open %s: %m\n", path); > - if (errno == ENOENT) { > - struct stat s; > - > - *ret = SD_RES_NO_OBJ; > - if (stat(obj_path, &s) < 0) { > - /* store directory is corrupted */ > - eprintf("corrupted\n"); > - *ret = SD_RES_EIO; > - } > - } else > - *ret = SD_RES_UNKNOWN; > - } else > - *ret = 0; > - > - return fd; > -} > - > int update_epoch_store(uint32_t epoch) > { > char new[1024]; > @@ -597,13 +655,13 @@ static int store_read_obj(struct request *req, uint32_t epoch) > struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; > struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp; > int ret = SD_RES_SUCCESS; > - int fd = -1; > + int opaque = epoch; > > - fd = ob_open(epoch, hdr->oid, 0, &ret); > - if (fd < 0) > + ret = store.open(hdr->oid, &opaque, 0); > + if (ret != SD_RES_SUCCESS) > return ret; > > - ret = pread64(fd, req->data, hdr->data_length, hdr->offset); > + ret = store.read(hdr->oid, &opaque, req->data, hdr->data_length, hdr->offset); > if (ret < 0) { > eprintf("%m\n"); > ret = SD_RES_EIO; > @@ -615,15 +673,16 @@ static int store_read_obj(struct request *req, uint32_t epoch) > > ret = SD_RES_SUCCESS; > out: > - close(fd); > + store.close(hdr->oid, &opaque); > return ret; > } > > -static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) > +static int store_write_obj_fd(void *opaque, struct request *req, uint32_t epoch) > { > struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; > uint64_t oid = hdr->oid; > int ret = SD_RES_SUCCESS; > + int fd = *(int *)opaque; > > if (hdr->flags & SD_FLAG_CMD_TRUNCATE) { > ret = ftruncate(fd, hdr->offset + hdr->data_length); > @@ -643,7 +702,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) > if (ret) > return ret; > } > - ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset); > + ret = store.write(oid, opaque, req->data, hdr->data_length, hdr->offset); > if (ret != hdr->data_length) { > if (errno == ENOSPC) > return SD_RES_NO_SPACE; > @@ -657,24 +716,24 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch) > static int store_write_obj(struct request *req, uint32_t epoch) > { > struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; > - int ret = SD_RES_SUCCESS; > - int fd = -1; > + int ret; > + int opaque = epoch; > > - fd = ob_open(epoch, hdr->oid, 0, &ret); > - if (fd < 0) > + ret = store.open(hdr->oid, &opaque, 0); > + if (ret != SD_RES_SUCCESS) > return ret; > > - ret = store_write_obj_fd(fd, req, epoch); > + ret = store_write_obj_fd(&opaque, req, epoch); > > - close(fd); > + store.close(hdr->oid, &opaque); > return ret; > } > > static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) > { > struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; > - int ret = SD_RES_SUCCESS; > - int fd = -1; > + int ret; > + int opaque = epoch; > char *buf = NULL; > > if (!hdr->copies) { > @@ -682,8 +741,8 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) > return SD_RES_INVALID_PARMS; > } > > - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); > - if (fd < 0) > + ret = store.open(hdr->oid, &opaque, 1); > + if (ret != SD_RES_SUCCESS) > return ret; > > dprintf("%" PRIu64 ", %" PRIx64 "\n", hdr->oid, hdr->cow_oid); > @@ -701,7 +760,7 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) > ret = SD_RES_EIO; > goto out; > } > - ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0); > + ret = store.write(hdr->oid, &opaque, buf, SD_DATA_OBJ_SIZE, 0); > if (ret != SD_DATA_OBJ_SIZE) { > if (errno == ENOSPC) > ret = SD_RES_NO_SPACE; > @@ -714,15 +773,15 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch) > free(buf); > buf = NULL; > > - ret = store_write_obj_fd(fd, req, epoch); > + ret = store_write_obj_fd(&opaque, req, epoch); > out: > if (buf) > free(buf); > - close(fd); > + store.close(hdr->oid, &opaque); > return ret; > } > > -static int store_write_last_sector(int fd) > +static int store_write_last_sector(uint64_t oid, void *opaque) > { > const int size = SECTOR_SIZE; > char *buf = NULL; > @@ -735,7 +794,7 @@ static int store_write_last_sector(int fd) > } > > memset(buf, 0, size); > - ret = pwrite64(fd, buf, size, SD_DATA_OBJ_SIZE - size); > + ret = store.write(oid, opaque, buf, size, SD_DATA_OBJ_SIZE - size); > free(buf); > > if (ret != size) { > @@ -751,34 +810,34 @@ static int store_write_last_sector(int fd) > static int store_create_and_write_obj(struct request *req, uint32_t epoch) > { > struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; > - int ret = SD_RES_SUCCESS; > - int fd = -1; > + int ret; > + int opaque = epoch; > > if (!hdr->copies) { > eprintf("the number of copies cannot be zero\n"); > return SD_RES_INVALID_PARMS; > } > > - fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret); > - if (fd < 0) > + ret = store.open(hdr->oid, &opaque, 1); > + if (ret != SD_RES_SUCCESS) > return ret; > > /* > * Preallocate the whole object to get a better filesystem layout. > */ > - ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE); > + ret = fallocate(opaque, 0, 0, SD_DATA_OBJ_SIZE); > if (ret < 0) { > if (errno != ENOSYS && errno != EOPNOTSUPP) { > ret = SD_RES_EIO; > goto out; > } > > - ret = store_write_last_sector(fd); > + ret = store_write_last_sector(hdr->oid, &opaque); > } > - > - ret = store_write_obj_fd(fd, req, epoch); > + > + ret = store_write_obj_fd(&opaque, req, epoch); > out: > - close(fd); > + store.close(hdr->oid, &opaque); > return ret; > } > > @@ -1489,14 +1548,14 @@ static void recover_one(struct work *work, int idx) > int old_copies, cur_copies; > uint32_t epoch = rw->epoch; > int i, copy_idx = 0, cur_idx = -1; > - int fd; > + int opaque = epoch; > > eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid); > > - fd = ob_open(epoch, oid, 0, &ret); > - if (fd != -1) { > + ret = store.open(oid, &opaque, 0); > + if (ret == SD_RES_SUCCESS) { > /* the object is already recovered */ > - close(fd); > + store.close(oid, &opaque); > goto out; > } > > @@ -1595,7 +1654,8 @@ int is_recoverying_oid(uint64_t oid) > uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT); > uint64_t min_hval; > struct recovery_work *rw = recovering_work; > - int ret, fd, i; > + int ret, i; > + int opaque = sys->epoch; > > if (oid == 0) > return 0; > @@ -1611,10 +1671,10 @@ int is_recoverying_oid(uint64_t oid) > if (rw->state == RW_INIT) > return 1; > > - fd = ob_open(sys->epoch, oid, 0, &ret); > - if (fd != -1) { > + ret = store.open(oid, &opaque, 0); > + if (ret == SD_RES_SUCCESS) { > dprintf("the object %" PRIx64 " is already recoverd\n", oid); > - close(fd); > + store.close(oid, &opaque); > return 0; > } > > @@ -2105,6 +2165,10 @@ int init_store(const char *d) > if (ret) > return ret; > > + ret = store.init(obj_path); > + if (ret) > + return ret; > + > return ret; > } > > -- > 1.7.6.1 > > -- > sheepdog mailing list > sheepdog at lists.wpkg.org > http://lists.wpkg.org/mailman/listinfo/sheepdog |