From: HaiTing Yao <wujue.yht at taobao.com> tx/rx are in main thead now. Then the tx/rx can not be done at the same time. The tx/rx will also delay the epoll event. I use one thread for tx and one thread for rx. This improve the performance. Signed-off-by: HaiTing Yao <wujue.yht at taobao.com> --- sheep/sdnet.c | 173 +++++++++++++++++++++++++++++++++++++--------------- sheep/sheep.c | 2 + sheep/sheep_priv.h | 2 + 3 files changed, 127 insertions(+), 50 deletions(-) diff --git a/sheep/sdnet.c b/sheep/sdnet.c index a2a7bc1..169f3ab 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -505,10 +505,68 @@ static void init_rx_hdr(struct client_info *ci) ci->conn.rx_buf = &ci->conn.rx_hdr; } -static void client_rx_handler(struct client_info *ci) +static void destroy_client(struct client_info *ci) +{ + dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port); + close(ci->conn.fd); + free(ci); +} + +struct rx_tx_info { + struct client_info *client; + struct work work; +}; + +static void client_handler(int fd, int events, void *data); + +static void rx_done(struct work *work) +{ + int ret = 0; + struct rx_tx_info *info = container_of(work, struct rx_tx_info, work); + struct client_info *ci = info->client; + struct connection *conn = &ci->conn; + struct request *req; + struct sd_req *hdr = &conn->rx_hdr; + + ret = register_event(ci->conn.fd, client_handler, ci); + + if (ret) { + destroy_client(ci); + return; + } + + if (is_conn_dead(conn) && ci->rx_req) { + free_request(ci->rx_req); + return; + } + + if (conn->c_rx_state != C_IO_END) + return; + + /* now we have a complete command */ + + req = ci->rx_req; + + init_rx_hdr(ci); + + if (hdr->flags & SD_FLAG_CMD_WRITE) + req->rp.data_length = 0; + else + req->rp.data_length = hdr->data_length; + + dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port); + queue_request(req); + + if (info) + free(info); +} + +static void client_rx_handler_worker(struct work *work) { int ret; uint64_t data_len; + struct rx_tx_info *info = container_of(work, struct rx_tx_info, work); + struct client_info *ci = info->client; struct connection *conn = &ci->conn; struct sd_req *hdr = &conn->rx_hdr; struct request *req; @@ -553,29 +611,7 @@ static void client_rx_handler(struct client_info *ci) eprintf("bug: unknown state %d\n", conn->c_rx_state); } - if (is_conn_dead(conn) && ci->rx_req) { - free_request(ci->rx_req); - ci->rx_req = NULL; - return; - } - - if (conn->c_rx_state != C_IO_END) - return; - - /* now we have a complete command */ - - req = ci->rx_req; - - init_rx_hdr(ci); - - if (hdr->flags & SD_FLAG_CMD_WRITE) - req->rp.data_length = 0; - else - req->rp.data_length = hdr->data_length; - - dprintf("connection from: %d, %s:%d\n", ci->conn.fd, - ci->conn.ipstr, ci->conn.port); - queue_request(req); + return; } static void init_tx_hdr(struct client_info *ci) @@ -604,12 +640,52 @@ static void init_tx_hdr(struct client_info *ci) rsp->id = req->rq.id; } -static void client_tx_handler(struct client_info *ci) +static void tx_done(struct work *work) +{ + int ret = 0; + struct rx_tx_info *info = container_of(work, struct rx_tx_info, work); + struct client_info *ci = info->client; + struct connection *conn, *n; + + ret = register_event(ci->conn.fd, client_handler, ci); + + if (is_conn_dead(&ci->conn)) { + free_request(ci->tx_req); + return; + } + + if (ci->conn.c_tx_state == C_IO_END) { + free_request(ci->tx_req); + ci->tx_req = NULL; + init_tx_hdr(ci); + if (!ci->tx_req) { + conn_tx_off(&ci->conn); + if (sys->outstanding_data_size < + MAX_OUTSTANDING_DATA_SIZE) { + list_for_each_entry_safe(conn, n, + &sys->blocking_conn_list, + blocking_siblings) { + dprintf("rx on %p\n", conn); + list_del_init(&conn->blocking_siblings); + conn_rx_on(conn); + } + } + } + } else + conn_tx_on(&ci->conn); + + if (info) + free(info); +} + +static void client_tx_handler_worker(struct work *work) { int ret, opt; + struct rx_tx_info *info = container_of(work, struct rx_tx_info, work); + struct client_info *ci = info->client; struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr; struct connection *conn, *n; -again: + init_tx_hdr(ci); if (!ci->tx_req) { conn_tx_off(&ci->conn); @@ -652,26 +728,7 @@ again: opt = 0; setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt)); - if (is_conn_dead(&ci->conn)) { - free_request(ci->tx_req); - ci->tx_req = NULL; - return; - } - - if (ci->conn.c_tx_state == C_IO_END) { - dprintf("connection from: %d, %s:%d\n", ci->conn.fd, - ci->conn.ipstr, ci->conn.port); - free_request(ci->tx_req); - ci->tx_req = NULL; - goto again; - } -} - -static void destroy_client(struct client_info *ci) -{ - dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port); - close(ci->conn.fd); - free(ci); + return; } static void clear_client(struct client_info *ci) @@ -750,14 +807,30 @@ static void client_handler(int fd, int events, void *data) { struct client_info *ci = (struct client_info *)data; + struct rx_tx_info *net_info = NULL; + + net_info = zalloc(sizeof(struct rx_tx_info)); + if (!net_info) + return; + + net_info->client = ci; + if (events & (EPOLLERR | EPOLLHUP)) goto err; - if (events & EPOLLIN) - client_rx_handler(ci); + if (events & EPOLLIN) { + net_info->work.fn = client_rx_handler_worker; + net_info->work.done = rx_done; + queue_work(sys->rx_wqueue, &net_info->work); + unregister_event(fd); + } - if (events & EPOLLOUT) - client_tx_handler(ci); + if (events & EPOLLOUT) { + net_info->work.fn = client_tx_handler_worker; + net_info->work.done = tx_done; + queue_work(sys->tx_wqueue, &net_info->work); + unregister_event(fd); + } if (is_conn_dead(&ci->conn)) { err: diff --git a/sheep/sheep.c b/sheep/sheep.c index 7d1e853..1df8683 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -252,6 +252,8 @@ int main(int argc, char **argv) sys->recovery_wqueue = init_work_queue("recovery", true); sys->deletion_wqueue = init_work_queue("deletion", true); sys->block_wqueue = init_work_queue("block", true); + sys->tx_wqueue = init_work_queue("tx", true); + sys->rx_wqueue = init_work_queue("rx", true); if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue || !sys->deletion_wqueue || !sys->block_wqueue) exit(1); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index afbc361..07e1f2e 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -126,6 +126,8 @@ struct cluster_info { struct work_queue *deletion_wqueue; struct work_queue *recovery_wqueue; struct work_queue *block_wqueue; + struct work_queue *rx_wqueue; + struct work_queue *tx_wqueue; }; struct siocb { -- 1.7.1 |