[Sheepdog] [PATCH v5 2/2] sheep: abstract out store IO interface

Liu Yuan namei.unix at gmail.com
Fri Nov 18 06:53:28 CET 2011


From: Liu Yuan <tailai.ly at taobao.com>

We need to abstract out store IO interface to adopt it to other IO store
such as the coming 'Farm' store.

The open/read/write/close is cumbersome for a common kv-store to work with,
but this interface request smallest changes to current sheep store code. It
sucks but works as a kludge.

Don't get me wrong that I am writing an universal interface that will work well
with different kinds of data stores, say, sql-store, non-sql store, unstructured store,
the store that is not with local backing stroage, etc.

Simply I am *not* and I am always lost to foresee the future.

This interface is stupid but simply enough that costs me smallest changes to existing code
to let Sheepdog work with current store implementation and the coming 'Farm' store.

I think those kind people who try to squeeze other useful stores into Sheepdog are at a better
position to cook a more generic interface in the future.

- Why include length, offset that many kv stores don't need at all?

Okay, we'er trying to implement huge data size, so we need these to do partial object read/write.

- Why 'int fd' instead of a void *opaque for store object handle?

I suppose file is everything in UNIX philosophy and so fd can name everything and I hate type
conversion and frown when I can't cscope what it means for one second.

And last, I am happy to see anybody prove me wrong and replace it with a more capable interface.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/Makefile.am    |    2 +-
 sheep/sheep_priv.h   |   21 ++++
 sheep/simple_store.c |  161 +++++++++++++++++++++++++++++++
 sheep/store.c        |  255 +++++++++++++++++++-------------------------------
 4 files changed, 281 insertions(+), 158 deletions(-)
 create mode 100644 sheep/simple_store.c

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index aa34712..146b421 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -25,7 +25,7 @@ INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include \
 sbin_PROGRAMS		= sheep
 
 sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
-			  cluster/local.c strbuf.c
+			  cluster/local.c strbuf.c simple_store.c
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
 endif
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index cbacab3..99924dd 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -156,6 +156,27 @@ struct cluster_info {
 	struct work_queue *recovery_wqueue;
 };
 
+struct siocb {
+	int fd;
+	uint16_t flags;
+	uint32_t epoch;
+	void *buf;
+	uint32_t length;
+	uint64_t offset;
+	size_t rw_size;
+};
+
+struct store_driver {
+	const char *driver_name;
+	int (*init)(char *path);
+	int (*open)(uint64_t oid, struct siocb *, int create);
+	int (*write)(uint64_t oid, struct siocb *);
+	int (*read)(uint64_t oid, struct siocb *);
+	int (*close)(uint64_t oid, struct siocb *);
+};
+
+extern void register_store_driver(struct store_driver *);
+
 extern struct cluster_info *sys;
 
 int create_listen_port(int port, void *data);
diff --git a/sheep/simple_store.c b/sheep/simple_store.c
new file mode 100644
index 0000000..1e3a8fc
--- /dev/null
+++ b/sheep/simple_store.c
@@ -0,0 +1,161 @@
+/*
+ * Copyright (C) 2011 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/statvfs.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "sheep_priv.h"
+#include "strbuf.h"
+#include "util.h"
+
+
+extern char *obj_path;
+
+extern mode_t def_fmode;
+
+static int def_store_flags = O_DSYNC | O_RDWR;
+
+struct store_driver store;
+
+static int simple_store_init(char *path)
+{
+	eprintf("Use simple store driver\n");
+	return 0;
+}
+
+static int store_write_last_sector(uint64_t oid, struct siocb *iocb)
+{
+	const int size = SECTOR_SIZE;
+	char *buf = NULL;
+	int ret;
+
+	buf = xzalloc(size);
+	iocb->buf = buf;
+	iocb->length = size;
+	iocb->offset = SD_DATA_OBJ_SIZE - size;
+	ret = store.write(oid, iocb);
+	free(buf);
+
+	return ret;
+}
+
+static int simple_store_open(uint64_t oid, struct siocb *iocb, int create)
+{
+	struct strbuf path = STRBUF_INIT;
+	int ret;
+	int flags = def_store_flags;
+
+	if (sys->use_directio && is_data_obj(oid))
+		flags |= O_DIRECT;
+
+	if (create)
+		flags |= O_CREAT | O_TRUNC;
+
+	strbuf_addf(&path, "%s%08u/%016" PRIx64, obj_path, iocb->epoch, oid);
+
+	ret = open(path.buf, flags, def_fmode);
+	if (ret < 0) {
+		eprintf("failed to open %s: %m\n", path.buf);
+		if (errno == ENOENT) {
+			struct stat s;
+
+			ret = SD_RES_NO_OBJ;
+			if (stat(obj_path, &s) < 0) {
+				/* store directory is corrupted */
+				eprintf("corrupted\n");
+				ret = SD_RES_EIO;
+			}
+		} else
+			ret = SD_RES_UNKNOWN;
+		goto out;
+	}
+
+	iocb->fd = ret;
+	if (!(iocb->flags & SD_FLAG_CMD_COW) && create) {
+		/*
+		 * Preallocate the whole object to get a better filesystem layout.
+		 */
+		ret = fallocate(iocb->fd, 0, 0, SD_DATA_OBJ_SIZE);
+		if (ret < 0) {
+			if (errno != ENOSYS && errno != EOPNOTSUPP) {
+				ret = SD_RES_EIO;
+				close(iocb->fd);
+				goto out;
+			}
+
+			ret = store_write_last_sector(oid, iocb);
+			if (ret) {
+				close(iocb->fd);
+				goto out;
+			}
+		}
+	}
+	if (iocb->flags & SD_FLAG_CMD_TRUNCATE) {
+		ret = ftruncate(iocb->fd, iocb->offset + iocb->length);
+		if (ret) {
+			eprintf("%m\n");
+			ret = SD_RES_EIO;
+			close(iocb->fd);
+			goto out;
+		}
+	}
+
+	ret = SD_RES_SUCCESS;
+out:
+	strbuf_release(&path);
+	return ret;
+}
+
+static int simple_store_write(uint64_t oid, struct siocb *iocb)
+{
+	iocb->rw_size = xpwrite(iocb->fd, iocb->buf, iocb->length, iocb->offset);
+	if (iocb->rw_size < 0)
+		return SD_RES_EIO;
+	return SD_RES_SUCCESS;
+}
+
+static int simple_store_read(uint64_t oid, struct siocb *iocb)
+{
+	iocb->rw_size = xpread(iocb->fd, iocb->buf, iocb->length, iocb->offset);
+	if (iocb->rw_size < 0)
+		return SD_RES_EIO;
+	return SD_RES_SUCCESS;
+}
+
+static int simple_store_close(uint64_t oid, struct siocb *iocb)
+{
+	if (close(iocb->fd) < 0)
+		return SD_RES_EIO;
+	return SD_RES_SUCCESS;
+}
+
+struct store_driver store = {
+	.driver_name = "simple",
+	.init = simple_store_init,
+	.open = simple_store_open,
+	.write = simple_store_write,
+	.read = simple_store_read,
+	.close = simple_store_close
+};
+
+void register_store_driver(struct store_driver *driver)
+{
+	store = *driver;
+	eprintf("Register %s store driver\n", store.driver_name);
+}
diff --git a/sheep/store.c b/sheep/store.c
index c875238..f794785 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -24,6 +24,8 @@
 #include <time.h>
 
 #include "sheep_priv.h"
+#include "strbuf.h"
+#include "util.h"
 
 struct sheepdog_config {
 	uint64_t ctime;
@@ -32,14 +34,16 @@ struct sheepdog_config {
 	uint8_t pad[3];
 };
 
-static char *obj_path;
+char *obj_path;
 static char *epoch_path;
 static char *mnt_path;
 static char *jrnl_path;
 static char *config_path;
 
 static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;
-static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+
+extern struct store_driver store;
 
 static int obj_cmp(const void *oid1, const void *oid2)
 {
@@ -158,17 +162,17 @@ out:
 	return res;
 }
 
-static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret);
-
 static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
 			 unsigned *ori_rlen, void *buf, uint64_t offset)
 {
-	int i, n, nr, fd, ret;
+	int i, n, nr, ret;
 	unsigned wlen, rlen;
 	char name[128];
 	struct sheepdog_vnode_list_entry *e;
 	struct sd_obj_req hdr;
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
+	struct siocb iocb;
+	int fd;
 
 	e = req->entry;
 	nr = req->nr_vnodes;
@@ -179,15 +183,21 @@ static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
 		addr_to_str(name, sizeof(name), e[n].addr, 0);
 
 		if (is_myself(e[n].addr, e[n].port)) {
-			fd = ob_open(epoch, oid, 0, &ret);
-			if (fd < 0 || ret != 0)
+			memset(&iocb, 0, sizeof(iocb));
+			iocb.epoch = epoch;
+			ret = store.open(oid, &iocb, 0);
+			if (ret != SD_RES_SUCCESS)
 				continue;
 
-			ret = pread64(fd, buf, *ori_rlen, offset);
-			if (ret < 0)
+			iocb.buf = buf;
+			iocb.length = *ori_rlen;
+			iocb.offset = offset;
+			ret = store.read(oid, &iocb);
+			if (ret != SD_RES_SUCCESS)
 				continue;
-			*ori_rlen = ret;
+			*ori_rlen = iocb.rw_size;
 			ret = 0;
+			store.close(oid, &iocb);
 			goto out;
 		}
 
@@ -429,37 +439,6 @@ out:
 	return ret;
 }
 
-static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret)
-{
-	char path[1024];
-	int fd;
-
-	flags |= O_DSYNC | O_RDWR;
-	if (sys->use_directio && is_data_obj(oid))
-		flags |= O_DIRECT;
-
-	snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid);
-
-	fd = open(path, flags, def_fmode);
-	if (fd < 0) {
-		eprintf("failed to open %s: %m\n", path);
-		if (errno == ENOENT) {
-			struct stat s;
-
-			*ret = SD_RES_NO_OBJ;
-			if (stat(obj_path, &s) < 0) {
-				/* store directory is corrupted */
-				eprintf("corrupted\n");
-				*ret = SD_RES_EIO;
-			}
-		} else
-			*ret = SD_RES_UNKNOWN;
-	} else
-		*ret = 0;
-
-	return fd;
-}
-
 int update_epoch_store(uint32_t epoch)
 {
 	char new[1024];
@@ -595,44 +574,47 @@ static int store_read_obj(struct request *req, uint32_t epoch)
 {
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
-	int ret = SD_RES_SUCCESS;
-	int fd = -1;
+	int ret;
+	struct siocb iocb;
 
-	fd = ob_open(epoch, hdr->oid, 0, &ret);
-	if (fd < 0)
+	memset(&iocb, 0, sizeof(iocb));
+	iocb.epoch = epoch;
+	iocb.flags = hdr->flags;
+	ret = store.open(hdr->oid, &iocb, 0);
+	if (ret != SD_RES_SUCCESS)
 		return ret;
 
-	ret = pread64(fd, req->data, hdr->data_length, hdr->offset);
-	if (ret < 0) {
-		eprintf("%m\n");
-		ret = SD_RES_EIO;
+	iocb.buf = req->data;
+	iocb.length = hdr->data_length;
+	iocb.offset = hdr->offset;
+	ret = store.read(hdr->oid, &iocb);
+	if (ret != SD_RES_SUCCESS)
 		goto out;
-	}
 
-	rsp->data_length = ret;
+	rsp->data_length = iocb.rw_size;
 	rsp->copies = sys->nr_sobjs;
-
-	ret = SD_RES_SUCCESS;
 out:
-	close(fd);
+	store.close(hdr->oid, &iocb);
 	return ret;
 }
 
-static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
+static inline int write_object_in_full(uint64_t oid, struct siocb *iocb)
+{
+	if (!store.write(oid, iocb) && iocb->rw_size != iocb->length)
+		return SD_RES_EIO;
+	return SD_RES_SUCCESS;
+}
+
+static int do_write_obj(struct siocb *iocb, struct request *req, uint32_t epoch)
 {
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
 	uint64_t oid = hdr->oid;
 	int ret = SD_RES_SUCCESS;
 	void *jd = NULL;
 
-	if (hdr->flags & SD_FLAG_CMD_TRUNCATE) {
-		ret = ftruncate(fd, hdr->offset + hdr->data_length);
-		if (ret) {
-			eprintf("%m\n");
-			return SD_RES_EIO;
-		}
-	}
-
+	iocb->buf = req->data;
+	iocb->length = hdr->data_length;
+	iocb->offset = hdr->offset;
 	if (is_vdi_obj(oid)) {
 		char path[1024];
 
@@ -642,57 +624,50 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
 				   hdr->offset, path, jrnl_path);
 		if (!jd)
 			return SD_RES_EIO;
-		ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
-		if (ret != hdr->data_length) {
-			if (errno == ENOSPC)
-				return SD_RES_NO_SPACE;
-			eprintf("%m\n");
-			jrnl_end(jd);
-			return SD_RES_EIO;
-		}
+		ret = write_object_in_full(oid, iocb);
 		jrnl_end(jd);
 	} else {
-		ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
-		if (ret != hdr->data_length) {
-			if (errno == ENOSPC)
-				return SD_RES_NO_SPACE;
-			eprintf("%m\n");
-			return SD_RES_EIO;
-		}
+		ret = write_object_in_full(oid, iocb);
 	}
-	return SD_RES_SUCCESS;
+	return ret;
 }
 
 static int store_write_obj(struct request *req, uint32_t epoch)
 {
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
-	int ret = SD_RES_SUCCESS;
-	int fd = -1;
+	int ret;
+	struct siocb iocb;
 
-	fd = ob_open(epoch, hdr->oid, 0, &ret);
-	if (fd < 0)
+	memset(&iocb, 0, sizeof(iocb));
+	iocb.epoch = epoch;
+	iocb.flags = hdr->flags;
+	ret = store.open(hdr->oid, &iocb, 0);
+	if (ret != SD_RES_SUCCESS)
 		return ret;
 
-	ret = store_write_obj_fd(fd, req, epoch);
+	ret = do_write_obj(&iocb, req, epoch);
 
-	close(fd);
+	store.close(hdr->oid, &iocb);
 	return ret;
 }
 
 static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
 {
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
-	int ret = SD_RES_SUCCESS;
-	int fd = -1;
+	int ret;
 	char *buf = NULL;
+	struct siocb iocb;
 
 	if (!hdr->copies) {
 		eprintf("the number of copies cannot be zero\n");
 		return SD_RES_INVALID_PARMS;
 	}
 
-	fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
-	if (fd < 0)
+	memset(&iocb, 0, sizeof(iocb));
+	iocb.epoch = epoch;
+	iocb.flags = hdr->flags;
+	ret = store.open(hdr->oid, &iocb, 1);
+	if (ret != SD_RES_SUCCESS)
 		return ret;
 
 	dprintf("%" PRIu64 ", %" PRIx64 "\n", hdr->oid, hdr->cow_oid);
@@ -710,84 +685,41 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
 		ret = SD_RES_EIO;
 		goto out;
 	}
-	ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0);
-	if (ret != SD_DATA_OBJ_SIZE) {
-		if (errno == ENOSPC)
-			ret = SD_RES_NO_SPACE;
-		else {
-			eprintf("%m\n");
-			ret = SD_RES_EIO;
-		}
+	iocb.buf = buf;
+	iocb.length = SD_DATA_OBJ_SIZE;
+	iocb.offset = 0;
+	ret = store.write(hdr->oid, &iocb);
+	if (ret != SD_RES_SUCCESS) {
 		goto out;
 	}
-	free(buf);
-	buf = NULL;
 
-	ret = store_write_obj_fd(fd, req, epoch);
+	ret = do_write_obj(&iocb, req, epoch);
 out:
-	if (buf)
-		free(buf);
-	close(fd);
-	return ret;
-}
-
-static int store_write_last_sector(int fd)
-{
-	const int size = SECTOR_SIZE;
-	char *buf = NULL;
-	int ret;
-
-	buf = valloc(size);
-	if (!buf) {
-		eprintf("failed to allocate memory\n");
-		return SD_RES_NO_MEM;
-	}
-
-	memset(buf, 0, size);
-	ret = pwrite64(fd, buf, size, SD_DATA_OBJ_SIZE - size);
 	free(buf);
-
-	if (ret != size) {
-		if (errno == ENOSPC)
-			return SD_RES_NO_SPACE;
-		eprintf("%m\n");
-		return SD_RES_EIO;
-	}
-
-	return SD_RES_SUCCESS;
+	store.close(hdr->oid, &iocb);
+	return ret;
 }
 
 static int store_create_and_write_obj(struct request *req, uint32_t epoch)
 {
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
-	int ret = SD_RES_SUCCESS;
-	int fd = -1;
+	int ret;
+	struct siocb iocb;
 
 	if (!hdr->copies) {
 		eprintf("the number of copies cannot be zero\n");
 		return SD_RES_INVALID_PARMS;
 	}
 
-	fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
-	if (fd < 0)
+	memset(&iocb, 0, sizeof(iocb));
+	iocb.epoch = epoch;
+	iocb.flags = hdr->flags;
+	ret = store.open(hdr->oid, &iocb, 1);
+	if (ret != SD_RES_SUCCESS)
 		return ret;
 
-	/*
-	 * Preallocate the whole object to get a better filesystem layout.
-	 */
-	ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE);
-	if (ret < 0) {
-		if (errno != ENOSYS && errno != EOPNOTSUPP) {
-			ret = SD_RES_EIO;
-			goto out;
-		}
-
-		ret = store_write_last_sector(fd);
-	}
-
-	ret = store_write_obj_fd(fd, req, epoch);
-out:
-	close(fd);
+	ret = do_write_obj(&iocb, req, epoch);
+	store.close(hdr->oid, &iocb);
 	return ret;
 }
 
@@ -1500,14 +1432,16 @@ static void recover_one(struct work *work, int idx)
 	int old_copies, cur_copies;
 	uint32_t epoch = rw->epoch;
 	int i, copy_idx = 0, cur_idx = -1;
-	int fd;
+	struct siocb iocb;
 
 	eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid);
 
-	fd = ob_open(epoch, oid, 0, &ret);
-	if (fd != -1) {
+	memset(&iocb, 0, sizeof(iocb));
+	iocb.epoch = epoch;
+	ret = store.open(oid, &iocb, 0);
+	if (ret == SD_RES_SUCCESS) {
 		/* the object is already recovered */
-		close(fd);
+		store.close(oid, &iocb);
 		goto out;
 	}
 
@@ -1606,7 +1540,8 @@ int is_recoverying_oid(uint64_t oid)
 	uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT);
 	uint64_t min_hval;
 	struct recovery_work *rw = recovering_work;
-	int ret, fd, i;
+	int ret, i;
+	struct siocb iocb;
 
 	if (oid == 0)
 		return 0;
@@ -1622,10 +1557,12 @@ int is_recoverying_oid(uint64_t oid)
 	if (rw->state == RW_INIT)
 		return 1;
 
-	fd = ob_open(sys->epoch, oid, 0, &ret);
-	if (fd != -1) {
+	memset(&iocb, 0, sizeof(iocb));
+	iocb.epoch = sys->epoch;
+	ret = store.open(oid, &iocb, 0);
+	if (ret == SD_RES_SUCCESS) {
 		dprintf("the object %" PRIx64 " is already recoverd\n", oid);
-		close(fd);
+		store.close(oid, &iocb);
 		return 0;
 	}
 
@@ -2116,6 +2053,10 @@ int init_store(const char *d)
 	if (ret)
 		return ret;
 
+	ret = store.init(obj_path);
+	if (ret)
+		return ret;
+
 	return ret;
 }
 
-- 
1.7.6.1




More information about the sheepdog mailing list