[sheepdog] [PATCH V4 1/2] sheep: refactor gateway_forward_request

Yunkai Zhang yunkai.me at gmail.com
Sun Sep 2 14:59:44 CEST 2012


From: Yunkai Zhang <qiushu.zyk at taobao.com>

- No longer treat local request specially, so that we can forward local/remote
  requests concurrently, and make the code more simpler.
- Exact a common low level function which can forward request concurrently and
  collect response data from target nodes. This low level function is useful for
  the callers who need to read the response data.

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 sheep/gateway.c | 120 +++++++++++++++++++++++++++++++++++---------------------
 1 file changed, 75 insertions(+), 45 deletions(-)

diff --git a/sheep/gateway.c b/sheep/gateway.c
index 41d712b..94ec6f9 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -99,6 +99,7 @@ int gateway_read_obj(struct request *req)
 }
 
 struct write_info_entry {
+	int idx;
 	struct pollfd pfd;
 	struct node_id *nid;
 	struct sockfd *sfd;
@@ -150,10 +151,13 @@ static inline void pfd_info_init(struct write_info *wi, struct pfd_info *pi)
  *
  * Return error code if any one request fails.
  */
-static int wait_forward_request(struct write_info *wi, struct sd_rsp *rsp)
+static int wait_forward_request(struct write_info *wi,
+				struct sd_rsp target_rsps[],
+				void *target_data[])
 {
 	int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i;
-	struct pfd_info pi;;
+	struct pfd_info pi;
+	struct sd_rsp *rsp;
 again:
 	pfd_info_init(wi, &pi);
 	pollret = poll(pi.pfds, pi.nr, -1);
@@ -170,14 +174,18 @@ again:
 			break;
 	if (i < nr_sent) {
 		int re = pi.pfds[i].revents;
+		int idx = wi->ent[i].idx;
 		dprintf("%d, revents %x\n", i, re);
 		if (re & (POLLERR | POLLHUP | POLLNVAL)) {
 			err_ret = SD_RES_NETWORK_ERROR;
+			target_rsps[idx].result = err_ret;
 			finish_one_write_err(wi, i);
 		} else if (re & POLLIN) {
+			rsp = &target_rsps[idx];
 			if (do_read(pi.pfds[i].fd, rsp, sizeof(*rsp))) {
 				eprintf("remote node might have gone away\n");
 				err_ret = SD_RES_NETWORK_ERROR;
+				rsp->result = err_ret;
 				finish_one_write_err(wi, i);
 				goto finish_write;
 			}
@@ -187,6 +195,20 @@ again:
 				eprintf("fail %"PRIx32"\n", ret);
 				err_ret = ret;
 			}
+
+			if (rsp->data_length) {
+				assert(target_data);
+				ret = do_read(pi.pfds[i].fd, target_data[idx],
+					      rsp->data_length);
+				if (ret) {
+					eprintf("failed to read the response data\n");
+					err_ret = SD_RES_NETWORK_ERROR;
+					rsp->result = err_ret;
+					finish_one_write_err(wi, i);
+					goto finish_write;
+				}
+			}
+
 			finish_one_write(wi, i);
 		} else {
 			eprintf("unhandled poll event\n");
@@ -208,9 +230,10 @@ static inline void write_info_init(struct write_info *wi)
 }
 
 static inline void
-write_info_advance(struct write_info *wi, struct sd_vnode *v,
+write_info_advance(struct write_info *wi, struct sd_vnode *v, int idx,
 		   struct sockfd *sfd)
 {
+	wi->ent[wi->nr_sent].idx = idx;
 	wi->ent[wi->nr_sent].nid = &v->nid;
 	wi->ent[wi->nr_sent].pfd.fd = sfd->fd;
 	wi->ent[wi->nr_sent].pfd.events = POLLIN;
@@ -225,70 +248,49 @@ static inline void gateway_init_fwd_hdr(struct sd_req *fwd, struct sd_req *hdr)
 	fwd->proto_ver = SD_SHEEP_PROTO_VER;
 }
 
-static int gateway_forward_request(struct request *req)
+/*
+ * Forward requests concurrently, collect response data sent back
+ * from target nodes and fill them into target_rsps[] and target_data[],
+ * target_data parameter can be set to NULL if no data return.
+ */
+static int forward_request_concurrently(struct sd_req *hdr,
+					void *data, unsigned int *wlen,
+					struct sd_vnode *target_vnodes[],
+					struct sd_rsp target_rsps[],
+					void *target_data[],
+					int nr_targets)
 {
-	int i, err_ret = SD_RES_SUCCESS, ret, local = -1;
-	unsigned wlen;
-	struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
-	struct sd_vnode *v;
-	struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
-	uint64_t oid = req->rq.obj.oid;
-	int nr_copies;
 	struct write_info wi;
-	struct sd_op_template *op;
-	struct sd_req hdr;
-
-	dprintf("%"PRIx64"\n", oid);
-
-	gateway_init_fwd_hdr(&hdr, &req->rq);
-	op = get_sd_op(hdr.opcode);
+	struct sd_vnode *v;
+	struct sockfd *sfd;
+	int i, ret, err_ret = SD_RES_SUCCESS;
 
 	write_info_init(&wi);
-	wlen = hdr.data_length;
-	nr_copies = get_req_copy_number(req);
-	oid_to_vnodes(req->vinfo->vnodes, req->vinfo->nr_vnodes, oid,
-		      nr_copies, obj_vnodes);
-
-	for (i = 0; i < nr_copies; i++) {
-		struct sockfd *sfd;
 
-		v = obj_vnodes[i];
-		if (vnode_is_local(v)) {
-			local = i;
-			continue;
-		}
+	for (i = 0; i < nr_targets; i++) {
+		v = target_vnodes[i];
 
 		sfd = sheep_get_sockfd(&v->nid);
 		if (!sfd) {
 			err_ret = SD_RES_NETWORK_ERROR;
+			target_rsps[i].result = err_ret;
 			break;
 		}
 
-		ret = send_req(sfd->fd, &hdr, req->data, &wlen);
+		ret = send_req(sfd->fd, hdr, data, wlen);
 		if (ret) {
 			sheep_del_sockfd(&v->nid, sfd);
 			err_ret = SD_RES_NETWORK_ERROR;
+			target_rsps[i].result = err_ret;
 			dprintf("fail %d\n", ret);
 			break;
 		}
-		write_info_advance(&wi, v, sfd);
-	}
-
-	if (local != -1 && err_ret == SD_RES_SUCCESS) {
-		v = obj_vnodes[local];
-
-		assert(op);
-		ret = sheep_do_op_work(op, req);
-
-		if (ret != SD_RES_SUCCESS) {
-			eprintf("fail to write local %"PRIx32"\n", ret);
-			err_ret = ret;
-		}
+		write_info_advance(&wi, v, i, sfd);
 	}
 
 	dprintf("nr_sent %d, err %x\n", wi.nr_sent, err_ret);
 	if (wi.nr_sent > 0) {
-		ret = wait_forward_request(&wi, rsp);
+		ret = wait_forward_request(&wi, target_rsps, target_data);
 		if (ret != SD_RES_SUCCESS)
 			err_ret = ret;
 	}
@@ -296,6 +298,34 @@ static int gateway_forward_request(struct request *req)
 	return err_ret;
 }
 
+/*
+ * Do forward request, not care about response data
+ * sent back from target nodes.
+ */
+static int gateway_forward_request(struct request *req)
+{
+	int nr_copies;
+	unsigned wlen;
+	struct sd_vnode *target_vnodes[SD_MAX_COPIES];
+	struct sd_rsp target_rsps[SD_MAX_COPIES];
+	uint64_t oid = req->rq.obj.oid;
+	struct sd_req hdr;
+
+	dprintf("%"PRIx64"\n", oid);
+
+	gateway_init_fwd_hdr(&hdr, &req->rq);
+
+	wlen = hdr.data_length;
+
+	nr_copies = get_req_copy_number(req);
+	oid_to_vnodes(req->vinfo->vnodes, req->vinfo->nr_vnodes, oid,
+		      nr_copies, target_vnodes);
+
+	return forward_request_concurrently(&hdr, req->data, &wlen,
+					    target_vnodes, target_rsps, NULL,
+					    nr_copies);
+}
+
 int gateway_write_obj(struct request *req)
 {
 	if (sys->enable_write_cache && !req->local && !bypass_object_cache(req))
-- 
1.7.11.4




More information about the sheepdog mailing list