[Sheepdog] [PATCH] sheepdog: use coroutines
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Fri Aug 12 14:33:15 CEST 2011
This makes the sheepdog block driver support bdrv_co_readv/writev
instead of bdrv_aio_readv/writev.
With this patch, Sheepdog network I/O becomes fully asynchronous. The
block driver yields back when send/recv returns EAGAIN, and is resumed
when the sheepdog network connection is ready for the operation.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
block/sheepdog.c | 150 +++++++++++++++++++++++++++++++++--------------------
1 files changed, 93 insertions(+), 57 deletions(-)
diff --git a/block/sheepdog.c b/block/sheepdog.c
index e150ac0..e283c34 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -274,7 +274,7 @@ struct SheepdogAIOCB {
int ret;
enum AIOCBState aiocb_type;
- QEMUBH *bh;
+ Coroutine *coroutine;
void (*aio_done_func)(SheepdogAIOCB *);
int canceled;
@@ -295,6 +295,10 @@ typedef struct BDRVSheepdogState {
char *port;
int fd;
+ CoMutex lock;
+ Coroutine *co_send;
+ Coroutine *co_recv;
+
uint32_t aioreq_seq_num;
QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
} BDRVSheepdogState;
@@ -346,19 +350,16 @@ static const char * sd_strerror(int err)
/*
* Sheepdog I/O handling:
*
- * 1. In the sd_aio_readv/writev, read/write requests are added to the
- * QEMU Bottom Halves.
- *
- * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
- * requests to the server and link the requests to the
- * outstanding_list in the BDRVSheepdogState. we exits the
- * function without waiting for receiving the response.
+ * 1. In sd_co_rw_vector, we send the I/O requests to the server and
+ * link the requests to the outstanding_list in the
+ * BDRVSheepdogState. The function exits without waiting for
+ * receiving the response.
*
- * 3. We receive the response in aio_read_response, the fd handler to
+ * 2. We receive the response in aio_read_response, the fd handler to
* the sheepdog connection. If metadata update is needed, we send
* the write request to the vdi object in sd_write_done, the write
- * completion function. The AIOCB callback is not called until all
- * the requests belonging to the AIOCB are finished.
+ * completion function. We switch back to sd_co_readv/writev after
+ * all the requests belonging to the AIOCB are finished.
*/
static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@@ -398,7 +399,7 @@ static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
static void sd_finish_aiocb(SheepdogAIOCB *acb)
{
if (!acb->canceled) {
- acb->common.cb(acb->common.opaque, acb->ret);
+ qemu_coroutine_enter(acb->coroutine, NULL);
}
qemu_aio_release(acb);
}
@@ -411,7 +412,8 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
* Sheepdog cannot cancel the requests which are already sent to
* the servers, so we just complete the request with -EIO here.
*/
- acb->common.cb(acb->common.opaque, -EIO);
+ acb->ret = -EIO;
+ qemu_coroutine_enter(acb->coroutine, NULL);
acb->canceled = 1;
}
@@ -435,24 +437,12 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
acb->aio_done_func = NULL;
acb->canceled = 0;
- acb->bh = NULL;
+ acb->coroutine = qemu_coroutine_self();
acb->ret = 0;
QLIST_INIT(&acb->aioreq_head);
return acb;
}
-static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
-{
- if (acb->bh) {
- error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type);
- return -EIO;
- }
-
- acb->bh = qemu_bh_new(cb, acb);
- qemu_bh_schedule(acb->bh);
- return 0;
-}
-
#ifdef _WIN32
struct msghdr {
@@ -635,7 +625,13 @@ static int do_readv_writev(int sockfd, struct iovec *iov, int len,
again:
ret = do_send_recv(sockfd, iov, len, iov_offset, write);
if (ret < 0) {
- if (errno == EINTR || errno == EAGAIN) {
+ if (errno == EINTR) {
+ goto again;
+ }
+ if (errno == EAGAIN) {
+ if (qemu_in_coroutine()) {
+ qemu_coroutine_yield();
+ }
goto again;
}
error_report("failed to recv a rsp, %s", strerror(errno));
@@ -793,14 +789,14 @@ static void aio_read_response(void *opaque)
unsigned long idx;
if (QLIST_EMPTY(&s->outstanding_aio_head)) {
- return;
+ goto out;
}
/* read a header */
ret = do_read(fd, &rsp, sizeof(rsp));
if (ret) {
error_report("failed to get the header, %s", strerror(errno));
- return;
+ goto out;
}
/* find the right aio_req from the outstanding_aio list */
@@ -811,7 +807,7 @@ static void aio_read_response(void *opaque)
}
if (!aio_req) {
error_report("cannot find aio_req %x", rsp.id);
- return;
+ goto out;
}
acb = aio_req->aiocb;
@@ -847,7 +843,7 @@ static void aio_read_response(void *opaque)
aio_req->iov_offset);
if (ret) {
error_report("failed to get the data, %s", strerror(errno));
- return;
+ goto out;
}
break;
}
@@ -861,10 +857,30 @@ static void aio_read_response(void *opaque)
if (!rest) {
/*
* We've finished all requests which belong to the AIOCB, so
- * we can call the callback now.
+ * we can switch back to sd_co_readv/writev now.
*/
acb->aio_done_func(acb);
}
+out:
+ s->co_recv = NULL;
+}
+
+static void co_read_response(void *opaque)
+{
+ BDRVSheepdogState *s = opaque;
+
+ if (!s->co_recv) {
+ s->co_recv = qemu_coroutine_create(aio_read_response);
+ }
+
+ qemu_coroutine_enter(s->co_recv, opaque);
+}
+
+static void co_write_request(void *opaque)
+{
+ BDRVSheepdogState *s = opaque;
+
+ qemu_coroutine_enter(s->co_send, NULL);
}
static int aio_flush_request(void *opaque)
@@ -924,7 +940,7 @@ static int get_sheep_fd(BDRVSheepdogState *s)
return -1;
}
- qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
+ qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request,
NULL, s);
return fd;
}
@@ -1091,6 +1107,10 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
hdr.id = aio_req->id;
+ qemu_co_mutex_lock(&s->lock);
+ s->co_send = qemu_coroutine_self();
+ qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
+ aio_flush_request, NULL, s);
set_cork(s->fd, 1);
/* send a header */
@@ -1109,6 +1129,9 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
}
set_cork(s->fd, 0);
+ qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
+ aio_flush_request, NULL, s);
+ qemu_co_mutex_unlock(&s->lock);
return 0;
}
@@ -1225,6 +1248,7 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE;
strncpy(s->name, vdi, sizeof(s->name));
+ qemu_co_mutex_init(&s->lock);
qemu_free(buf);
return 0;
out:
@@ -1491,7 +1515,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
/*
* This function is called after writing data objects. If we need to
* update metadata, this sends a write request to the vdi object.
- * Otherwise, this calls the AIOCB callback.
+ * Otherwise, this switches back to sd_co_readv/writev.
*/
static void sd_write_done(SheepdogAIOCB *acb)
{
@@ -1587,8 +1611,11 @@ out:
* waiting the response. The responses are received in the
* `aio_read_response' function which is called from the main loop as
* a fd handler.
+ *
+ * Returns 1 when we need to wait a response, 0 when there is no sent
+ * request and -errno in error cases.
*/
-static void sd_readv_writev_bh_cb(void *p)
+static int sd_co_rw_vector(void *p)
{
SheepdogAIOCB *acb = p;
int ret = 0;
@@ -1600,9 +1627,6 @@ static void sd_readv_writev_bh_cb(void *p)
SheepdogInode *inode = &s->inode;
AIOReq *aio_req;
- qemu_bh_delete(acb->bh);
- acb->bh = NULL;
-
if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
/*
* In the case we open the snapshot VDI, Sheepdog creates the
@@ -1684,42 +1708,47 @@ static void sd_readv_writev_bh_cb(void *p)
}
out:
if (QLIST_EMPTY(&acb->aioreq_head)) {
- sd_finish_aiocb(acb);
+ return acb->ret;
}
+ return 1;
}
-static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t sector_num,
- QEMUIOVector *qiov, int nb_sectors,
- BlockDriverCompletionFunc *cb,
- void *opaque)
+static int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov)
{
SheepdogAIOCB *acb;
+ int ret;
if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
/* TODO: shouldn't block here */
if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) {
- return NULL;
+ return -EIO;
}
bs->total_sectors = sector_num + nb_sectors;
}
- acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+ acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
acb->aio_done_func = sd_write_done;
acb->aiocb_type = AIOCB_WRITE_UDATA;
- sd_schedule_bh(sd_readv_writev_bh_cb, acb);
- return &acb->common;
+ ret = sd_co_rw_vector(acb);
+ if (ret <= 0) {
+ qemu_aio_release(acb);
+ return ret;
+ }
+
+ qemu_coroutine_yield();
+
+ return acb->ret;
}
-static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
- QEMUIOVector *qiov, int nb_sectors,
- BlockDriverCompletionFunc *cb,
- void *opaque)
+static int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov)
{
SheepdogAIOCB *acb;
- int i;
+ int i, ret;
- acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+ acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
acb->aiocb_type = AIOCB_READ_UDATA;
acb->aio_done_func = sd_finish_aiocb;
@@ -1731,8 +1760,15 @@ static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
}
- sd_schedule_bh(sd_readv_writev_bh_cb, acb);
- return &acb->common;
+ ret = sd_co_rw_vector(acb);
+ if (ret <= 0) {
+ qemu_aio_release(acb);
+ return ret;
+ }
+
+ qemu_coroutine_yield();
+
+ return acb->ret;
}
static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@@ -2062,8 +2098,8 @@ BlockDriver bdrv_sheepdog = {
.bdrv_getlength = sd_getlength,
.bdrv_truncate = sd_truncate,
- .bdrv_aio_readv = sd_aio_readv,
- .bdrv_aio_writev = sd_aio_writev,
+ .bdrv_co_readv = sd_co_readv,
+ .bdrv_co_writev = sd_co_writev,
.bdrv_snapshot_create = sd_snapshot_create,
.bdrv_snapshot_goto = sd_snapshot_goto,
--
1.7.2.5
More information about the sheepdog
mailing list