[sheepdog] [RFC PATCH] sdnet: threading tx/rx process

Liu Yuan namei.unix at gmail.com
Wed Jul 11 12:11:31 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

With threaded tx/rx process, we are supposed to improve the scalability of
sheep gateway performence by overlapping different requests from different
VMs.

This patch also remove request throttling because we are guaranteed to process
requests as soon as it arrives and won't block any request.

I'll give numbers soon later

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 include/net.h      |    2 -
 sheep/sdnet.c      |  159 ++++++++++++++++++++++++++++++++--------------------
 sheep/sheep.c      |    3 +-
 sheep/sheep_priv.h |    1 +
 4 files changed, 102 insertions(+), 63 deletions(-)

diff --git a/include/net.h b/include/net.h
index 581da66..4ee0eb6 100644
--- a/include/net.h
+++ b/include/net.h
@@ -30,8 +30,6 @@ struct connection {
 	int tx_length;
 	void *tx_buf;
 	struct sd_rsp tx_hdr;
-
-	struct list_head blocking_siblings;
 };
 
 int conn_tx_off(struct connection *conn);
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index a2a7bc1..adfc9c3 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -22,6 +22,11 @@
 
 #include "sheep_priv.h"
 
+struct net_work {
+	struct work work;
+	struct client_info *ci;
+};
+
 static void requeue_request(struct request *req);
 
 static int is_access_local(struct request *req, uint64_t oid)
@@ -458,16 +463,14 @@ static struct request *alloc_request(struct client_info *ci, int data_length)
 	INIT_LIST_HEAD(&req->request_list);
 	uatomic_set(&req->refcnt, 1);
 
-	sys->nr_outstanding_reqs++;
-	sys->outstanding_data_size += data_length;
+	uatomic_inc(&sys->nr_outstanding_reqs);
 
 	return req;
 }
 
 static void free_request(struct request *req)
 {
-	sys->nr_outstanding_reqs--;
-	sys->outstanding_data_size -= req->data_length;
+	uatomic_dec(&sys->nr_outstanding_reqs);
 
 	req->ci->refcnt--;
 	put_vnode_info(req->vnodes);
@@ -505,21 +508,17 @@ static void init_rx_hdr(struct client_info *ci)
 	ci->conn.rx_buf = &ci->conn.rx_hdr;
 }
 
-static void client_rx_handler(struct client_info *ci)
+static void do_rx(struct work *work)
 {
-	int ret;
-	uint64_t data_len;
+	struct net_work *nw = container_of(work, struct net_work, work);
+	struct client_info *ci = nw->ci;
 	struct connection *conn = &ci->conn;
 	struct sd_req *hdr = &conn->rx_hdr;
 	struct request *req;
+	uint64_t data_len;
+	int ret;
 
-	if (!ci->rx_req && sys->outstanding_data_size > MAX_OUTSTANDING_DATA_SIZE) {
-		dprintf("too many requests (%p)\n", &ci->conn);
-		conn_rx_off(&ci->conn);
-		list_add(&ci->conn.blocking_siblings, &sys->blocking_conn_list);
-		return;
-	}
-
+	dprintf("%s:%d, fd %d\n", conn->ipstr, conn->port, conn->fd);
 	switch (conn->c_rx_state) {
 	case C_IO_HEADER:
 		ret = rx(conn, C_IO_DATA_INIT);
@@ -553,29 +552,52 @@ static void client_rx_handler(struct client_info *ci)
 		eprintf("bug: unknown state %d\n", conn->c_rx_state);
 	}
 
-	if (is_conn_dead(conn) && ci->rx_req) {
-		free_request(ci->rx_req);
-		ci->rx_req = NULL;
-		return;
+}
+
+static void rx_done(struct work *work)
+{
+	struct net_work *nw = container_of(work, struct net_work, work);
+	struct client_info *ci = nw->ci;
+	struct connection *conn = &ci->conn;
+	struct sd_req *hdr = &conn->rx_hdr;
+	struct request *req = nw->ci->rx_req;
+
+	if (is_conn_dead(conn) && req) {
+		free_request(req);
+		goto out;
 	}
 
+	/* Short read happens */
 	if (conn->c_rx_state != C_IO_END)
-		return;
+		goto out;
 
 	/* now we have a complete command */
-
-	req = ci->rx_req;
-
 	init_rx_hdr(ci);
-
 	if (hdr->flags & SD_FLAG_CMD_WRITE)
 		req->rp.data_length = 0;
 	else
 		req->rp.data_length = hdr->data_length;
 
-	dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
-		ci->conn.ipstr, ci->conn.port);
+	dprintf("%s:%d, fd %d\n", conn->ipstr, conn->port,
+		conn->fd);
+
 	queue_request(req);
+out:
+	conn_rx_on(conn);
+	free(nw);
+}
+
+static void client_rx_handler(struct client_info *ci)
+{
+	struct net_work *nw;
+
+	conn_rx_off(&ci->conn);
+
+	nw = xmalloc(sizeof(*nw));
+	nw->ci = ci;
+	nw->work.fn = do_rx;
+	nw->work.done = rx_done;
+	queue_work(sys->net_wqueue, &nw->work);
 }
 
 static void init_tx_hdr(struct client_info *ci)
@@ -583,9 +605,6 @@ static void init_tx_hdr(struct client_info *ci)
 	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
 	struct request *req;
 
-	if (ci->tx_req || list_empty(&ci->done_reqs))
-		return;
-
 	memset(rsp, 0, sizeof(*rsp));
 
 	req = list_first_entry(&ci->done_reqs, struct request, request_list);
@@ -604,26 +623,14 @@ static void init_tx_hdr(struct client_info *ci)
 	rsp->id = req->rq.id;
 }
 
-static void client_tx_handler(struct client_info *ci)
+static void do_tx(struct work *work)
 {
-	int ret, opt;
+	struct net_work *nw = container_of(work, struct net_work, work);
+	struct client_info *ci = nw->ci;
 	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
-	struct connection *conn, *n;
-again:
-	init_tx_hdr(ci);
-	if (!ci->tx_req) {
-		conn_tx_off(&ci->conn);
-		if (sys->outstanding_data_size < MAX_OUTSTANDING_DATA_SIZE) {
-			list_for_each_entry_safe(conn, n, &sys->blocking_conn_list,
-						 blocking_siblings) {
-				dprintf("rx on %p\n", conn);
-				list_del_init(&conn->blocking_siblings);
-				conn_rx_on(conn);
-			}
-		}
-		return;
-	}
+	int ret, opt;
 
+	dprintf("%s:%d, fd %d\n", ci->conn.ipstr, ci->conn.port, ci->conn.fd);
 	opt = 1;
 	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
 
@@ -651,20 +658,54 @@ again:
 
 	opt = 0;
 	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+}
+
+static void tx_done(struct work *work)
+{
+	struct net_work *nw = container_of(work, struct net_work, work);
+	struct client_info *ci = nw->ci;
 
 	if (is_conn_dead(&ci->conn)) {
 		free_request(ci->tx_req);
-		ci->tx_req = NULL;
-		return;
+		goto out;
 	}
 
-	if (ci->conn.c_tx_state == C_IO_END) {
-		dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
-			ci->conn.ipstr, ci->conn.port);
+	/* Finish sending one response */
+	if (ci->conn.c_tx_state == C_IO_END && ci->tx_req) {
 		free_request(ci->tx_req);
 		ci->tx_req = NULL;
-		goto again;
+		dprintf("%s:%d, fd %d\n", ci->conn.ipstr, ci->conn.port,
+			ci->conn.fd);
 	}
+
+	/* We have more data to send */
+	if (ci->tx_req || !list_empty(&ci->done_reqs))
+		conn_tx_on(&ci->conn);
+out:
+	ci->refcnt--;
+	free(nw);
+}
+
+static void client_tx_handler(struct client_info *ci)
+{
+	struct net_work *nw;
+
+	conn_tx_off(&ci->conn);
+
+	if (!ci->tx_req && list_empty(&ci->done_reqs))
+		return;
+
+	/* If short send happens, we don't need init hdr */
+	if (!ci->tx_req)
+		init_tx_hdr(ci);
+
+	ci->refcnt++;
+
+	nw = xmalloc(sizeof(*nw));
+	nw->ci = ci;
+	nw->work.fn = do_tx;
+	nw->work.done = tx_done;
+	queue_work(sys->net_wqueue, &nw->work);
 }
 
 static void destroy_client(struct client_info *ci)
@@ -693,9 +734,6 @@ static void clear_client(struct client_info *ci)
 		free_request(req);
 	}
 
-	if (!list_empty(&ci->conn.blocking_siblings))
-		list_del_init(&ci->conn.blocking_siblings);
-
 	unregister_event(ci->conn.fd);
 
 	dprintf("refcnt:%d, fd:%d, %s:%d\n",
@@ -725,12 +763,12 @@ static struct client_info *create_client(int fd, struct cluster_info *cluster)
 	case AF_INET:
 		ci->conn.port = ntohs(((struct sockaddr_in *)&from)->sin_port);
 		inet_ntop(AF_INET, &((struct sockaddr_in *)&from)->sin_addr,
-				ci->conn.ipstr, sizeof(ci->conn.ipstr));
+			  ci->conn.ipstr, sizeof(ci->conn.ipstr));
 		break;
 	case AF_INET6:
 		ci->conn.port = ntohs(((struct sockaddr_in6 *)&from)->sin6_port);
 		inet_ntop(AF_INET6, &((struct sockaddr_in6 *)&from)->sin6_addr,
-				ci->conn.ipstr, sizeof(ci->conn.ipstr));
+			  ci->conn.ipstr, sizeof(ci->conn.ipstr));
 		break;
 	}
 
@@ -739,7 +777,6 @@ static struct client_info *create_client(int fd, struct cluster_info *cluster)
 	ci->refcnt = 0;
 
 	INIT_LIST_HEAD(&ci->done_reqs);
-	INIT_LIST_HEAD(&ci->conn.blocking_siblings);
 
 	init_rx_hdr(ci);
 
@@ -750,8 +787,11 @@ static void client_handler(int fd, int events, void *data)
 {
 	struct client_info *ci = (struct client_info *)data;
 
-	if (events & (EPOLLERR | EPOLLHUP))
-		goto err;
+	dprintf("%x, rx %d, tx %d\n", events, ci->conn.c_rx_state,
+		ci->conn.c_tx_state);
+
+	if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
+		return clear_client(ci);
 
 	if (events & EPOLLIN)
 		client_rx_handler(ci);
@@ -760,7 +800,6 @@ static void client_handler(int fd, int events, void *data)
 		client_tx_handler(ci);
 
 	if (is_conn_dead(&ci->conn)) {
-err:
 		dprintf("connection seems to be dead\n");
 		clear_client(ci);
 	}
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 7d1e853..cd3b396 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -252,8 +252,9 @@ int main(int argc, char **argv)
 	sys->recovery_wqueue = init_work_queue("recovery", true);
 	sys->deletion_wqueue = init_work_queue("deletion", true);
 	sys->block_wqueue = init_work_queue("block", true);
+	sys->net_wqueue = init_work_queue("net", false);
 	if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
-	    !sys->deletion_wqueue || !sys->block_wqueue)
+	    !sys->deletion_wqueue || !sys->block_wqueue || !sys->net_wqueue)
 		exit(1);
 
 	ret = init_signal();
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index afbc361..6c1a9d9 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -126,6 +126,7 @@ struct cluster_info {
 	struct work_queue *deletion_wqueue;
 	struct work_queue *recovery_wqueue;
 	struct work_queue *block_wqueue;
+	struct work_queue *net_wqueue;
 };
 
 struct siocb {
-- 
1.7.10.2




More information about the sheepdog mailing list