[sheepdog] [PATCH] sheep: use tx/rx thread for data transfer

yaohaiting.wujue at gmail.com yaohaiting.wujue at gmail.com
Wed Jul 11 11:10:54 CEST 2012


From: HaiTing Yao <wujue.yht at taobao.com>

tx/rx are in main thead now. Then the tx/rx can not be done at the same
time. The tx/rx will also delay the epoll event.

I use one thread for tx and one thread for rx. This improve the
performance.

Signed-off-by: HaiTing Yao <wujue.yht at taobao.com>
---
 sheep/sdnet.c      |  173 +++++++++++++++++++++++++++++++++++++---------------
 sheep/sheep.c      |    2 +
 sheep/sheep_priv.h |    2 +
 3 files changed, 127 insertions(+), 50 deletions(-)

diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index a2a7bc1..169f3ab 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -505,10 +505,68 @@ static void init_rx_hdr(struct client_info *ci)
 	ci->conn.rx_buf = &ci->conn.rx_hdr;
 }
 
-static void client_rx_handler(struct client_info *ci)
+static void destroy_client(struct client_info *ci)
+{
+	dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
+	close(ci->conn.fd);
+	free(ci);
+}
+
+struct rx_tx_info {
+	struct client_info *client;
+	struct work work;
+};
+
+static void client_handler(int fd, int events, void *data);
+
+static void rx_done(struct work *work)
+{
+	int ret = 0;
+	struct rx_tx_info *info = container_of(work, struct rx_tx_info, work);
+	struct client_info *ci = info->client;
+	struct connection *conn = &ci->conn;
+	struct request *req;
+	struct sd_req *hdr = &conn->rx_hdr;
+
+	ret = register_event(ci->conn.fd, client_handler, ci);
+
+	if (ret) {
+		destroy_client(ci);
+		return;
+	}
+
+	if (is_conn_dead(conn) && ci->rx_req) {
+		free_request(ci->rx_req);
+		return;
+	}
+
+	if (conn->c_rx_state != C_IO_END)
+		return;
+
+	/* now we have a complete command */
+
+	req = ci->rx_req;
+
+	init_rx_hdr(ci);
+
+	if (hdr->flags & SD_FLAG_CMD_WRITE)
+		req->rp.data_length = 0;
+	else
+		req->rp.data_length = hdr->data_length;
+
+	dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
+	queue_request(req);
+
+	if (info)
+		free(info);
+}
+
+static void client_rx_handler_worker(struct work *work)
 {
 	int ret;
 	uint64_t data_len;
+	struct rx_tx_info *info = container_of(work, struct rx_tx_info, work);
+	struct client_info *ci = info->client;
 	struct connection *conn = &ci->conn;
 	struct sd_req *hdr = &conn->rx_hdr;
 	struct request *req;
@@ -553,29 +611,7 @@ static void client_rx_handler(struct client_info *ci)
 		eprintf("bug: unknown state %d\n", conn->c_rx_state);
 	}
 
-	if (is_conn_dead(conn) && ci->rx_req) {
-		free_request(ci->rx_req);
-		ci->rx_req = NULL;
-		return;
-	}
-
-	if (conn->c_rx_state != C_IO_END)
-		return;
-
-	/* now we have a complete command */
-
-	req = ci->rx_req;
-
-	init_rx_hdr(ci);
-
-	if (hdr->flags & SD_FLAG_CMD_WRITE)
-		req->rp.data_length = 0;
-	else
-		req->rp.data_length = hdr->data_length;
-
-	dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
-		ci->conn.ipstr, ci->conn.port);
-	queue_request(req);
+	return;
 }
 
 static void init_tx_hdr(struct client_info *ci)
@@ -604,12 +640,52 @@ static void init_tx_hdr(struct client_info *ci)
 	rsp->id = req->rq.id;
 }
 
-static void client_tx_handler(struct client_info *ci)
+static void tx_done(struct work *work)
+{
+	int ret = 0;
+	struct rx_tx_info *info = container_of(work, struct rx_tx_info, work);
+	struct client_info *ci = info->client;
+	struct connection *conn, *n;
+
+	ret = register_event(ci->conn.fd, client_handler, ci);
+
+	if (is_conn_dead(&ci->conn)) {
+		free_request(ci->tx_req);
+		return;
+	}
+
+	if (ci->conn.c_tx_state == C_IO_END) {
+		free_request(ci->tx_req);
+		ci->tx_req = NULL;
+		init_tx_hdr(ci);
+		if (!ci->tx_req) {
+			conn_tx_off(&ci->conn);
+			if (sys->outstanding_data_size <
+				MAX_OUTSTANDING_DATA_SIZE) {
+				list_for_each_entry_safe(conn, n,
+						&sys->blocking_conn_list,
+						blocking_siblings) {
+					dprintf("rx on %p\n", conn);
+					list_del_init(&conn->blocking_siblings);
+					conn_rx_on(conn);
+				}
+			}
+		}
+	} else
+		conn_tx_on(&ci->conn);
+
+	if (info)
+		free(info);
+}
+
+static void client_tx_handler_worker(struct work *work)
 {
 	int ret, opt;
+	struct rx_tx_info *info = container_of(work, struct rx_tx_info, work);
+	struct client_info *ci = info->client;
 	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
 	struct connection *conn, *n;
-again:
+
 	init_tx_hdr(ci);
 	if (!ci->tx_req) {
 		conn_tx_off(&ci->conn);
@@ -652,26 +728,7 @@ again:
 	opt = 0;
 	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
 
-	if (is_conn_dead(&ci->conn)) {
-		free_request(ci->tx_req);
-		ci->tx_req = NULL;
-		return;
-	}
-
-	if (ci->conn.c_tx_state == C_IO_END) {
-		dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
-			ci->conn.ipstr, ci->conn.port);
-		free_request(ci->tx_req);
-		ci->tx_req = NULL;
-		goto again;
-	}
-}
-
-static void destroy_client(struct client_info *ci)
-{
-	dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
-	close(ci->conn.fd);
-	free(ci);
+	return;
 }
 
 static void clear_client(struct client_info *ci)
@@ -750,14 +807,30 @@ static void client_handler(int fd, int events, void *data)
 {
 	struct client_info *ci = (struct client_info *)data;
 
+	struct rx_tx_info *net_info = NULL;
+
+	net_info = zalloc(sizeof(struct rx_tx_info));
+	if (!net_info)
+		return;
+
+	net_info->client = ci;
+
 	if (events & (EPOLLERR | EPOLLHUP))
 		goto err;
 
-	if (events & EPOLLIN)
-		client_rx_handler(ci);
+	if (events & EPOLLIN) {
+		net_info->work.fn = client_rx_handler_worker;
+		net_info->work.done = rx_done;
+		queue_work(sys->rx_wqueue, &net_info->work);
+		unregister_event(fd);
+	}
 
-	if (events & EPOLLOUT)
-		client_tx_handler(ci);
+	if (events & EPOLLOUT) {
+		net_info->work.fn = client_tx_handler_worker;
+		net_info->work.done = tx_done;
+		queue_work(sys->tx_wqueue, &net_info->work);
+		unregister_event(fd);
+	}
 
 	if (is_conn_dead(&ci->conn)) {
 err:
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 7d1e853..1df8683 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -252,6 +252,8 @@ int main(int argc, char **argv)
 	sys->recovery_wqueue = init_work_queue("recovery", true);
 	sys->deletion_wqueue = init_work_queue("deletion", true);
 	sys->block_wqueue = init_work_queue("block", true);
+	sys->tx_wqueue = init_work_queue("tx", true);
+	sys->rx_wqueue = init_work_queue("rx", true);
 	if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
 	    !sys->deletion_wqueue || !sys->block_wqueue)
 		exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index afbc361..07e1f2e 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -126,6 +126,8 @@ struct cluster_info {
 	struct work_queue *deletion_wqueue;
 	struct work_queue *recovery_wqueue;
 	struct work_queue *block_wqueue;
+	struct work_queue *rx_wqueue;
+	struct work_queue *tx_wqueue;
 };
 
 struct siocb {
-- 
1.7.1




More information about the sheepdog mailing list