[Sheepdog] [PATCH] sheep: cache socket discriptors

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Mon Oct 25 07:12:55 CEST 2010


This patch reuses socket discriptors when accessing data objects, and
improves latency.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/sdnet.c      |   94 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 sheep/sheep_priv.h |    3 ++
 sheep/store.c      |   23 +++---------
 3 files changed, 103 insertions(+), 17 deletions(-)

diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index c8cbd87..9ad0bc7 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -13,6 +13,7 @@
 #include <unistd.h>
 #include <netinet/tcp.h>
 #include <sys/epoll.h>
+#include <fcntl.h>
 
 #include "sheep_priv.h"
 
@@ -670,3 +671,96 @@ int remove_object(struct sheepdog_node_list_entry *e,
 
 	return 0;
 }
+
+static int set_nonblocking(int fd)
+{
+	int ret;
+
+	ret = fcntl(fd, F_GETFL);
+	if (ret < 0) {
+		eprintf("can't fcntl (F_GETFL), %m\n");
+		close(fd);
+	} else {
+		ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
+		if (ret < 0)
+			eprintf("can't fcntl (O_NONBLOCK), %m\n");
+	}
+
+	return ret;
+}
+
+static int set_nodelay(int fd)
+{
+	int ret, opt;
+
+	opt = 1;
+	ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
+	return ret;
+}
+
+int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx,
+		 uint32_t epoch, int worker_idx)
+{
+	static int cached_fds[NR_WORKER_THREAD][SD_MAX_NODES];
+	static uint32_t cached_epoch = 0;
+	int i, j, fd, ret;
+	char name[INET6_ADDRSTRLEN];
+
+	if (cached_epoch == 0) {
+		/* initialize */
+		for (i = 0; i < NR_WORKER_THREAD; i++) {
+			for (j = 0; j < SD_MAX_NODES; j++)
+				cached_fds[i][j] = -1;
+		}
+		cached_epoch = epoch;
+	}
+
+	if (before(epoch, cached_epoch)) {
+		eprintf("requested epoch is smaller than the previous one, %d %d\n",
+			cached_epoch, epoch);
+		return -1;
+	}
+	if (after(epoch, cached_epoch)) {
+		for (i = 0; i < NR_WORKER_THREAD; i++) {
+			for (j = 0; j < SD_MAX_NODES; j++) {
+				if (cached_fds[i][j] >= 0)
+					close(cached_fds[i][j]);
+
+				cached_fds[i][j] = -1;
+			}
+		}
+		cached_epoch = epoch;
+	}
+
+	fd = cached_fds[worker_idx][node_idx];
+	dprintf("%d, %d\n", epoch, fd);
+
+	if (cached_epoch == epoch && fd >= 0) {
+		dprintf("use a cached fd %d\n", fd);
+		return fd;
+	}
+
+	addr_to_str(name, sizeof(name), e[node_idx].addr, 0);
+
+	fd = connect_to(name, e[node_idx].port);
+	if (fd < 0)
+		return -1;
+
+	ret = set_nonblocking(fd);
+	if (ret) {
+		eprintf("%m\n");
+		close(fd);
+		return -1;
+	}
+
+	ret = set_nodelay(fd);
+	if (ret) {
+		eprintf("%m\n");
+		close(fd);
+		return -1;
+	}
+
+	cached_fds[worker_idx][node_idx] = fd;
+
+	return fd;
+}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index c66baf4..a8af306 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -190,6 +190,9 @@ int remove_object(struct sheepdog_node_list_entry *e,
 		  int nodes, uint32_t node_version,
 		  uint64_t oid, int nr);
 
+int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx,
+		 uint32_t epoch, int worker_idx);
+
 static inline int is_myself(struct sheepdog_node_list_entry *e)
 {
 	return e->id == sys->this_node.id;
diff --git a/sheep/store.c b/sheep/store.c
index 06e35e7..6043054 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -391,11 +391,10 @@ static int read_from_other_sheeps(struct request *req, uint32_t epoch,
 
 static int store_queue_request_local(struct request *req, uint32_t epoch);
 
-static int forward_read_obj_req(struct request *req)
+static int forward_read_obj_req(struct request *req, int idx)
 {
 	int i, n, nr, fd, ret;
 	unsigned wlen, rlen;
-	char name[128];
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)hdr;
 	struct sheepdog_node_list_entry *e;
@@ -427,9 +426,7 @@ static int forward_read_obj_req(struct request *req)
 
 	n = obj_to_sheep(e, nr, oid, 0);
 
-	addr_to_str(name, sizeof(name), e[n].addr, 0);
-
-	fd = connect_to(name, e[n].port);
+	fd = get_sheep_fd(e, n, hdr->epoch, idx);
 	if (fd < 0) {
 		ret = SD_RES_NETWORK_ERROR;
 		goto out;
@@ -440,8 +437,6 @@ static int forward_read_obj_req(struct request *req)
 
 	ret = exec_req(fd, (struct sd_req *)hdr, req->data, &wlen, &rlen);
 
-	close(fd);
-
 	if (ret) /* network errors */
 		ret = SD_RES_NETWORK_ERROR;
 	else {
@@ -455,7 +450,7 @@ out:
 	return ret;
 }
 
-static int forward_write_obj_req(struct request *req)
+static int forward_write_obj_req(struct request *req, int idx)
 {
 	int i, n, nr, fd, ret;
 	unsigned wlen, rlen;
@@ -501,7 +496,7 @@ static int forward_write_obj_req(struct request *req)
 			continue;
 		}
 
-		fd = connect_to(name, e[n].port);
+		fd = get_sheep_fd(e, n, hdr->epoch, idx);
 		if (fd < 0) {
 			eprintf("failed to connect to %s:%"PRIu32"\n", name, e[n].port);
 			ret = SD_RES_NETWORK_ERROR;
@@ -582,12 +577,6 @@ again:
 
 	ret = SD_RES_SUCCESS;
 out:
-
-	for (i = 0; i < ARRAY_SIZE(pfds); i++){
-		if (pfds[i].fd >= 0)
-			close(pfds[i].fd);
-	}
-
 	hdr->flags &= ~SD_FLAG_CMD_DIRECT;
 
 	return ret;
@@ -831,9 +820,9 @@ void store_queue_request(struct work *work, int idx)
 
 	if (!(hdr->flags & SD_FLAG_CMD_DIRECT)) {
 		if (hdr->flags & SD_FLAG_CMD_WRITE)
-			ret = forward_write_obj_req(req);
+			ret = forward_write_obj_req(req, idx);
 		else
-			ret = forward_read_obj_req(req);
+			ret = forward_read_obj_req(req, idx);
 		goto out;
 	}
 
-- 
1.5.6.5





More information about the sheepdog mailing list