From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> Currently, we read/write data from clients in the main thread in the non-blockng way. This has the following problems: - The implementation is much more complex than doing it in the blocking way. - The main thread is a bottleneck when doing a large data I/Os (e.g. object recovery does a lot of 4 MB reads and writes). This patch creates one worker thread for each connection, makes socket descriptors blocking, and reads/writes data from clients in the worker threads in blocking way. With this patch, client_handler() doesn't print warnings of the loop_check tracer any more. I did the following benchmark to emulate a VM I/O while object recovery, and could see a lot of performance improvement. [benchmark] Simulate object recovery with 10 concurrent 4 MB reads, and simulate VM I/Os with 4 KB reads. $ ./script/vditest -C 10 -B 4M -T 10 test > /dev/null & \ ./script/vditest -B 4k -T 10 test [result (before)] options: -B 4096:4096 -c writethrough -C 1 -D 100:0 -o 0 -p linear -s 1376684853 -S 0:4294967296 -T 10 -f 0 Total read throughput: 638156.8B/s (623.2K/s), IOPS 155.8/s. [result (after)] options: -B 4096:4096 -c writethrough -C 1 -D 100:0 -o 0 -p linear -s 1376684811 -S 0:4294967296 -T 10 -f 0 Total read throughput: 4237312.0B/s (4.0M/s), IOPS 1034.5/s. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> Signed-off-by: Liu Yuan <namei.unix at gmail.com> --- include/net.h | 12 +-- lib/net.c | 80 --------------- sheep/request.c | 283 ++++++++++++++++++++-------------------------------- sheep/sheep.c | 1 + sheep/sheep_priv.h | 3 + 5 files changed, 115 insertions(+), 264 deletions(-) diff --git a/include/net.h b/include/net.h index 2782e44..b8c6ab8 100644 --- a/include/net.h +++ b/include/net.h @@ -32,22 +32,13 @@ struct connection { uint16_t port; char ipstr[INET6_ADDRSTRLEN]; - enum conn_state c_rx_state; - int rx_length; - void *rx_buf; - struct sd_req rx_hdr; - - enum conn_state c_tx_state; - int tx_length; - void *tx_buf; - struct sd_rsp tx_hdr; + bool dead; }; int conn_tx_off(struct connection *conn); int conn_tx_on(struct connection *conn); int conn_rx_off(struct connection *conn); int conn_rx_on(struct connection *conn); -bool is_conn_dead(const struct connection *conn); int do_read(int sockfd, void *buf, int len, bool (*need_retry)(uint32_t), uint32_t, uint32_t); int rx(struct connection *conn, enum conn_state next_state); @@ -65,7 +56,6 @@ int create_unix_domain_socket(const char *unix_path, const char *addr_to_str(const uint8_t *addr, uint16_t port); uint8_t *str_to_addr(const char *ipstr, uint8_t *addr); char *sockaddr_in_to_str(struct sockaddr_in *sockaddr); -int set_nonblocking(int fd); int set_nodelay(int fd); int set_keepalive(int fd); int set_snd_timeout(int fd); diff --git a/lib/net.c b/lib/net.c index ce978b2..e38dfaa 100644 --- a/lib/net.c +++ b/lib/net.c @@ -61,59 +61,6 @@ int conn_rx_on(struct connection *conn) return modify_event(conn->fd, conn->events); } -bool is_conn_dead(const struct connection *conn) -{ - if (conn->c_rx_state == C_IO_CLOSED || conn->c_tx_state == C_IO_CLOSED) - return true; - else - return false; -} - -int rx(struct connection *conn, enum conn_state next_state) -{ - int ret; - - ret = read(conn->fd, conn->rx_buf, conn->rx_length); - if (!ret) { - conn->c_rx_state = C_IO_CLOSED; - return 0; - } - - if (ret < 0) { - if (errno != EAGAIN && errno != EINTR) - conn->c_rx_state = C_IO_CLOSED; - return 0; - } - - conn->rx_length -= ret; - conn->rx_buf = (char *)conn->rx_buf + ret; - - if (!conn->rx_length) - conn->c_rx_state = next_state; - - return ret; -} - -int tx(struct connection *conn, enum conn_state next_state) -{ - int ret; - - ret = write(conn->fd, conn->tx_buf, conn->tx_length); - if (ret < 0) { - if (errno != EAGAIN && errno != EINTR) - conn->c_tx_state = C_IO_CLOSED; - return 0; - } - - conn->tx_length -= ret; - conn->tx_buf = (char *)conn->tx_buf + ret; - - if (!conn->tx_length) - conn->c_tx_state = next_state; - - return ret; -} - int create_listen_ports(const char *bindaddr, int port, int (*callback)(int fd, void *), void *data) { @@ -170,12 +117,6 @@ int create_listen_ports(const char *bindaddr, int port, continue; } - ret = set_nonblocking(fd); - if (ret < 0) { - close(fd); - continue; - } - ret = callback(fd, data); if (ret) { close(fd); @@ -483,23 +424,6 @@ uint8_t *str_to_addr(const char *ipstr, uint8_t *addr) return addr; } -int set_nonblocking(int fd) -{ - int ret; - - ret = fcntl(fd, F_GETFL); - if (ret < 0) { - sd_err("fcntl F_GETFL failed: %m"); - close(fd); - } else { - ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK); - if (ret < 0) - sd_err("fcntl O_NONBLOCK failed: %m"); - } - - return ret; -} - int set_snd_timeout(int fd) { struct timeval timeout; @@ -636,10 +560,6 @@ int create_unix_domain_socket(const char *unix_path, goto err; } - ret = set_nonblocking(fd); - if (ret < 0) - goto err; - ret = callback(fd, data); if (ret) goto err; diff --git a/sheep/request.c b/sheep/request.c index 8758214..29781cc 100644 --- a/sheep/request.c +++ b/sheep/request.c @@ -510,196 +510,125 @@ main_fn void put_request(struct request *req) if (req->local) eventfd_xwrite(req->local_req_efd, 1); else { - if (conn_tx_on(&ci->conn)) { + if (ci->conn.dead) { clear_client_info(ci); free_request(req); } else { - list_add(&req->request_list, &ci->done_reqs); + list_add_tail(&req->request_list, &ci->done_reqs); + + if (ci->tx_req == NULL) + /* There is no request being sent. */ + conn_tx_on(&ci->conn); } } } -static void init_rx_hdr(struct client_info *ci) -{ - ci->conn.c_rx_state = C_IO_HEADER; - ci->rx_req = NULL; - ci->conn.rx_length = sizeof(struct sd_req); - ci->conn.rx_buf = &ci->conn.rx_hdr; -} - -static inline int begin_rx(struct client_info *ci) +static void rx_work(struct work *work) { + struct client_info *ci = container_of(work, struct client_info, + rx_work); int ret; - uint64_t data_len; struct connection *conn = &ci->conn; - struct sd_req *hdr = &conn->rx_hdr; + struct sd_req hdr; struct request *req; - switch (conn->c_rx_state) { - case C_IO_HEADER: - ret = rx(conn, C_IO_DATA_INIT); - if (!ret || conn->c_rx_state != C_IO_DATA_INIT) - break; - case C_IO_DATA_INIT: - data_len = hdr->data_length; - - req = alloc_request(ci, data_len); - if (!req) { - conn->c_rx_state = C_IO_CLOSED; - break; - } - ci->rx_req = req; - - /* use le_to_cpu */ - memcpy(&req->rq, hdr, sizeof(req->rq)); - - if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) { - conn->c_rx_state = C_IO_DATA; - conn->rx_length = data_len; - conn->rx_buf = req->data; - } else { - conn->c_rx_state = C_IO_END; - break; - } - case C_IO_DATA: - ret = rx(conn, C_IO_END); - break; - default: - sd_err("bug: unknown state %d", conn->c_rx_state); + ret = do_read(conn->fd, &hdr, sizeof(hdr), NULL, 0, UINT32_MAX); + if (ret) { + sd_err("failed to read a header"); + conn->dead = true; + return; } - if (is_conn_dead(conn)) { - clear_client_info(ci); - return -1; + req = alloc_request(ci, hdr.data_length); + if (!req) { + sd_err("failed to allocate request"); + conn->dead = true; + return; } + ci->rx_req = req; - /* Short read happens */ - if (conn->c_rx_state != C_IO_END) - return -1; + /* use le_to_cpu */ + memcpy(&req->rq, &hdr, sizeof(req->rq)); - return 0; -} - -static inline void finish_rx(struct client_info *ci) -{ - struct request *req; - - req = ci->rx_req; - init_rx_hdr(ci); - - sd_debug("%d, %s:%d", ci->conn.fd, ci->conn.ipstr, ci->conn.port); - queue_request(req); -} - -static void do_client_rx(struct client_info *ci) -{ - if (begin_rx(ci) < 0) - return; - - finish_rx(ci); + if (hdr.data_length && hdr.flags & SD_FLAG_CMD_WRITE) { + ret = do_read(conn->fd, req->data, hdr.data_length, NULL, 0, + UINT32_MAX); + if (ret) { + sd_err("failed to read data"); + conn->dead = true; + } + } } -static void init_tx_hdr(struct client_info *ci) +static void rx_main(struct work *work) { - struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr; - struct request *req; + struct client_info *ci = container_of(work, struct client_info, + rx_work); + struct request *req = ci->rx_req; - assert(!list_empty(&ci->done_reqs)); + ci->rx_req = NULL; - memset(rsp, 0, sizeof(*rsp)); + refcount_dec(&ci->refcnt); - req = list_first_entry(&ci->done_reqs, struct request, request_list); - list_del(&req->request_list); + if (ci->conn.dead) { + if (req) + free_request(req); - ci->tx_req = req; - ci->conn.tx_length = sizeof(*rsp); - ci->conn.c_tx_state = C_IO_HEADER; - ci->conn.tx_buf = rsp; + clear_client_info(ci); + return; + } - /* use cpu_to_le */ - memcpy(rsp, &req->rp, sizeof(*rsp)); + conn_rx_on(&ci->conn); - rsp->epoch = sys->cinfo.epoch; - rsp->opcode = req->rq.opcode; - rsp->id = req->rq.id; + sd_debug("%d, %s:%d", ci->conn.fd, ci->conn.ipstr, ci->conn.port); + queue_request(req); } -static inline int begin_tx(struct client_info *ci) +static void tx_work(struct work *work) { - int ret, opt; - struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr; + struct client_info *ci = container_of(work, struct client_info, + tx_work); + int ret; + struct connection *conn = &ci->conn; + struct sd_rsp rsp; + struct request *req = ci->tx_req; + void *data = NULL; - /* If short send happens, we don't need init hdr */ - if (!ci->tx_req) - init_tx_hdr(ci); + /* use cpu_to_le */ + memcpy(&rsp, &req->rp, sizeof(rsp)); - opt = 1; - setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt)); + rsp.epoch = sys->cinfo.epoch; + rsp.opcode = req->rq.opcode; + rsp.id = req->rq.id; - switch (ci->conn.c_tx_state) { - case C_IO_HEADER: - ret = tx(&ci->conn, C_IO_DATA_INIT); - if (!ret) - break; + if (rsp.data_length) + data = req->data; - if (rsp->data_length) { - ci->conn.tx_length = rsp->data_length; - ci->conn.tx_buf = ci->tx_req->data; - ci->conn.c_tx_state = C_IO_DATA; - } else { - ci->conn.c_tx_state = C_IO_END; - break; - } - case C_IO_DATA: - ret = tx(&ci->conn, C_IO_END); - if (!ret) - break; - default: - break; + ret = send_req(conn->fd, (struct sd_req *)&rsp, data, rsp.data_length, + NULL, 0, UINT32_MAX); + if (ret != 0) { + sd_err("failed to send a request"); + conn->dead = true; } - - opt = 0; - setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt)); - - if (is_conn_dead(&ci->conn)) { - clear_client_info(ci); - return -1; - } - return 0; } -/* Return 1 if short send happens or we have more data to send */ -static inline int finish_tx(struct client_info *ci) +static void tx_main(struct work *work) { - /* Finish sending one response */ - if (ci->conn.c_tx_state == C_IO_END) { - sd_debug("connection from: %d, %s:%d", ci->conn.fd, - ci->conn.ipstr, ci->conn.port); - free_request(ci->tx_req); - ci->tx_req = NULL; - } - if (ci->tx_req || !list_empty(&ci->done_reqs)) - return 1; - return 0; -} + struct client_info *ci = container_of(work, struct client_info, + tx_work); -static void do_client_tx(struct client_info *ci) -{ - if (!ci->tx_req && list_empty(&ci->done_reqs)) { - if (conn_tx_off(&ci->conn)) - clear_client_info(ci); - return; - } + refcount_dec(&ci->refcnt); - if (begin_tx(ci) < 0) - return; + free_request(ci->tx_req); + ci->tx_req = NULL; - if (finish_tx(ci)) + if (ci->conn.dead) { + clear_client_info(ci); return; + } - /* Let's go sleep, and put_request() will wake me up */ - if (conn_tx_off(&ci->conn)) - clear_client_info(ci); + if (!list_empty(&ci->done_reqs)) + conn_tx_on(&ci->conn); } static void destroy_client(struct client_info *ci) @@ -715,16 +644,6 @@ static void clear_client_info(struct client_info *ci) sd_debug("connection seems to be dead"); - if (ci->rx_req) { - free_request(ci->rx_req); - ci->rx_req = NULL; - } - - if (ci->tx_req) { - free_request(ci->tx_req); - ci->tx_req = NULL; - } - list_for_each_entry_safe(req, t, &ci->done_reqs, request_list) { list_del(&req->request_list); free_request(req); @@ -775,8 +694,6 @@ static struct client_info *create_client(int fd, struct cluster_info *cluster) INIT_LIST_HEAD(&ci->done_reqs); - init_rx_hdr(ci); - return ci; } @@ -784,17 +701,43 @@ static void client_handler(int fd, int events, void *data) { struct client_info *ci = (struct client_info *)data; - sd_debug("%x, rx %d, tx %d", events, ci->conn.c_rx_state, - ci->conn.c_tx_state); + sd_debug("%x, %d", events, ci->conn.dead); - if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn)) + if (events & (EPOLLERR | EPOLLHUP) || ci->conn.dead) return clear_client_info(ci); - if (events & EPOLLIN) - do_client_rx(ci); + if (events & EPOLLIN) { + if (conn_rx_off(&ci->conn) != 0) + return; - if (events & EPOLLOUT) - do_client_tx(ci); + /* + * Increment refcnt so that the client_info isn't freed while + * rx_work uses it. + */ + refcount_inc(&ci->refcnt); + ci->rx_work.fn = rx_work; + ci->rx_work.done = rx_main; + queue_work(sys->net_wqueue, &ci->rx_work); + } + + if (events & EPOLLOUT) { + if (conn_tx_off(&ci->conn) != 0) + return; + + assert(ci->tx_req == NULL); + ci->tx_req = list_first_entry(&ci->done_reqs, struct request, + request_list); + list_del(&ci->tx_req->request_list); + + /* + * Increment refcnt so that the client_info isn't freed while + * tx_work uses it. + */ + refcount_inc(&ci->refcnt); + ci->tx_work.fn = tx_work; + ci->tx_work.done = tx_main; + queue_work(sys->net_wqueue, &ci->tx_work); + } } static void listen_handler(int listen_fd, int events, void *data) @@ -826,12 +769,6 @@ static void listen_handler(int listen_fd, int events, void *data) } } - ret = set_nonblocking(fd); - if (ret) { - close(fd); - return; - } - ci = create_client(fd, data); if (!ci) { close(fd); diff --git a/sheep/sheep.c b/sheep/sheep.c index eb6afe8..3a6a8e7 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -424,6 +424,7 @@ static int create_work_queues(void) if (init_work_queue(get_nr_nodes)) return -1; + sys->net_wqueue = create_work_queue("net", WQ_UNLIMITED); sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED); sys->io_wqueue = create_work_queue("io", WQ_UNLIMITED); sys->recovery_wqueue = create_ordered_work_queue("rw"); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index c13725d..2199284 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -66,8 +66,10 @@ struct client_info { struct connection conn; struct request *rx_req; + struct work rx_work; struct request *tx_req; + struct work tx_work; struct list_head done_reqs; @@ -128,6 +130,7 @@ struct system_info { bool gateway_only; bool nosync; + struct work_queue *net_wqueue; struct work_queue *gateway_wqueue; struct work_queue *io_wqueue; struct work_queue *deletion_wqueue; -- 1.7.10.4 |