[sheepdog] [PATCH] read/write data from clients in worker threads
MORITA Kazutaka
morita.kazutaka at gmail.com
Fri Aug 16 22:36:42 CEST 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
Currently, we read/write data from clients in the main thread in
the non-blockng way. This has the following problems:
- The implementation is much more complex than doing it in the
blocking way.
- The main thread is a bottleneck when doing a large data I/Os
(e.g. object recovery does a lot of 4 MB reads and writes).
This patch creates one worker thread for each connection, makes socket
descriptors blocking, and reads/writes data from clients in the worker
threads in blocking way.
With this patch, client_handler() doesn't print warnings of the
loop_check tracer any more. I did the following benchmark to emulate
a VM I/O while object recovery, and could see a lot of performance
improvement.
[benchmark]
Simulate object recovery with 10 concurrent 4 MB reads, and simulate
VM I/Os with 4 KB reads.
$ ./script/vditest -C 10 -B 4M -T 10 test > /dev/null & \
./script/vditest -B 4k -T 10 test
[result (before)]
options: -B 4096:4096 -c writethrough -C 1 -D 100:0 -o 0
-p linear -s 1376684853 -S 0:4294967296 -T 10 -f 0
Total read throughput: 638156.8B/s (623.2K/s), IOPS 155.8/s.
[result (after)]
options: -B 4096:4096 -c writethrough -C 1 -D 100:0 -o 0
-p linear -s 1376684811 -S 0:4294967296 -T 10 -f 0
Total read throughput: 4237312.0B/s (4.0M/s), IOPS 1034.5/s.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
include/net.h | 12 +--
lib/net.c | 80 ---------------
sheep/request.c | 279 ++++++++++++++++++++--------------------------------
sheep/sheep.c | 1 +
sheep/sheep_priv.h | 3 +
5 files changed, 111 insertions(+), 264 deletions(-)
diff --git a/include/net.h b/include/net.h
index 2782e44..b8c6ab8 100644
--- a/include/net.h
+++ b/include/net.h
@@ -32,22 +32,13 @@ struct connection {
uint16_t port;
char ipstr[INET6_ADDRSTRLEN];
- enum conn_state c_rx_state;
- int rx_length;
- void *rx_buf;
- struct sd_req rx_hdr;
-
- enum conn_state c_tx_state;
- int tx_length;
- void *tx_buf;
- struct sd_rsp tx_hdr;
+ bool dead;
};
int conn_tx_off(struct connection *conn);
int conn_tx_on(struct connection *conn);
int conn_rx_off(struct connection *conn);
int conn_rx_on(struct connection *conn);
-bool is_conn_dead(const struct connection *conn);
int do_read(int sockfd, void *buf, int len,
bool (*need_retry)(uint32_t), uint32_t, uint32_t);
int rx(struct connection *conn, enum conn_state next_state);
@@ -65,7 +56,6 @@ int create_unix_domain_socket(const char *unix_path,
const char *addr_to_str(const uint8_t *addr, uint16_t port);
uint8_t *str_to_addr(const char *ipstr, uint8_t *addr);
char *sockaddr_in_to_str(struct sockaddr_in *sockaddr);
-int set_nonblocking(int fd);
int set_nodelay(int fd);
int set_keepalive(int fd);
int set_snd_timeout(int fd);
diff --git a/lib/net.c b/lib/net.c
index 82b6e3a..9b0aa44 100644
--- a/lib/net.c
+++ b/lib/net.c
@@ -61,59 +61,6 @@ int conn_rx_on(struct connection *conn)
return modify_event(conn->fd, conn->events);
}
-bool is_conn_dead(const struct connection *conn)
-{
- if (conn->c_rx_state == C_IO_CLOSED || conn->c_tx_state == C_IO_CLOSED)
- return true;
- else
- return false;
-}
-
-int rx(struct connection *conn, enum conn_state next_state)
-{
- int ret;
-
- ret = read(conn->fd, conn->rx_buf, conn->rx_length);
- if (!ret) {
- conn->c_rx_state = C_IO_CLOSED;
- return 0;
- }
-
- if (ret < 0) {
- if (errno != EAGAIN && errno != EINTR)
- conn->c_rx_state = C_IO_CLOSED;
- return 0;
- }
-
- conn->rx_length -= ret;
- conn->rx_buf = (char *)conn->rx_buf + ret;
-
- if (!conn->rx_length)
- conn->c_rx_state = next_state;
-
- return ret;
-}
-
-int tx(struct connection *conn, enum conn_state next_state)
-{
- int ret;
-
- ret = write(conn->fd, conn->tx_buf, conn->tx_length);
- if (ret < 0) {
- if (errno != EAGAIN && errno != EINTR)
- conn->c_tx_state = C_IO_CLOSED;
- return 0;
- }
-
- conn->tx_length -= ret;
- conn->tx_buf = (char *)conn->tx_buf + ret;
-
- if (!conn->tx_length)
- conn->c_tx_state = next_state;
-
- return ret;
-}
-
int create_listen_ports(const char *bindaddr, int port,
int (*callback)(int fd, void *), void *data)
{
@@ -170,12 +117,6 @@ int create_listen_ports(const char *bindaddr, int port,
continue;
}
- ret = set_nonblocking(fd);
- if (ret < 0) {
- close(fd);
- continue;
- }
-
ret = callback(fd, data);
if (ret) {
close(fd);
@@ -483,23 +424,6 @@ uint8_t *str_to_addr(const char *ipstr, uint8_t *addr)
return addr;
}
-int set_nonblocking(int fd)
-{
- int ret;
-
- ret = fcntl(fd, F_GETFL);
- if (ret < 0) {
- sd_err("fcntl F_GETFL failed: %m");
- close(fd);
- } else {
- ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
- if (ret < 0)
- sd_err("fcntl O_NONBLOCK failed: %m");
- }
-
- return ret;
-}
-
int set_snd_timeout(int fd)
{
struct timeval timeout;
@@ -636,10 +560,6 @@ int create_unix_domain_socket(const char *unix_path,
goto err;
}
- ret = set_nonblocking(fd);
- if (ret < 0)
- goto err;
-
ret = callback(fd, data);
if (ret)
goto err;
diff --git a/sheep/request.c b/sheep/request.c
index bd69ea9..060045e 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -510,196 +510,121 @@ main_fn void put_request(struct request *req)
if (req->local)
eventfd_xwrite(req->local_req_efd, 1);
else {
- if (conn_tx_on(&ci->conn)) {
+ if (ci->conn.dead) {
clear_client_info(ci);
free_request(req);
} else {
- list_add(&req->request_list, &ci->done_reqs);
+ list_add_tail(&req->request_list, &ci->done_reqs);
+
+ if (ci->tx_req == NULL)
+ /* There is no request being sent. */
+ conn_tx_on(&ci->conn);
}
}
}
-static void init_rx_hdr(struct client_info *ci)
-{
- ci->conn.c_rx_state = C_IO_HEADER;
- ci->rx_req = NULL;
- ci->conn.rx_length = sizeof(struct sd_req);
- ci->conn.rx_buf = &ci->conn.rx_hdr;
-}
-
-static inline int begin_rx(struct client_info *ci)
+static void rx_work(struct work *work)
{
+ struct client_info *ci = container_of(work, struct client_info, rx_work);
int ret;
- uint64_t data_len;
struct connection *conn = &ci->conn;
- struct sd_req *hdr = &conn->rx_hdr;
+ struct sd_req hdr;
struct request *req;
- switch (conn->c_rx_state) {
- case C_IO_HEADER:
- ret = rx(conn, C_IO_DATA_INIT);
- if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
- break;
- case C_IO_DATA_INIT:
- data_len = hdr->data_length;
-
- req = alloc_request(ci, data_len);
- if (!req) {
- conn->c_rx_state = C_IO_CLOSED;
- break;
- }
- ci->rx_req = req;
-
- /* use le_to_cpu */
- memcpy(&req->rq, hdr, sizeof(req->rq));
-
- if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
- conn->c_rx_state = C_IO_DATA;
- conn->rx_length = data_len;
- conn->rx_buf = req->data;
- } else {
- conn->c_rx_state = C_IO_END;
- break;
- }
- case C_IO_DATA:
- ret = rx(conn, C_IO_END);
- break;
- default:
- sd_err("bug: unknown state %d", conn->c_rx_state);
+ ret = do_read(conn->fd, &hdr, sizeof(hdr), NULL, 0, UINT32_MAX);
+ if (ret) {
+ sd_err("failed to read a header");
+ conn->dead = true;
+ return;
}
- if (is_conn_dead(conn)) {
- clear_client_info(ci);
- return -1;
+ req = alloc_request(ci, hdr.data_length);
+ if (!req) {
+ sd_err("failed to allocate request");
+ conn->dead = true;
+ return;
}
+ ci->rx_req = req;
- /* Short read happens */
- if (conn->c_rx_state != C_IO_END)
- return -1;
+ /* use le_to_cpu */
+ memcpy(&req->rq, &hdr, sizeof(req->rq));
- return 0;
-}
-
-static inline void finish_rx(struct client_info *ci)
-{
- struct request *req;
-
- req = ci->rx_req;
- init_rx_hdr(ci);
-
- sd_debug("%d, %s:%d", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
- queue_request(req);
-}
-
-static void do_client_rx(struct client_info *ci)
-{
- if (begin_rx(ci) < 0)
- return;
-
- finish_rx(ci);
+ if (hdr.data_length && hdr.flags & SD_FLAG_CMD_WRITE) {
+ ret = do_read(conn->fd, req->data, hdr.data_length, NULL, 0,
+ UINT32_MAX);
+ if (ret) {
+ sd_err("failed to read data");
+ conn->dead = true;
+ }
+ }
}
-static void init_tx_hdr(struct client_info *ci)
+static void rx_main(struct work *work)
{
- struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
- struct request *req;
+ struct client_info *ci = container_of(work, struct client_info, rx_work);
+ struct request *req = ci->rx_req;
- assert(!list_empty(&ci->done_reqs));
+ ci->rx_req = NULL;
- memset(rsp, 0, sizeof(*rsp));
+ refcount_dec(&ci->refcnt);
- req = list_first_entry(&ci->done_reqs, struct request, request_list);
- list_del(&req->request_list);
+ if (ci->conn.dead) {
+ if (req)
+ free_request(req);
- ci->tx_req = req;
- ci->conn.tx_length = sizeof(*rsp);
- ci->conn.c_tx_state = C_IO_HEADER;
- ci->conn.tx_buf = rsp;
+ clear_client_info(ci);
+ return;
+ }
- /* use cpu_to_le */
- memcpy(rsp, &req->rp, sizeof(*rsp));
+ conn_rx_on(&ci->conn);
- rsp->epoch = sys->cinfo.epoch;
- rsp->opcode = req->rq.opcode;
- rsp->id = req->rq.id;
+ sd_debug("%d, %s:%d", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
+ queue_request(req);
}
-static inline int begin_tx(struct client_info *ci)
+static void tx_work(struct work *work)
{
- int ret, opt;
- struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+ struct client_info *ci = container_of(work, struct client_info, tx_work);
+ int ret;
+ struct connection *conn = &ci->conn;
+ struct sd_rsp rsp;
+ struct request *req = ci->tx_req;
+ void *data = NULL;
- /* If short send happens, we don't need init hdr */
- if (!ci->tx_req)
- init_tx_hdr(ci);
+ /* use cpu_to_le */
+ memcpy(&rsp, &req->rp, sizeof(rsp));
- opt = 1;
- setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+ rsp.epoch = sys->cinfo.epoch;
+ rsp.opcode = req->rq.opcode;
+ rsp.id = req->rq.id;
- switch (ci->conn.c_tx_state) {
- case C_IO_HEADER:
- ret = tx(&ci->conn, C_IO_DATA_INIT);
- if (!ret)
- break;
+ if (rsp.data_length)
+ data = req->data;
- if (rsp->data_length) {
- ci->conn.tx_length = rsp->data_length;
- ci->conn.tx_buf = ci->tx_req->data;
- ci->conn.c_tx_state = C_IO_DATA;
- } else {
- ci->conn.c_tx_state = C_IO_END;
- break;
- }
- case C_IO_DATA:
- ret = tx(&ci->conn, C_IO_END);
- if (!ret)
- break;
- default:
- break;
+ ret = send_req(conn->fd, (struct sd_req *)&rsp, data, rsp.data_length,
+ NULL, 0, UINT32_MAX);
+ if (ret != 0) {
+ sd_err("failed to send a request");
+ conn->dead = true;
}
-
- opt = 0;
- setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
-
- if (is_conn_dead(&ci->conn)) {
- clear_client_info(ci);
- return -1;
- }
- return 0;
}
-/* Return 1 if short send happens or we have more data to send */
-static inline int finish_tx(struct client_info *ci)
+static void tx_main(struct work *work)
{
- /* Finish sending one response */
- if (ci->conn.c_tx_state == C_IO_END) {
- sd_debug("connection from: %d, %s:%d", ci->conn.fd,
- ci->conn.ipstr, ci->conn.port);
- free_request(ci->tx_req);
- ci->tx_req = NULL;
- }
- if (ci->tx_req || !list_empty(&ci->done_reqs))
- return 1;
- return 0;
-}
+ struct client_info *ci = container_of(work, struct client_info, tx_work);
-static void do_client_tx(struct client_info *ci)
-{
- if (!ci->tx_req && list_empty(&ci->done_reqs)) {
- if (conn_tx_off(&ci->conn))
- clear_client_info(ci);
- return;
- }
+ refcount_dec(&ci->refcnt);
- if (begin_tx(ci) < 0)
- return;
+ free_request(ci->tx_req);
+ ci->tx_req = NULL;
- if (finish_tx(ci))
+ if (ci->conn.dead) {
+ clear_client_info(ci);
return;
+ }
- /* Let's go sleep, and put_request() will wake me up */
- if (conn_tx_off(&ci->conn))
- clear_client_info(ci);
+ if (!list_empty(&ci->done_reqs))
+ conn_tx_on(&ci->conn);
}
static void destroy_client(struct client_info *ci)
@@ -715,16 +640,6 @@ static void clear_client_info(struct client_info *ci)
sd_debug("connection seems to be dead");
- if (ci->rx_req) {
- free_request(ci->rx_req);
- ci->rx_req = NULL;
- }
-
- if (ci->tx_req) {
- free_request(ci->tx_req);
- ci->tx_req = NULL;
- }
-
list_for_each_entry_safe(req, t, &ci->done_reqs, request_list) {
list_del(&req->request_list);
free_request(req);
@@ -773,8 +688,6 @@ static struct client_info *create_client(int fd, struct cluster_info *cluster)
INIT_LIST_HEAD(&ci->done_reqs);
- init_rx_hdr(ci);
-
return ci;
}
@@ -782,17 +695,43 @@ static void client_handler(int fd, int events, void *data)
{
struct client_info *ci = (struct client_info *)data;
- sd_debug("%x, rx %d, tx %d", events, ci->conn.c_rx_state,
- ci->conn.c_tx_state);
+ sd_debug("%x, %d", events, ci->conn.dead);
- if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
+ if (events & (EPOLLERR | EPOLLHUP) || ci->conn.dead)
return clear_client_info(ci);
- if (events & EPOLLIN)
- do_client_rx(ci);
+ if (events & EPOLLIN) {
+ if (conn_rx_off(&ci->conn) != 0)
+ return;
- if (events & EPOLLOUT)
- do_client_tx(ci);
+ /*
+ * Increment refcnt so that the client_info isn't freed while
+ * rx_work uses it.
+ */
+ refcount_inc(&ci->refcnt);
+ ci->rx_work.fn = rx_work;
+ ci->rx_work.done = rx_main;
+ queue_work(sys->net_wqueue, &ci->rx_work);
+ }
+
+ if (events & EPOLLOUT) {
+ if (conn_tx_off(&ci->conn) != 0)
+ return;
+
+ assert(ci->tx_req == NULL);
+ ci->tx_req = list_first_entry(&ci->done_reqs, struct request,
+ request_list);
+ list_del(&ci->tx_req->request_list);
+
+ /*
+ * Increment refcnt so that the client_info isn't freed while
+ * tx_work uses it.
+ */
+ refcount_inc(&ci->refcnt);
+ ci->tx_work.fn = tx_work;
+ ci->tx_work.done = tx_main;
+ queue_work(sys->net_wqueue, &ci->tx_work);
+ }
}
static void listen_handler(int listen_fd, int events, void *data)
@@ -824,12 +763,6 @@ static void listen_handler(int listen_fd, int events, void *data)
}
}
- ret = set_nonblocking(fd);
- if (ret) {
- close(fd);
- return;
- }
-
ci = create_client(fd, data);
if (!ci) {
close(fd);
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 61d7f76..dacedff 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -416,6 +416,7 @@ static int create_work_queues(void)
if (init_work_queue(get_nr_nodes))
return -1;
+ sys->net_wqueue = create_work_queue("net", WQ_UNLIMITED);
sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
sys->io_wqueue = create_work_queue("io", WQ_UNLIMITED);
sys->recovery_wqueue = create_ordered_work_queue("rw");
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 2f97196..c38cb96 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -66,8 +66,10 @@ struct client_info {
struct connection conn;
struct request *rx_req;
+ struct work rx_work;
struct request *tx_req;
+ struct work tx_work;
struct list_head done_reqs;
@@ -128,6 +130,7 @@ struct system_info {
bool gateway_only;
bool nosync;
+ struct work_queue *net_wqueue;
struct work_queue *gateway_wqueue;
struct work_queue *io_wqueue;
struct work_queue *deletion_wqueue;
--
1.7.9.5
More information about the sheepdog
mailing list