[Sheepdog] [PATCH 2/2] sheep: abstract out store IO interface

Liu Yuan namei.unix at gmail.com
Wed Nov 16 09:11:36 CET 2011


From: Liu Yuan <tailai.ly at taobao.com>

We need to abstract out store IO interface to adopt it to other IO store
such as the coming 'Farm' store.

The open/read/write/close is cumbersome for a common kv-store to work with,
but this interface request smallest changes to current sheep store code. It
sucks but works as a kludge.

We can refine it later to more common get/put/delete interface.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/sheep_priv.h |   11 ++++
 sheep/store.c      |  132 +++++++++++++++++++++++++++++++++------------------
 2 files changed, 96 insertions(+), 47 deletions(-)

diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index ee4deb4..85b670d 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -156,6 +156,17 @@ struct cluster_info {
 	struct work_queue *recovery_wqueue;
 };
 
+struct store_driver {
+	const char *driver_name;
+	int (*init)(char *path);
+	int (*open)(uint32_t epoch, uint64_t oid, int flags, int *ret);
+	ssize_t (*write)(uint64_t oid, int fd, void *buf, int len, off_t offset);
+	ssize_t (*read)(uint64_t oid, int fd, void *buf, int len, off_t offset);
+	int (*close)(int fd);
+};
+
+extern void register_store_driver(struct store_driver *);
+
 extern struct cluster_info *sys;
 
 int create_listen_port(int port, void *data);
diff --git a/sheep/store.c b/sheep/store.c
index 3a04c34..28719da 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -41,6 +41,73 @@ static char *config_path;
 static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;
 static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
 
+static int default_store_init(char *path)
+{
+	eprintf("Use default store driver\n");
+	return 0;
+}
+
+static int default_store_open(uint32_t epoch, uint64_t oid, int flags, int *ret)
+{
+	char path[1024];
+	int fd;
+
+	flags |= O_DSYNC | O_RDWR;
+	if (sys->use_directio && is_data_obj(oid))
+		flags |= O_DIRECT;
+
+	snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid);
+
+	fd = open(path, flags, def_fmode);
+	if (fd < 0) {
+		eprintf("failed to open %s: %m\n", path);
+		if (errno == ENOENT) {
+			struct stat s;
+
+			*ret = SD_RES_NO_OBJ;
+			if (stat(obj_path, &s) < 0) {
+				/* store directory is corrupted */
+				eprintf("corrupted\n");
+				*ret = SD_RES_EIO;
+			}
+		} else
+			*ret = SD_RES_UNKNOWN;
+	} else
+		*ret = 0;
+
+	return fd;
+}
+
+static ssize_t default_store_write(uint64_t oid, int fd, void *buf, int len, off_t offset)
+{
+	return pwrite64(fd, buf, len, offset);
+}
+
+static ssize_t default_store_read(uint64_t oid, int fd, void *buf, int len, off_t offset)
+{
+	return pread64(fd, buf, len, offset);
+}
+
+static int default_store_close(int fd)
+{
+	return close(fd);
+}
+
+static struct store_driver store = {
+	.driver_name = "default",
+	.init = default_store_init,
+	.open = default_store_open,
+	.write = default_store_write,
+	.read = default_store_read,
+	.close = default_store_close
+};
+
+void register_store_driver(struct store_driver *driver)
+{
+	store = *driver;
+	eprintf("Register %s store driver\n", store.driver_name);
+}
+
 static int obj_cmp(const void *oid1, const void *oid2)
 {
 	const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);
@@ -158,8 +225,6 @@ out:
 	return res;
 }
 
-static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret);
-
 static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
 			 unsigned *ori_rlen, void *buf, uint64_t offset)
 {
@@ -179,11 +244,11 @@ static int read_from_one(struct request *req, uint32_t epoch, uint64_t oid,
 		addr_to_str(name, sizeof(name), e[n].addr, 0);
 
 		if (is_myself(e[n].addr, e[n].port)) {
-			fd = ob_open(epoch, oid, 0, &ret);
+			fd = store.open(epoch, oid, 0, &ret);
 			if (fd < 0 || ret != 0)
 				continue;
 
-			ret = pread64(fd, buf, *ori_rlen, offset);
+			ret = store.read(oid, fd, buf, *ori_rlen, offset);
 			if (ret < 0)
 				continue;
 			*ori_rlen = ret;
@@ -430,37 +495,6 @@ out:
 	return ret;
 }
 
-static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret)
-{
-	char path[1024];
-	int fd;
-
-	flags |= O_DSYNC | O_RDWR;
-	if (sys->use_directio && is_data_obj(oid))
-		flags |= O_DIRECT;
-
-	snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path, epoch, oid);
-
-	fd = open(path, flags, def_fmode);
-	if (fd < 0) {
-		eprintf("failed to open %s: %m\n", path);
-		if (errno == ENOENT) {
-			struct stat s;
-
-			*ret = SD_RES_NO_OBJ;
-			if (stat(obj_path, &s) < 0) {
-				/* store directory is corrupted */
-				eprintf("corrupted\n");
-				*ret = SD_RES_EIO;
-			}
-		} else
-			*ret = SD_RES_UNKNOWN;
-	} else
-		*ret = 0;
-
-	return fd;
-}
-
 int update_epoch_store(uint32_t epoch)
 {
 	char new[1024];
@@ -599,11 +633,11 @@ static int store_read_obj(struct request *req, uint32_t epoch)
 	int ret = SD_RES_SUCCESS;
 	int fd = -1;
 
-	fd = ob_open(epoch, hdr->oid, 0, &ret);
+	fd = store.open(epoch, hdr->oid, 0, &ret);
 	if (fd < 0)
 		return ret;
 
-	ret = pread64(fd, req->data, hdr->data_length, hdr->offset);
+	ret = store.read(hdr->oid, fd, req->data, hdr->data_length, hdr->offset);
 	if (ret < 0) {
 		eprintf("%m\n");
 		ret = SD_RES_EIO;
@@ -643,7 +677,7 @@ static int store_write_obj_fd(int fd, struct request *req, uint32_t epoch)
 		if (ret)
 			return ret;
 	}
-	ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
+	ret = store.write(oid, fd, req->data, hdr->data_length, hdr->offset);
 	if (ret != hdr->data_length) {
 		if (errno == ENOSPC)
 			return SD_RES_NO_SPACE;
@@ -660,7 +694,7 @@ static int store_write_obj(struct request *req, uint32_t epoch)
 	int ret = SD_RES_SUCCESS;
 	int fd = -1;
 
-	fd = ob_open(epoch, hdr->oid, 0, &ret);
+	fd = store.open(epoch, hdr->oid, 0, &ret);
 	if (fd < 0)
 		return ret;
 
@@ -682,7 +716,7 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
 		return SD_RES_INVALID_PARMS;
 	}
 
-	fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
+	fd = store.open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
 	if (fd < 0)
 		return ret;
 
@@ -701,7 +735,7 @@ static int store_create_and_write_obj_cow(struct request *req, uint32_t epoch)
 		ret = SD_RES_EIO;
 		goto out;
 	}
-	ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0);
+	ret = store.write(hdr->oid, fd, buf, SD_DATA_OBJ_SIZE, 0);
 	if (ret != SD_DATA_OBJ_SIZE) {
 		if (errno == ENOSPC)
 			ret = SD_RES_NO_SPACE;
@@ -722,7 +756,7 @@ out:
 	return ret;
 }
 
-static int store_write_last_sector(int fd)
+static int store_write_last_sector(uint64_t oid, int fd)
 {
 	const int size = SECTOR_SIZE;
 	char *buf = NULL;
@@ -735,7 +769,7 @@ static int store_write_last_sector(int fd)
 	}
 
 	memset(buf, 0, size);
-	ret = pwrite64(fd, buf, size, SD_DATA_OBJ_SIZE - size);
+	ret = store.write(oid, fd, buf, size, SD_DATA_OBJ_SIZE - size);
 	free(buf);
 
 	if (ret != size) {
@@ -759,7 +793,7 @@ static int store_create_and_write_obj(struct request *req, uint32_t epoch)
 		return SD_RES_INVALID_PARMS;
 	}
 
-	fd = ob_open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
+	fd = store.open(epoch, hdr->oid, O_CREAT|O_TRUNC, &ret);
 	if (fd < 0)
 		return ret;
 
@@ -773,7 +807,7 @@ static int store_create_and_write_obj(struct request *req, uint32_t epoch)
 			goto out;
 		}
 
-		ret = store_write_last_sector(fd);
+		ret = store_write_last_sector(hdr->oid, fd);
 	}
 
 	ret = store_write_obj_fd(fd, req, epoch);
@@ -1493,7 +1527,7 @@ static void recover_one(struct work *work, int idx)
 
 	eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid);
 
-	fd = ob_open(epoch, oid, 0, &ret);
+	fd = store.open(epoch, oid, 0, &ret);
 	if (fd != -1) {
 		/* the object is already recovered */
 		close(fd);
@@ -1611,7 +1645,7 @@ int is_recoverying_oid(uint64_t oid)
 	if (rw->state == RW_INIT)
 		return 1;
 
-	fd = ob_open(sys->epoch, oid, 0, &ret);
+	fd = store.open(sys->epoch, oid, 0, &ret);
 	if (fd != -1) {
 		dprintf("the object %" PRIx64 " is already recoverd\n", oid);
 		close(fd);
@@ -2105,6 +2139,10 @@ int init_store(const char *d)
 	if (ret)
 		return ret;
 
+	ret = store.init(obj_path);
+	if (ret)
+		return ret;
+
 	return ret;
 }
 
-- 
1.7.6.1




More information about the sheepdog mailing list