[Sheepdog] [PATCH] collie: add asynchronous write support

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Sat Apr 17 16:34:26 CEST 2010


This patch uses poll when collie forward write requests to multiple
nodes.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/store.c           |  211 ++++++++++++++++++++++++++++++++++++----------
 include/sheepdog_proto.h |    1 +
 2 files changed, 169 insertions(+), 43 deletions(-)

diff --git a/collie/store.c b/collie/store.c
index 72c079c..0fa711e 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -15,6 +15,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <poll.h>
 #include <sys/xattr.h>
 #include <sys/statvfs.h>
 
@@ -323,21 +324,90 @@ static int read_from_other_sheeps(uint64_t oid, char *buf, int copies)
 
 static int store_queue_request_local(struct request *req, char *buf, uint32_t epoch);
 
-static int forward_obj_req(struct request *req, char *buf)
+static int forward_read_obj_req(struct request *req, char *buf)
 {
 	int i, n, nr, fd, ret;
 	unsigned wlen, rlen;
 	char name[128];
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
+	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)hdr;
 	struct sheepdog_node_list_entry *e;
-	struct sd_obj_req hdr2;
-	struct sd_rsp *rsp = (struct sd_rsp *)&hdr2;
 	uint64_t oid = hdr->oid;
 	int copies;
-	uint32_t epoch;
 
 	e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
-again:
+	if (!e)
+		return SD_RES_NO_MEM;
+
+	nr = get_ordered_sd_node_list(e);
+
+	copies = hdr->copies;
+
+	/* temporary hack */
+	if (!copies)
+		copies = sys->nr_sobjs;
+
+	hdr->flags |= SD_FLAG_CMD_FORWARD;
+	hdr->epoch = sys->epoch;
+
+	/* TODO: we can do better; we need to check this first */
+	for (i = 0; i < copies; i++) {
+		n = obj_to_sheep(e, nr, oid, i);
+
+		if (is_myself(&e[n])) {
+			ret = store_queue_request_local(req, buf, sys->epoch);
+			goto out;
+		}
+	}
+
+	n = obj_to_sheep(e, nr, oid, 0);
+
+	addr_to_str(name, sizeof(name), e[n].addr, 0);
+
+	fd = connect_to(name, e[n].port);
+	if (fd < 0) {
+		ret = SD_RES_EIO;
+		goto out;
+	}
+
+	wlen = 0;
+	rlen = hdr->data_length;
+
+	ret = exec_req(fd, (struct sd_req *)hdr, req->data, &wlen, &rlen);
+
+	close(fd);
+
+	if (ret) /* network errors */
+		ret = SD_RES_EIO;
+	else {
+		memcpy(&req->rp, rsp, sizeof(*rsp));
+		ret = rsp->result;
+	}
+
+out:
+	free(e);
+
+	return ret;
+}
+
+static int forward_write_obj_req(struct request *req, char *buf)
+{
+	int i, n, nr, fd, ret;
+	unsigned wlen, rlen;
+	char name[128];
+	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
+	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+	struct sheepdog_node_list_entry *e;
+	uint64_t oid = hdr->oid;
+	int copies;
+	struct pollfd pfds[SD_MAX_REDUNDANCY];
+	int done, nr_fds, local = 0;
+
+	dprintf("%lx\n", oid);
+	e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
+	if (!e)
+		return SD_RES_NO_MEM;
+
 	nr = get_ordered_sd_node_list(e);
 
 	copies = hdr->copies;
@@ -346,65 +416,117 @@ again:
 	if (!copies)
 		copies = sys->nr_sobjs;
 
+	nr_fds = 0;
+	done = 0;
+	memset(pfds, 0, sizeof(pfds));
+	for (i = 0; i < ARRAY_SIZE(pfds); i++)
+		pfds[i].fd = -1;
+
+	hdr->flags |= SD_FLAG_CMD_FORWARD;
+	hdr->epoch = sys->epoch;
+
+	wlen = hdr->data_length;
+	rlen = 0;
+
 	for (i = 0; i < copies; i++) {
 		n = obj_to_sheep(e, nr, oid, i);
 
 		addr_to_str(name, sizeof(name), e[n].addr, 0);
 
-		/* TODO: we can do better; we need to chech this first */
 		if (is_myself(&e[n])) {
-			if (hdr->flags & SD_FLAG_CMD_RECOVERY)
-				epoch = hdr->tgt_epoch;
-			else
-				epoch = sys->epoch;
-			ret = store_queue_request_local(req, buf, epoch);
-			memcpy(rsp, &req->rp, sizeof(*rsp));
-			rsp->result = ret;
-			goto done;
+			local = 1;
+			continue;
 		}
 
 		fd = connect_to(name, e[n].port);
-		if (fd < 0)
-			goto again;
-
-		memcpy(&hdr2, hdr, sizeof(hdr2));
+		if (fd < 0) {
+			eprintf("failed to connect to %s:%d\n", name, e[n].port);
+			ret = SD_RES_EIO;
+			goto out;
+		}
 
-		if (hdr->flags & SD_FLAG_CMD_WRITE) {
-			wlen = hdr->data_length;
-			rlen = 0;
-		} else {
-			wlen = 0;
-			rlen = hdr->data_length;
+		ret = send_req(fd, (struct sd_req *)hdr, req->data, &wlen);
+		if (ret) { /* network errors */
+			ret = SD_RES_EIO;
+			dprintf("fail %d\n", ret);
+			goto out;
 		}
 
-		hdr2.flags |= SD_FLAG_CMD_FORWARD;
-		hdr2.epoch = sys->epoch;
+		pfds[nr_fds].fd = fd;
+		pfds[nr_fds].events = POLLIN;
+		nr_fds++;
+	}
 
-		ret = exec_req(fd, (struct sd_req *)&hdr2, req->data, &wlen, &rlen);
+	if (local) {
+		ret = store_queue_request_local(req, buf, sys->epoch);
+		rsp->result = ret;
 
-		close(fd);
+		if (nr_fds == 0) {
+			eprintf("exit %d\n", ret);
+			goto out;
+		}
+
+		if (rsp->result != SD_RES_SUCCESS) {
+			eprintf("fail %d\n", ret);
+			goto out;
+		}
+	}
 
-		if (ret) /* network errors */
+again:
+	ret = poll(pfds, nr_fds, -1);
+
+	if (ret < 0) {
+		if (errno == EINTR)
 			goto again;
 
-	done:
-		if (hdr->flags & SD_FLAG_CMD_WRITE) {
-			if (rsp->result != SD_RES_SUCCESS) {
-				free(e);
-				return rsp->result;
-			}
-		} else {
-			if (rsp->result == SD_RES_SUCCESS) {
-				memcpy(&req->rp, rsp, sizeof(req->rp));
-				free(e);
-				return SD_RES_SUCCESS;
-			}
+		ret = SD_RES_EIO;
+		goto out;
+	}
+
+	for (i = 0; i < nr_fds; i++) {
+		if (pfds[i].fd < 0)
+			continue;
+
+		if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP) {
+			ret = SD_RES_EIO;
+			goto out;
 		}
+
+		if (!(pfds[i].revents & POLLIN))
+			continue;
+
+		ret = do_read(pfds[i].fd, rsp, sizeof(*rsp));
+
+		if (ret) {
+			eprintf("failed to get a rsp, %m\n");
+			ret = SD_RES_EIO;
+			goto out;
+		}
+
+		if (rsp->result != SD_RES_SUCCESS) {
+			eprintf("fail %d\n", rsp->result);
+			ret = rsp->result;
+			goto out;
+		}
+
+		done++;
 	}
 
+	dprintf("%lx %d %d\n", oid, nr_fds, done);
+
+	if (done != nr_fds)
+		goto again;
+
+	ret = SD_RES_SUCCESS;
+out:
 	free(e);
 
-	return (hdr->flags & SD_FLAG_CMD_WRITE) ? SD_RES_SUCCESS: rsp->result;
+	for (i = 0; i < ARRAY_SIZE(pfds); i++){
+		if (pfds[i].fd >= 0)
+			close(pfds[i].fd);
+	}
+
+	return ret;
 }
 
 static int check_epoch(struct request *req)
@@ -638,7 +760,10 @@ void store_queue_request(struct work *work, int idx)
 	}
 
 	if (!(hdr->flags & SD_FLAG_CMD_FORWARD)) {
-		ret = forward_obj_req(req, buf);
+		if (hdr->flags & SD_FLAG_CMD_WRITE)
+			ret = forward_write_obj_req(req, buf);
+		else
+			ret = forward_read_obj_req(req, buf);
 		goto out;
 	}
 
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index b2c0fb8..c66f8bb 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -20,6 +20,7 @@
 #define SD_MAX_NODES 1024
 #define SD_MAX_VMS   4096
 #define SD_MAX_VDI_LEN 256
+#define SD_MAX_REDUNDANCY 8
 
 /* -> vmon */
 
-- 
1.5.6.5




More information about the sheepdog mailing list