[Sheepdog] [PATCH 3/4] collie: reuse socket discriptors

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Thu Feb 11 18:17:09 CET 2010


We cannot reuse local ports for new connections just after we closed connections,
so collie exhausts the local ports under the heavy load.

This patch fix the problem.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/store.c |   61 ++++++++--------------------
 include/net.h  |    1 +
 lib/net.c      |  120 +++++++++++++++++++++++++++++++++++++++++++-------------
 3 files changed, 111 insertions(+), 71 deletions(-)

diff --git a/collie/store.c b/collie/store.c
index 842271c..f37ce8a 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -140,7 +140,6 @@ static int read_from_one(struct cluster_info *cluster, uint64_t oid,
 {
 	int i, n, nr, fd, ret;
 	unsigned wlen, rlen;
-	char name[128];
 	struct sheepdog_node_list_entry *e;
 	struct sd_obj_req hdr;
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
@@ -152,15 +151,11 @@ again:
 	for (i = 0; i < nr; i++) {
 		n = obj_to_sheep(e, nr, oid, i);
 
-		snprintf(name, sizeof(name), "%d.%d.%d.%d",
-			 e[n].addr[12], e[n].addr[13],
-			 e[n].addr[14], e[n].addr[15]);
-
 		/* FIXME: do like store_queue_request_local() */
 		if (e[n].id == cluster->this_node.id)
 			continue;
 
-		fd = connect_to(name, e[n].port);
+		fd = get_collie_fd(e, n, cluster->epoch);
 		if (fd < 0)
 			continue;
 
@@ -177,8 +172,6 @@ again:
 
 		ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
 
-		close(fd);
-
 		if (ret)
 			continue;
 
@@ -223,7 +216,6 @@ static int forward_obj_req(struct cluster_info *cluster, struct request *req,
 {
 	int i, n, nr, fd, ret;
 	unsigned wlen, rlen;
-	char name[128];
 	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
 	struct sheepdog_node_list_entry *e;
 	struct sd_obj_req hdr2;
@@ -244,10 +236,6 @@ again:
 	for (i = 0; i < copies; i++) {
 		n = obj_to_sheep(e, nr, oid, i);
 
-		snprintf(name, sizeof(name), "%d.%d.%d.%d",
-			 e[n].addr[12], e[n].addr[13],
-			 e[n].addr[14], e[n].addr[15]);
-
 		/* TODO: we can do better; we need to chech this first */
 		if (e[n].id == cluster->this_node.id) {
 			ret = store_queue_request_local(cluster, req, buf, cluster->epoch);
@@ -256,7 +244,7 @@ again:
 			goto done;
 		}
 
-		fd = connect_to(name, e[n].port);
+		fd = get_collie_fd(e, n, cluster->epoch);
 		if (fd < 0)
 			continue;
 
@@ -275,8 +263,6 @@ again:
 
 		ret = exec_req(fd, (struct sd_req *)&hdr2, req->data, &wlen, &rlen);
 
-		close(fd);
-
 		if (ret) /* network errors */
 			goto again;
 
@@ -771,19 +757,16 @@ void so_queue_request(struct work *work, int idx)
 
 		if (!local) {
 			struct sd_so_req hdr2;
-			char name[128];
 			int fd;
 			unsigned wlen, rlen;
 
 			n = obj_to_sheep(e, nr, SD_DIR_OID, 0);
 
-			snprintf(name, sizeof(name), "%d.%d.%d.%d",
-				 e[n].addr[12], e[n].addr[13],
-				 e[n].addr[14], e[n].addr[15]);
+			eprintf("%d.%d.%d.%d %d\n",
+				e[n].addr[12], e[n].addr[13],
+				e[n].addr[14], e[n].addr[15], e[n].port);
 
-			eprintf("%s %d\n", name, e[n].port);
-
-			fd = connect_to(name, e[n].port);
+			fd = get_collie_fd(e, n, cluster->epoch);
 			if (fd < 0) {
 				rsp->result = SD_RES_EIO;
 				goto out;
@@ -802,8 +785,6 @@ void so_queue_request(struct work *work, int idx)
 			ret = exec_req(fd, (struct sd_req *)&hdr2,
 				       req->data, &wlen, &rlen);
 
-			close(fd);
-
 			rsp->result = ((struct sd_rsp *)&hdr2)->result;
 			rsp->data_length = ((struct sd_rsp *)&hdr2)->data_length;
 
@@ -1081,7 +1062,6 @@ static void recover_one(struct work *work, int idx)
 	struct sheepdog_node_list_entry *e = &rw->e;
 	struct sd_obj_req hdr;
 	struct sd_obj_rsp *rsp;
-	char name[128];
 	char *buf = zero_block + idx * SD_DATA_OBJ_SIZE;
 	unsigned wlen = 0, rlen = SD_DATA_OBJ_SIZE;
 	int fd, ret;
@@ -1089,13 +1069,11 @@ static void recover_one(struct work *work, int idx)
 
 	eprintf("%d %d, %16lx\n", rw->done, rw->count, oid);
 
-	snprintf(name, sizeof(name), "%d.%d.%d.%d",
-		 e->addr[12], e->addr[13],
-		 e->addr[14], e->addr[15]);
-
-	fd = connect_to(name, e->port);
+	fd = get_collie_fd(e, 0, 0);
 	if (fd < 0) {
-		eprintf("%s %d\n", name, e->port);
+		eprintf("%d.%d.%d.%d %d\n",
+			e->addr[12], e->addr[13],
+			e->addr[14], e->addr[15], e->port);
 		return;
 	}
 
@@ -1108,8 +1086,6 @@ static void recover_one(struct work *work, int idx)
 
 	ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
 
-	close(fd);
-
 	rsp = (struct sd_obj_rsp *)&hdr;
 
 	if (rsp->result != SD_RES_SUCCESS) {
@@ -1173,19 +1149,18 @@ static int fill_obj_list(struct recovery_work *rw,
 	int fd, ret;
 	uint32_t epoch = rw->epoch;
 	unsigned wlen, rlen;
-	char name[128];
 	struct sd_obj_req hdr;
 	struct sd_obj_rsp *rsp;
 
-	snprintf(name, sizeof(name), "%d.%d.%d.%d",
-		 e->addr[12], e->addr[13],
-		 e->addr[14], e->addr[15]);
+	dprintf("%d.%d.%d.%d %d\n",
+		e->addr[12], e->addr[13],
+		e->addr[14], e->addr[15], e->port);
 
-	dprintf("%s %d\n", name, e->port);
-
-	fd = connect_to(name, e->port);
+	fd = get_collie_fd(e, 0, 0);
 	if (fd < 0) {
-		eprintf("%s %d\n", name, e->port);
+		eprintf("%d.%d.%d.%d %d\n",
+			e->addr[12], e->addr[13],
+			e->addr[14], e->addr[15], e->port);
 		return -1;
 	}
 
@@ -1208,8 +1183,6 @@ static int fill_obj_list(struct recovery_work *rw,
 
 	ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf, &wlen, &rlen);
 
-	close(fd);
-
 	rsp = (struct sd_obj_rsp *)&hdr;
 
 	if (rsp->result != SD_RES_SUCCESS) {
diff --git a/include/net.h b/include/net.h
index 7bf0dbb..4175d03 100644
--- a/include/net.h
+++ b/include/net.h
@@ -32,6 +32,7 @@ int do_read(int sockfd, void *buf, int len);
 int rx(struct connection *conn, enum conn_state next_state);
 int tx(struct connection *conn, enum conn_state next_state, int flags);
 int connect_to(char *name, int port);
+int get_collie_fd(struct sheepdog_node_list_entry *e, int idx, uint32_t epoch);
 int send_req(int sockfd, struct sd_req *hdr, void *data, unsigned int *wlen);
 int exec_req(int sockfd, struct sd_req *hdr, void *data,
 	     unsigned int *wlen, unsigned int *rlen);
diff --git a/lib/net.c b/lib/net.c
index f06ae05..248f555 100644
--- a/lib/net.c
+++ b/lib/net.c
@@ -221,6 +221,90 @@ success:
 	return fd;
 }
 
+static int set_nonblocking(int fd)
+{
+	int ret;
+
+	ret = fcntl(fd, F_GETFL);
+	if (ret < 0) {
+		eprintf("can't fcntl (F_GETFL), %m\n");
+		close(fd);
+	} else {
+		ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
+		if (ret < 0)
+			eprintf("can't fcntl (O_NONBLOCK), %m\n");
+	}
+
+	return ret;
+}
+
+static int set_nodelay(int fd)
+{
+	int ret, opt;
+
+	opt = 1;
+	ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
+	return ret;
+}
+
+int get_collie_fd(struct sheepdog_node_list_entry *e, int idx, uint32_t epoch)
+{
+	static int cached_fds[SD_MAX_NODES];
+	static uint32_t cached_epoch = 0;
+	int i, fd, ret;
+	char name[INET6_ADDRSTRLEN];
+
+	if (cached_epoch == 0) {
+		for (i = 0; i < ARRAY_SIZE(cached_fds); i++)
+			cached_fds[i] = -1;
+		cached_epoch = 1;
+	}
+
+	dprintf("%d, %d, %d\n", cached_epoch, epoch, cached_fds[idx]);
+	if (cached_epoch < epoch) {
+		for (i = 0; i < ARRAY_SIZE(cached_fds); i++) {
+			if (cached_fds[i] >= 0) {
+				close(cached_fds[i]);
+			}
+			cached_fds[i] = -1;
+		}
+		cached_epoch = epoch;
+	}
+
+	if (cached_epoch == epoch && cached_fds[idx] >= 0) {
+		dprintf("use a cached fd %d\n", cached_fds[idx]);
+		return cached_fds[idx];
+	}
+
+	snprintf(name, sizeof(name), "%d.%d.%d.%d",
+		 e[idx].addr[12], e[idx].addr[13],
+		 e[idx].addr[14], e[idx].addr[15]);
+
+	fd = connect_to(name, e[idx].port);
+	if (fd < 0)
+		return -1;
+
+	ret = set_nonblocking(fd);
+	if (ret) {
+		eprintf("%m\n");
+		close(fd);
+		return -1;
+	}
+
+	ret = set_nodelay(fd);
+	if (ret) {
+		eprintf("%m\n");
+		close(fd);
+		return -1;
+	}
+
+	/* cache only current epoch fds */
+	if (cached_epoch == epoch)
+		cached_fds[idx] = fd;
+
+	return fd;
+}
+
 int do_read(int sockfd, void *buf, int len)
 {
 	int ret;
@@ -345,20 +429,17 @@ int write_object(struct sheepdog_node_list_entry *e,
 	struct sd_obj_req hdr;
 	int i, n, fd, ret, success = 0;
 	uint16_t vosts[3];
-	char name[128];
 
 	for (i = 0; i < 1; i++) {
 		unsigned rlen = 0, wlen = datalen;
 
 		n = obj_to_sheep(e, nodes, oid, i);
 
-		snprintf(name, sizeof(name), "%d.%d.%d.%d",
-			 e[n].addr[12], e[n].addr[13],
-			 e[n].addr[14], e[n].addr[15]);
-
-		fd = connect_to(name, e[n].port);
+		fd = get_collie_fd(e, n, node_version);
 		if (fd < 0) {
-			eprintf("can't connect to vost %u, %s\n", vosts[i], name);
+			eprintf("can't connect to vost %u, %d.%d.%d.%d\n",
+				vosts[i], e[n].addr[12], e[n].addr[13],
+				e[n].addr[14], e[n].addr[15]);
 			continue;
 		}
 
@@ -377,9 +458,10 @@ int write_object(struct sheepdog_node_list_entry *e,
 		hdr.offset = offset;
 
 		ret = exec_req(fd, (struct sd_req *)&hdr, data, &wlen, &rlen);
-		close(fd);
 		if (ret)
-			eprintf("can't update vost %u, %s\n", vosts[i], name);
+			eprintf("can't update vost %u, %d.%d.%d.%d\n",
+				vosts[i], e[n].addr[12], e[n].addr[13],
+				e[n].addr[14], e[n].addr[15]);
 		else
 			success++;
 	}
@@ -394,7 +476,6 @@ int read_object(struct sheepdog_node_list_entry *e,
 {
 	struct sd_obj_req hdr;
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
-	char name[128];
 	int i = 0, n, fd, ret;
 
 	for (i = 0; i < nr; i++) {
@@ -402,13 +483,7 @@ int read_object(struct sheepdog_node_list_entry *e,
 
 		n = obj_to_sheep(e, nodes, oid, i);
 
-		snprintf(name, sizeof(name), "%d.%d.%d.%d",
-			 e[n].addr[12],
-			 e[n].addr[13],
-			 e[n].addr[14],
-			 e[n].addr[15]);
-
-		fd = connect_to(name, e[n].port);
+		fd = get_collie_fd(e, n, node_version);
 		if (fd < 0)
 			return -1;
 
@@ -422,7 +497,6 @@ int read_object(struct sheepdog_node_list_entry *e,
 		hdr.offset = offset;
 
 		ret = exec_req(fd, (struct sd_req *)&hdr, data, &wlen, &rlen);
-		close(fd);
 
 		if (!ret) {
 			if (rsp->result == SD_RES_SUCCESS)
@@ -439,7 +513,6 @@ int exec_reqs(struct sheepdog_node_list_entry *e,
 	      char *data, unsigned int wdatalen, unsigned int rdatalen, int nr,
 	      int quorum)
 {
-	char name[128];
 	int i = 0, n, fd, ret;
 	int success = 0;
 	struct sd_req tmp;
@@ -452,13 +525,7 @@ int exec_reqs(struct sheepdog_node_list_entry *e,
 
 		n = obj_to_sheep(e, nodes, oid, i);
 
-		snprintf(name, sizeof(name), "%d.%d.%d.%d",
-			 e[n].addr[12],
-			 e[n].addr[13],
-			 e[n].addr[14],
-			 e[n].addr[15]);
-
-		fd = connect_to(name, e[n].port);
+		fd = get_collie_fd(e, n, node_version);
 		if (fd < 0) {
 			((struct sd_rsp *) hdr)->result = SD_RES_EIO;
 			return -1;
@@ -476,7 +543,6 @@ int exec_reqs(struct sheepdog_node_list_entry *e,
 
 		memcpy(&tmp, hdr, sizeof(tmp));
 		ret = exec_req(fd, &tmp, data, &wlen, &rlen);
-		close(fd);
 
 		rsp = (struct sd_rsp *)&tmp;
 
-- 
1.5.6.5




More information about the sheepdog mailing list