[Sheepdog] [PATCH v2 2/2] sheep: abstract out store IO interface
Liu Yuan
namei.unix at gmail.com
Wed Nov 16 13:59:20 CET 2011
From: Liu Yuan <tailai.ly at taobao.com>
We need to abstract out store IO interface to adopt it to other IO store
such as the coming 'Farm' store.
The open/read/write/close is cumbersome for a common kv-store to work with,
but this interface request smallest changes to current sheep store code. It
sucks but works as a kludge.
We can refine it later to more common get/put/delete interface.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/sheep_priv.h | 11 +++
sheep/store.c | 216 +++++++++++++++++++++++++++++++++------------------
2 files changed, 151 insertions(+), 76 deletions(-)
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index ee4deb4..56b9a99 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -156,6 +156,17 @@ struct cluster_info {
struct work_queue *recovery_wqueue;
};
+struct store_driver {
+ const char *driver_name;
+ int (*init)(char *path);
+ int (*open)(uint64_t oid, void *priv, int create);
+ ssize_t (*write)(uint64_t oid, void *priv, void *buf, int len, off_t offset);
+ ssize_t (*read)(uint64_t oid, void *priv, void *buf, int len, off_t offset);
+ int (*close)(uint64_t oid, void *priv);
+};
+
+extern void register_store_driver(struct store_driver *);
+
extern struct cluster_info *sys;
int create_listen_port(int port, void *data);
diff --git a/sheep/store.c b/sheep/store.c
index 3a04c34..a6d865f 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -24,6 +24,8 @@
#include <time.h>
#include "sheep_priv.h"
+#include "strbuf.h"
+#include "util.h"
struct sheepdog_config {
uint64_t ctime;
@@ -41,6 +43,92 @@ static char *config_path;
static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;
static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+static int def_store_flags = O_DSYNC | O_RDWR;
+
+static int default_store_init(char *path)
+{
+ eprintf("Use default store driver\n");
+ return 0;
+}
+
+static int default_store_open(uint64_t oid, void *priv, int create)
+{
+ struct strbuf path = STRBUF_INIT;
+ int ret;
+ int flags = def_store_flags;
+ int epoch = *(int *)priv;
+
+ if (sys->use_directio && is_data_obj(oid))
+ flags |= O_DIRECT;
+
+ if (create)
+ flags |= O_CREAT | O_TRUNC;
+
+ strbuf_addf(&path, "%s%08u/%016" PRIx64, obj_path, epoch, oid);
+
+ ret = open(path.buf, flags, def_fmode);
+ if (ret < 0) {
+ eprintf("failed to open %s: %m\n", path.buf);
+ if (errno == ENOENT) {
+ struct stat s;
+
+ ret = SD_RES_NO_OBJ;
+ if (stat(obj_path, &s) < 0) {
+ /* store directory is corrupted */
+ eprintf("corrupted\n");
+ ret = SD_RES_EIO;
+ }
+ } else
+ ret = SD_RES_UNKNOWN;
+ goto out;
+ }
+
+ *(int *)priv = ret;
+ ret = SD_RES_SUCCESS;
+out:
+ strbuf_release(&path);
+ return ret;
+}
+
+static ssize_t default_store_write(uint64_t oid, void *priv, void *buf, int len, off_t offset)
+{
+ int fd = *(int *)priv;
+
+ return xpwrite(fd, buf, len, offset);
+}
+
+static ssize_t default_store_read(uint64_t oid, void *priv, void *buf, int len, off_t offset)
+{
+ int fd = *(int *)priv;
+
+ return xpread(fd, buf, len, offset);
+}
+
+static int default_store_close(uint64_t oid, void *priv)
+{
+ int fd = *(int *)priv;
+
+ if (close(fd) < 0)
+ return SD_RES_EIO;
+
+ return SD_RES_SUCCESS;
+}
+
+static struct store_driver store = {
+ .driver_name = "default",
+ .init = default_store_init,
+ .open = default_store_open,
+ .write = default_store_write,
+ .read = default_store_read,
+ .close = default_store_close
+};
+
+void register_store_driver(struct store_driver *driver)
+{
+ store = *driver;
+ eprintf("Register %s store driver\n", store.driver_name);
+}
+
static int obj_cmp(const void *oid1, const void *oid2)
{
const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);
@@ -158,17 +246,17 @@ out:
return res;
}
-static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret);
-
static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
unsigned *ori_rlen, void *buf, uint64_t offset)
{
- int i, n, nr, fd, ret;
+ int i, n, nr, ret;
unsigned wlen, rlen;
char name[128];
struct sheepdog_vnode_list_entry *e;
struct sd_obj_req hdr;
struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
+ int opaque = epoch;
+ int fd;
e = req->entry;
nr = req->nr_vnodes;
@@ -179,15 +267,16 @@ static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
addr_to_str(name, sizeof(name), e[n].addr, 0);
if (is_myself(e[n].addr, e[n].port)) {
- fd = ob_open(epoch, oid, 0, &ret);
- if (fd < 0 || ret != 0)
+ ret = store.open(oid, &opaque, 0);
+ if (ret != SD_RES_SUCCESS)
continue;
- ret = pread64(fd, buf, *ori_rlen, offset);
+ ret = store.read(oid, &opaque, buf, *ori_rlen, offset);
if (ret < 0)
continue;
*ori_rlen = ret;
ret = 0;
+ store.close(oid, &opaque);
goto out;
}
@@ -430,37 +519,6 @@ out:
return ret;
}
-static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret)
-{
- char path[1024];
- int fd;
-
- flags |= O_DSYNC | O_RDWR;
- if (sys->use_directio && is_data_obj(oid))
- flags |= O_DIRECT;
-
- snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid);
-
- fd = open(path, flags, def_fmode);
- if (fd < 0) {
- eprintf("failed to open %s: %m\n", path);
- if (errno == ENOENT) {
- struct stat s;
-
- *ret = SD_RES_NO_OBJ;
- if (stat(obj_path, &s) < 0) {
- /* store directory is corrupted */
- eprintf("corrupted\n");
- *ret = SD_RES_EIO;
- }
- } else
- *ret = SD_RES_UNKNOWN;
- } else
- *ret = 0;
-
- return fd;
-}
-
int update_epoch_store(uint32_t epoch)
{
char new[1024];
@@ -597,13 +655,13 @@ static int store_read_obj(struct request *req, uint32_t epoch)
struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
int ret = SD_RES_SUCCESS;
- int fd = -1;
+ int opaque = epoch;
- fd = ob_open(epoch, hdr->oid, 0, &ret);
- if (fd < 0)
+ ret = store.open(hdr->oid, &opaque, 0);
+ if (ret != SD_RES_SUCCESS)
return ret;
- ret = pread64(fd, req->data, hdr->data_length, hdr->offset);
+ ret = store.read(hdr->oid, &opaque, req->data, hdr->data_length, hdr->offset);
if (ret < 0) {
eprintf("%m\n");
ret = SD_RES_EIO;
@@ -615,15 +673,16 @@ static int store_read_obj(struct request *req, uint32_t epoch)
ret = SD_RES_SUCCESS;
out:
- close(fd);
+ store.close(hdr->oid, &opaque);
return ret;
}
-static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
+static int store_write_obj_fd(void *opaque, struct request *req, uint32_t epoch)
{
struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
uint64_t oid = hdr->oid;
int ret = SD_RES_SUCCESS;
+ int fd = *(int *)opaque;
if (hdr->flags & SD_FLAG_CMD_TRUNCATE) {
ret = ftruncate(fd, hdr->offset + hdr->data_length);
@@ -643,7 +702,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
if (ret)
return ret;
}
- ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
+ ret = store.write(oid, opaque, req->data, hdr->data_length, hdr->offset);
if (ret != hdr->data_length) {
if (errno == ENOSPC)
return SD_RES_NO_SPACE;
@@ -657,24 +716,24 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
static int store_write_obj(struct request *req, uint32_t epoch)
{
struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
- int ret = SD_RES_SUCCESS;
- int fd = -1;
+ int ret;
+ int opaque = epoch;
- fd = ob_open(epoch, hdr->oid, 0, &ret);
- if (fd < 0)
+ ret = store.open(hdr->oid, &opaque, 0);
+ if (ret != SD_RES_SUCCESS)
return ret;
- ret = store_write_obj_fd(fd, req, epoch);
+ ret = store_write_obj_fd(&opaque, req, epoch);
- close(fd);
+ store.close(hdr->oid, &opaque);
return ret;
}
static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
{
struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
- int ret = SD_RES_SUCCESS;
- int fd = -1;
+ int ret;
+ int opaque = epoch;
char *buf = NULL;
if (!hdr->copies) {
@@ -682,8 +741,8 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
return SD_RES_INVALID_PARMS;
}
- fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
- if (fd < 0)
+ ret = store.open(hdr->oid, &opaque, 1);
+ if (ret != SD_RES_SUCCESS)
return ret;
dprintf("%" PRIu64 ", %" PRIx64 "\n", hdr->oid, hdr->cow_oid);
@@ -701,7 +760,7 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
ret = SD_RES_EIO;
goto out;
}
- ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0);
+ ret = store.write(hdr->oid, &opaque, buf, SD_DATA_OBJ_SIZE, 0);
if (ret != SD_DATA_OBJ_SIZE) {
if (errno == ENOSPC)
ret = SD_RES_NO_SPACE;
@@ -714,15 +773,15 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
free(buf);
buf = NULL;
- ret = store_write_obj_fd(fd, req, epoch);
+ ret = store_write_obj_fd(&opaque, req, epoch);
out:
if (buf)
free(buf);
- close(fd);
+ store.close(hdr->oid, &opaque);
return ret;
}
-static int store_write_last_sector(int fd)
+static int store_write_last_sector(uint64_t oid, void *opaque)
{
const int size = SECTOR_SIZE;
char *buf = NULL;
@@ -735,7 +794,7 @@ static int store_write_last_sector(int fd)
}
memset(buf, 0, size);
- ret = pwrite64(fd, buf, size, SD_DATA_OBJ_SIZE - size);
+ ret = store.write(oid, opaque, buf, size, SD_DATA_OBJ_SIZE - size);
free(buf);
if (ret != size) {
@@ -751,34 +810,34 @@ static int store_write_last_sector(int fd)
static int store_create_and_write_obj(struct request *req, uint32_t epoch)
{
struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
- int ret = SD_RES_SUCCESS;
- int fd = -1;
+ int ret;
+ int opaque = epoch;
if (!hdr->copies) {
eprintf("the number of copies cannot be zero\n");
return SD_RES_INVALID_PARMS;
}
- fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
- if (fd < 0)
+ ret = store.open(hdr->oid, &opaque, 1);
+ if (ret != SD_RES_SUCCESS)
return ret;
/*
* Preallocate the whole object to get a better filesystem layout.
*/
- ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE);
+ ret = fallocate(opaque, 0, 0, SD_DATA_OBJ_SIZE);
if (ret < 0) {
if (errno != ENOSYS && errno != EOPNOTSUPP) {
ret = SD_RES_EIO;
goto out;
}
- ret = store_write_last_sector(fd);
+ ret = store_write_last_sector(hdr->oid, &opaque);
}
-
- ret = store_write_obj_fd(fd, req, epoch);
+
+ ret = store_write_obj_fd(&opaque, req, epoch);
out:
- close(fd);
+ store.close(hdr->oid, &opaque);
return ret;
}
@@ -1489,14 +1548,14 @@ static void recover_one(struct work *work, int idx)
int old_copies, cur_copies;
uint32_t epoch = rw->epoch;
int i, copy_idx = 0, cur_idx = -1;
- int fd;
+ int opaque = epoch;
eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid);
- fd = ob_open(epoch, oid, 0, &ret);
- if (fd != -1) {
+ ret = store.open(oid, &opaque, 0);
+ if (ret == SD_RES_SUCCESS) {
/* the object is already recovered */
- close(fd);
+ store.close(oid, &opaque);
goto out;
}
@@ -1595,7 +1654,8 @@ int is_recoverying_oid(uint64_t oid)
uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT);
uint64_t min_hval;
struct recovery_work *rw = recovering_work;
- int ret, fd, i;
+ int ret, i;
+ int opaque = sys->epoch;
if (oid == 0)
return 0;
@@ -1611,10 +1671,10 @@ int is_recoverying_oid(uint64_t oid)
if (rw->state == RW_INIT)
return 1;
- fd = ob_open(sys->epoch, oid, 0, &ret);
- if (fd != -1) {
+ ret = store.open(oid, &opaque, 0);
+ if (ret == SD_RES_SUCCESS) {
dprintf("the object %" PRIx64 " is already recoverd\n", oid);
- close(fd);
+ store.close(oid, &opaque);
return 0;
}
@@ -2105,6 +2165,10 @@ int init_store(const char *d)
if (ret)
return ret;
+ ret = store.init(obj_path);
+ if (ret)
+ return ret;
+
return ret;
}
--
1.7.6.1
More information about the sheepdog
mailing list