From: Liu Yuan <tailai.ly at taobao.com> - We get a wew APIs for manipulation of sockfd: struct sockfd *sheep_get_sockfd(struct node_id *); void sheep_put_sockfd(struct node_id *, struct sockfd *); void sheep_del_sockfd(struct node_id *, struct sockfd *); - change write_info accordingly - add a sub-structure - add a new pfd_info for poll system call Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/gateway.c | 108 ++++++++++++++++++++++++++++++-------------------- sheep/sheep_priv.h | 11 +++-- sheep/sockfd_cache.c | 75 ++++++++++++++++++++--------------- 3 files changed, 116 insertions(+), 78 deletions(-) diff --git a/sheep/gateway.c b/sheep/gateway.c index ccc0e5b..f8a60ea 100644 --- a/sheep/gateway.c +++ b/sheep/gateway.c @@ -22,7 +22,7 @@ */ int forward_read_obj_req(struct request *req) { - int i, fd, ret = SD_RES_SUCCESS; + int i, ret = SD_RES_SUCCESS; unsigned wlen, rlen; struct sd_req fwd_hdr; struct sd_rsp *rsp = (struct sd_rsp *)&fwd_hdr; @@ -59,15 +59,15 @@ read_remote: */ j = random(); for (i = 0; i < nr_copies; i++) { + struct sockfd *sfd; int idx = (i + j) % nr_copies; - int sock_idx; v = obj_vnodes[idx]; if (vnode_is_local(v)) continue; - fd = sheep_get_fd(&v->nid, &sock_idx); - if (fd < 0) { + sfd = sheep_get_sockfd(&v->nid); + if (!sfd) { ret = SD_RES_NETWORK_ERROR; continue; } @@ -75,23 +75,23 @@ read_remote: wlen = 0; rlen = fwd_hdr.data_length; - ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen); + ret = exec_req(sfd->fd, &fwd_hdr, req->data, &wlen, &rlen); if (!ret && rsp->result == SD_RES_SUCCESS) { memcpy(&req->rp, rsp, sizeof(*rsp)); ret = rsp->result; - sheep_put_fd(&v->nid, fd, sock_idx); + sheep_put_sockfd(&v->nid, sfd); break; /* Read success */ } if (ret) { dprintf("remote node might have gone away"); - sheep_del_fd(&v->nid, fd, sock_idx); + sheep_del_sockfd(&v->nid, sfd); ret = SD_RES_NETWORK_ERROR; } else { ret = rsp->result; eprintf("remote read fail %x\n", ret); - sheep_put_fd(&v->nid, fd, sock_idx); + sheep_put_sockfd(&v->nid, sfd); } /* Reset the hdr for next read */ memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); @@ -100,37 +100,48 @@ read_remote: return ret; } +struct write_info_entry { + struct pollfd pfd; + struct node_id *nid; + struct sockfd *sfd; +}; + struct write_info { - struct pollfd pfds[SD_MAX_REDUNDANCY]; - struct sd_vnode *vnodes[SD_MAX_REDUNDANCY]; - int sock_idx[SD_MAX_REDUNDANCY]; + struct write_info_entry ent[SD_MAX_REDUNDANCY]; int nr_sent; }; -static inline void update_write_info(struct write_info *wi, int pos) +static inline void write_info_update(struct write_info *wi, int pos) { dprintf("%d, %d\n", wi->nr_sent, pos); wi->nr_sent--; - memmove(wi->pfds + pos, wi->pfds + pos + 1, - sizeof(struct pollfd) * (wi->nr_sent - pos)); - memmove(wi->vnodes + pos, wi->vnodes + pos + 1, - sizeof(struct sd_vnode *) * (wi->nr_sent - pos)); - memmove(wi->sock_idx + pos, wi->sock_idx + pos + 1, - sizeof(int) * (wi->nr_sent - pos)); + memmove(wi->ent + pos, wi->ent + pos + 1, + sizeof(struct write_info_entry) * (wi->nr_sent - pos)); } static inline void finish_one_write(struct write_info *wi, int i) { - sheep_put_fd(&wi->vnodes[i]->nid, wi->pfds[i].fd, - wi->sock_idx[i]); - update_write_info(wi, i); + sheep_put_sockfd(wi->ent[i].nid, wi->ent[i].sfd); + write_info_update(wi, i); } static inline void finish_one_write_err(struct write_info *wi, int i) { - sheep_del_fd(&wi->vnodes[i]->nid, wi->pfds[i].fd, - wi->sock_idx[i]); - update_write_info(wi, i); + sheep_del_sockfd(wi->ent[i].nid, wi->ent[i].sfd); + write_info_update(wi, i); +} + +struct pfd_info { + struct pollfd pfds[SD_MAX_REDUNDANCY]; + int nr; +}; + +static inline void pfd_info_init(struct write_info *wi, struct pfd_info *pi) +{ + int i; + for (i = 0; i < wi->nr_sent; i++) + pi->pfds[i] = wi->ent[i].pfd; + pi->nr = wi->nr_sent; } /* @@ -144,8 +155,10 @@ static inline void finish_one_write_err(struct write_info *wi, int i) static int wait_forward_write(struct write_info *wi, struct sd_rsp *rsp) { int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i; + struct pfd_info pi;; again: - pollret = poll(wi->pfds, wi->nr_sent, -1); + pfd_info_init(wi, &pi); + pollret = poll(pi.pfds, pi.nr, -1); if (pollret < 0) { if (errno == EINTR) goto again; @@ -155,16 +168,16 @@ again: nr_sent = wi->nr_sent; for (i = 0; i < nr_sent; i++) - if (wi->pfds[i].revents & POLLIN) + if (pi.pfds[i].revents & POLLIN) break; if (i < nr_sent) { - int re = wi->pfds[i].revents; + int re = pi.pfds[i].revents; dprintf("%d, revents %x\n", i, re); if (re & (POLLERR | POLLHUP | POLLNVAL)) { err_ret = SD_RES_NETWORK_ERROR; finish_one_write_err(wi, i); } else if (re & POLLIN) { - if (do_read(wi->pfds[i].fd, rsp, sizeof(*rsp))) { + if (do_read(pi.pfds[i].fd, rsp, sizeof(*rsp))) { eprintf("remote node might have gone away\n"); err_ret = SD_RES_NETWORK_ERROR; finish_one_write_err(wi, i); @@ -188,19 +201,28 @@ finish_write: return err_ret; } -static void init_write_info(struct write_info *wi) +static inline void write_info_init(struct write_info *wi) { int i; - for (i = 0; i < SD_MAX_REDUNDANCY; i++) { - wi->pfds[i].fd = -1; - wi->vnodes[i] = NULL; - } + for (i = 0; i < SD_MAX_REDUNDANCY; i++) + wi->ent[i].pfd.fd = -1; wi->nr_sent = 0; } +static inline void +write_info_advance(struct write_info *wi, struct sd_vnode *v, + struct sockfd *sfd) +{ + wi->ent[wi->nr_sent].nid = &v->nid; + wi->ent[wi->nr_sent].pfd.fd = sfd->fd; + wi->ent[wi->nr_sent].pfd.events = POLLIN; + wi->ent[wi->nr_sent].sfd = sfd; + wi->nr_sent++; +} + int forward_write_obj_req(struct request *req) { - int i, fd, err_ret = SD_RES_SUCCESS, ret, local = -1; + int i, err_ret = SD_RES_SUCCESS, ret, local = -1; unsigned wlen; struct sd_req fwd_hdr; struct sd_rsp *rsp = (struct sd_rsp *)&req->rp; @@ -212,7 +234,7 @@ int forward_write_obj_req(struct request *req) dprintf("%"PRIx64"\n", oid); - init_write_info(&wi); + write_info_init(&wi); memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL; @@ -222,30 +244,28 @@ int forward_write_obj_req(struct request *req) oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes); for (i = 0; i < nr_copies; i++) { + struct sockfd *sfd; + v = obj_vnodes[i]; if (vnode_is_local(v)) { local = i; continue; } - fd = sheep_get_fd(&v->nid, &wi.sock_idx[wi.nr_sent]); - if (fd < 0) { + sfd = sheep_get_sockfd(&v->nid); + if (!sfd) { err_ret = SD_RES_NETWORK_ERROR; break; } - ret = send_req(fd, &fwd_hdr, req->data, &wlen); + ret = send_req(sfd->fd, &fwd_hdr, req->data, &wlen); if (ret) { - sheep_del_fd(&v->nid, fd, wi.sock_idx[wi.nr_sent]); + sheep_del_sockfd(&v->nid, sfd); err_ret = SD_RES_NETWORK_ERROR; dprintf("fail %d\n", ret); break; } - - wi.vnodes[wi.nr_sent] = v; - wi.pfds[wi.nr_sent].fd = fd; - wi.pfds[wi.nr_sent].events = POLLIN; - wi.nr_sent++; + write_info_advance(&wi, v, sfd); } if (local != -1 && err_ret == SD_RES_SUCCESS) { diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index a5b0399..a497b24 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -399,12 +399,17 @@ void object_cache_delete(uint32_t vid); int object_cache_init(const char *p); /* sockfd_cache */ +struct sockfd { + int fd; + int idx; +}; + void sockfd_cache_del(struct node_id *); void sockfd_cache_add(struct node_id *); void sockfd_cache_add_group(struct sd_node *nodes, int nr); -int sheep_get_fd(struct node_id *, int *); -void sheep_put_fd(struct node_id *, int fd, int); -void sheep_del_fd(struct node_id *, int fd, int); +struct sockfd *sheep_get_sockfd(struct node_id *); +void sheep_put_sockfd(struct node_id *, struct sockfd *); +void sheep_del_sockfd(struct node_id *, struct sockfd *); #endif diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c index c27d99b..51e0c02 100644 --- a/sheep/sockfd_cache.c +++ b/sheep/sockfd_cache.c @@ -272,30 +272,35 @@ void sockfd_cache_add(struct node_id *nid) dprintf("%s:%d, count %d\n", name, nid->port, n); } -static int sockfd_cache_get(struct node_id *nid, char *name, int *ret_idx) +static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name) { struct sockfd_cache_entry *entry; - int fd; + struct sockfd *sfd; + int fd, idx; - entry = sockfd_cache_grab(nid, name, ret_idx); + entry = sockfd_cache_grab(nid, name, &idx); if (!entry) - return -1; + return NULL; - if (entry->fd[*ret_idx] != -1) { - dprintf("%s:%d, idx %d\n", name, nid->port, *ret_idx); - return entry->fd[*ret_idx]; + if (entry->fd[idx] != -1) { + dprintf("%s:%d, idx %d\n", name, nid->port, idx); + goto out; } /* Create a new cached connection for this vnode */ - dprintf("create connection %s:%d idx %d\n", name, nid->port, *ret_idx); + dprintf("create connection %s:%d idx %d\n", name, nid->port, idx); fd = connect_to(name, nid->port); if (fd < 0) { - uatomic_dec(&entry->fd_in_use[*ret_idx]); - return -1; + uatomic_dec(&entry->fd_in_use[idx]); + return NULL; } - entry->fd[*ret_idx] = fd; + entry->fd[idx] = fd; - return fd; +out: + sfd = xmalloc(sizeof(*sfd)); + sfd->fd = entry->fd[idx]; + sfd->idx = idx; + return sfd; } static void sockfd_cache_put(struct node_id *nid, int idx) @@ -317,7 +322,7 @@ static void sockfd_cache_put(struct node_id *nid, int idx) } /* - * Return a FD connected to the vnode to the caller + * Return a sockfd connected to the vnode to the caller * * Try to get a 'long' FD as best, which is cached and never closed. If no FD * available, we return a 'short' FD which is supposed to be closed by @@ -325,29 +330,33 @@ static void sockfd_cache_put(struct node_id *nid, int idx) * * ret_idx is opaque to the caller, -1 indicates it is a short FD. */ -int sheep_get_fd(struct node_id *nid, int *ret_idx) +struct sockfd *sheep_get_sockfd(struct node_id *nid) { char name[INET6_ADDRSTRLEN]; + struct sockfd *sfd; int fd; addr_to_str(name, sizeof(name), nid->addr, 0); - fd = sockfd_cache_get(nid, name, ret_idx); - if (fd != -1) - return fd; + sfd = sockfd_cache_get(nid, name); + if (sfd) + return sfd; /* Create a fd that is to be closed */ fd = connect_to(name, nid->port); if (fd < 0) { dprintf("failed connect to %s:%d\n", name, nid->port); - return -1; + return NULL; } + sfd = xmalloc(sizeof(*sfd)); + sfd->idx = -1; + sfd->fd = fd; dprintf("%d\n", fd); - return fd; + return sfd; } /* - * Rlease a FD connected to the vnode, which is acquired from sheep_get_fd() + * Rlease a sockfd connected to the vnode, which is acquired from sheep_get_fd() * * If it is a long FD, just decrease the refcount to make it available again. * If it is a short FD, close it. @@ -355,32 +364,36 @@ int sheep_get_fd(struct node_id *nid, int *ret_idx) * sheep_put_fd() or sheep_del_fd() should be paired with sheep_get_fd() */ -void sheep_put_fd(struct node_id *nid, int fd, int idx) +void sheep_put_sockfd(struct node_id *nid, struct sockfd *sfd) { - if (idx == -1) { - dprintf("%d\n", fd); - close(fd); + if (sfd->idx == -1) { + dprintf("%d\n", sfd->fd); + close(sfd->fd); + free(sfd); return; } - sockfd_cache_put(nid, idx); + sockfd_cache_put(nid, sfd->idx); + free(sfd); } /* - * Delete a FD connected to the vnode, when vnode is crashed. + * Delete a sockfd connected to the vnode, when vnode is crashed. * * If it is a long FD, de-refcount it and tres to destroy all the cached FDs of * this vnode in the cache. * If it is a short FD, just close it. */ -void sheep_del_fd(struct node_id *nid, int fd, int idx) +void sheep_del_sockfd(struct node_id *nid, struct sockfd *sfd) { - if (idx == -1) { - dprintf("%d\n", fd); - close(fd); + if (sfd->idx == -1) { + dprintf("%d\n", sfd->fd); + close(sfd->fd); + free(sfd); return; } - sockfd_cache_put(nid, idx); + sockfd_cache_put(nid, sfd->idx); sockfd_cache_del(nid); + free(sfd); } -- 1.7.10.2 |