We cannot reuse local ports for new connections just after we closed connections, so collie exhausts the local ports under the heavy load. This patch fix the problem. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- collie/store.c | 61 ++++++++-------------------- include/net.h | 1 + lib/net.c | 120 +++++++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 111 insertions(+), 71 deletions(-) diff --git a/collie/store.c b/collie/store.c index 842271c..f37ce8a 100644 --- a/collie/store.c +++ b/collie/store.c @@ -140,7 +140,6 @@ static int read_from_one(struct cluster_info *cluster, uint64_t oid, { int i, n, nr, fd, ret; unsigned wlen, rlen; - char name[128]; struct sheepdog_node_list_entry *e; struct sd_obj_req hdr; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; @@ -152,15 +151,11 @@ again: for (i = 0; i < nr; i++) { n = obj_to_sheep(e, nr, oid, i); - snprintf(name, sizeof(name), "%d.%d.%d.%d", - e[n].addr[12], e[n].addr[13], - e[n].addr[14], e[n].addr[15]); - /* FIXME: do like store_queue_request_local() */ if (e[n].id == cluster->this_node.id) continue; - fd = connect_to(name, e[n].port); + fd = get_collie_fd(e, n, cluster->epoch); if (fd < 0) continue; @@ -177,8 +172,6 @@ again: ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); - close(fd); - if (ret) continue; @@ -223,7 +216,6 @@ static int forward_obj_req(struct cluster_info *cluster, struct request *req, { int i, n, nr, fd, ret; unsigned wlen, rlen; - char name[128]; struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; struct sheepdog_node_list_entry *e; struct sd_obj_req hdr2; @@ -244,10 +236,6 @@ again: for (i = 0; i < copies; i++) { n = obj_to_sheep(e, nr, oid, i); - snprintf(name, sizeof(name), "%d.%d.%d.%d", - e[n].addr[12], e[n].addr[13], - e[n].addr[14], e[n].addr[15]); - /* TODO: we can do better; we need to chech this first */ if (e[n].id == cluster->this_node.id) { ret = store_queue_request_local(cluster, req, buf, cluster->epoch); @@ -256,7 +244,7 @@ again: goto done; } - fd = connect_to(name, e[n].port); + fd = get_collie_fd(e, n, cluster->epoch); if (fd < 0) continue; @@ -275,8 +263,6 @@ again: ret = exec_req(fd, (struct sd_req *)&hdr2, req->data, &wlen, &rlen); - close(fd); - if (ret) /* network errors */ goto again; @@ -771,19 +757,16 @@ void so_queue_request(struct work *work, int idx) if (!local) { struct sd_so_req hdr2; - char name[128]; int fd; unsigned wlen, rlen; n = obj_to_sheep(e, nr, SD_DIR_OID, 0); - snprintf(name, sizeof(name), "%d.%d.%d.%d", - e[n].addr[12], e[n].addr[13], - e[n].addr[14], e[n].addr[15]); + eprintf("%d.%d.%d.%d %d\n", + e[n].addr[12], e[n].addr[13], + e[n].addr[14], e[n].addr[15], e[n].port); - eprintf("%s %d\n", name, e[n].port); - - fd = connect_to(name, e[n].port); + fd = get_collie_fd(e, n, cluster->epoch); if (fd < 0) { rsp->result = SD_RES_EIO; goto out; @@ -802,8 +785,6 @@ void so_queue_request(struct work *work, int idx) ret = exec_req(fd, (struct sd_req *)&hdr2, req->data, &wlen, &rlen); - close(fd); - rsp->result = ((struct sd_rsp *)&hdr2)->result; rsp->data_length = ((struct sd_rsp *)&hdr2)->data_length; @@ -1081,7 +1062,6 @@ static void recover_one(struct work *work, int idx) struct sheepdog_node_list_entry *e = &rw->e; struct sd_obj_req hdr; struct sd_obj_rsp *rsp; - char name[128]; char *buf = zero_block + idx * SD_DATA_OBJ_SIZE; unsigned wlen = 0, rlen = SD_DATA_OBJ_SIZE; int fd, ret; @@ -1089,13 +1069,11 @@ static void recover_one(struct work *work, int idx) eprintf("%d %d, %16lx\n", rw->done, rw->count, oid); - snprintf(name, sizeof(name), "%d.%d.%d.%d", - e->addr[12], e->addr[13], - e->addr[14], e->addr[15]); - - fd = connect_to(name, e->port); + fd = get_collie_fd(e, 0, 0); if (fd < 0) { - eprintf("%s %d\n", name, e->port); + eprintf("%d.%d.%d.%d %d\n", + e->addr[12], e->addr[13], + e->addr[14], e->addr[15], e->port); return; } @@ -1108,8 +1086,6 @@ static void recover_one(struct work *work, int idx) ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); - close(fd); - rsp = (struct sd_obj_rsp *)&hdr; if (rsp->result != SD_RES_SUCCESS) { @@ -1173,19 +1149,18 @@ static int fill_obj_list(struct recovery_work *rw, int fd, ret; uint32_t epoch = rw->epoch; unsigned wlen, rlen; - char name[128]; struct sd_obj_req hdr; struct sd_obj_rsp *rsp; - snprintf(name, sizeof(name), "%d.%d.%d.%d", - e->addr[12], e->addr[13], - e->addr[14], e->addr[15]); + dprintf("%d.%d.%d.%d %d\n", + e->addr[12], e->addr[13], + e->addr[14], e->addr[15], e->port); - dprintf("%s %d\n", name, e->port); - - fd = connect_to(name, e->port); + fd = get_collie_fd(e, 0, 0); if (fd < 0) { - eprintf("%s %d\n", name, e->port); + eprintf("%d.%d.%d.%d %d\n", + e->addr[12], e->addr[13], + e->addr[14], e->addr[15], e->port); return -1; } @@ -1208,8 +1183,6 @@ static int fill_obj_list(struct recovery_work *rw, ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf, &wlen, &rlen); - close(fd); - rsp = (struct sd_obj_rsp *)&hdr; if (rsp->result != SD_RES_SUCCESS) { diff --git a/include/net.h b/include/net.h index 7bf0dbb..4175d03 100644 --- a/include/net.h +++ b/include/net.h @@ -32,6 +32,7 @@ int do_read(int sockfd, void *buf, int len); int rx(struct connection *conn, enum conn_state next_state); int tx(struct connection *conn, enum conn_state next_state, int flags); int connect_to(char *name, int port); +int get_collie_fd(struct sheepdog_node_list_entry *e, int idx, uint32_t epoch); int send_req(int sockfd, struct sd_req *hdr, void *data, unsigned int *wlen); int exec_req(int sockfd, struct sd_req *hdr, void *data, unsigned int *wlen, unsigned int *rlen); diff --git a/lib/net.c b/lib/net.c index f06ae05..248f555 100644 --- a/lib/net.c +++ b/lib/net.c @@ -221,6 +221,90 @@ success: return fd; } +static int set_nonblocking(int fd) +{ + int ret; + + ret = fcntl(fd, F_GETFL); + if (ret < 0) { + eprintf("can't fcntl (F_GETFL), %m\n"); + close(fd); + } else { + ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK); + if (ret < 0) + eprintf("can't fcntl (O_NONBLOCK), %m\n"); + } + + return ret; +} + +static int set_nodelay(int fd) +{ + int ret, opt; + + opt = 1; + ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); + return ret; +} + +int get_collie_fd(struct sheepdog_node_list_entry *e, int idx, uint32_t epoch) +{ + static int cached_fds[SD_MAX_NODES]; + static uint32_t cached_epoch = 0; + int i, fd, ret; + char name[INET6_ADDRSTRLEN]; + + if (cached_epoch == 0) { + for (i = 0; i < ARRAY_SIZE(cached_fds); i++) + cached_fds[i] = -1; + cached_epoch = 1; + } + + dprintf("%d, %d, %d\n", cached_epoch, epoch, cached_fds[idx]); + if (cached_epoch < epoch) { + for (i = 0; i < ARRAY_SIZE(cached_fds); i++) { + if (cached_fds[i] >= 0) { + close(cached_fds[i]); + } + cached_fds[i] = -1; + } + cached_epoch = epoch; + } + + if (cached_epoch == epoch && cached_fds[idx] >= 0) { + dprintf("use a cached fd %d\n", cached_fds[idx]); + return cached_fds[idx]; + } + + snprintf(name, sizeof(name), "%d.%d.%d.%d", + e[idx].addr[12], e[idx].addr[13], + e[idx].addr[14], e[idx].addr[15]); + + fd = connect_to(name, e[idx].port); + if (fd < 0) + return -1; + + ret = set_nonblocking(fd); + if (ret) { + eprintf("%m\n"); + close(fd); + return -1; + } + + ret = set_nodelay(fd); + if (ret) { + eprintf("%m\n"); + close(fd); + return -1; + } + + /* cache only current epoch fds */ + if (cached_epoch == epoch) + cached_fds[idx] = fd; + + return fd; +} + int do_read(int sockfd, void *buf, int len) { int ret; @@ -345,20 +429,17 @@ int write_object(struct sheepdog_node_list_entry *e, struct sd_obj_req hdr; int i, n, fd, ret, success = 0; uint16_t vosts[3]; - char name[128]; for (i = 0; i < 1; i++) { unsigned rlen = 0, wlen = datalen; n = obj_to_sheep(e, nodes, oid, i); - snprintf(name, sizeof(name), "%d.%d.%d.%d", - e[n].addr[12], e[n].addr[13], - e[n].addr[14], e[n].addr[15]); - - fd = connect_to(name, e[n].port); + fd = get_collie_fd(e, n, node_version); if (fd < 0) { - eprintf("can't connect to vost %u, %s\n", vosts[i], name); + eprintf("can't connect to vost %u, %d.%d.%d.%d\n", + vosts[i], e[n].addr[12], e[n].addr[13], + e[n].addr[14], e[n].addr[15]); continue; } @@ -377,9 +458,10 @@ int write_object(struct sheepdog_node_list_entry *e, hdr.offset = offset; ret = exec_req(fd, (struct sd_req *)&hdr, data, &wlen, &rlen); - close(fd); if (ret) - eprintf("can't update vost %u, %s\n", vosts[i], name); + eprintf("can't update vost %u, %d.%d.%d.%d\n", + vosts[i], e[n].addr[12], e[n].addr[13], + e[n].addr[14], e[n].addr[15]); else success++; } @@ -394,7 +476,6 @@ int read_object(struct sheepdog_node_list_entry *e, { struct sd_obj_req hdr; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; - char name[128]; int i = 0, n, fd, ret; for (i = 0; i < nr; i++) { @@ -402,13 +483,7 @@ int read_object(struct sheepdog_node_list_entry *e, n = obj_to_sheep(e, nodes, oid, i); - snprintf(name, sizeof(name), "%d.%d.%d.%d", - e[n].addr[12], - e[n].addr[13], - e[n].addr[14], - e[n].addr[15]); - - fd = connect_to(name, e[n].port); + fd = get_collie_fd(e, n, node_version); if (fd < 0) return -1; @@ -422,7 +497,6 @@ int read_object(struct sheepdog_node_list_entry *e, hdr.offset = offset; ret = exec_req(fd, (struct sd_req *)&hdr, data, &wlen, &rlen); - close(fd); if (!ret) { if (rsp->result == SD_RES_SUCCESS) @@ -439,7 +513,6 @@ int exec_reqs(struct sheepdog_node_list_entry *e, char *data, unsigned int wdatalen, unsigned int rdatalen, int nr, int quorum) { - char name[128]; int i = 0, n, fd, ret; int success = 0; struct sd_req tmp; @@ -452,13 +525,7 @@ int exec_reqs(struct sheepdog_node_list_entry *e, n = obj_to_sheep(e, nodes, oid, i); - snprintf(name, sizeof(name), "%d.%d.%d.%d", - e[n].addr[12], - e[n].addr[13], - e[n].addr[14], - e[n].addr[15]); - - fd = connect_to(name, e[n].port); + fd = get_collie_fd(e, n, node_version); if (fd < 0) { ((struct sd_rsp *) hdr)->result = SD_RES_EIO; return -1; @@ -476,7 +543,6 @@ int exec_reqs(struct sheepdog_node_list_entry *e, memcpy(&tmp, hdr, sizeof(tmp)); ret = exec_req(fd, &tmp, data, &wlen, &rlen); - close(fd); rsp = (struct sd_rsp *)&tmp; -- 1.5.6.5 |