[sheepdog] [PATCH stable-0.7] read/write data from clients in worker threads

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Fri Feb 7 02:52:01 CET 2014


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

Currently, we read/write data from clients in the main thread in
the non-blockng way.  This has the following problems:

 - The implementation is much more complex than doing it in the
   blocking way.
 - The main thread is a bottleneck when doing a large data I/Os
   (e.g. object recovery does a lot of 4 MB reads and writes).

This patch creates one worker thread for each connection, makes socket
descriptors blocking, and reads/writes data from clients in the worker
threads in blocking way.

With this patch, client_handler() doesn't print warnings of the
loop_check tracer any more.  I did the following benchmark to emulate
a VM I/O while object recovery, and could see a lot of performance
improvement.

[benchmark]

  Simulate object recovery with 10 concurrent 4 MB reads, and simulate
  VM I/Os with 4 KB reads.

  $ ./script/vditest -C 10 -B 4M -T 10 test > /dev/null & \
    ./script/vditest -B 4k -T 10 test

[result (before)]

  options: -B 4096:4096 -c writethrough -C 1 -D 100:0 -o 0
           -p linear -s 1376684853 -S 0:4294967296 -T 10 -f 0
  Total read throughput: 638156.8B/s (623.2K/s), IOPS 155.8/s.

[result (after)]
  options: -B 4096:4096 -c writethrough -C 1 -D 100:0 -o 0
           -p linear -s 1376684811 -S 0:4294967296 -T 10 -f 0
  Total read throughput: 4237312.0B/s (4.0M/s), IOPS 1034.5/s.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
 include/net.h      |   12 +--
 lib/net.c          |   80 ---------------
 sheep/request.c    |  283 ++++++++++++++++++++--------------------------------
 sheep/sheep.c      |    1 +
 sheep/sheep_priv.h |    3 +
 5 files changed, 115 insertions(+), 264 deletions(-)

diff --git a/include/net.h b/include/net.h
index 2782e44..b8c6ab8 100644
--- a/include/net.h
+++ b/include/net.h
@@ -32,22 +32,13 @@ struct connection {
 	uint16_t port;
 	char ipstr[INET6_ADDRSTRLEN];
 
-	enum conn_state c_rx_state;
-	int rx_length;
-	void *rx_buf;
-	struct sd_req rx_hdr;
-
-	enum conn_state c_tx_state;
-	int tx_length;
-	void *tx_buf;
-	struct sd_rsp tx_hdr;
+	bool dead;
 };
 
 int conn_tx_off(struct connection *conn);
 int conn_tx_on(struct connection *conn);
 int conn_rx_off(struct connection *conn);
 int conn_rx_on(struct connection *conn);
-bool is_conn_dead(const struct connection *conn);
 int do_read(int sockfd, void *buf, int len,
 	    bool (*need_retry)(uint32_t), uint32_t, uint32_t);
 int rx(struct connection *conn, enum conn_state next_state);
@@ -65,7 +56,6 @@ int create_unix_domain_socket(const char *unix_path,
 const char *addr_to_str(const uint8_t *addr, uint16_t port);
 uint8_t *str_to_addr(const char *ipstr, uint8_t *addr);
 char *sockaddr_in_to_str(struct sockaddr_in *sockaddr);
-int set_nonblocking(int fd);
 int set_nodelay(int fd);
 int set_keepalive(int fd);
 int set_snd_timeout(int fd);
diff --git a/lib/net.c b/lib/net.c
index ce978b2..e38dfaa 100644
--- a/lib/net.c
+++ b/lib/net.c
@@ -61,59 +61,6 @@ int conn_rx_on(struct connection *conn)
 	return modify_event(conn->fd, conn->events);
 }
 
-bool is_conn_dead(const struct connection *conn)
-{
-	if (conn->c_rx_state == C_IO_CLOSED || conn->c_tx_state == C_IO_CLOSED)
-		return true;
-	else
-		return false;
-}
-
-int rx(struct connection *conn, enum conn_state next_state)
-{
-	int ret;
-
-	ret = read(conn->fd, conn->rx_buf, conn->rx_length);
-	if (!ret) {
-		conn->c_rx_state = C_IO_CLOSED;
-		return 0;
-	}
-
-	if (ret < 0) {
-		if (errno != EAGAIN && errno != EINTR)
-			conn->c_rx_state = C_IO_CLOSED;
-		return 0;
-	}
-
-	conn->rx_length -= ret;
-	conn->rx_buf = (char *)conn->rx_buf + ret;
-
-	if (!conn->rx_length)
-		conn->c_rx_state = next_state;
-
-	return ret;
-}
-
-int tx(struct connection *conn, enum conn_state next_state)
-{
-	int ret;
-
-	ret = write(conn->fd, conn->tx_buf, conn->tx_length);
-	if (ret < 0) {
-		if (errno != EAGAIN && errno != EINTR)
-			conn->c_tx_state = C_IO_CLOSED;
-		return 0;
-	}
-
-	conn->tx_length -= ret;
-	conn->tx_buf = (char *)conn->tx_buf + ret;
-
-	if (!conn->tx_length)
-		conn->c_tx_state = next_state;
-
-	return ret;
-}
-
 int create_listen_ports(const char *bindaddr, int port,
 		int (*callback)(int fd, void *), void *data)
 {
@@ -170,12 +117,6 @@ int create_listen_ports(const char *bindaddr, int port,
 			continue;
 		}
 
-		ret = set_nonblocking(fd);
-		if (ret < 0) {
-			close(fd);
-			continue;
-		}
-
 		ret = callback(fd, data);
 		if (ret) {
 			close(fd);
@@ -483,23 +424,6 @@ uint8_t *str_to_addr(const char *ipstr, uint8_t *addr)
 	return addr;
 }
 
-int set_nonblocking(int fd)
-{
-	int ret;
-
-	ret = fcntl(fd, F_GETFL);
-	if (ret < 0) {
-		sd_err("fcntl F_GETFL failed: %m");
-		close(fd);
-	} else {
-		ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
-		if (ret < 0)
-			sd_err("fcntl O_NONBLOCK failed: %m");
-	}
-
-	return ret;
-}
-
 int set_snd_timeout(int fd)
 {
 	struct timeval timeout;
@@ -636,10 +560,6 @@ int create_unix_domain_socket(const char *unix_path,
 		goto err;
 	}
 
-	ret = set_nonblocking(fd);
-	if (ret < 0)
-		goto err;
-
 	ret = callback(fd, data);
 	if (ret)
 		goto err;
diff --git a/sheep/request.c b/sheep/request.c
index 8758214..29781cc 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -510,196 +510,125 @@ main_fn void put_request(struct request *req)
 	if (req->local)
 		eventfd_xwrite(req->local_req_efd, 1);
 	else {
-		if (conn_tx_on(&ci->conn)) {
+		if (ci->conn.dead) {
 			clear_client_info(ci);
 			free_request(req);
 		} else {
-			list_add(&req->request_list, &ci->done_reqs);
+			list_add_tail(&req->request_list, &ci->done_reqs);
+
+			if (ci->tx_req == NULL)
+				/* There is no request being sent. */
+				conn_tx_on(&ci->conn);
 		}
 	}
 }
 
-static void init_rx_hdr(struct client_info *ci)
-{
-	ci->conn.c_rx_state = C_IO_HEADER;
-	ci->rx_req = NULL;
-	ci->conn.rx_length = sizeof(struct sd_req);
-	ci->conn.rx_buf = &ci->conn.rx_hdr;
-}
-
-static inline int begin_rx(struct client_info *ci)
+static void rx_work(struct work *work)
 {
+	struct client_info *ci = container_of(work, struct client_info,
+					      rx_work);
 	int ret;
-	uint64_t data_len;
 	struct connection *conn = &ci->conn;
-	struct sd_req *hdr = &conn->rx_hdr;
+	struct sd_req hdr;
 	struct request *req;
 
-	switch (conn->c_rx_state) {
-	case C_IO_HEADER:
-		ret = rx(conn, C_IO_DATA_INIT);
-		if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
-			break;
-	case C_IO_DATA_INIT:
-		data_len = hdr->data_length;
-
-		req = alloc_request(ci, data_len);
-		if (!req) {
-			conn->c_rx_state = C_IO_CLOSED;
-			break;
-		}
-		ci->rx_req = req;
-
-		/* use le_to_cpu */
-		memcpy(&req->rq, hdr, sizeof(req->rq));
-
-		if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
-			conn->c_rx_state = C_IO_DATA;
-			conn->rx_length = data_len;
-			conn->rx_buf = req->data;
-		} else {
-			conn->c_rx_state = C_IO_END;
-			break;
-		}
-	case C_IO_DATA:
-		ret = rx(conn, C_IO_END);
-		break;
-	default:
-		sd_err("bug: unknown state %d", conn->c_rx_state);
+	ret = do_read(conn->fd, &hdr, sizeof(hdr), NULL, 0, UINT32_MAX);
+	if (ret) {
+		sd_err("failed to read a header");
+		conn->dead = true;
+		return;
 	}
 
-	if (is_conn_dead(conn)) {
-		clear_client_info(ci);
-		return -1;
+	req = alloc_request(ci, hdr.data_length);
+	if (!req) {
+		sd_err("failed to allocate request");
+		conn->dead = true;
+		return;
 	}
+	ci->rx_req = req;
 
-	/* Short read happens */
-	if (conn->c_rx_state != C_IO_END)
-		return -1;
+	/* use le_to_cpu */
+	memcpy(&req->rq, &hdr, sizeof(req->rq));
 
-	return 0;
-}
-
-static inline void finish_rx(struct client_info *ci)
-{
-	struct request *req;
-
-	req = ci->rx_req;
-	init_rx_hdr(ci);
-
-	sd_debug("%d, %s:%d", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
-	queue_request(req);
-}
-
-static void do_client_rx(struct client_info *ci)
-{
-	if (begin_rx(ci) < 0)
-		return;
-
-	finish_rx(ci);
+	if (hdr.data_length && hdr.flags & SD_FLAG_CMD_WRITE) {
+		ret = do_read(conn->fd, req->data, hdr.data_length, NULL, 0,
+			      UINT32_MAX);
+		if (ret) {
+			sd_err("failed to read data");
+			conn->dead = true;
+		}
+	}
 }
 
-static void init_tx_hdr(struct client_info *ci)
+static void rx_main(struct work *work)
 {
-	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
-	struct request *req;
+	struct client_info *ci = container_of(work, struct client_info,
+					      rx_work);
+	struct request *req = ci->rx_req;
 
-	assert(!list_empty(&ci->done_reqs));
+	ci->rx_req = NULL;
 
-	memset(rsp, 0, sizeof(*rsp));
+	refcount_dec(&ci->refcnt);
 
-	req = list_first_entry(&ci->done_reqs, struct request, request_list);
-	list_del(&req->request_list);
+	if (ci->conn.dead) {
+		if (req)
+			free_request(req);
 
-	ci->tx_req = req;
-	ci->conn.tx_length = sizeof(*rsp);
-	ci->conn.c_tx_state = C_IO_HEADER;
-	ci->conn.tx_buf = rsp;
+		clear_client_info(ci);
+		return;
+	}
 
-	/* use cpu_to_le */
-	memcpy(rsp, &req->rp, sizeof(*rsp));
+	conn_rx_on(&ci->conn);
 
-	rsp->epoch = sys->cinfo.epoch;
-	rsp->opcode = req->rq.opcode;
-	rsp->id = req->rq.id;
+	sd_debug("%d, %s:%d", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
+	queue_request(req);
 }
 
-static inline int begin_tx(struct client_info *ci)
+static void tx_work(struct work *work)
 {
-	int ret, opt;
-	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+	struct client_info *ci = container_of(work, struct client_info,
+					      tx_work);
+	int ret;
+	struct connection *conn = &ci->conn;
+	struct sd_rsp rsp;
+	struct request *req = ci->tx_req;
+	void *data = NULL;
 
-	/* If short send happens, we don't need init hdr */
-	if (!ci->tx_req)
-		init_tx_hdr(ci);
+	/* use cpu_to_le */
+	memcpy(&rsp, &req->rp, sizeof(rsp));
 
-	opt = 1;
-	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+	rsp.epoch = sys->cinfo.epoch;
+	rsp.opcode = req->rq.opcode;
+	rsp.id = req->rq.id;
 
-	switch (ci->conn.c_tx_state) {
-	case C_IO_HEADER:
-		ret = tx(&ci->conn, C_IO_DATA_INIT);
-		if (!ret)
-			break;
+	if (rsp.data_length)
+		data = req->data;
 
-		if (rsp->data_length) {
-			ci->conn.tx_length = rsp->data_length;
-			ci->conn.tx_buf = ci->tx_req->data;
-			ci->conn.c_tx_state = C_IO_DATA;
-		} else {
-			ci->conn.c_tx_state = C_IO_END;
-			break;
-		}
-	case C_IO_DATA:
-		ret = tx(&ci->conn, C_IO_END);
-		if (!ret)
-			break;
-	default:
-		break;
+	ret = send_req(conn->fd, (struct sd_req *)&rsp, data, rsp.data_length,
+		       NULL, 0, UINT32_MAX);
+	if (ret != 0) {
+		sd_err("failed to send a request");
+		conn->dead = true;
 	}
-
-	opt = 0;
-	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
-
-	if (is_conn_dead(&ci->conn)) {
-		clear_client_info(ci);
-		return -1;
-	}
-	return 0;
 }
 
-/* Return 1 if short send happens or we have more data to send */
-static inline int finish_tx(struct client_info *ci)
+static void tx_main(struct work *work)
 {
-	/* Finish sending one response */
-	if (ci->conn.c_tx_state == C_IO_END) {
-		sd_debug("connection from: %d, %s:%d", ci->conn.fd,
-			 ci->conn.ipstr, ci->conn.port);
-		free_request(ci->tx_req);
-		ci->tx_req = NULL;
-	}
-	if (ci->tx_req || !list_empty(&ci->done_reqs))
-		return 1;
-	return 0;
-}
+	struct client_info *ci = container_of(work, struct client_info,
+					      tx_work);
 
-static void do_client_tx(struct client_info *ci)
-{
-	if (!ci->tx_req && list_empty(&ci->done_reqs)) {
-		if (conn_tx_off(&ci->conn))
-			clear_client_info(ci);
-		return;
-	}
+	refcount_dec(&ci->refcnt);
 
-	if (begin_tx(ci) < 0)
-		return;
+	free_request(ci->tx_req);
+	ci->tx_req = NULL;
 
-	if (finish_tx(ci))
+	if (ci->conn.dead) {
+		clear_client_info(ci);
 		return;
+	}
 
-	/* Let's go sleep, and put_request() will wake me up */
-	if (conn_tx_off(&ci->conn))
-		clear_client_info(ci);
+	if (!list_empty(&ci->done_reqs))
+		conn_tx_on(&ci->conn);
 }
 
 static void destroy_client(struct client_info *ci)
@@ -715,16 +644,6 @@ static void clear_client_info(struct client_info *ci)
 
 	sd_debug("connection seems to be dead");
 
-	if (ci->rx_req) {
-		free_request(ci->rx_req);
-		ci->rx_req = NULL;
-	}
-
-	if (ci->tx_req) {
-		free_request(ci->tx_req);
-		ci->tx_req = NULL;
-	}
-
 	list_for_each_entry_safe(req, t, &ci->done_reqs, request_list) {
 		list_del(&req->request_list);
 		free_request(req);
@@ -775,8 +694,6 @@ static struct client_info *create_client(int fd, struct cluster_info *cluster)
 
 	INIT_LIST_HEAD(&ci->done_reqs);
 
-	init_rx_hdr(ci);
-
 	return ci;
 }
 
@@ -784,17 +701,43 @@ static void client_handler(int fd, int events, void *data)
 {
 	struct client_info *ci = (struct client_info *)data;
 
-	sd_debug("%x, rx %d, tx %d", events, ci->conn.c_rx_state,
-		 ci->conn.c_tx_state);
+	sd_debug("%x, %d", events, ci->conn.dead);
 
-	if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
+	if (events & (EPOLLERR | EPOLLHUP) || ci->conn.dead)
 		return clear_client_info(ci);
 
-	if (events & EPOLLIN)
-		do_client_rx(ci);
+	if (events & EPOLLIN) {
+		if (conn_rx_off(&ci->conn) != 0)
+			return;
 
-	if (events & EPOLLOUT)
-		do_client_tx(ci);
+		/*
+		 * Increment refcnt so that the client_info isn't freed while
+		 * rx_work uses it.
+		 */
+		refcount_inc(&ci->refcnt);
+		ci->rx_work.fn = rx_work;
+		ci->rx_work.done = rx_main;
+		queue_work(sys->net_wqueue, &ci->rx_work);
+	}
+
+	if (events & EPOLLOUT) {
+		if (conn_tx_off(&ci->conn) != 0)
+			return;
+
+		assert(ci->tx_req == NULL);
+		ci->tx_req = list_first_entry(&ci->done_reqs, struct request,
+					      request_list);
+		list_del(&ci->tx_req->request_list);
+
+		/*
+		 * Increment refcnt so that the client_info isn't freed while
+		 * tx_work uses it.
+		 */
+		refcount_inc(&ci->refcnt);
+		ci->tx_work.fn = tx_work;
+		ci->tx_work.done = tx_main;
+		queue_work(sys->net_wqueue, &ci->tx_work);
+	}
 }
 
 static void listen_handler(int listen_fd, int events, void *data)
@@ -826,12 +769,6 @@ static void listen_handler(int listen_fd, int events, void *data)
 		}
 	}
 
-	ret = set_nonblocking(fd);
-	if (ret) {
-		close(fd);
-		return;
-	}
-
 	ci = create_client(fd, data);
 	if (!ci) {
 		close(fd);
diff --git a/sheep/sheep.c b/sheep/sheep.c
index eb6afe8..3a6a8e7 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -424,6 +424,7 @@ static int create_work_queues(void)
 	if (init_work_queue(get_nr_nodes))
 		return -1;
 
+	sys->net_wqueue = create_work_queue("net", WQ_UNLIMITED);
 	sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
 	sys->io_wqueue = create_work_queue("io", WQ_UNLIMITED);
 	sys->recovery_wqueue = create_ordered_work_queue("rw");
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index c13725d..2199284 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -66,8 +66,10 @@ struct client_info {
 	struct connection conn;
 
 	struct request *rx_req;
+	struct work rx_work;
 
 	struct request *tx_req;
+	struct work tx_work;
 
 	struct list_head done_reqs;
 
@@ -128,6 +130,7 @@ struct system_info {
 	bool gateway_only;
 	bool nosync;
 
+	struct work_queue *net_wqueue;
 	struct work_queue *gateway_wqueue;
 	struct work_queue *io_wqueue;
 	struct work_queue *deletion_wqueue;
-- 
1.7.10.4




More information about the sheepdog mailing list