This patch reuses socket discriptors when accessing data objects, and improves latency. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/sdnet.c | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++ sheep/sheep_priv.h | 3 ++ sheep/store.c | 23 +++--------- 3 files changed, 103 insertions(+), 17 deletions(-) diff --git a/sheep/sdnet.c b/sheep/sdnet.c index c8cbd87..9ad0bc7 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -13,6 +13,7 @@ #include <unistd.h> #include <netinet/tcp.h> #include <sys/epoll.h> +#include <fcntl.h> #include "sheep_priv.h" @@ -670,3 +671,96 @@ int remove_object(struct sheepdog_node_list_entry *e, return 0; } + +static int set_nonblocking(int fd) +{ + int ret; + + ret = fcntl(fd, F_GETFL); + if (ret < 0) { + eprintf("can't fcntl (F_GETFL), %m\n"); + close(fd); + } else { + ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK); + if (ret < 0) + eprintf("can't fcntl (O_NONBLOCK), %m\n"); + } + + return ret; +} + +static int set_nodelay(int fd) +{ + int ret, opt; + + opt = 1; + ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); + return ret; +} + +int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx, + uint32_t epoch, int worker_idx) +{ + static int cached_fds[NR_WORKER_THREAD][SD_MAX_NODES]; + static uint32_t cached_epoch = 0; + int i, j, fd, ret; + char name[INET6_ADDRSTRLEN]; + + if (cached_epoch == 0) { + /* initialize */ + for (i = 0; i < NR_WORKER_THREAD; i++) { + for (j = 0; j < SD_MAX_NODES; j++) + cached_fds[i][j] = -1; + } + cached_epoch = epoch; + } + + if (before(epoch, cached_epoch)) { + eprintf("requested epoch is smaller than the previous one, %d %d\n", + cached_epoch, epoch); + return -1; + } + if (after(epoch, cached_epoch)) { + for (i = 0; i < NR_WORKER_THREAD; i++) { + for (j = 0; j < SD_MAX_NODES; j++) { + if (cached_fds[i][j] >= 0) + close(cached_fds[i][j]); + + cached_fds[i][j] = -1; + } + } + cached_epoch = epoch; + } + + fd = cached_fds[worker_idx][node_idx]; + dprintf("%d, %d\n", epoch, fd); + + if (cached_epoch == epoch && fd >= 0) { + dprintf("use a cached fd %d\n", fd); + return fd; + } + + addr_to_str(name, sizeof(name), e[node_idx].addr, 0); + + fd = connect_to(name, e[node_idx].port); + if (fd < 0) + return -1; + + ret = set_nonblocking(fd); + if (ret) { + eprintf("%m\n"); + close(fd); + return -1; + } + + ret = set_nodelay(fd); + if (ret) { + eprintf("%m\n"); + close(fd); + return -1; + } + + cached_fds[worker_idx][node_idx] = fd; + + return fd; +} diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index c66baf4..a8af306 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -190,6 +190,9 @@ int remove_object(struct sheepdog_node_list_entry *e, int nodes, uint32_t node_version, uint64_t oid, int nr); +int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx, + uint32_t epoch, int worker_idx); + static inline int is_myself(struct sheepdog_node_list_entry *e) { return e->id == sys->this_node.id; diff --git a/sheep/store.c b/sheep/store.c index 06e35e7..6043054 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -391,11 +391,10 @@ static int read_from_other_sheeps(struct request *req, uint32_t epoch, static int store_queue_request_local(struct request *req, uint32_t epoch); -static int forward_read_obj_req(struct request *req) +static int forward_read_obj_req(struct request *req, int idx) { int i, n, nr, fd, ret; unsigned wlen, rlen; - char name[128]; struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)hdr; struct sheepdog_node_list_entry *e; @@ -427,9 +426,7 @@ static int forward_read_obj_req(struct request *req) n = obj_to_sheep(e, nr, oid, 0); - addr_to_str(name, sizeof(name), e[n].addr, 0); - - fd = connect_to(name, e[n].port); + fd = get_sheep_fd(e, n, hdr->epoch, idx); if (fd < 0) { ret = SD_RES_NETWORK_ERROR; goto out; @@ -440,8 +437,6 @@ static int forward_read_obj_req(struct request *req) ret = exec_req(fd, (struct sd_req *)hdr, req->data, &wlen, &rlen); - close(fd); - if (ret) /* network errors */ ret = SD_RES_NETWORK_ERROR; else { @@ -455,7 +450,7 @@ out: return ret; } -static int forward_write_obj_req(struct request *req) +static int forward_write_obj_req(struct request *req, int idx) { int i, n, nr, fd, ret; unsigned wlen, rlen; @@ -501,7 +496,7 @@ static int forward_write_obj_req(struct request *req) continue; } - fd = connect_to(name, e[n].port); + fd = get_sheep_fd(e, n, hdr->epoch, idx); if (fd < 0) { eprintf("failed to connect to %s:%"PRIu32"\n", name, e[n].port); ret = SD_RES_NETWORK_ERROR; @@ -582,12 +577,6 @@ again: ret = SD_RES_SUCCESS; out: - - for (i = 0; i < ARRAY_SIZE(pfds); i++){ - if (pfds[i].fd >= 0) - close(pfds[i].fd); - } - hdr->flags &= ~SD_FLAG_CMD_DIRECT; return ret; @@ -831,9 +820,9 @@ void store_queue_request(struct work *work, int idx) if (!(hdr->flags & SD_FLAG_CMD_DIRECT)) { if (hdr->flags & SD_FLAG_CMD_WRITE) - ret = forward_write_obj_req(req); + ret = forward_write_obj_req(req, idx); else - ret = forward_read_obj_req(req); + ret = forward_read_obj_req(req, idx); goto out; } -- 1.5.6.5 |