From: Liu Yuan <tailai.ly at taobao.com> Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/gateway.c | 55 ++++++++++++++++++++++++++++++++----------------------- sheep/group.c | 9 ++++++++- 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/sheep/gateway.c b/sheep/gateway.c index 42f028a..c3e5f80 100644 --- a/sheep/gateway.c +++ b/sheep/gateway.c @@ -58,7 +58,7 @@ read_remote: if (vnode_is_local(v)) continue; - fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch); + fd = sheep_get_fd(v); if (fd < 0) { ret = SD_RES_NETWORK_ERROR; continue; @@ -70,10 +70,11 @@ read_remote: ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen); if (ret) { /* network errors */ - del_sheep_fd(fd); + sheep_del_fd(v, fd); ret = SD_RES_NETWORK_ERROR; continue; } else { + sheep_put_fd(v, fd); memcpy(&req->rp, rsp, sizeof(*rsp)); ret = rsp->result; break; @@ -82,6 +83,11 @@ read_remote: return ret; } +struct write_info { + struct pollfd pfds[SD_MAX_REDUNDANCY]; + struct sd_vnode *vnodes[SD_MAX_REDUNDANCY]; +}; + int forward_write_obj_req(struct request *req) { int i, fd, ret, pollret; @@ -93,15 +99,14 @@ int forward_write_obj_req(struct request *req) struct sd_vnode *obj_vnodes[SD_MAX_COPIES]; uint64_t oid = req->rq.obj.oid; int nr_copies; - struct pollfd pfds[SD_MAX_REDUNDANCY]; - int nr_fds, local = 0; + int nr_fds = 0, local = 0; + struct write_info wi; dprintf("%"PRIx64"\n", oid); - nr_fds = 0; - memset(pfds, 0, sizeof(pfds)); - for (i = 0; i < ARRAY_SIZE(pfds); i++) - pfds[i].fd = -1; + memset(&wi, 0, sizeof(wi)); + for (i = 0; i < SD_MAX_REDUNDANCY; i++) + wi.pfds[i].fd = -1; memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL; @@ -120,24 +125,23 @@ int forward_write_obj_req(struct request *req) continue; } - fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch); + fd = sheep_get_fd(v); if (fd < 0) { - eprintf("failed to connect to %s:%"PRIu32"\n", name, - v->port); ret = SD_RES_NETWORK_ERROR; goto err; } ret = send_req(fd, &fwd_hdr, req->data, &wlen); if (ret) { /* network errors */ - del_sheep_fd(fd); + sheep_del_fd(v, fd); ret = SD_RES_NETWORK_ERROR; dprintf("fail %"PRIu32"\n", ret); goto err; } - pfds[nr_fds].fd = fd; - pfds[nr_fds].events = POLLIN; + wi.vnodes[nr_fds] = v; + wi.pfds[nr_fds].fd = fd; + wi.pfds[nr_fds].events = POLLIN; nr_fds++; } @@ -157,7 +161,7 @@ int forward_write_obj_req(struct request *req) ret = SD_RES_SUCCESS; again: - pollret = poll(pfds, nr_fds, -1); + pollret = poll(wi.pfds, nr_fds, -1); if (pollret < 0) { if (errno == EINTR) goto again; @@ -167,19 +171,20 @@ again: } for (i = 0; i < nr_fds; i++) { - if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP || - pfds[i].revents & POLLNVAL) { - del_sheep_fd(pfds[i].fd); + if (wi.pfds[i].revents & POLLERR || + wi.pfds[i].revents & POLLHUP || + wi.pfds[i].revents & POLLNVAL) { + sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd); ret = SD_RES_NETWORK_ERROR; break; } - if (!(pfds[i].revents & POLLIN)) + if (!(wi.pfds[i].revents & POLLIN)) continue; - if (do_read(pfds[i].fd, rsp, sizeof(*rsp))) { + if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) { eprintf("failed to read a response: %m\n"); - del_sheep_fd(pfds[i].fd); + sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd); ret = SD_RES_NETWORK_ERROR; break; } @@ -189,11 +194,15 @@ again: ret = rsp->result; } + sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd); break; } if (i < nr_fds) { nr_fds--; - memmove(pfds + i, pfds + i + 1, sizeof(*pfds) * (nr_fds - i)); + memmove(wi.pfds + i, wi.pfds + i + 1, + sizeof(struct pollfd) * (nr_fds - i)); + memmove(wi.vnodes + i, wi.vnodes + i + 1, + sizeof(struct sd_vnode *) * (nr_fds - i)); } dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds); @@ -204,7 +213,7 @@ out: return ret; err: for (i = 0; i < nr_fds; i++) - del_sheep_fd(pfds[i].fd); + sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd); return ret; } diff --git a/sheep/group.c b/sheep/group.c index b448809..19d5f40 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -803,10 +803,13 @@ static void finish_join(struct join_message *msg, struct sd_node *joined, if (sd_store->purge_obj && sd_store->purge_obj() != SD_RES_SUCCESS) eprintf("WARN: may have stale objects\n"); + + sockfd_cache_add_group(nodes, nr_nodes); } static void update_cluster_info(struct join_message *msg, - struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes) + struct sd_node *joined, struct sd_node *nodes, + size_t nr_nodes) { struct vnode_info *old_vnode_info; @@ -867,6 +870,8 @@ static void update_cluster_info(struct join_message *msg, if (current_vnode_info->nr_zones >= sys->nr_copies) sys_stat_set(SD_STATUS_OK); } + + sockfd_cache_add(joined); } /* @@ -1110,6 +1115,8 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members, if (current_vnode_info->nr_zones < sys->nr_copies) sys_stat_set(SD_STATUS_HALT); } + + sockfd_cache_del((struct node_id *)left); } int create_cluster(int port, int64_t zone, int nr_vnodes, -- 1.7.10.2 |