[sheepdog] [PATCH 2/6] sheep: use new sockfd cache

Liu Yuan namei.unix at gmail.com
Sun Jun 24 14:51:49 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/gateway.c |   55 ++++++++++++++++++++++++++++++++-----------------------
 sheep/group.c   |    9 ++++++++-
 2 files changed, 40 insertions(+), 24 deletions(-)

diff --git a/sheep/gateway.c b/sheep/gateway.c
index 42f028a..c3e5f80 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -58,7 +58,7 @@ read_remote:
 		if (vnode_is_local(v))
 			continue;
 
-		fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch);
+		fd = sheep_get_fd(v);
 		if (fd < 0) {
 			ret = SD_RES_NETWORK_ERROR;
 			continue;
@@ -70,10 +70,11 @@ read_remote:
 		ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen);
 
 		if (ret) { /* network errors */
-			del_sheep_fd(fd);
+			sheep_del_fd(v, fd);
 			ret = SD_RES_NETWORK_ERROR;
 			continue;
 		} else {
+			sheep_put_fd(v, fd);
 			memcpy(&req->rp, rsp, sizeof(*rsp));
 			ret = rsp->result;
 			break;
@@ -82,6 +83,11 @@ read_remote:
 	return ret;
 }
 
+struct write_info {
+	struct pollfd pfds[SD_MAX_REDUNDANCY];
+	struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
+};
+
 int forward_write_obj_req(struct request *req)
 {
 	int i, fd, ret, pollret;
@@ -93,15 +99,14 @@ int forward_write_obj_req(struct request *req)
 	struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
 	uint64_t oid = req->rq.obj.oid;
 	int nr_copies;
-	struct pollfd pfds[SD_MAX_REDUNDANCY];
-	int nr_fds, local = 0;
+	int nr_fds = 0, local = 0;
+	struct write_info wi;
 
 	dprintf("%"PRIx64"\n", oid);
 
-	nr_fds = 0;
-	memset(pfds, 0, sizeof(pfds));
-	for (i = 0; i < ARRAY_SIZE(pfds); i++)
-		pfds[i].fd = -1;
+	memset(&wi, 0, sizeof(wi));
+	for (i = 0; i < SD_MAX_REDUNDANCY; i++)
+		wi.pfds[i].fd = -1;
 
 	memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
 	fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
@@ -120,24 +125,23 @@ int forward_write_obj_req(struct request *req)
 			continue;
 		}
 
-		fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch);
+		fd = sheep_get_fd(v);
 		if (fd < 0) {
-			eprintf("failed to connect to %s:%"PRIu32"\n", name,
-				v->port);
 			ret = SD_RES_NETWORK_ERROR;
 			goto err;
 		}
 
 		ret = send_req(fd, &fwd_hdr, req->data, &wlen);
 		if (ret) { /* network errors */
-			del_sheep_fd(fd);
+			sheep_del_fd(v, fd);
 			ret = SD_RES_NETWORK_ERROR;
 			dprintf("fail %"PRIu32"\n", ret);
 			goto err;
 		}
 
-		pfds[nr_fds].fd = fd;
-		pfds[nr_fds].events = POLLIN;
+		wi.vnodes[nr_fds] = v;
+		wi.pfds[nr_fds].fd = fd;
+		wi.pfds[nr_fds].events = POLLIN;
 		nr_fds++;
 	}
 
@@ -157,7 +161,7 @@ int forward_write_obj_req(struct request *req)
 
 	ret = SD_RES_SUCCESS;
 again:
-	pollret = poll(pfds, nr_fds, -1);
+	pollret = poll(wi.pfds, nr_fds, -1);
 	if (pollret < 0) {
 		if (errno == EINTR)
 			goto again;
@@ -167,19 +171,20 @@ again:
 	}
 
 	for (i = 0; i < nr_fds; i++) {
-		if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP ||
-		    pfds[i].revents & POLLNVAL) {
-			del_sheep_fd(pfds[i].fd);
+		if (wi.pfds[i].revents & POLLERR ||
+		    wi.pfds[i].revents & POLLHUP ||
+		    wi.pfds[i].revents & POLLNVAL) {
+			sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
 			ret = SD_RES_NETWORK_ERROR;
 			break;
 		}
 
-		if (!(pfds[i].revents & POLLIN))
+		if (!(wi.pfds[i].revents & POLLIN))
 			continue;
 
-		if (do_read(pfds[i].fd, rsp, sizeof(*rsp))) {
+		if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
 			eprintf("failed to read a response: %m\n");
-			del_sheep_fd(pfds[i].fd);
+			sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
 			ret = SD_RES_NETWORK_ERROR;
 			break;
 		}
@@ -189,11 +194,15 @@ again:
 			ret = rsp->result;
 		}
 
+		sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd);
 		break;
 	}
 	if (i < nr_fds) {
 		nr_fds--;
-		memmove(pfds + i, pfds + i + 1, sizeof(*pfds) * (nr_fds - i));
+		memmove(wi.pfds + i, wi.pfds + i + 1,
+			sizeof(struct pollfd) * (nr_fds - i));
+		memmove(wi.vnodes + i, wi.vnodes + i + 1,
+			sizeof(struct sd_vnode *) * (nr_fds - i));
 	}
 
 	dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
@@ -204,7 +213,7 @@ out:
 	return ret;
 err:
 	for (i = 0; i < nr_fds; i++)
-		del_sheep_fd(pfds[i].fd);
+		sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
 	return ret;
 }
 
diff --git a/sheep/group.c b/sheep/group.c
index 6721025..ede59ef 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -804,10 +804,13 @@ static void finish_join(struct join_message *msg, struct sd_node *joined,
 		if (sd_store->purge_obj &&
 		    sd_store->purge_obj() != SD_RES_SUCCESS)
 			eprintf("WARN: may have stale objects\n");
+
+	sockfd_cache_add_group(nodes, nr_nodes);
 }
 
 static void update_cluster_info(struct join_message *msg,
-		struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes)
+				struct sd_node *joined, struct sd_node *nodes,
+				size_t nr_nodes)
 {
 	struct vnode_info *old_vnode_info;
 
@@ -868,6 +871,8 @@ static void update_cluster_info(struct join_message *msg,
 		if (current_vnode_info->nr_zones >= sys->nr_copies)
 			sys_stat_set(SD_STATUS_OK);
 	}
+
+	sockfd_cache_add(joined);
 }
 
 /*
@@ -1111,6 +1116,8 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members,
 		if (current_vnode_info->nr_zones < sys->nr_copies)
 			sys_stat_set(SD_STATUS_HALT);
 	}
+
+	sockfd_cache_del((struct node_id *)left);
 }
 
 int create_cluster(int port, int64_t zone, int nr_vnodes,
-- 
1.7.10.2




More information about the sheepdog mailing list