[sheepdog] [PATCH v2 4/8] sheep: refactor forward_write_obj_req()
Liu Yuan
namei.unix at gmail.com
Mon Jun 25 16:31:46 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/gateway.c | 210 +++++++++++++++++++++++++++++++------------------------
1 file changed, 120 insertions(+), 90 deletions(-)
diff --git a/sheep/gateway.c b/sheep/gateway.c
index f126dfb..c45490d 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -89,29 +89,115 @@ struct write_info {
struct pollfd pfds[SD_MAX_REDUNDANCY];
struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
int sock_idx[SD_MAX_REDUNDANCY];
+ int nr_sent;
};
+static inline void update_write_info(struct write_info *wi, int pos)
+{
+ dprintf("%d, %d\n", wi->nr_sent, pos);
+ wi->nr_sent--;
+ memmove(wi->pfds + pos, wi->pfds + pos + 1,
+ sizeof(struct pollfd) * (wi->nr_sent - pos));
+ memmove(wi->vnodes + pos, wi->vnodes + pos + 1,
+ sizeof(struct sd_vnode *) * (wi->nr_sent - pos));
+ memmove(wi->sock_idx + pos, wi->sock_idx + pos + 1,
+ sizeof(int) * (wi->nr_sent - pos));
+}
+
+static inline void finish_one_write(struct write_info *wi, int i)
+{
+ sheep_put_fd(wi->vnodes[i], wi->pfds[i].fd,
+ wi->sock_idx[i]);
+ update_write_info(wi, i);
+}
+
+static inline void finish_one_write_err(struct write_info *wi, int i)
+{
+ sheep_del_fd(wi->vnodes[i], wi->pfds[i].fd,
+ wi->sock_idx[i]);
+ update_write_info(wi, i);
+}
+
+/*
+ * Wait for all forward writes completion.
+ *
+ * Even if something goes wrong, we have to wait forward write completion to
+ * avoid interleaved requests.
+ *
+ * Return error code if any one request fails.
+ */
+static int wait_forward_write(struct write_info *wi, struct sd_rsp *rsp)
+{
+ int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i;
+again:
+ pollret = poll(wi->pfds, wi->nr_sent, -1);
+ if (pollret < 0) {
+ if (errno == EINTR)
+ goto again;
+
+ panic("%m\n");
+ }
+
+ nr_sent = wi->nr_sent;
+ for (i = 0; i < nr_sent; i++)
+ if (wi->pfds[i].revents & POLLIN)
+ break;
+ if (i < nr_sent) {
+ int re = wi->pfds[i].revents;
+ dprintf("%d, revents %x\n", i, re);
+ if (re & (POLLERR | POLLHUP | POLLNVAL)) {
+ err_ret = SD_RES_NETWORK_ERROR;
+ finish_one_write_err(wi, i);
+ } else if (re & POLLIN) {
+ if (do_read(wi->pfds[i].fd, rsp, sizeof(*rsp))) {
+ eprintf("remote node might be crashed\n");
+ err_ret = SD_RES_NETWORK_ERROR;
+ finish_one_write_err(wi, i);
+ goto finish_write;
+ }
+
+ ret = rsp->result;
+ if (ret != SD_RES_SUCCESS) {
+ eprintf("fail %"PRIu32"\n", ret);
+ err_ret = ret;
+ }
+ finish_one_write(wi, i);
+ } else {
+ eprintf("unhanlded poll event\n");
+ }
+ }
+finish_write:
+ if (wi->nr_sent > 0)
+ goto again;
+
+ return err_ret;
+}
+
+static void init_write_info(struct write_info *wi)
+{
+ int i;
+ for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
+ wi->pfds[i].fd = -1;
+ wi->vnodes[i] = NULL;
+ }
+ wi->nr_sent = 0;
+}
+
int forward_write_obj_req(struct request *req)
{
- int i, fd, ret, pollret;
+ int i, fd, err_ret = SD_RES_SUCCESS, ret;
unsigned wlen;
- char name[128];
struct sd_req fwd_hdr;
struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
struct sd_vnode *v;
struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
uint64_t oid = req->rq.obj.oid;
int nr_copies;
- int nr_fds = 0, local = 0;
struct write_info wi;
dprintf("%"PRIx64"\n", oid);
- for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
- wi.pfds[i].fd = -1;
- wi.vnodes[i] = NULL;
- }
-
+ init_write_info(&wi);
memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
@@ -121,107 +207,51 @@ int forward_write_obj_req(struct request *req)
oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
for (i = 0; i < nr_copies; i++) {
v = obj_vnodes[i];
-
- addr_to_str(name, sizeof(name), v->addr, 0);
-
- if (vnode_is_local(v)) {
- local = 1;
+ if (!vnode_is_local(v))
continue;
- }
-
- fd = sheep_get_fd(v, &wi.sock_idx[nr_fds]);
- if (fd < 0) {
- ret = SD_RES_NETWORK_ERROR;
- goto err;
- }
-
- ret = send_req(fd, &fwd_hdr, req->data, &wlen);
- if (ret) { /* network errors */
- sheep_del_fd(v, fd, wi.sock_idx[nr_fds]);
- ret = SD_RES_NETWORK_ERROR;
- dprintf("fail %"PRIu32"\n", ret);
- goto err;
- }
- wi.vnodes[nr_fds] = v;
- wi.pfds[nr_fds].fd = fd;
- wi.pfds[nr_fds].events = POLLIN;
- nr_fds++;
- }
-
- if (local) {
ret = do_local_io(req, fwd_hdr.epoch);
- rsp->result = ret;
- if (rsp->result != SD_RES_SUCCESS) {
+ if (ret != SD_RES_SUCCESS) {
eprintf("fail to write local %"PRIu32"\n", ret);
- ret = rsp->result;
- goto err;
+ return ret;
}
-
- if (nr_fds == 0)
- goto out;
}
- ret = SD_RES_SUCCESS;
-again:
- pollret = poll(wi.pfds, nr_fds, -1);
- if (pollret < 0) {
- if (errno == EINTR)
- goto again;
+ for (i = 0; i < nr_copies; i++) {
+ v = obj_vnodes[i];
- ret = SD_RES_NETWORK_ERROR;
- goto err;
- }
+ if (vnode_is_local(v))
+ continue;
- for (i = 0; i < nr_fds; i++) {
- if (wi.pfds[i].revents & POLLERR ||
- wi.pfds[i].revents & POLLHUP ||
- wi.pfds[i].revents & POLLNVAL) {
- sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
- wi.sock_idx[i]);
- ret = SD_RES_NETWORK_ERROR;
+ fd = sheep_get_fd(v, &wi.sock_idx[wi.nr_sent]);
+ if (fd < 0) {
+ err_ret = SD_RES_NETWORK_ERROR;
break;
}
- if (!(wi.pfds[i].revents & POLLIN))
- continue;
-
- if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
- eprintf("failed to read a response: %m\n");
- sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
- wi.sock_idx[i]);
- ret = SD_RES_NETWORK_ERROR;
+ ret = send_req(fd, &fwd_hdr, req->data, &wlen);
+ if (ret) {
+ sheep_del_fd(v, fd, wi.sock_idx[wi.nr_sent]);
+ err_ret = SD_RES_NETWORK_ERROR;
+ dprintf("fail %"PRIu32"\n", ret);
break;
}
- if (rsp->result != SD_RES_SUCCESS) {
- eprintf("fail %"PRIu32"\n", rsp->result);
- ret = rsp->result;
- }
- sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
- break;
- }
- if (i < nr_fds) {
- nr_fds--;
- memmove(wi.pfds + i, wi.pfds + i + 1,
- sizeof(struct pollfd) * (nr_fds - i));
- memmove(wi.vnodes + i, wi.vnodes + i + 1,
- sizeof(struct sd_vnode *) * (nr_fds - i));
- memmove(wi.sock_idx + i, wi.sock_idx + i + 1,
- sizeof(int) * (nr_fds - i));
+ wi.vnodes[wi.nr_sent] = v;
+ wi.pfds[wi.nr_sent].fd = fd;
+ wi.pfds[wi.nr_sent].events = POLLIN;
+ wi.nr_sent++;
}
- dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
+ dprintf("nr_sent %d, err %d\n", wi.nr_sent, err_ret);
+ if (wi.nr_sent > 0) {
+ ret = wait_forward_write(&wi, rsp);
+ if (ret != SD_RES_SUCCESS)
+ err_ret = ret;
+ }
- if (nr_fds > 0)
- goto again;
-out:
- return ret;
-err:
- for (i = 0; i < nr_fds; i++)
- sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
- return ret;
+ return err_ret;
}
void do_gateway_request(struct work *work)
--
1.7.10.2
More information about the sheepdog
mailing list