[Sheepdog] [PATCH 8/8] sheepdog: fix object creation race

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Tue Apr 27 10:11:51 CEST 2010


At Tue, 27 Apr 2010 15:33:54 +0900,
MORITA Kazutaka wrote:
> 
> Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
> ---
>  block/sheepdog.c |  135 ++++++++++++++++++++++++++++++++++++++++++-----------
>  1 files changed, 107 insertions(+), 28 deletions(-)
> 
This patch doesn't work, sorry.
The following one is correct version.

=
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
Date: Tue, 27 Apr 2010 17:06:01 +0900
Subject: [PATCH] sheepdog: fix object creation race

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 block/sheepdog.c |  136 +++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 108 insertions(+), 28 deletions(-)

diff --git a/block/sheepdog.c b/block/sheepdog.c
index 0f3ccfd..107f61e 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -201,6 +201,13 @@ struct aio_req {
 	struct sd_aiocb *aiocb;
 	unsigned int iov_offset;
 
+	uint64_t oid;
+	uint64_t base_oid;
+	uint64_t offset;
+	unsigned int data_len;
+	uint8_t flags;
+
+	QLIST_ENTRY(aio_req) pending_siblings;
 	QLIST_ENTRY(aio_req) aioreq_siblings;
 };
 
@@ -243,6 +250,8 @@ struct bdrv_sd_state {
 	struct aio_req aio_req_list[MAX_AIO_REQS];
 	struct aio_req *aio_req_free[MAX_AIO_REQS];
 	int nr_aio_req_free;
+
+	QLIST_HEAD(pending_head, aio_req) pending_head;
 };
 
 static const char * sd_strerror(int err)
@@ -334,7 +343,11 @@ static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 }
 
 static inline struct aio_req *alloc_aio_req(struct bdrv_sd_state *s,
-					    struct sd_aiocb *acb)
+					    struct sd_aiocb *acb,
+					    uint64_t oid, unsigned int data_len,
+					    uint64_t offset, uint8_t flags,
+					    uint64_t base_oid,
+					    unsigned int iov_offset)
 {
 	struct aio_req *aio_req;
 
@@ -343,7 +356,14 @@ static inline struct aio_req *alloc_aio_req(struct bdrv_sd_state *s,
 
 	aio_req = s->aio_req_free[--s->nr_aio_req_free];
 	aio_req->aiocb = acb;
+	aio_req->iov_offset = iov_offset;
+	aio_req->oid = oid;
+	aio_req->base_oid = base_oid;
+	aio_req->offset = offset;
+	aio_req->data_len = data_len;
+	aio_req->flags = flags;
 
+	QLIST_INSERT_HEAD(&s->pending_head, aio_req, pending_siblings);
 	QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
 
 	return aio_req;
@@ -352,6 +372,7 @@ static inline struct aio_req *alloc_aio_req(struct bdrv_sd_state *s,
 static inline int free_aio_req(struct bdrv_sd_state *s, struct aio_req *aio_req)
 {
 	struct sd_aiocb *acb = aio_req->aiocb;
+	QLIST_REMOVE(aio_req, pending_siblings);
 	QLIST_REMOVE(aio_req, aioreq_siblings);
 	aio_req->aiocb = NULL;
 	s->aio_req_free[s->nr_aio_req_free++] = aio_req;
@@ -642,6 +663,32 @@ out:
 
 static void sd_write_bh_cb(void *p);
 static void sd_readv_bh_cb(void *p);
+static int add_aio_request(struct bdrv_sd_state *s, struct aio_req *aio_req,
+			   struct iovec *iov, int niov, int create, int write);
+
+static void send_pending_req(struct bdrv_sd_state *s, uint64_t oid, uint32_t id)
+{
+	struct aio_req *aio_req, *next;
+	struct sd_aiocb *acb;
+	int ret;
+
+	QLIST_FOREACH_SAFE(aio_req, &s->pending_head, pending_siblings, next) {
+		if (id == get_id_from_req(s, aio_req))
+			continue;
+		if (aio_req->oid != oid)
+			continue;
+
+		acb = aio_req->aiocb;
+		ret = add_aio_request(s, aio_req, acb->qiov->iov,
+				      acb->qiov->niov, 0, 1);
+		if (ret < 0) {
+			eprintf("add_aio_request is faled\n");
+			free_aio_req(s, aio_req);
+			if (QLIST_EMPTY(&acb->aioreq_head))
+				sd_finish_aiocb(acb);
+		}
+	}
+}
 
 static void aio_read_response(void *opaque)
 {
@@ -654,6 +701,7 @@ static void aio_read_response(void *opaque)
 	struct sd_aiocb *acb;
 	int rest;
 	unsigned long idx;
+	uint64_t oid;
 
 	if (!nr_outstanding_aio_req(s))
 		return;
@@ -670,9 +718,13 @@ static void aio_read_response(void *opaque)
 	switch (acb->aiocb_type) {
 	case AIOCB_WRITE_UDATA:
 		idx = acb->sector_num * 512 / SD_DATA_OBJ_SIZE;
-		if (s->inode.data_oid[idx] != to_data_oid(s->inode.oid, idx)) {
-			s->inode.data_oid[idx] = to_data_oid(s->inode.oid, idx);
+		oid = to_data_oid(s->inode.oid, idx);
+
+		if (s->inode.data_oid[idx] != oid) {
+			s->inode.data_oid[idx] = oid;
 			s->inode_dirty = 1;
+
+			send_pending_req(s, oid, rsp->id);
 		}
 		break;
 	case AIOCB_READ_UDATA:
@@ -823,17 +875,18 @@ out:
 	return ret;
 }
 
-static int add_aio_request(struct bdrv_sd_state *s, struct sd_aiocb *acb,
-			   uint64_t oid, struct iovec *iov, int niov,
-			   unsigned int datalen, uint64_t offset, uint8_t flags,
-			   uint64_t old_oid, int create, int write,
-			   unsigned int iov_offset)
+static int add_aio_request(struct bdrv_sd_state *s, struct aio_req *aio_req,
+			   struct iovec *iov, int niov, int create, int write)
 {
 	int nr_copies = s->inode.nr_copies;
 	struct sd_obj_req hdr;
 	unsigned int wlen;
 	int ret, opt;
-	struct aio_req *aio_req;
+	uint64_t oid = aio_req->oid;
+	unsigned int datalen = aio_req->data_len;
+	uint64_t offset = aio_req->offset;
+	uint8_t flags = aio_req->flags;
+	uint64_t old_oid = aio_req->base_oid;
 
 	if (!nr_copies)
 		eprintf("bug\n");
@@ -861,12 +914,6 @@ static int add_aio_request(struct bdrv_sd_state *s, struct sd_aiocb *acb,
 	hdr.data_length = datalen;
 	hdr.offset = offset;
 
-	aio_req = alloc_aio_req(s, acb);
-	if (!aio_req) {
-		eprintf("too many requests\n");
-		return -ENOMEM;
-	}
-	aio_req->iov_offset = iov_offset;
 	hdr.id = get_id_from_req(s, aio_req);
 
 	opt = 1;
@@ -875,23 +922,20 @@ static int add_aio_request(struct bdrv_sd_state *s, struct sd_aiocb *acb,
 	ret = do_write(s->fd, &hdr, sizeof(hdr));
 	if (ret) {
 		eprintf("failed to send a req, %m\n");
-		goto err;
+		return -EIO;
 	}
 
 	if (wlen) {
-		ret = do_writev(s->fd, iov, wlen, iov_offset);
+		ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
 		if (ret) {
 			eprintf("failed to send a data, %m\n");
-			goto err;
+			return -EIO;
 		}
 	}
         opt = 0;
         setsockopt(s->fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
 
 	return 0;
-err:
-	free_aio_req(s, aio_req);
-	return -EIO;
 }
 
 static int read_vdi_obj(char *buf, uint64_t oid, int *copies)
@@ -1009,6 +1053,7 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
 	s->name = strdup(vdi);
 	free(buf);
 
+	QLIST_INIT(&s->pending_head);
 	return 0;
 out:
 	free(buf);
@@ -1198,15 +1243,22 @@ static void sd_write_done(struct sd_aiocb *acb)
 	int ret;
 	struct bdrv_sd_state *s = acb->common.bs->opaque;
 	struct iovec iov;
+	struct aio_req *aio_req;
 
 	if (s->inode_dirty) {
 		s->inode_dirty = 0;
 		iov.iov_base = &s->inode;
 		iov.iov_len = sizeof(s->inode);
-		ret = add_aio_request(s, acb, s->inode.oid, &iov, 1,
-				      sizeof(s->inode),
-				      0, 0, 0, 0, 1, 0);
+		aio_req = alloc_aio_req(s, acb, s->inode.oid, sizeof(s->inode),
+					0, 0, 0, 0);
+		if (!aio_req) {
+			eprintf("too many requests\n");
+			acb->ret = -EIO;
+			goto out;
+		}
+		ret = add_aio_request(s, aio_req, &iov, 1, 0, 1);
 		if (ret) {
+			free_aio_req(s, aio_req);
 			acb->ret = -EIO;
 			goto out;
 		}
@@ -1266,6 +1318,7 @@ static void sd_write_bh_cb(void *p)
 	uint64_t offset = (acb->sector_num * 512) % CHUNK_SIZE;
 	struct bdrv_sd_state *s = acb->common.bs->opaque;
 	struct sd_inode *inode = &s->inode;
+	struct aio_req *aio_req;
 
 	if (acb->bh) {
 		qemu_bh_delete(acb->bh);
@@ -1309,10 +1362,29 @@ static void sd_write_bh_cb(void *p)
 			dprintf("new oid %lx\n", oid);
 		}
 
-		ret = add_aio_request(s, acb, oid, acb->qiov->iov, acb->qiov->niov,
-				      len, offset, flags, old_oid, create, 1, done);
+		aio_req = alloc_aio_req(s, acb, oid, len, offset, flags,
+					old_oid, done);
+		if (!aio_req) {
+			eprintf("too many requests\n");
+			acb->ret = -EIO;
+			goto abort;
+		}
+
+		if (create) {
+			struct aio_req *areq;
+			QLIST_FOREACH(areq, &s->pending_head, pending_siblings) {
+				if (get_id_from_req(s, areq) == get_id_from_req(s, aio_req))
+					continue;
+				if (areq->oid == oid)
+					goto done;
+			}
+		}
+
+		ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
+				      create, 1);
 		if (ret < 0) {
 			eprintf("add_aio_request is faled\n");
+			free_aio_req(s, aio_req);
 			ret = -EIO;
 			goto abort;
 		}
@@ -1355,6 +1427,7 @@ static void sd_readv_bh_cb(void *p)
 	uint64_t oid;
 	uint64_t offset = (acb->sector_num * 512) % CHUNK_SIZE;
 	struct bdrv_sd_state *s = acb->common.bs->opaque;
+	struct aio_req *aio_req;
 
 	qemu_bh_delete(acb->bh);
 	acb->bh = NULL;
@@ -1368,9 +1441,16 @@ static void sd_readv_bh_cb(void *p)
 		len = min_t(unsigned long, total - done, CHUNK_SIZE - offset);
 
 		if (oid) {
-			ret = add_aio_request(s, acb, oid, NULL, 0, len, offset,
-					      0, 0, 0, 0, done);
+			aio_req = alloc_aio_req(s, acb, oid, len, offset, 0, 0, done);
+			if (!aio_req) {
+				eprintf("too many requests\n");
+				acb->ret = -EIO;
+				goto out;
+			}
+
+			ret = add_aio_request(s, aio_req, NULL, 0, 0, 0);
 			if (ret) {
+				free_aio_req(s, aio_req);
 				acb->ret = -EIO;
 				goto out;
 			}
-- 
1.5.6.5




More information about the sheepdog mailing list