[sheepdog] [PATCH] sheep: use tx/rx thread for data transfer
yaohaiting.wujue at gmail.com
yaohaiting.wujue at gmail.com
Wed Jul 11 11:10:54 CEST 2012
From: HaiTing Yao <wujue.yht at taobao.com>
tx/rx are in main thead now. Then the tx/rx can not be done at the same
time. The tx/rx will also delay the epoll event.
I use one thread for tx and one thread for rx. This improve the
performance.
Signed-off-by: HaiTing Yao <wujue.yht at taobao.com>
---
sheep/sdnet.c | 173 +++++++++++++++++++++++++++++++++++++---------------
sheep/sheep.c | 2 +
sheep/sheep_priv.h | 2 +
3 files changed, 127 insertions(+), 50 deletions(-)
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index a2a7bc1..169f3ab 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -505,10 +505,68 @@ static void init_rx_hdr(struct client_info *ci)
ci->conn.rx_buf = &ci->conn.rx_hdr;
}
-static void client_rx_handler(struct client_info *ci)
+static void destroy_client(struct client_info *ci)
+{
+ dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
+ close(ci->conn.fd);
+ free(ci);
+}
+
+struct rx_tx_info {
+ struct client_info *client;
+ struct work work;
+};
+
+static void client_handler(int fd, int events, void *data);
+
+static void rx_done(struct work *work)
+{
+ int ret = 0;
+ struct rx_tx_info *info = container_of(work, struct rx_tx_info, work);
+ struct client_info *ci = info->client;
+ struct connection *conn = &ci->conn;
+ struct request *req;
+ struct sd_req *hdr = &conn->rx_hdr;
+
+ ret = register_event(ci->conn.fd, client_handler, ci);
+
+ if (ret) {
+ destroy_client(ci);
+ return;
+ }
+
+ if (is_conn_dead(conn) && ci->rx_req) {
+ free_request(ci->rx_req);
+ return;
+ }
+
+ if (conn->c_rx_state != C_IO_END)
+ return;
+
+ /* now we have a complete command */
+
+ req = ci->rx_req;
+
+ init_rx_hdr(ci);
+
+ if (hdr->flags & SD_FLAG_CMD_WRITE)
+ req->rp.data_length = 0;
+ else
+ req->rp.data_length = hdr->data_length;
+
+ dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
+ queue_request(req);
+
+ if (info)
+ free(info);
+}
+
+static void client_rx_handler_worker(struct work *work)
{
int ret;
uint64_t data_len;
+ struct rx_tx_info *info = container_of(work, struct rx_tx_info, work);
+ struct client_info *ci = info->client;
struct connection *conn = &ci->conn;
struct sd_req *hdr = &conn->rx_hdr;
struct request *req;
@@ -553,29 +611,7 @@ static void client_rx_handler(struct client_info *ci)
eprintf("bug: unknown state %d\n", conn->c_rx_state);
}
- if (is_conn_dead(conn) && ci->rx_req) {
- free_request(ci->rx_req);
- ci->rx_req = NULL;
- return;
- }
-
- if (conn->c_rx_state != C_IO_END)
- return;
-
- /* now we have a complete command */
-
- req = ci->rx_req;
-
- init_rx_hdr(ci);
-
- if (hdr->flags & SD_FLAG_CMD_WRITE)
- req->rp.data_length = 0;
- else
- req->rp.data_length = hdr->data_length;
-
- dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
- ci->conn.ipstr, ci->conn.port);
- queue_request(req);
+ return;
}
static void init_tx_hdr(struct client_info *ci)
@@ -604,12 +640,52 @@ static void init_tx_hdr(struct client_info *ci)
rsp->id = req->rq.id;
}
-static void client_tx_handler(struct client_info *ci)
+static void tx_done(struct work *work)
+{
+ int ret = 0;
+ struct rx_tx_info *info = container_of(work, struct rx_tx_info, work);
+ struct client_info *ci = info->client;
+ struct connection *conn, *n;
+
+ ret = register_event(ci->conn.fd, client_handler, ci);
+
+ if (is_conn_dead(&ci->conn)) {
+ free_request(ci->tx_req);
+ return;
+ }
+
+ if (ci->conn.c_tx_state == C_IO_END) {
+ free_request(ci->tx_req);
+ ci->tx_req = NULL;
+ init_tx_hdr(ci);
+ if (!ci->tx_req) {
+ conn_tx_off(&ci->conn);
+ if (sys->outstanding_data_size <
+ MAX_OUTSTANDING_DATA_SIZE) {
+ list_for_each_entry_safe(conn, n,
+ &sys->blocking_conn_list,
+ blocking_siblings) {
+ dprintf("rx on %p\n", conn);
+ list_del_init(&conn->blocking_siblings);
+ conn_rx_on(conn);
+ }
+ }
+ }
+ } else
+ conn_tx_on(&ci->conn);
+
+ if (info)
+ free(info);
+}
+
+static void client_tx_handler_worker(struct work *work)
{
int ret, opt;
+ struct rx_tx_info *info = container_of(work, struct rx_tx_info, work);
+ struct client_info *ci = info->client;
struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
struct connection *conn, *n;
-again:
+
init_tx_hdr(ci);
if (!ci->tx_req) {
conn_tx_off(&ci->conn);
@@ -652,26 +728,7 @@ again:
opt = 0;
setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
- if (is_conn_dead(&ci->conn)) {
- free_request(ci->tx_req);
- ci->tx_req = NULL;
- return;
- }
-
- if (ci->conn.c_tx_state == C_IO_END) {
- dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
- ci->conn.ipstr, ci->conn.port);
- free_request(ci->tx_req);
- ci->tx_req = NULL;
- goto again;
- }
-}
-
-static void destroy_client(struct client_info *ci)
-{
- dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
- close(ci->conn.fd);
- free(ci);
+ return;
}
static void clear_client(struct client_info *ci)
@@ -750,14 +807,30 @@ static void client_handler(int fd, int events, void *data)
{
struct client_info *ci = (struct client_info *)data;
+ struct rx_tx_info *net_info = NULL;
+
+ net_info = zalloc(sizeof(struct rx_tx_info));
+ if (!net_info)
+ return;
+
+ net_info->client = ci;
+
if (events & (EPOLLERR | EPOLLHUP))
goto err;
- if (events & EPOLLIN)
- client_rx_handler(ci);
+ if (events & EPOLLIN) {
+ net_info->work.fn = client_rx_handler_worker;
+ net_info->work.done = rx_done;
+ queue_work(sys->rx_wqueue, &net_info->work);
+ unregister_event(fd);
+ }
- if (events & EPOLLOUT)
- client_tx_handler(ci);
+ if (events & EPOLLOUT) {
+ net_info->work.fn = client_tx_handler_worker;
+ net_info->work.done = tx_done;
+ queue_work(sys->tx_wqueue, &net_info->work);
+ unregister_event(fd);
+ }
if (is_conn_dead(&ci->conn)) {
err:
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 7d1e853..1df8683 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -252,6 +252,8 @@ int main(int argc, char **argv)
sys->recovery_wqueue = init_work_queue("recovery", true);
sys->deletion_wqueue = init_work_queue("deletion", true);
sys->block_wqueue = init_work_queue("block", true);
+ sys->tx_wqueue = init_work_queue("tx", true);
+ sys->rx_wqueue = init_work_queue("rx", true);
if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
!sys->deletion_wqueue || !sys->block_wqueue)
exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index afbc361..07e1f2e 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -126,6 +126,8 @@ struct cluster_info {
struct work_queue *deletion_wqueue;
struct work_queue *recovery_wqueue;
struct work_queue *block_wqueue;
+ struct work_queue *rx_wqueue;
+ struct work_queue *tx_wqueue;
};
struct siocb {
--
1.7.1
More information about the sheepdog
mailing list