[sheepdog] [PATCH 4/6] sheep: refactor forward_write_obj_req()

Liu Yuan namei.unix at gmail.com
Sun Jun 24 14:51:51 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>


Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/gateway.c |  210 +++++++++++++++++++++++++++++++------------------------
 1 file changed, 120 insertions(+), 90 deletions(-)

diff --git a/sheep/gateway.c b/sheep/gateway.c
index f126dfb..c45490d 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -89,29 +89,115 @@ struct write_info {
 	struct pollfd pfds[SD_MAX_REDUNDANCY];
 	struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
 	int sock_idx[SD_MAX_REDUNDANCY];
+	int nr_sent;
 };
 
+static inline void update_write_info(struct write_info *wi, int pos)
+{
+	dprintf("%d, %d\n", wi->nr_sent, pos);
+	wi->nr_sent--;
+	memmove(wi->pfds + pos, wi->pfds + pos + 1,
+		sizeof(struct pollfd) * (wi->nr_sent - pos));
+	memmove(wi->vnodes + pos, wi->vnodes + pos + 1,
+		sizeof(struct sd_vnode *) * (wi->nr_sent - pos));
+	memmove(wi->sock_idx + pos, wi->sock_idx + pos + 1,
+		sizeof(int) * (wi->nr_sent - pos));
+}
+
+static inline void finish_one_write(struct write_info *wi, int i)
+{
+	sheep_put_fd(wi->vnodes[i], wi->pfds[i].fd,
+		     wi->sock_idx[i]);
+	update_write_info(wi, i);
+}
+
+static inline void finish_one_write_err(struct write_info *wi, int i)
+{
+	sheep_del_fd(wi->vnodes[i], wi->pfds[i].fd,
+		     wi->sock_idx[i]);
+	update_write_info(wi, i);
+}
+
+/*
+ * Wait for all forward writes completion.
+ *
+ * Even if something goes wrong, we have to wait forward write completion to
+ * avoid interleaved requests.
+ *
+ * Return error code if any one request fails.
+ */
+static int wait_forward_write(struct write_info *wi, struct sd_rsp *rsp)
+{
+	int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i;
+again:
+	pollret = poll(wi->pfds, wi->nr_sent, -1);
+	if (pollret < 0) {
+		if (errno == EINTR)
+			goto again;
+
+		panic("%m\n");
+	}
+
+	nr_sent = wi->nr_sent;
+	for (i = 0; i < nr_sent; i++)
+		if (wi->pfds[i].revents & POLLIN)
+			break;
+	if (i < nr_sent) {
+		int re = wi->pfds[i].revents;
+		dprintf("%d, revents %x\n", i, re);
+		if (re & (POLLERR | POLLHUP | POLLNVAL)) {
+			err_ret = SD_RES_NETWORK_ERROR;
+			finish_one_write_err(wi, i);
+		} else if (re & POLLIN) {
+			if (do_read(wi->pfds[i].fd, rsp, sizeof(*rsp))) {
+				eprintf("remote node might be crashed\n");
+				err_ret = SD_RES_NETWORK_ERROR;
+				finish_one_write_err(wi, i);
+				goto finish_write;
+			}
+
+			ret = rsp->result;
+			if (ret != SD_RES_SUCCESS) {
+				eprintf("fail %"PRIu32"\n", ret);
+				err_ret = ret;
+			}
+			finish_one_write(wi, i);
+		} else {
+			eprintf("unhanlded poll event\n");
+		}
+	}
+finish_write:
+	if (wi->nr_sent > 0)
+		goto again;
+
+	return err_ret;
+}
+
+static void init_write_info(struct write_info *wi)
+{
+	int i;
+	for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
+		wi->pfds[i].fd = -1;
+		wi->vnodes[i] = NULL;
+	}
+	wi->nr_sent = 0;
+}
+
 int forward_write_obj_req(struct request *req)
 {
-	int i, fd, ret, pollret;
+	int i, fd, err_ret = SD_RES_SUCCESS, ret;
 	unsigned wlen;
-	char name[128];
 	struct sd_req fwd_hdr;
 	struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
 	struct sd_vnode *v;
 	struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
 	uint64_t oid = req->rq.obj.oid;
 	int nr_copies;
-	int nr_fds = 0, local = 0;
 	struct write_info wi;
 
 	dprintf("%"PRIx64"\n", oid);
 
-	for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
-		wi.pfds[i].fd = -1;
-		wi.vnodes[i] = NULL;
-	}
-
+	init_write_info(&wi);
 	memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
 	fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
 
@@ -121,107 +207,51 @@ int forward_write_obj_req(struct request *req)
 	oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
 	for (i = 0; i < nr_copies; i++) {
 		v = obj_vnodes[i];
-
-		addr_to_str(name, sizeof(name), v->addr, 0);
-
-		if (vnode_is_local(v)) {
-			local = 1;
+		if (!vnode_is_local(v))
 			continue;
-		}
-
-		fd = sheep_get_fd(v, &wi.sock_idx[nr_fds]);
-		if (fd < 0) {
-			ret = SD_RES_NETWORK_ERROR;
-			goto err;
-		}
-
-		ret = send_req(fd, &fwd_hdr, req->data, &wlen);
-		if (ret) { /* network errors */
-			sheep_del_fd(v, fd, wi.sock_idx[nr_fds]);
-			ret = SD_RES_NETWORK_ERROR;
-			dprintf("fail %"PRIu32"\n", ret);
-			goto err;
-		}
 
-		wi.vnodes[nr_fds] = v;
-		wi.pfds[nr_fds].fd = fd;
-		wi.pfds[nr_fds].events = POLLIN;
-		nr_fds++;
-	}
-
-	if (local) {
 		ret = do_local_io(req, fwd_hdr.epoch);
-		rsp->result = ret;
 
-		if (rsp->result != SD_RES_SUCCESS) {
+		if (ret != SD_RES_SUCCESS) {
 			eprintf("fail to write local %"PRIu32"\n", ret);
-			ret = rsp->result;
-			goto err;
+			return ret;
 		}
-
-		if (nr_fds == 0)
-			goto out;
 	}
 
-	ret = SD_RES_SUCCESS;
-again:
-	pollret = poll(wi.pfds, nr_fds, -1);
-	if (pollret < 0) {
-		if (errno == EINTR)
-			goto again;
+	for (i = 0; i < nr_copies; i++) {
+		v = obj_vnodes[i];
 
-		ret = SD_RES_NETWORK_ERROR;
-		goto err;
-	}
+		if (vnode_is_local(v))
+			continue;
 
-	for (i = 0; i < nr_fds; i++) {
-		if (wi.pfds[i].revents & POLLERR ||
-		    wi.pfds[i].revents & POLLHUP ||
-		    wi.pfds[i].revents & POLLNVAL) {
-			sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
-				     wi.sock_idx[i]);
-			ret = SD_RES_NETWORK_ERROR;
+		fd = sheep_get_fd(v, &wi.sock_idx[wi.nr_sent]);
+		if (fd < 0) {
+			err_ret = SD_RES_NETWORK_ERROR;
 			break;
 		}
 
-		if (!(wi.pfds[i].revents & POLLIN))
-			continue;
-
-		if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
-			eprintf("failed to read a response: %m\n");
-			sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
-				     wi.sock_idx[i]);
-			ret = SD_RES_NETWORK_ERROR;
+		ret = send_req(fd, &fwd_hdr, req->data, &wlen);
+		if (ret) {
+			sheep_del_fd(v, fd, wi.sock_idx[wi.nr_sent]);
+			err_ret = SD_RES_NETWORK_ERROR;
+			dprintf("fail %"PRIu32"\n", ret);
 			break;
 		}
 
-		if (rsp->result != SD_RES_SUCCESS) {
-			eprintf("fail %"PRIu32"\n", rsp->result);
-			ret = rsp->result;
-		}
-		sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
-		break;
-	}
-	if (i < nr_fds) {
-		nr_fds--;
-		memmove(wi.pfds + i, wi.pfds + i + 1,
-			sizeof(struct pollfd) * (nr_fds - i));
-		memmove(wi.vnodes + i, wi.vnodes + i + 1,
-			sizeof(struct sd_vnode *) * (nr_fds - i));
-		memmove(wi.sock_idx + i, wi.sock_idx + i + 1,
-			sizeof(int) * (nr_fds - i));
+		wi.vnodes[wi.nr_sent] = v;
+		wi.pfds[wi.nr_sent].fd = fd;
+		wi.pfds[wi.nr_sent].events = POLLIN;
+		wi.nr_sent++;
 	}
 
-	dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
+	dprintf("nr_sent %d, err %d\n", wi.nr_sent, err_ret);
+	if (wi.nr_sent > 0) {
+		ret = wait_forward_write(&wi, rsp);
+		if (ret != SD_RES_SUCCESS)
+			err_ret = ret;
+	}
 
-	if (nr_fds > 0)
-		goto again;
-out:
-	return ret;
-err:
-	for (i = 0; i < nr_fds; i++)
-		sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
-	return ret;
+	return err_ret;
 }
 
 void do_gateway_request(struct work *work)
-- 
1.7.10.2




More information about the sheepdog mailing list