From: Liu Yuan <tailai.ly at taobao.com> Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/gateway.c | 210 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 120 insertions(+), 90 deletions(-) diff --git a/sheep/gateway.c b/sheep/gateway.c index f126dfb..c45490d 100644 --- a/sheep/gateway.c +++ b/sheep/gateway.c @@ -89,29 +89,115 @@ struct write_info { struct pollfd pfds[SD_MAX_REDUNDANCY]; struct sd_vnode *vnodes[SD_MAX_REDUNDANCY]; int sock_idx[SD_MAX_REDUNDANCY]; + int nr_sent; }; +static inline void update_write_info(struct write_info *wi, int pos) +{ + dprintf("%d, %d\n", wi->nr_sent, pos); + wi->nr_sent--; + memmove(wi->pfds + pos, wi->pfds + pos + 1, + sizeof(struct pollfd) * (wi->nr_sent - pos)); + memmove(wi->vnodes + pos, wi->vnodes + pos + 1, + sizeof(struct sd_vnode *) * (wi->nr_sent - pos)); + memmove(wi->sock_idx + pos, wi->sock_idx + pos + 1, + sizeof(int) * (wi->nr_sent - pos)); +} + +static inline void finish_one_write(struct write_info *wi, int i) +{ + sheep_put_fd(wi->vnodes[i], wi->pfds[i].fd, + wi->sock_idx[i]); + update_write_info(wi, i); +} + +static inline void finish_one_write_err(struct write_info *wi, int i) +{ + sheep_del_fd(wi->vnodes[i], wi->pfds[i].fd, + wi->sock_idx[i]); + update_write_info(wi, i); +} + +/* + * Wait for all forward writes completion. + * + * Even if something goes wrong, we have to wait forward write completion to + * avoid interleaved requests. + * + * Return error code if any one request fails. + */ +static int wait_forward_write(struct write_info *wi, struct sd_rsp *rsp) +{ + int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i; +again: + pollret = poll(wi->pfds, wi->nr_sent, -1); + if (pollret < 0) { + if (errno == EINTR) + goto again; + + panic("%m\n"); + } + + nr_sent = wi->nr_sent; + for (i = 0; i < nr_sent; i++) + if (wi->pfds[i].revents & POLLIN) + break; + if (i < nr_sent) { + int re = wi->pfds[i].revents; + dprintf("%d, revents %x\n", i, re); + if (re & (POLLERR | POLLHUP | POLLNVAL)) { + err_ret = SD_RES_NETWORK_ERROR; + finish_one_write_err(wi, i); + } else if (re & POLLIN) { + if (do_read(wi->pfds[i].fd, rsp, sizeof(*rsp))) { + eprintf("remote node might be crashed\n"); + err_ret = SD_RES_NETWORK_ERROR; + finish_one_write_err(wi, i); + goto finish_write; + } + + ret = rsp->result; + if (ret != SD_RES_SUCCESS) { + eprintf("fail %"PRIu32"\n", ret); + err_ret = ret; + } + finish_one_write(wi, i); + } else { + eprintf("unhanlded poll event\n"); + } + } +finish_write: + if (wi->nr_sent > 0) + goto again; + + return err_ret; +} + +static void init_write_info(struct write_info *wi) +{ + int i; + for (i = 0; i < SD_MAX_REDUNDANCY; i++) { + wi->pfds[i].fd = -1; + wi->vnodes[i] = NULL; + } + wi->nr_sent = 0; +} + int forward_write_obj_req(struct request *req) { - int i, fd, ret, pollret; + int i, fd, err_ret = SD_RES_SUCCESS, ret; unsigned wlen; - char name[128]; struct sd_req fwd_hdr; struct sd_rsp *rsp = (struct sd_rsp *)&req->rp; struct sd_vnode *v; struct sd_vnode *obj_vnodes[SD_MAX_COPIES]; uint64_t oid = req->rq.obj.oid; int nr_copies; - int nr_fds = 0, local = 0; struct write_info wi; dprintf("%"PRIx64"\n", oid); - for (i = 0; i < SD_MAX_REDUNDANCY; i++) { - wi.pfds[i].fd = -1; - wi.vnodes[i] = NULL; - } - + init_write_info(&wi); memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL; @@ -121,107 +207,51 @@ int forward_write_obj_req(struct request *req) oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes); for (i = 0; i < nr_copies; i++) { v = obj_vnodes[i]; - - addr_to_str(name, sizeof(name), v->addr, 0); - - if (vnode_is_local(v)) { - local = 1; + if (!vnode_is_local(v)) continue; - } - - fd = sheep_get_fd(v, &wi.sock_idx[nr_fds]); - if (fd < 0) { - ret = SD_RES_NETWORK_ERROR; - goto err; - } - - ret = send_req(fd, &fwd_hdr, req->data, &wlen); - if (ret) { /* network errors */ - sheep_del_fd(v, fd, wi.sock_idx[nr_fds]); - ret = SD_RES_NETWORK_ERROR; - dprintf("fail %"PRIu32"\n", ret); - goto err; - } - wi.vnodes[nr_fds] = v; - wi.pfds[nr_fds].fd = fd; - wi.pfds[nr_fds].events = POLLIN; - nr_fds++; - } - - if (local) { ret = do_local_io(req, fwd_hdr.epoch); - rsp->result = ret; - if (rsp->result != SD_RES_SUCCESS) { + if (ret != SD_RES_SUCCESS) { eprintf("fail to write local %"PRIu32"\n", ret); - ret = rsp->result; - goto err; + return ret; } - - if (nr_fds == 0) - goto out; } - ret = SD_RES_SUCCESS; -again: - pollret = poll(wi.pfds, nr_fds, -1); - if (pollret < 0) { - if (errno == EINTR) - goto again; + for (i = 0; i < nr_copies; i++) { + v = obj_vnodes[i]; - ret = SD_RES_NETWORK_ERROR; - goto err; - } + if (vnode_is_local(v)) + continue; - for (i = 0; i < nr_fds; i++) { - if (wi.pfds[i].revents & POLLERR || - wi.pfds[i].revents & POLLHUP || - wi.pfds[i].revents & POLLNVAL) { - sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, - wi.sock_idx[i]); - ret = SD_RES_NETWORK_ERROR; + fd = sheep_get_fd(v, &wi.sock_idx[wi.nr_sent]); + if (fd < 0) { + err_ret = SD_RES_NETWORK_ERROR; break; } - if (!(wi.pfds[i].revents & POLLIN)) - continue; - - if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) { - eprintf("failed to read a response: %m\n"); - sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, - wi.sock_idx[i]); - ret = SD_RES_NETWORK_ERROR; + ret = send_req(fd, &fwd_hdr, req->data, &wlen); + if (ret) { + sheep_del_fd(v, fd, wi.sock_idx[wi.nr_sent]); + err_ret = SD_RES_NETWORK_ERROR; + dprintf("fail %"PRIu32"\n", ret); break; } - if (rsp->result != SD_RES_SUCCESS) { - eprintf("fail %"PRIu32"\n", rsp->result); - ret = rsp->result; - } - sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]); - break; - } - if (i < nr_fds) { - nr_fds--; - memmove(wi.pfds + i, wi.pfds + i + 1, - sizeof(struct pollfd) * (nr_fds - i)); - memmove(wi.vnodes + i, wi.vnodes + i + 1, - sizeof(struct sd_vnode *) * (nr_fds - i)); - memmove(wi.sock_idx + i, wi.sock_idx + i + 1, - sizeof(int) * (nr_fds - i)); + wi.vnodes[wi.nr_sent] = v; + wi.pfds[wi.nr_sent].fd = fd; + wi.pfds[wi.nr_sent].events = POLLIN; + wi.nr_sent++; } - dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds); + dprintf("nr_sent %d, err %d\n", wi.nr_sent, err_ret); + if (wi.nr_sent > 0) { + ret = wait_forward_write(&wi, rsp); + if (ret != SD_RES_SUCCESS) + err_ret = ret; + } - if (nr_fds > 0) - goto again; -out: - return ret; -err: - for (i = 0; i < nr_fds; i++) - sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]); - return ret; + return err_ret; } void do_gateway_request(struct work *work) -- 1.7.10.2 |