[sheepdog] [PATCH UPDATE] farm: refactor core code

Liu Yuan namei.unix at gmail.com
Sun May 20 15:00:07 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

UPDATE:
 - address Kazutaka's comments. Thanks !
  - use stat() for farm_exist
  - init ret
-------------------------------------------- 8< ---

refactor strcut siocb, this makes the code more clean.

- let sd_store->read/write() use open() internally.
- remove sd_store->open()/close()
- add sd_store->exist()

But now we don't actually have concurrent requests to the same object, because
upper layer have exclude concurrent access by check_request().

We'll remove this constaint by later patch set.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 include/util.h       |    5 +-
 sheep/farm/farm.c    |  188 +++++++++++++++++++++++++++-----------------------
 sheep/object_cache.c |    5 --
 sheep/ops.c          |   37 ++--------
 sheep/recovery.c     |   16 +----
 sheep/sheep_priv.h   |    6 +-
 6 files changed, 119 insertions(+), 138 deletions(-)

diff --git a/include/util.h b/include/util.h
index 1c32954..10b30d7 100644
--- a/include/util.h
+++ b/include/util.h
@@ -8,6 +8,7 @@
 
 #include "bitops.h"
 #include "list.h"
+#include "logger.h"
 
 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
 #define roundup(x, y) ((((x) + ((y) - 1)) / (y)) * (y))
@@ -61,8 +62,10 @@ static inline void *zalloc(size_t size)
 
 static inline int xlockf(int fd, int cmd, off_t offset, off_t len)
 {
-	if (lseek(fd, offset, SEEK_SET) < 0)
+	if (lseek(fd, offset, SEEK_SET) < 0) {
+		eprintf("%m\n");
 		return -1;
+	}
 
 	return lockf(fd, cmd, len);
 }
diff --git a/sheep/farm/farm.c b/sheep/farm/farm.c
index 7912cb7..1cd4efc 100644
--- a/sheep/farm/farm.c
+++ b/sheep/farm/farm.c
@@ -13,9 +13,12 @@
 
 #include <dirent.h>
 #include <pthread.h>
+#include <linux/limits.h>
 
 #include "farm.h"
 #include "sheep_priv.h"
+#include "sheepdog_proto.h"
+#include "sheep.h"
 
 char farm_obj_dir[PATH_MAX];
 char farm_dir[PATH_MAX];
@@ -67,15 +70,75 @@ err:
 	return ret;
 }
 
-static int farm_write(uint64_t oid, struct siocb *iocb)
+static int farm_exist(uint64_t oid)
 {
-	ssize_t size = xpwrite(iocb->fd, iocb->buf, iocb->length, iocb->offset);
+	char path[PATH_MAX];
+	struct stat s;
 
-	if (size != iocb->length)
-		return SD_RES_EIO;
+	sprintf(path, "%s%016"PRIx64, obj_path, oid);
+	if (stat(path, &s) < 0) {
+		if (errno != ENOENT)
+			eprintf("%m\n");
+		return 0;
+	}
+
+	return 1;
+}
+
+static int err_to_sderr(uint64_t oid, int err)
+{
+	int ret;
+	if (err == ENOENT) {
+		struct stat s;
+
+		if (stat(obj_path, &s) < 0) {
+			eprintf("corrupted\n");
+			ret = SD_RES_EIO;
+		} else {
+			dprintf("object %016" PRIx64 " not found locally\n", oid);
+			ret = SD_RES_NO_OBJ;
+		}
+	} else {
+		eprintf("%m\n");
+		ret = SD_RES_UNKNOWN;
+	}
+	return ret;
+}
+
+static int farm_write(uint64_t oid, struct siocb *iocb, int create)
+{
+	int flags = def_open_flags, fd, ret = SD_RES_SUCCESS;
+	char path[PATH_MAX];
+	ssize_t size;
+
+	if (is_vdi_obj(oid))
+		flags &= ~O_DIRECT;
+
+	if (create)
+		flags |= O_CREAT | O_TRUNC;
+
+	sprintf(path, "%s%016"PRIx64, obj_path, oid);
+	fd = open(path, flags, def_fmode);
+	if (fd < 0)
+		return err_to_sderr(oid, errno);
+
+	if (create && !(iocb->flags & SD_FLAG_CMD_COW)) {
+		ret = prealloc(fd, is_vdi_obj(oid) ?
+			       SD_INODE_SIZE : SD_DATA_OBJ_SIZE);
+		if (ret != SD_RES_SUCCESS)
+			goto out;
+	}
+	size = xpwrite(fd, iocb->buf, iocb->length, iocb->offset);
+	if (size != iocb->length) {
+		eprintf("%m\n");
+		ret = SD_RES_EIO;
+		goto out;
+	}
 
 	trunk_update_entry(oid);
-	return SD_RES_SUCCESS;
+out:
+	close(fd);
+	return ret;
 }
 
 static int write_last_sector(int fd, uint32_t length)
@@ -102,26 +165,6 @@ static int write_last_sector(int fd, uint32_t length)
 	return ret;
 }
 
-static int err_to_sderr(uint64_t oid, int err)
-{
-	int ret;
-	if (err == ENOENT) {
-		struct stat s;
-
-		if (stat(obj_path, &s) < 0) {
-			eprintf("corrupted\n");
-			ret = SD_RES_EIO;
-		} else {
-			dprintf("object %016" PRIx64 " not found locally\n", oid);
-			ret = SD_RES_NO_OBJ;
-		}
-	} else {
-		eprintf("%m\n");
-		ret = SD_RES_UNKNOWN;
-	}
-	return ret;
-}
-
 /*
  * Preallocate the whole object to get a better filesystem layout.
  */
@@ -139,51 +182,6 @@ int prealloc(int fd, uint32_t size)
 	return ret;
 }
 
-static int farm_open(uint64_t oid, struct siocb *iocb, int create)
-{
-	struct strbuf buf = STRBUF_INIT;
-	int ret = SD_RES_SUCCESS, fd;
-	int flags = def_open_flags;
-
-	if (iocb->epoch < sys->epoch)
-		goto out;
-
-	if (is_vdi_obj(oid))
-		flags &= ~O_DIRECT;
-
-	if (create)
-		flags |= O_CREAT | O_TRUNC;
-
-	strbuf_addstr(&buf, obj_path);
-	strbuf_addf(&buf, "%016" PRIx64, oid);
-	fd = open(buf.buf, flags, def_fmode);
-	if (fd < 0) {
-		ret = err_to_sderr(oid, errno);
-		goto out;
-	}
-	iocb->fd = fd;
-	ret = SD_RES_SUCCESS;
-	if (!(iocb->flags & SD_FLAG_CMD_COW) && create) {
-		ret = prealloc(fd, iocb->length);
-		if (ret != SD_RES_SUCCESS)
-			close(fd);
-	}
-out:
-	strbuf_release(&buf);
-	return ret;
-}
-
-static int farm_close(uint64_t oid, struct siocb *iocb)
-{
-	if (iocb->epoch < sys->epoch)
-		return SD_RES_SUCCESS;
-
-	if (close(iocb->fd) < 0)
-		return SD_RES_EIO;
-
-	return SD_RES_SUCCESS;
-}
-
 static int get_trunk_sha1(uint32_t epoch, unsigned char *outsha1, int user)
 {
 	int i, nr_logs = -1, ret = -1;
@@ -346,29 +344,31 @@ static int farm_get_objlist(struct siocb *iocb)
 }
 
 
-static void *read_working_object(uint64_t oid, int length)
+static void *read_working_object(uint64_t oid, uint64_t offset,
+				 uint32_t length)
 {
 	void *buf = NULL;
 	char path[PATH_MAX];
-	int fd, ret;
+	int fd;
+	size_t size;
 
 	snprintf(path, sizeof(path), "%s%016" PRIx64, obj_path, oid);
 
-	fd = open(path, O_RDONLY, def_fmode);
+	fd = open(path, def_open_flags);
 	if (fd < 0) {
 		dprintf("object %"PRIx64" not found\n", oid);
 		goto out;
 	}
 
-	buf = malloc(length);
+	buf = valloc(length);
 	if (!buf) {
 		eprintf("no memory to allocate buffer.\n");
 		goto out;
 	}
 
-	ret = xread(fd, buf, length);
-	if (length != ret) {
-		eprintf("object read error.\n");
+	size = xpread(fd, buf, length, offset);
+	if (length != size) {
+		eprintf("object read error. %m\n");
 		free(buf);
 		buf = NULL;
 		goto out;
@@ -413,11 +413,13 @@ out:
 
 static int farm_read(uint64_t oid, struct siocb *iocb)
 {
-	int i;
+	int flags = def_open_flags, fd, ret = SD_RES_SUCCESS;
 
 	if (iocb->epoch < sys->epoch) {
+		int i;
 		void *buffer;
-		buffer = read_working_object(oid, iocb->length);
+
+		buffer = read_working_object(oid, iocb->offset, iocb->length);
 		if (!buffer) {
 			/* Here if read the object from the targeted epoch failed,
 			 * we need to read from the later epoch, because at some epoch
@@ -437,13 +439,30 @@ static int farm_read(uint64_t oid, struct siocb *iocb)
 			return SD_RES_NO_OBJ;
 		memcpy(iocb->buf, buffer, iocb->length);
 		free(buffer);
+
+		return SD_RES_SUCCESS;
 	} else {
-		ssize_t size = xpread(iocb->fd, iocb->buf, iocb->length, iocb->offset);
+		char path[PATH_MAX];
+		ssize_t size;
+
+		if (is_vdi_obj(oid))
+			flags &= ~O_DIRECT;
+
+		sprintf(path, "%s%016"PRIx64, obj_path, oid);
+		fd = open(path, flags);
+
+		if (fd < 0)
+			return err_to_sderr(oid, errno);
 
-		if (size != iocb->length)
-			return SD_RES_EIO;
+		size = xpread(fd, iocb->buf, iocb->length, iocb->offset);
+		if (size != iocb->length) {
+			ret = SD_RES_EIO;
+			goto out;
+		}
 	}
-	return SD_RES_SUCCESS;
+out:
+	close(fd);
+	return ret;
 }
 
 static int farm_atomic_put(uint64_t oid, struct siocb *iocb)
@@ -706,10 +725,9 @@ static int farm_purge_obj(void)
 struct store_driver farm = {
 	.name = "farm",
 	.init = farm_init,
-	.open = farm_open,
+	.exist = farm_exist,
 	.write = farm_write,
 	.read = farm_read,
-	.close = farm_close,
 	.get_objlist = farm_get_objlist,
 	.link = farm_link,
 	.atomic_put = farm_atomic_put,
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index 799cb23..3ca31b5 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -436,15 +436,10 @@ int object_cache_pull(struct vnode_info *vnode_info, struct object_cache *oc,
 		if (vnode_is_local(v)) {
 			struct siocb iocb = { 0 };
 			iocb.epoch = sys->epoch;
-			ret = sd_store->open(oid, &iocb, 0);
-			if (ret != SD_RES_SUCCESS)
-				goto pull_remote;
-
 			iocb.buf = buf;
 			iocb.length = data_length;
 			iocb.offset = 0;
 			ret = sd_store->read(oid, &iocb);
-			sd_store->close(oid, &iocb);
 			if (ret != SD_RES_SUCCESS)
 				goto pull_remote;
 			/* read succeed */
diff --git a/sheep/ops.c b/sheep/ops.c
index 729fb86..a0461c9 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -658,17 +658,12 @@ static int read_copy_from_replica(struct request *req, uint32_t epoch,
 		if (vnode_is_local(v)) {
 			memset(&iocb, 0, sizeof(iocb));
 			iocb.epoch = epoch;
-			ret = sd_store->open(oid, &iocb, 0);
-			if (ret != SD_RES_SUCCESS)
-				continue;
-
 			iocb.buf = buf;
 			iocb.length = SD_DATA_OBJ_SIZE;
 			iocb.offset = 0;
 			ret = sd_store->read(oid, &iocb);
 			if (ret != SD_RES_SUCCESS)
 				continue;
-			sd_store->close(oid, &iocb);
 			goto out;
 		}
 
@@ -747,10 +742,6 @@ static int store_read_obj(struct request *req)
 	memset(&iocb, 0, sizeof(iocb));
 	iocb.epoch = epoch;
 	iocb.flags = hdr->flags;
-	ret = sd_store->open(hdr->obj.oid, &iocb, 0);
-	if (ret != SD_RES_SUCCESS)
-		return ret;
-
 	iocb.buf = req->data;
 	iocb.length = hdr->data_length;
 	iocb.offset = hdr->obj.offset;
@@ -761,12 +752,11 @@ static int store_read_obj(struct request *req)
 	rsp->data_length = hdr->data_length;
 	rsp->obj.copies = sys->nr_copies;
 out:
-	sd_store->close(hdr->obj.oid, &iocb);
 	return ret;
 }
 
 static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch,
-		void *data)
+		void *data, int create)
 {
 	uint64_t oid = hdr->obj.oid;
 	int ret = SD_RES_SUCCESS;
@@ -786,11 +776,11 @@ static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch,
 			strbuf_release(&buf);
 			return SD_RES_EIO;
 		}
-		ret = sd_store->write(oid, iocb);
+		ret = sd_store->write(oid, iocb, create);
 		jrnl_end(jd);
 		strbuf_release(&buf);
 	} else
-		ret = sd_store->write(oid, iocb);
+		ret = sd_store->write(oid, iocb, create);
 
 	return ret;
 }
@@ -798,21 +788,13 @@ static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch,
 static int store_write_obj(struct request *req)
 {
 	struct sd_req *hdr = &req->rq;
-	int ret;
 	uint32_t epoch = hdr->epoch;
 	struct siocb iocb;
 
 	memset(&iocb, 0, sizeof(iocb));
 	iocb.epoch = epoch;
 	iocb.flags = hdr->flags;
-	ret = sd_store->open(hdr->obj.oid, &iocb, 0);
-	if (ret != SD_RES_SUCCESS)
-		return ret;
-
-	ret = do_write_obj(&iocb, hdr, epoch, req->data);
-
-	sd_store->close(hdr->obj.oid, &iocb);
-	return ret;
+	return do_write_obj(&iocb, hdr, epoch, req->data, 0);
 }
 
 static int store_create_and_write_obj(struct request *req)
@@ -821,10 +803,10 @@ static int store_create_and_write_obj(struct request *req)
 	struct sd_req cow_hdr;
 	uint32_t epoch = hdr->epoch;
 	uint64_t oid = hdr->obj.oid;
-	int ret;
 	char *buf = NULL;
 	struct siocb iocb;
 	unsigned data_length;
+	int ret = SD_RES_SUCCESS;
 
 	if (is_vdi_obj(oid))
 		data_length = SD_INODE_SIZE;
@@ -837,10 +819,6 @@ static int store_create_and_write_obj(struct request *req)
 	iocb.epoch = epoch;
 	iocb.flags = hdr->flags;
 	iocb.length = data_length;
-	ret = sd_store->open(oid, &iocb, 1);
-	if (ret != SD_RES_SUCCESS)
-		return ret;
-
 	if (hdr->flags & SD_FLAG_CMD_COW) {
 		dprintf("%" PRIx64 ", %" PRIx64 "\n", oid, hdr->obj.cow_oid);
 
@@ -863,16 +841,15 @@ static int store_create_and_write_obj(struct request *req)
 		cow_hdr.data_length = SD_DATA_OBJ_SIZE;
 		cow_hdr.obj.offset = 0;
 
-		ret = do_write_obj(&iocb, &cow_hdr, epoch, buf);
+		ret = do_write_obj(&iocb, &cow_hdr, epoch, buf, 1);
 	} else
-		ret = do_write_obj(&iocb, hdr, epoch, req->data);
+		ret = do_write_obj(&iocb, hdr, epoch, req->data, 1);
 
 	if (SD_RES_SUCCESS == ret)
 		check_and_insert_objlist_cache(oid);
 out:
 	if (buf)
 		free(buf);
-	sd_store->close(oid, &iocb);
 	return ret;
 }
 
diff --git a/sheep/recovery.c b/sheep/recovery.c
index d73df48..e31f226 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -399,19 +399,14 @@ static void recover_object(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work, work);
 	uint64_t oid = rw->oids[rw->done];
-	uint32_t epoch = rw->epoch;
 	int i, copy_idx, copy_nr, ret;
-	struct siocb iocb = { 0 };
 
 	if (!sys->nr_copies)
 		return;
 
 	eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, rw->count, oid);
 
-	iocb.epoch = epoch;
-	ret = sd_store->open(oid, &iocb, 0);
-	if (ret == SD_RES_SUCCESS) {
-		sd_store->close(oid, &iocb);
+	if (sd_store->exist(oid)) {
 		dprintf("the object is already recovered\n");
 		return;
 	}
@@ -479,8 +474,7 @@ int is_recoverying_oid(uint64_t oid)
 	uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT);
 	uint64_t min_hval;
 	struct recovery_work *rw = recovering_work;
-	int ret, i;
-	struct siocb iocb;
+	int i;
 
 	if (oid == 0)
 		return 0;
@@ -496,12 +490,8 @@ int is_recoverying_oid(uint64_t oid)
 	if (rw->state == RW_INIT)
 		return 1;
 
-	memset(&iocb, 0, sizeof(iocb));
-	iocb.epoch = sys->epoch;
-	ret = sd_store->open(oid, &iocb, 0);
-	if (ret == SD_RES_SUCCESS) {
+	if (sd_store->exist(oid)) {
 		dprintf("the object %" PRIx64 " is already recoverd\n", oid);
-		sd_store->close(oid, &iocb);
 		return 0;
 	}
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 6f6a320..c09918a 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -155,7 +155,6 @@ struct cluster_info {
 };
 
 struct siocb {
-	int fd;
 	uint16_t flags;
 	uint32_t epoch;
 	void *buf;
@@ -167,10 +166,9 @@ struct store_driver {
 	struct list_head list;
 	const char *name;
 	int (*init)(char *path);
-	int (*open)(uint64_t oid, struct siocb *, int create);
-	int (*write)(uint64_t oid, struct siocb *);
+	int (*exist)(uint64_t oid);
+	int (*write)(uint64_t oid, struct siocb *, int create);
 	int (*read)(uint64_t oid, struct siocb *);
-	int (*close)(uint64_t oid, struct siocb *);
 	int (*format)(struct siocb *);
 	/* Operations in recovery */
 	int (*get_objlist)(struct siocb *);
-- 
1.7.10.2




More information about the sheepdog mailing list