From: Liu Yuan <tailai.ly at taobao.com> UPDATE: - address Kazutaka's comments. Thanks ! - use stat() for farm_exist - init ret -------------------------------------------- 8< --- refactor strcut siocb, this makes the code more clean. - let sd_store->read/write() use open() internally. - remove sd_store->open()/close() - add sd_store->exist() But now we don't actually have concurrent requests to the same object, because upper layer have exclude concurrent access by check_request(). We'll remove this constaint by later patch set. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- include/util.h | 5 +- sheep/farm/farm.c | 188 +++++++++++++++++++++++++++----------------------- sheep/object_cache.c | 5 -- sheep/ops.c | 37 ++-------- sheep/recovery.c | 16 +---- sheep/sheep_priv.h | 6 +- 6 files changed, 119 insertions(+), 138 deletions(-) diff --git a/include/util.h b/include/util.h index 1c32954..10b30d7 100644 --- a/include/util.h +++ b/include/util.h @@ -8,6 +8,7 @@ #include "bitops.h" #include "list.h" +#include "logger.h" #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0])) #define roundup(x, y) ((((x) + ((y) - 1)) / (y)) * (y)) @@ -61,8 +62,10 @@ static inline void *zalloc(size_t size) static inline int xlockf(int fd, int cmd, off_t offset, off_t len) { - if (lseek(fd, offset, SEEK_SET) < 0) + if (lseek(fd, offset, SEEK_SET) < 0) { + eprintf("%m\n"); return -1; + } return lockf(fd, cmd, len); } diff --git a/sheep/farm/farm.c b/sheep/farm/farm.c index 7912cb7..1cd4efc 100644 --- a/sheep/farm/farm.c +++ b/sheep/farm/farm.c @@ -13,9 +13,12 @@ #include <dirent.h> #include <pthread.h> +#include <linux/limits.h> #include "farm.h" #include "sheep_priv.h" +#include "sheepdog_proto.h" +#include "sheep.h" char farm_obj_dir[PATH_MAX]; char farm_dir[PATH_MAX]; @@ -67,15 +70,75 @@ err: return ret; } -static int farm_write(uint64_t oid, struct siocb *iocb) +static int farm_exist(uint64_t oid) { - ssize_t size = xpwrite(iocb->fd, iocb->buf, iocb->length, iocb->offset); + char path[PATH_MAX]; + struct stat s; - if (size != iocb->length) - return SD_RES_EIO; + sprintf(path, "%s%016"PRIx64, obj_path, oid); + if (stat(path, &s) < 0) { + if (errno != ENOENT) + eprintf("%m\n"); + return 0; + } + + return 1; +} + +static int err_to_sderr(uint64_t oid, int err) +{ + int ret; + if (err == ENOENT) { + struct stat s; + + if (stat(obj_path, &s) < 0) { + eprintf("corrupted\n"); + ret = SD_RES_EIO; + } else { + dprintf("object %016" PRIx64 " not found locally\n", oid); + ret = SD_RES_NO_OBJ; + } + } else { + eprintf("%m\n"); + ret = SD_RES_UNKNOWN; + } + return ret; +} + +static int farm_write(uint64_t oid, struct siocb *iocb, int create) +{ + int flags = def_open_flags, fd, ret = SD_RES_SUCCESS; + char path[PATH_MAX]; + ssize_t size; + + if (is_vdi_obj(oid)) + flags &= ~O_DIRECT; + + if (create) + flags |= O_CREAT | O_TRUNC; + + sprintf(path, "%s%016"PRIx64, obj_path, oid); + fd = open(path, flags, def_fmode); + if (fd < 0) + return err_to_sderr(oid, errno); + + if (create && !(iocb->flags & SD_FLAG_CMD_COW)) { + ret = prealloc(fd, is_vdi_obj(oid) ? + SD_INODE_SIZE : SD_DATA_OBJ_SIZE); + if (ret != SD_RES_SUCCESS) + goto out; + } + size = xpwrite(fd, iocb->buf, iocb->length, iocb->offset); + if (size != iocb->length) { + eprintf("%m\n"); + ret = SD_RES_EIO; + goto out; + } trunk_update_entry(oid); - return SD_RES_SUCCESS; +out: + close(fd); + return ret; } static int write_last_sector(int fd, uint32_t length) @@ -102,26 +165,6 @@ static int write_last_sector(int fd, uint32_t length) return ret; } -static int err_to_sderr(uint64_t oid, int err) -{ - int ret; - if (err == ENOENT) { - struct stat s; - - if (stat(obj_path, &s) < 0) { - eprintf("corrupted\n"); - ret = SD_RES_EIO; - } else { - dprintf("object %016" PRIx64 " not found locally\n", oid); - ret = SD_RES_NO_OBJ; - } - } else { - eprintf("%m\n"); - ret = SD_RES_UNKNOWN; - } - return ret; -} - /* * Preallocate the whole object to get a better filesystem layout. */ @@ -139,51 +182,6 @@ int prealloc(int fd, uint32_t size) return ret; } -static int farm_open(uint64_t oid, struct siocb *iocb, int create) -{ - struct strbuf buf = STRBUF_INIT; - int ret = SD_RES_SUCCESS, fd; - int flags = def_open_flags; - - if (iocb->epoch < sys->epoch) - goto out; - - if (is_vdi_obj(oid)) - flags &= ~O_DIRECT; - - if (create) - flags |= O_CREAT | O_TRUNC; - - strbuf_addstr(&buf, obj_path); - strbuf_addf(&buf, "%016" PRIx64, oid); - fd = open(buf.buf, flags, def_fmode); - if (fd < 0) { - ret = err_to_sderr(oid, errno); - goto out; - } - iocb->fd = fd; - ret = SD_RES_SUCCESS; - if (!(iocb->flags & SD_FLAG_CMD_COW) && create) { - ret = prealloc(fd, iocb->length); - if (ret != SD_RES_SUCCESS) - close(fd); - } -out: - strbuf_release(&buf); - return ret; -} - -static int farm_close(uint64_t oid, struct siocb *iocb) -{ - if (iocb->epoch < sys->epoch) - return SD_RES_SUCCESS; - - if (close(iocb->fd) < 0) - return SD_RES_EIO; - - return SD_RES_SUCCESS; -} - static int get_trunk_sha1(uint32_t epoch, unsigned char *outsha1, int user) { int i, nr_logs = -1, ret = -1; @@ -346,29 +344,31 @@ static int farm_get_objlist(struct siocb *iocb) } -static void *read_working_object(uint64_t oid, int length) +static void *read_working_object(uint64_t oid, uint64_t offset, + uint32_t length) { void *buf = NULL; char path[PATH_MAX]; - int fd, ret; + int fd; + size_t size; snprintf(path, sizeof(path), "%s%016" PRIx64, obj_path, oid); - fd = open(path, O_RDONLY, def_fmode); + fd = open(path, def_open_flags); if (fd < 0) { dprintf("object %"PRIx64" not found\n", oid); goto out; } - buf = malloc(length); + buf = valloc(length); if (!buf) { eprintf("no memory to allocate buffer.\n"); goto out; } - ret = xread(fd, buf, length); - if (length != ret) { - eprintf("object read error.\n"); + size = xpread(fd, buf, length, offset); + if (length != size) { + eprintf("object read error. %m\n"); free(buf); buf = NULL; goto out; @@ -413,11 +413,13 @@ out: static int farm_read(uint64_t oid, struct siocb *iocb) { - int i; + int flags = def_open_flags, fd, ret = SD_RES_SUCCESS; if (iocb->epoch < sys->epoch) { + int i; void *buffer; - buffer = read_working_object(oid, iocb->length); + + buffer = read_working_object(oid, iocb->offset, iocb->length); if (!buffer) { /* Here if read the object from the targeted epoch failed, * we need to read from the later epoch, because at some epoch @@ -437,13 +439,30 @@ static int farm_read(uint64_t oid, struct siocb *iocb) return SD_RES_NO_OBJ; memcpy(iocb->buf, buffer, iocb->length); free(buffer); + + return SD_RES_SUCCESS; } else { - ssize_t size = xpread(iocb->fd, iocb->buf, iocb->length, iocb->offset); + char path[PATH_MAX]; + ssize_t size; + + if (is_vdi_obj(oid)) + flags &= ~O_DIRECT; + + sprintf(path, "%s%016"PRIx64, obj_path, oid); + fd = open(path, flags); + + if (fd < 0) + return err_to_sderr(oid, errno); - if (size != iocb->length) - return SD_RES_EIO; + size = xpread(fd, iocb->buf, iocb->length, iocb->offset); + if (size != iocb->length) { + ret = SD_RES_EIO; + goto out; + } } - return SD_RES_SUCCESS; +out: + close(fd); + return ret; } static int farm_atomic_put(uint64_t oid, struct siocb *iocb) @@ -706,10 +725,9 @@ static int farm_purge_obj(void) struct store_driver farm = { .name = "farm", .init = farm_init, - .open = farm_open, + .exist = farm_exist, .write = farm_write, .read = farm_read, - .close = farm_close, .get_objlist = farm_get_objlist, .link = farm_link, .atomic_put = farm_atomic_put, diff --git a/sheep/object_cache.c b/sheep/object_cache.c index 799cb23..3ca31b5 100644 --- a/sheep/object_cache.c +++ b/sheep/object_cache.c @@ -436,15 +436,10 @@ int object_cache_pull(struct vnode_info *vnode_info, struct object_cache *oc, if (vnode_is_local(v)) { struct siocb iocb = { 0 }; iocb.epoch = sys->epoch; - ret = sd_store->open(oid, &iocb, 0); - if (ret != SD_RES_SUCCESS) - goto pull_remote; - iocb.buf = buf; iocb.length = data_length; iocb.offset = 0; ret = sd_store->read(oid, &iocb); - sd_store->close(oid, &iocb); if (ret != SD_RES_SUCCESS) goto pull_remote; /* read succeed */ diff --git a/sheep/ops.c b/sheep/ops.c index 729fb86..a0461c9 100644 --- a/sheep/ops.c +++ b/sheep/ops.c @@ -658,17 +658,12 @@ static int read_copy_from_replica(struct request *req, uint32_t epoch, if (vnode_is_local(v)) { memset(&iocb, 0, sizeof(iocb)); iocb.epoch = epoch; - ret = sd_store->open(oid, &iocb, 0); - if (ret != SD_RES_SUCCESS) - continue; - iocb.buf = buf; iocb.length = SD_DATA_OBJ_SIZE; iocb.offset = 0; ret = sd_store->read(oid, &iocb); if (ret != SD_RES_SUCCESS) continue; - sd_store->close(oid, &iocb); goto out; } @@ -747,10 +742,6 @@ static int store_read_obj(struct request *req) memset(&iocb, 0, sizeof(iocb)); iocb.epoch = epoch; iocb.flags = hdr->flags; - ret = sd_store->open(hdr->obj.oid, &iocb, 0); - if (ret != SD_RES_SUCCESS) - return ret; - iocb.buf = req->data; iocb.length = hdr->data_length; iocb.offset = hdr->obj.offset; @@ -761,12 +752,11 @@ static int store_read_obj(struct request *req) rsp->data_length = hdr->data_length; rsp->obj.copies = sys->nr_copies; out: - sd_store->close(hdr->obj.oid, &iocb); return ret; } static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch, - void *data) + void *data, int create) { uint64_t oid = hdr->obj.oid; int ret = SD_RES_SUCCESS; @@ -786,11 +776,11 @@ static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch, strbuf_release(&buf); return SD_RES_EIO; } - ret = sd_store->write(oid, iocb); + ret = sd_store->write(oid, iocb, create); jrnl_end(jd); strbuf_release(&buf); } else - ret = sd_store->write(oid, iocb); + ret = sd_store->write(oid, iocb, create); return ret; } @@ -798,21 +788,13 @@ static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch, static int store_write_obj(struct request *req) { struct sd_req *hdr = &req->rq; - int ret; uint32_t epoch = hdr->epoch; struct siocb iocb; memset(&iocb, 0, sizeof(iocb)); iocb.epoch = epoch; iocb.flags = hdr->flags; - ret = sd_store->open(hdr->obj.oid, &iocb, 0); - if (ret != SD_RES_SUCCESS) - return ret; - - ret = do_write_obj(&iocb, hdr, epoch, req->data); - - sd_store->close(hdr->obj.oid, &iocb); - return ret; + return do_write_obj(&iocb, hdr, epoch, req->data, 0); } static int store_create_and_write_obj(struct request *req) @@ -821,10 +803,10 @@ static int store_create_and_write_obj(struct request *req) struct sd_req cow_hdr; uint32_t epoch = hdr->epoch; uint64_t oid = hdr->obj.oid; - int ret; char *buf = NULL; struct siocb iocb; unsigned data_length; + int ret = SD_RES_SUCCESS; if (is_vdi_obj(oid)) data_length = SD_INODE_SIZE; @@ -837,10 +819,6 @@ static int store_create_and_write_obj(struct request *req) iocb.epoch = epoch; iocb.flags = hdr->flags; iocb.length = data_length; - ret = sd_store->open(oid, &iocb, 1); - if (ret != SD_RES_SUCCESS) - return ret; - if (hdr->flags & SD_FLAG_CMD_COW) { dprintf("%" PRIx64 ", %" PRIx64 "\n", oid, hdr->obj.cow_oid); @@ -863,16 +841,15 @@ static int store_create_and_write_obj(struct request *req) cow_hdr.data_length = SD_DATA_OBJ_SIZE; cow_hdr.obj.offset = 0; - ret = do_write_obj(&iocb, &cow_hdr, epoch, buf); + ret = do_write_obj(&iocb, &cow_hdr, epoch, buf, 1); } else - ret = do_write_obj(&iocb, hdr, epoch, req->data); + ret = do_write_obj(&iocb, hdr, epoch, req->data, 1); if (SD_RES_SUCCESS == ret) check_and_insert_objlist_cache(oid); out: if (buf) free(buf); - sd_store->close(oid, &iocb); return ret; } diff --git a/sheep/recovery.c b/sheep/recovery.c index d73df48..e31f226 100644 --- a/sheep/recovery.c +++ b/sheep/recovery.c @@ -399,19 +399,14 @@ static void recover_object(struct work *work) { struct recovery_work *rw = container_of(work, struct recovery_work, work); uint64_t oid = rw->oids[rw->done]; - uint32_t epoch = rw->epoch; int i, copy_idx, copy_nr, ret; - struct siocb iocb = { 0 }; if (!sys->nr_copies) return; eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid); - iocb.epoch = epoch; - ret = sd_store->open(oid, &iocb, 0); - if (ret == SD_RES_SUCCESS) { - sd_store->close(oid, &iocb); + if (sd_store->exist(oid)) { dprintf("the object is already recovered\n"); return; } @@ -479,8 +474,7 @@ int is_recoverying_oid(uint64_t oid) uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT); uint64_t min_hval; struct recovery_work *rw = recovering_work; - int ret, i; - struct siocb iocb; + int i; if (oid == 0) return 0; @@ -496,12 +490,8 @@ int is_recoverying_oid(uint64_t oid) if (rw->state == RW_INIT) return 1; - memset(&iocb, 0, sizeof(iocb)); - iocb.epoch = sys->epoch; - ret = sd_store->open(oid, &iocb, 0); - if (ret == SD_RES_SUCCESS) { + if (sd_store->exist(oid)) { dprintf("the object %" PRIx64 " is already recoverd\n", oid); - sd_store->close(oid, &iocb); return 0; } diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 6f6a320..c09918a 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -155,7 +155,6 @@ struct cluster_info { }; struct siocb { - int fd; uint16_t flags; uint32_t epoch; void *buf; @@ -167,10 +166,9 @@ struct store_driver { struct list_head list; const char *name; int (*init)(char *path); - int (*open)(uint64_t oid, struct siocb *, int create); - int (*write)(uint64_t oid, struct siocb *); + int (*exist)(uint64_t oid); + int (*write)(uint64_t oid, struct siocb *, int create); int (*read)(uint64_t oid, struct siocb *); - int (*close)(uint64_t oid, struct siocb *); int (*format)(struct siocb *); /* Operations in recovery */ int (*get_objlist)(struct siocb *); -- 1.7.10.2 |