[Sheepdog] [PATCH 3/3] collie: support request forwarding

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Wed Dec 23 19:25:34 CET 2009


TODO:
 - forward directory read/write operations
 - move objects when node map is changed
 - prevent forwarded requests from using too many worker threads

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/store.c           |  116 ++++++++++++++++++++++++++++++++++-----------
 collie/vdi.c             |    9 ++--
 collie/work.h            |    2 +-
 include/sheepdog_proto.h |    1 +
 4 files changed, 94 insertions(+), 34 deletions(-)

diff --git a/collie/store.c b/collie/store.c
index 7c0f6d1..b2cf355 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -29,7 +29,7 @@ static char *obj_dir;
 static char *mnt_dir;
 static char *zero_block;
 
-static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
+static int stat_sheep(uint64_t *store_size, uint32_t epoch, uint64_t *store_free)
 {
 	struct statvfs vs;
 	int ret;
@@ -43,7 +43,8 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
 	if (ret)
 		return SD_RES_EIO;
 
-	dir = opendir(obj_dir);
+	snprintf(path, sizeof(path), "%s/obj/%x", obj_dir, epoch);
+	dir = opendir(path);
 	if (!dir)
 		return SD_RES_EIO;
 
@@ -51,7 +52,8 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
 		if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
 			continue;
 
-		snprintf(path, sizeof(path), "%s/%s", obj_dir, d->d_name);
+		snprintf(path, sizeof(path), "%s/obj/%x/%s", obj_dir, epoch,
+			 d->d_name);
 
 		ret = stat(path, &s);
 		if (ret)
@@ -66,27 +68,26 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
 	return SD_RES_SUCCESS;
 }
 
-static int read_from_others(struct cluster_info *cluster, uint64_t oid,
-			    unsigned *rlen, void *buf, uint64_t offset,
+static int read_from_others(struct sheepdog_node_list_entry *e, int nr,
+			    uint64_t oid, unsigned *rlen, void *buf,
+			    uint64_t offset, uint16_t flags, uint32_t epoch,
 			    int copies)
 {
-	int i, n, nr, fd, ret;
+	int i, n, fd, ret;
 	unsigned wlen;
 	char name[128];
-	struct sheepdog_node_list_entry *e;
 	struct sd_obj_req hdr;
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
 
-	e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
 again:
-	nr = build_node_list(&cluster->node_list, e);
-
 	for (i = 0; i < copies; i++) {
 		n = obj_to_sheep(e, nr, oid, i);
 
 		snprintf(name, sizeof(name), "%d.%d.%d.%d",
 			 e[n].addr[12], e[n].addr[13],
 			 e[n].addr[14], e[n].addr[15]);
+		dprintf("read %" PRIx64 " from %s, %d, %d\n", oid, name, i,
+			copies);
 
 		fd = connect_to(name, e[n].port);
 		if (fd < 0)
@@ -96,9 +97,9 @@ again:
 		hdr.opcode = SD_OP_READ_OBJ;
 		hdr.copies = copies;
 		hdr.oid = oid;
-		hdr.epoch = cluster->epoch;
+		hdr.epoch = epoch;
 
-		hdr.flags = 0;
+		hdr.flags = flags;
 		hdr.data_length = *rlen;
 		hdr.offset = offset;
 
@@ -160,15 +161,21 @@ void store_queue_request(struct work *work, int idx)
 	uint32_t opcode = hdr->opcode;
 	uint32_t epoch = cluster->epoch;
 	uint32_t req_epoch = hdr->epoch;
+	uint32_t cow_epoch = req_epoch;
+	uint16_t cow_flags = 0;
 	struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
 	int copies;
 	unsigned rlen;
+	struct sheepdog_node_list_entry e[SD_MAX_NODES];
+	int nr;
 
 	/* use le_to_cpu */
 
-	snprintf(path, sizeof(path), "%s/%" PRIx64, obj_dir, oid);
+	snprintf(path, sizeof(path), "%s/obj/%x/%" PRIx64, obj_dir,
+		 req_epoch, oid);
 
-	dprintf("%d, %x, %s, %u, %u\n", idx, opcode, path, epoch, req_epoch);
+	dprintf("%d, %x, %s, %u, %u, %d\n", idx, opcode, path, epoch, req_epoch,
+		hdr->copies);
 
 	if (list_empty(&cluster->node_list)) {
 		/* we haven't got SD_OP_GET_NODE_LIST response yet. */
@@ -176,12 +183,13 @@ void store_queue_request(struct work *work, int idx)
 		goto out;
 	}
 
-	if (opcode != SD_OP_GET_NODE_LIST) {
+	if (opcode != SD_OP_GET_NODE_LIST && !(hdr->flags & SD_FLAG_CMD_FORWARD)) {
 		ret = check_epoch(cluster, req);
 		if (ret != SD_RES_SUCCESS)
 			goto out;
 	}
 
+retry:
 	switch (opcode) {
 	case SD_OP_CREATE_AND_WRITE_OBJ:
 	case SD_OP_WRITE_OBJ:
@@ -192,12 +200,38 @@ void store_queue_request(struct work *work, int idx)
 
 		fd = open(path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
 		if (fd < 0) {
-			if (errno == ENOENT)
-				ret = SD_RES_NO_OBJ;
-			else
+			if (errno != ENOENT) {
 				ret = SD_RES_UNKNOWN;
+				goto out;
+			}
 
-			goto out;
+			if (opcode == SD_OP_WRITE_OBJ) {
+				if (!(hdr->flags & SD_FLAG_CMD_COW)) {
+					dprintf("force cow %" PRIx64 " %d\n",
+						oid, req_epoch);
+					opcode = SD_OP_CREATE_AND_WRITE_OBJ;
+					hdr->flags |= SD_FLAG_CMD_COW;
+					hdr->cow_oid = oid;
+					cow_epoch = req_epoch - 1;
+					cow_flags = SD_FLAG_CMD_FORWARD;
+					goto retry;
+				}
+			} else {
+				dprintf("forward request %d\n", opcode);
+				nr = read_node_list(req_epoch - 1, e,
+						    SD_MAX_NODES);
+				if (nr < 0) {
+					ret = SD_RES_EIO;
+					goto out;
+				}
+				ret = read_from_others(e, nr, oid,
+						       &hdr->data_length,
+						       req->data, hdr->offset,
+						       SD_FLAG_CMD_FORWARD,
+						       req_epoch - 1,
+						       hdr->copies);
+				goto out;
+			}
 		}
 
 		if (opcode != SD_OP_CREATE_AND_WRITE_OBJ)
@@ -223,15 +257,23 @@ void store_queue_request(struct work *work, int idx)
 			goto out;
 		}
 
-		if (!is_data_obj(oid))
-			break;
-
+		rlen = SD_DATA_OBJ_SIZE;
 		if (hdr->flags & SD_FLAG_CMD_COW) {
 			dprintf("%" PRIu64 "\n", hdr->cow_oid);
 
-			rlen = SD_DATA_OBJ_SIZE;
-			ret = read_from_others(cluster, hdr->cow_oid, &rlen, buf,
-					       0, hdr->copies);
+			if (epoch == cow_epoch)
+				nr = build_node_list(&cluster->node_list, e);
+			else
+				nr = read_node_list(cow_epoch, e, SD_MAX_NODES);
+
+			if (nr < 0) {
+				ret = SD_RES_EIO;
+				goto out;
+			}
+
+			ret = read_from_others(e, nr, hdr->cow_oid, &rlen, buf,
+					       0, cow_flags, cow_epoch,
+					       hdr->copies);
 
 			if (ret) {
 				ret = 1;
@@ -239,13 +281,15 @@ void store_queue_request(struct work *work, int idx)
 			}
 		} else {
 			dprintf("%" PRIu64 "\n", oid);
-			memset(buf, 0, SD_DATA_OBJ_SIZE);
+			if (rlen > hdr->data_length)
+				rlen = hdr->data_length;
+			memset(buf, 0, rlen);
 		}
 
 		dprintf("%" PRIu64 "\n", oid);
 
-		ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0);
-		if (ret != SD_DATA_OBJ_SIZE) {
+		ret = pwrite64(fd, buf, rlen, 0);
+		if (ret != rlen) {
 			ret = SD_RES_EIO;
 			goto out;
 		}
@@ -299,7 +343,8 @@ void store_queue_request(struct work *work, int idx)
 		}
 		break;
 	case SD_OP_STAT_SHEEP:
-		ret = stat_sheep(&nrsp->store_size, &nrsp->store_free);
+		ret = stat_sheep(&nrsp->store_size, req_epoch,
+				 &nrsp->store_free);
 		break;
 	}
 out:
@@ -649,6 +694,13 @@ int init_store(char *dir)
 	if (ret && errno != EEXIST)
 		return 1;
 
+	strcpy(path, dir);
+	strcat(path, "/obj");
+	ret = mkdir(path, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP |
+		    S_IWGRP | S_IXGRP);
+	if (ret && errno != EEXIST)
+		return 1;
+
 	fp = setmntent(MOUNTED, "r");
 	if (!fp)
 		return 1;
@@ -718,5 +770,11 @@ int write_node_list(uint32_t epoch, struct sheepdog_node_list_entry *entries,
 
 	close(fd);
 
+	snprintf(path, sizeof(path), "%s/obj/%x", obj_dir, epoch);
+	ret = mkdir(path, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP |
+		    S_IWGRP | S_IXGRP);
+	if (ret && errno != EEXIST)
+		return -1;
+
 	return nr_nodes;
 }
diff --git a/collie/vdi.c b/collie/vdi.c
index 31567d0..2a19ad0 100644
--- a/collie/vdi.c
+++ b/collie/vdi.c
@@ -84,7 +84,7 @@ int add_vdi(struct cluster_info *ci, char *name, int len, uint64_t size,
 	    uint64_t *added_oid, uint64_t base_oid, uint32_t tag)
 {
 	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
-	int nr_nodes;
+	int nr_nodes, nr_reqs;
 	uint64_t oid = 0;
 	int ret;
 	int copies;
@@ -100,15 +100,16 @@ int add_vdi(struct cluster_info *ci, char *name, int len, uint64_t size,
 	/* todo */
 /* 	copies = sb->default_nr_copies; */
 	copies = 3;
-	if (copies > nr_nodes)
-		copies = nr_nodes;
+	nr_reqs = copies;
+	if (nr_reqs > nr_nodes)
+		nr_reqs = nr_nodes;
 
 	req.opcode = SD_OP_SO_NEW_VDI;
 	req.copies = copies;
 	req.tag = tag;
 
 	ret = exec_reqs(entries, nr_nodes, ci->epoch,
-			SD_DIR_OID, (struct sd_req *)&req, name, len, 0, copies);
+			SD_DIR_OID, (struct sd_req *)&req, name, len, 0, nr_reqs);
 
 	/* todo: error handling */
 
diff --git a/collie/work.h b/collie/work.h
index 81d8c56..7020991 100644
--- a/collie/work.h
+++ b/collie/work.h
@@ -1,7 +1,7 @@
 #ifndef __WORK_H__
 #define __WORK_H__
 
-#define NR_WORKER_THREAD 4
+#define NR_WORKER_THREAD 16
 
 struct work;
 
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 4bfb4e5..affc332 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -53,6 +53,7 @@
 
 #define SD_FLAG_CMD_WRITE    0x01
 #define SD_FLAG_CMD_COW      0x02
+#define SD_FLAG_CMD_FORWARD  0x04
 
 #define SD_RES_SUCCESS       0x00 /* Success */
 #define SD_RES_UNKNOWN       0x01 /* Unknown error */
-- 
1.6.5




More information about the sheepdog mailing list