[Sheepdog] [PATCH v3 2/2] sheep: abstract out store IO interface

Liu Yuan namei.unix at gmail.com
Thu Nov 17 09:54:27 CET 2011


From: Liu Yuan <tailai.ly at taobao.com>

We need to abstract out store IO interface to adopt it to other IO store
such as the coming 'Farm' store.

The open/read/write/close is cumbersome for a common kv-store to work with,
but this interface request smallest changes to current sheep store code. It
sucks but works as a kludge.

We can refine it later to more common get/put/delete interface.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/Makefile.am    |    2 +-
 sheep/sheep_priv.h   |   20 +++++
 sheep/simple_store.c |  176 +++++++++++++++++++++++++++++++++++++++++++
 sheep/store.c        |  204 ++++++++++++++++++++-----------------------------
 4 files changed, 280 insertions(+), 122 deletions(-)
 create mode 100644 sheep/simple_store.c

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index b7cc459..4220e47 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -24,7 +24,7 @@ INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $
 sbin_PROGRAMS		= sheep
 
 sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
-			  cluster/local.c util.c strbuf.c
+			  cluster/local.c util.c strbuf.c simple_store.c
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
 endif
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index cbacab3..ab1a164 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -156,6 +156,26 @@ struct cluster_info {
 	struct work_queue *recovery_wqueue;
 };
 
+struct store_driver {
+	const char *driver_name;
+	int (*init)(char *path);
+	int (*open)(uint64_t oid, void *priv, int create);
+	ssize_t (*write)(uint64_t oid, void *priv);
+	ssize_t (*read)(uint64_t oid, void *priv);
+	int (*close)(uint64_t oid, void *priv);
+};
+
+struct siocb {
+	int fd;
+	uint16_t flags;
+	uint32_t epoch;
+	void *buf;
+	uint32_t length;
+	uint64_t offset;
+};
+
+extern void register_store_driver(struct store_driver *);
+
 extern struct cluster_info *sys;
 
 int create_listen_port(int port, void *data);
diff --git a/sheep/simple_store.c b/sheep/simple_store.c
new file mode 100644
index 0000000..7945068
--- /dev/null
+++ b/sheep/simple_store.c
@@ -0,0 +1,176 @@
+/*
+ * Copyright (C) 2011 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/statvfs.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "sheep_priv.h"
+#include "strbuf.h"
+#include "util.h"
+
+
+extern char *obj_path;
+
+extern mode_t def_fmode;
+
+static int def_store_flags = O_DSYNC | O_RDWR;
+
+struct store_driver store;
+
+static int default_store_init(char *path)
+{
+	eprintf("Use default store driver\n");
+	return 0;
+}
+
+static int store_write_last_sector(uint64_t oid, void *opaque)
+{
+	const int size = SECTOR_SIZE;
+	char *buf = NULL;
+	int ret;
+	struct siocb *iocb = (struct siocb *)opaque;
+
+	buf = valloc(size);
+	if (!buf) {
+		eprintf("failed to allocate memory\n");
+		return SD_RES_NO_MEM;
+	}
+
+	memset(buf, 0, size);
+	iocb->buf = buf;
+	iocb->length = size;
+	iocb->offset = SD_DATA_OBJ_SIZE - size;
+	ret = store.write(oid, iocb);
+	free(buf);
+
+	if (ret != size) {
+		if (errno == ENOSPC)
+			return SD_RES_NO_SPACE;
+		eprintf("%m\n");
+		return SD_RES_EIO;
+	}
+
+	return SD_RES_SUCCESS;
+}
+
+static int default_store_open(uint64_t oid, void *priv, int create)
+{
+	struct strbuf path = STRBUF_INIT;
+	int ret, fd;
+	int flags = def_store_flags;
+	struct siocb *iocb = (struct siocb *)priv;
+
+	if (sys->use_directio && is_data_obj(oid))
+		flags |= O_DIRECT;
+
+	if (create)
+		flags |= O_CREAT | O_TRUNC;
+
+	strbuf_addf(&path, "%s%08u/%016" PRIx64, obj_path, iocb->epoch, oid);
+
+	ret = open(path.buf, flags, def_fmode);
+	if (ret < 0) {
+		eprintf("failed to open %s: %m\n", path.buf);
+		if (errno == ENOENT) {
+			struct stat s;
+
+			ret = SD_RES_NO_OBJ;
+			if (stat(obj_path, &s) < 0) {
+				/* store directory is corrupted */
+				eprintf("corrupted\n");
+				ret = SD_RES_EIO;
+			}
+		} else
+			ret = SD_RES_UNKNOWN;
+		goto out;
+	}
+
+	fd = ret;
+	if (!(iocb->flags & SD_FLAG_CMD_COW) && create) {
+		/*
+		 * Preallocate the whole object to get a better filesystem layout.
+		 */
+		ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE);
+		if (ret < 0) {
+			if (errno != ENOSYS && errno != EOPNOTSUPP) {
+				ret = SD_RES_EIO;
+				close(fd);
+				goto out;
+			}
+
+			ret = store_write_last_sector(oid, &fd);
+			if (ret)
+				goto out;
+		}
+	}
+	if (iocb->flags & SD_FLAG_CMD_TRUNCATE) {
+		ret = ftruncate(fd, iocb->offset + iocb->length);
+		if (ret) {
+			eprintf("%m\n");
+			ret = SD_RES_EIO;
+			close(fd);
+			goto out;
+		}
+	}
+
+	iocb->fd = fd;
+	ret = SD_RES_SUCCESS;
+out:
+	strbuf_release(&path);
+	return ret;
+}
+
+static ssize_t default_store_write(uint64_t oid, void *priv)
+{
+	struct siocb *iocb = (struct siocb *)priv;
+
+	return xpwrite(iocb->fd, iocb->buf, iocb->length, iocb->offset);
+}
+
+static ssize_t default_store_read(uint64_t oid, void *priv)
+{
+	struct siocb *iocb = (struct siocb *)priv;
+
+	return xpread(iocb->fd, iocb->buf, iocb->length, iocb->offset);
+}
+
+static int default_store_close(uint64_t oid, void *priv)
+{
+	struct siocb *iocb = (struct siocb *)priv;
+
+	if (close(iocb->fd) < 0)
+		return SD_RES_EIO;
+
+	return SD_RES_SUCCESS;
+}
+
+struct store_driver store = {
+	.driver_name = "default",
+	.init = default_store_init,
+	.open = default_store_open,
+	.write = default_store_write,
+	.read = default_store_read,
+	.close = default_store_close
+};
+
+void register_store_driver(struct store_driver *driver)
+{
+	store = *driver;
+	eprintf("Register %s store driver\n", store.driver_name);
+}
diff --git a/sheep/store.c b/sheep/store.c
index 5844be9..14a6076 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -24,6 +24,8 @@
 #include <time.h>
 
 #include "sheep_priv.h"
+#include "strbuf.h"
+#include "util.h"
 
 struct sheepdog_config {
 	uint64_t ctime;
@@ -39,7 +41,9 @@ static char *jrnl_path;
 static char *config_path;
 
 static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;
-static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+
+extern struct store_driver store;
 
 static int obj_cmp(const void *oid1, const void *oid2)
 {
@@ -158,17 +162,17 @@ out:
 	return res;
 }
 
-static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret);
-
 static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
 			 unsigned *ori_rlen, void *buf, uint64_t offset)
 {
-	int i, n, nr, fd, ret;
+	int i, n, nr, ret;
 	unsigned wlen, rlen;
 	char name[128];
 	struct sheepdog_vnode_list_entry *e;
 	struct sd_obj_req hdr;
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
+	struct siocb opaque;
+	int fd;
 
 	e = req->entry;
 	nr = req->nr_vnodes;
@@ -179,15 +183,21 @@ static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
 		addr_to_str(name, sizeof(name), e[n].addr, 0);
 
 		if (is_myself(e[n].addr, e[n].port)) {
-			fd = ob_open(epoch, oid, 0, &ret);
-			if (fd < 0 || ret != 0)
+			memset(&opaque, 0, sizeof(opaque));
+			opaque.epoch = epoch;
+			ret = store.open(oid, &opaque, 0);
+			if (ret != SD_RES_SUCCESS)
 				continue;
 
-			ret = pread64(fd, buf, *ori_rlen, offset);
+			opaque.buf = buf;
+			opaque.length = *ori_rlen;
+			opaque.offset = offset;
+			ret = store.read(oid, &opaque);
 			if (ret < 0)
 				continue;
 			*ori_rlen = ret;
 			ret = 0;
+			store.close(oid, &opaque);
 			goto out;
 		}
 
@@ -430,37 +440,6 @@ out:
 	return ret;
 }
 
-static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret)
-{
-	char path[1024];
-	int fd;
-
-	flags |= O_DSYNC | O_RDWR;
-	if (sys->use_directio && is_data_obj(oid))
-		flags |= O_DIRECT;
-
-	snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid);
-
-	fd = open(path, flags, def_fmode);
-	if (fd < 0) {
-		eprintf("failed to open %s: %m\n", path);
-		if (errno == ENOENT) {
-			struct stat s;
-
-			*ret = SD_RES_NO_OBJ;
-			if (stat(obj_path, &s) < 0) {
-				/* store directory is corrupted */
-				eprintf("corrupted\n");
-				*ret = SD_RES_EIO;
-			}
-		} else
-			*ret = SD_RES_UNKNOWN;
-	} else
-		*ret = 0;
-
-	return fd;
-}
-
 int update_epoch_store(uint32_t epoch)
 {
 	char new[1024];
@@ -597,13 +576,19 @@ static int store_read_obj(struct request *req, uint32_t epoch)
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
 	int ret = SD_RES_SUCCESS;
-	int fd = -1;
+	struct siocb opaque;
 
-	fd = ob_open(epoch, hdr->oid, 0, &ret);
-	if (fd < 0)
+	memset(&opaque, 0, sizeof(opaque));
+	opaque.epoch = epoch;
+	opaque.flags = hdr->flags;
+	ret = store.open(hdr->oid, &opaque, 0);
+	if (ret != SD_RES_SUCCESS)
 		return ret;
 
-	ret = pread64(fd, req->data, hdr->data_length, hdr->offset);
+	opaque.buf = req->data;
+	opaque.length = hdr->data_length;
+	opaque.offset = hdr->offset;
+	ret = store.read(hdr->oid, &opaque);
 	if (ret < 0) {
 		eprintf("%m\n");
 		ret = SD_RES_EIO;
@@ -615,25 +600,21 @@ static int store_read_obj(struct request *req, uint32_t epoch)
 
 	ret = SD_RES_SUCCESS;
 out:
-	close(fd);
+	store.close(hdr->oid, &opaque);
 	return ret;
 }
 
-static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
+static int do_write_obj(void *opaque, struct request *req, uint32_t epoch)
 {
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
 	uint64_t oid = hdr->oid;
 	int ret = SD_RES_SUCCESS;
 	void *jd = NULL;
 
-	if (hdr->flags & SD_FLAG_CMD_TRUNCATE) {
-		ret = ftruncate(fd, hdr->offset + hdr->data_length);
-		if (ret) {
-			eprintf("%m\n");
-			return SD_RES_EIO;
-		}
-	}
-
+	struct siocb *iocb = (struct siocb *)opaque;
+	iocb->buf = req->data;
+	iocb->length = hdr->data_length;
+	iocb->offset = hdr->offset;
 	if (is_vdi_obj(oid)) {
 		char path[1024];
 
@@ -643,7 +624,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
 				   hdr->offset, path, jrnl_path);
 		if (!jd)
 			return SD_RES_EIO;
-		ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
+		ret = store.write(oid, iocb);
 		if (ret != hdr->data_length) {
 			if (errno == ENOSPC)
 				return SD_RES_NO_SPACE;
@@ -653,7 +634,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
 		}
 		jrnl_end(jd);
 	} else {
-		ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
+		ret = store.write(oid, iocb);
 		if (ret != hdr->data_length) {
 			if (errno == ENOSPC)
 				return SD_RES_NO_SPACE;
@@ -667,33 +648,39 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
 static int store_write_obj(struct request *req, uint32_t epoch)
 {
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
-	int ret = SD_RES_SUCCESS;
-	int fd = -1;
+	int ret;
+	struct siocb opaque;
 
-	fd = ob_open(epoch, hdr->oid, 0, &ret);
-	if (fd < 0)
+	memset(&opaque, 0, sizeof(opaque));
+	opaque.epoch = epoch;
+	opaque.flags = hdr->flags;
+	ret = store.open(hdr->oid, &opaque, 0);
+	if (ret != SD_RES_SUCCESS)
 		return ret;
 
-	ret = store_write_obj_fd(fd, req, epoch);
+	ret = do_write_obj(&opaque, req, epoch);
 
-	close(fd);
+	store.close(hdr->oid, &opaque);
 	return ret;
 }
 
 static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
 {
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
-	int ret = SD_RES_SUCCESS;
-	int fd = -1;
+	int ret;
 	char *buf = NULL;
+	struct siocb opaque;
 
 	if (!hdr->copies) {
 		eprintf("the number of copies cannot be zero\n");
 		return SD_RES_INVALID_PARMS;
 	}
 
-	fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
-	if (fd < 0)
+	memset(&opaque, 0, sizeof(opaque));
+	opaque.epoch = epoch;
+	opaque.flags = hdr->flags;
+	ret = store.open(hdr->oid, &opaque, 1);
+	if (ret != SD_RES_SUCCESS)
 		return ret;
 
 	dprintf("%" PRIu64 ", %" PRIx64 "\n", hdr->oid, hdr->cow_oid);
@@ -711,7 +698,10 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
 		ret = SD_RES_EIO;
 		goto out;
 	}
-	ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0);
+	opaque.buf = buf;
+	opaque.length = SD_DATA_OBJ_SIZE;
+	opaque.offset = 0;
+	ret = store.write(hdr->oid, &opaque);
 	if (ret != SD_DATA_OBJ_SIZE) {
 		if (errno == ENOSPC)
 			ret = SD_RES_NO_SPACE;
@@ -724,71 +714,34 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
 	free(buf);
 	buf = NULL;
 
-	ret = store_write_obj_fd(fd, req, epoch);
+	ret = do_write_obj(&opaque, req, epoch);
 out:
 	if (buf)
 		free(buf);
-	close(fd);
+	store.close(hdr->oid, &opaque);
 	return ret;
 }
 
-static int store_write_last_sector(int fd)
-{
-	const int size = SECTOR_SIZE;
-	char *buf = NULL;
-	int ret;
-
-	buf = valloc(size);
-	if (!buf) {
-		eprintf("failed to allocate memory\n");
-		return SD_RES_NO_MEM;
-	}
-
-	memset(buf, 0, size);
-	ret = pwrite64(fd, buf, size, SD_DATA_OBJ_SIZE - size);
-	free(buf);
-
-	if (ret != size) {
-		if (errno == ENOSPC)
-			return SD_RES_NO_SPACE;
-		eprintf("%m\n");
-		return SD_RES_EIO;
-	}
-
-	return SD_RES_SUCCESS;
-}
-
 static int store_create_and_write_obj(struct request *req, uint32_t epoch)
 {
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
-	int ret = SD_RES_SUCCESS;
-	int fd = -1;
+	int ret;
+	struct siocb opaque;
 
 	if (!hdr->copies) {
 		eprintf("the number of copies cannot be zero\n");
 		return SD_RES_INVALID_PARMS;
 	}
 
-	fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
-	if (fd < 0)
+	memset(&opaque, 0, sizeof(opaque));
+	opaque.epoch = epoch;
+	opaque.flags = hdr->flags;
+	ret = store.open(hdr->oid, &opaque, 1);
+	if (ret != SD_RES_SUCCESS)
 		return ret;
 
-	/*
-	 * Preallocate the whole object to get a better filesystem layout.
-	 */
-	ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE);
-	if (ret < 0) {
-		if (errno != ENOSYS && errno != EOPNOTSUPP) {
-			ret = SD_RES_EIO;
-			goto out;
-		}
-
-		ret = store_write_last_sector(fd);
-	}
-
-	ret = store_write_obj_fd(fd, req, epoch);
-out:
-	close(fd);
+	ret = do_write_obj(&opaque, req, epoch);
+	store.close(hdr->oid, &opaque);
 	return ret;
 }
 
@@ -1502,14 +1455,16 @@ static void recover_one(struct work *work, int idx)
 	int old_copies, cur_copies;
 	uint32_t epoch = rw->epoch;
 	int i, copy_idx = 0, cur_idx = -1;
-	int fd;
+	struct siocb opaque;
 
 	eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid);
 
-	fd = ob_open(epoch, oid, 0, &ret);
-	if (fd != -1) {
+	memset(&opaque, 0, sizeof(opaque));
+	opaque.epoch = epoch;
+	ret = store.open(oid, &opaque, 0);
+	if (ret == SD_RES_SUCCESS) {
 		/* the object is already recovered */
-		close(fd);
+		store.close(oid, &opaque);
 		goto out;
 	}
 
@@ -1608,7 +1563,8 @@ int is_recoverying_oid(uint64_t oid)
 	uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT);
 	uint64_t min_hval;
 	struct recovery_work *rw = recovering_work;
-	int ret, fd, i;
+	int ret, i;
+	struct siocb opaque;
 
 	if (oid == 0)
 		return 0;
@@ -1624,10 +1580,12 @@ int is_recoverying_oid(uint64_t oid)
 	if (rw->state == RW_INIT)
 		return 1;
 
-	fd = ob_open(sys->epoch, oid, 0, &ret);
-	if (fd != -1) {
+	memset(&opaque, 0, sizeof(opaque));
+	opaque.epoch = sys->epoch;
+	ret = store.open(oid, &opaque, 0);
+	if (ret == SD_RES_SUCCESS) {
 		dprintf("the object %" PRIx64 " is already recoverd\n", oid);
-		close(fd);
+		store.close(oid, &opaque);
 		return 0;
 	}
 
@@ -2118,6 +2076,10 @@ int init_store(const char *d)
 	if (ret)
 		return ret;
 
+	ret = store.init(obj_path);
+	if (ret)
+		return ret;
+
 	return ret;
 }
 
-- 
1.7.6.1




More information about the sheepdog mailing list