[Sheepdog] [PATCH 4/8] sheepdog: use iovec buffer directly in asynchronous I/O

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Tue Apr 27 08:33:50 CEST 2010


Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 block/sheepdog.c |  199 +++++++++++++++++++++++++++++-------------------------
 1 files changed, 107 insertions(+), 92 deletions(-)

diff --git a/block/sheepdog.c b/block/sheepdog.c
index afe5e68..2047788 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -481,7 +481,13 @@ success:
 	return fd;
 }
 
-static void forward_iov(struct msghdr *msg, int len)
+static void reset_iov(struct msghdr *msg, int len)
+{
+	msg->msg_iov->iov_base = (char *) msg->msg_iov->iov_base - len;
+	msg->msg_iov->iov_len += len;
+}
+
+static int forward_iov(struct msghdr *msg, int len)
 {
 	while (msg->msg_iov->iov_len <= len) {
 		len -= msg->msg_iov->iov_len;
@@ -491,71 +497,107 @@ static void forward_iov(struct msghdr *msg, int len)
 
 	msg->msg_iov->iov_base = (char *) msg->msg_iov->iov_base + len;
 	msg->msg_iov->iov_len -= len;
+
+	return len;
 }
 
-static int do_read(int sockfd, void *buf, int len)
+static int set_iov_limit(struct iovec *iov, int len, int *diff)
 {
-	int ret;
+	int i = 0;
+
+	for (i = 0; iov[i].iov_len < len; i++)
+		len -= iov[i].iov_len;
+
+	*diff = iov[i].iov_len - len;
+	iov[i].iov_len = len;
+
+	return i + 1;
+}
+
+static int do_readv_writev(int sockfd, struct iovec *iov, int len,
+			   int iov_offset, int write)
+{
+	int ret, diff;
+	struct iovec *end_iov;
+	struct msghdr msg;
+
+	memset(&msg, 0, sizeof(msg));
+	msg.msg_iov = iov;
+	msg.msg_iovlen = set_iov_limit(msg.msg_iov, iov_offset + len, &diff);
+	end_iov = &msg.msg_iov[msg.msg_iovlen - 1];
 reread:
-	ret = recv(sockfd, buf, len, MSG_WAITALL);
-	if (ret < 0 || !ret) {
+	iov_offset = forward_iov(&msg, iov_offset);
+	if (write)
+		ret = sendmsg(sockfd, &msg, 0);
+	else
+		ret = recvmsg(sockfd, &msg, MSG_WAITALL);
+	reset_iov(&msg, iov_offset);
+	if (ret <= 0) {
 		if (errno == EINTR || errno == EAGAIN)
 			goto reread;
-		eprintf("failed to recv a req %d %d %d, %m\n", ret, errno, len);
-		return 1;
+		eprintf("failed to recv a rsp, %m\n");
+		ret = 1;
+		goto out;
 	}
 
 	len -= ret;
-	buf += ret;
-	if (len)
+	if (len) {
+		iov_offset += ret;
 		goto reread;
+	}
 
-	return 0;
+	ret = 0;
+out:
+	/* reset iovec state */
+	end_iov->iov_len += diff;
+	return ret;
 }
 
-static int do_write(int sockfd, struct msghdr *msg, int len)
+static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
 {
-	int ret;
-rewrite:
-	ret = sendmsg(sockfd, msg, 0);
-	if (ret < 0) {
-		if (errno == EINTR || errno == EAGAIN)
-			goto rewrite;
-		eprintf("failed to send a req, %m\n");
-		return 1;
-	}
+	return do_readv_writev(sockfd, iov, len, iov_offset, 0);
+}
 
-	len -= ret;
-	if (len) {
-		forward_iov(msg, ret);
-		goto rewrite;
-	}
+static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
+{
+	return do_readv_writev(sockfd, iov, len, iov_offset, 1);
+}
 
-	return 0;
+static int do_read_write(int sockfd, void *buf, int len, int write)
+{
+	struct iovec iov;
+
+	iov.iov_base = buf;
+	iov.iov_len = len;
+
+	return do_readv_writev(sockfd, &iov, len, 0, write);
+}
+
+static int do_read(int sockfd, void *buf, int len)
+{
+	return do_read_write(sockfd, buf, len, 0);
+}
+
+static int do_write(int sockfd, void *buf, int len)
+{
+	return do_read_write(sockfd, buf, len, 1);
 }
 
 static int send_req(int sockfd, struct sd_req *hdr, void *data,
 		    unsigned int *wlen)
 {
 	int ret;
-	struct msghdr msg;
 	struct iovec iov[2];
 
-	memset(&msg, 0, sizeof(msg));
-
-	msg.msg_iov = iov;
-
-	msg.msg_iovlen = 1;
 	iov[0].iov_base = hdr;
 	iov[0].iov_len = sizeof(*hdr);
 
 	if (*wlen) {
-		msg.msg_iovlen++;
 		iov[1].iov_base = data;
 		iov[1].iov_len = *wlen;
 	}
 
-	ret = do_write(sockfd, &msg, sizeof(*hdr) + *wlen);
+	ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
 	if (ret) {
 		eprintf("failed to send a req, %m\n");
 		ret = -1;
@@ -610,9 +652,6 @@ static void aio_read_response(void *opaque)
 	int ret;
 	struct aio_req *aio_req;
 	struct sd_aiocb *acb;
-	unsigned int offset, done;
-	char *buf;
-	struct iovec *iov;
 	int rest;
 	unsigned long idx;
 
@@ -637,35 +676,8 @@ static void aio_read_response(void *opaque)
 		}
 		break;
 	case AIOCB_READ_UDATA:
-		buf = malloc(rsp->data_length);
-		if (!buf) {
-			eprintf("Failed to allocate memory\n");
-			goto new_node_list;
-		}
-
-		ret = do_read(fd, buf, rsp->data_length);
-		if (ret) {
-			free(buf);
-			goto new_node_list;
-		}
-
-		offset = aio_req->iov_offset;
-		iov = acb->qiov->iov;
-
-		while (iov->iov_len <= offset) {
-			offset -= iov->iov_len;
-			iov++;
-		}
-
-		for (done = 0; done < rsp->data_length; iov++) {
-			unsigned int len = min_t(unsigned int, iov->iov_len - offset,
-						 rsp->data_length - done);
-			memcpy(iov->iov_base + offset, buf + done, len);
-			offset = 0;
-			done += len;
-		}
-
-		free(buf);
+		ret = do_readv(fd, acb->qiov->iov, rsp->data_length,
+			       aio_req->iov_offset);
 		if (ret) {
 			eprintf("failed to get the data, %m\n");
 			goto new_node_list;
@@ -855,7 +867,7 @@ out:
 }
 
 static int add_aio_request(struct bdrv_sd_state *s, struct sd_aiocb *acb,
-			   uint64_t oid, void *data,
+			   uint64_t oid, struct iovec *iov, int niov,
 			   unsigned int datalen, uint64_t offset, uint8_t flags,
 			   uint64_t old_oid, int create, int write,
 			   unsigned int iov_offset)
@@ -863,7 +875,7 @@ static int add_aio_request(struct bdrv_sd_state *s, struct sd_aiocb *acb,
 	int nr_copies = s->inode.nr_copies;
 	struct sd_obj_req hdr;
 	unsigned int wlen;
-	int ret;
+	int ret, opt;
 	struct aio_req *aio_req;
 
 	if (!nr_copies)
@@ -900,13 +912,29 @@ static int add_aio_request(struct bdrv_sd_state *s, struct sd_aiocb *acb,
 	aio_req->iov_offset = iov_offset;
 	hdr.id = get_id_from_req(s, aio_req);
 
-	ret = send_req(s->fd, (struct sd_req *)&hdr, data, &wlen);
+	opt = 1;
+	setsockopt(s->fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+	ret = do_write(s->fd, &hdr, sizeof(hdr));
 	if (ret) {
-		free_aio_req(s, aio_req);
-		return -EIO;
+		eprintf("failed to send a req, %m\n");
+		goto err;
 	}
 
+	if (wlen) {
+		ret = do_writev(s->fd, iov, wlen, iov_offset);
+		if (ret) {
+			eprintf("failed to send a data, %m\n");
+			goto err;
+		}
+	}
+        opt = 0;
+        setsockopt(s->fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
 	return 0;
+err:
+	free_aio_req(s, aio_req);
+	return -EIO;
 }
 
 static int read_vdi_obj(char *buf, uint64_t oid, int *copies)
@@ -1216,10 +1244,13 @@ static void sd_write_done(struct sd_aiocb *acb)
 {
 	int ret;
 	struct bdrv_sd_state *s = acb->common.bs->opaque;
+	struct iovec iov;
 
 	if (s->inode_dirty) {
 		s->inode_dirty = 0;
-		ret = add_aio_request(s, acb, s->inode.oid, &s->inode,
+		iov.iov_base = &s->inode;
+		iov.iov_len = sizeof(s->inode);
+		ret = add_aio_request(s, acb, s->inode.oid, &iov, 1,
 				      sizeof(s->inode),
 				      0, 0, 0, 0, 1, 0);
 		if (ret)
@@ -1283,8 +1314,6 @@ static void sd_write_bh_cb(void *p)
 	uint64_t offset = (acb->sector_num * 512) % CHUNK_SIZE;
 	struct bdrv_sd_state *s = acb->common.bs->opaque;
 	struct sd_inode *inode = &s->inode;
-	/* FIXME: better handle iov directly */
-	void *buf = NULL;
 
 	if (acb->bh) {
 		qemu_bh_delete(acb->bh);
@@ -1299,17 +1328,9 @@ static void sd_write_bh_cb(void *p)
 		}
 	}
 
-	buf = malloc(acb->qiov->size);
-	if (!buf) {
-		ret = -EIO;
-		goto abort;
-	}
-
 	acb->aio_done_func = sd_write_done;
 	acb->aiocb_type = AIOCB_WRITE_UDATA;
 
-	qemu_iovec_to_buffer(acb->qiov, buf);
-
 	i = 0;
 	while (done != total) {
 		uint8_t flags = 0;
@@ -1336,9 +1357,8 @@ static void sd_write_bh_cb(void *p)
 			dprintf("new oid %lx\n", oid);
 		}
 
-		ret = add_aio_request(s, acb, oid, buf + done, len, offset, flags, old_oid,
-				      create, 1, 0);
-
+		ret = add_aio_request(s, acb, oid, acb->qiov->iov, acb->qiov->niov,
+				      len, offset, flags, old_oid, create, 1, done);
 		if (ret < 0) {
 			eprintf("may be add_aio_request is faled\n");
 			ret = -EIO;
@@ -1352,13 +1372,8 @@ static void sd_write_bh_cb(void *p)
 		i++;
 	}
 
-	free(buf);
-
 	return;
 abort:
-	if (buf)
-		free(buf);
-
 	acb->ret = ret;
 	sd_finish_aiocb(acb);
 	return;
@@ -1399,8 +1414,8 @@ static void sd_read_done(struct sd_aiocb *acb)
 		oid = s->inode.data_oid[idx];
 
 		if (oid) {
-			ret = add_aio_request(s, acb, oid, NULL, len, offset, 0, 0,
-					      0, 0, done);
+			ret = add_aio_request(s, acb, oid, NULL, 0, len, offset,
+					      0, 0, 0, 0, done);
 			if (ret)
 				goto new_node_list;
 		}
-- 
1.5.6.5




More information about the sheepdog mailing list