[sheepdog] [PATCH v3 3/9] sbd: implement write operation

Liu Yuan namei.unix at gmail.com
Mon May 26 07:18:36 CEST 2014


From: Liu Yuan <tailai.ly at taobao.com>

- change the aio framework
  -- have submiter a dedicated kthread to avoid lockup
- flesh out sheep_handle_reply and sheep_aiocb_submit

Now we have two kthreads, one is for request submitting and the other is for
reply handling.

The tricky part is that sheep_submit_sdreq() will be called by both kthreads,
so we add a mutex to avoid send data intermingling, which would cause a very
tricky bug that sheep client will get wrong sd_req structure and reply with
very wrong sd_rsp.

The main problems for aio framework are,
- split the striding request into individual sheep requests for each objects.
- we only send one create request if object isn't created and block the requests
  to this object until the creation succeed then send the blocking requests.

To get best of performance,

# echo 4096 > /sys/block/sbd0/queue/max_sectors_kb

Which means io scheduler will try its best to handle us 4MB request.

Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
 sbd/sbd.h                |  16 ++-
 sbd/sheep.c              | 261 +++++++++++++++++++++++++++++++++++++++++------
 sbd/sheep_block_device.c |  78 +++++++++++---
 3 files changed, 306 insertions(+), 49 deletions(-)

diff --git a/sbd/sbd.h b/sbd/sbd.h
index e938561..602a7bc 100644
--- a/sbd/sbd.h
+++ b/sbd/sbd.h
@@ -43,13 +43,19 @@ struct sbd_device {
 	spinlock_t queue_lock;   /* request queue lock */
 
 	struct sheep_vdi vdi;		/* Associated sheep image */
+	spinlock_t vdi_lock;
 
-	struct list_head inflight_head;
-	wait_queue_head_t inflight_wq;
-	struct list_head blocking_head;
+	struct list_head request_head; /* protected by queue lock */
+	struct list_head inflight_head; /* for inflight sheep requests */
+	struct list_head blocking_head; /* for blocking sheep requests */
+	rwlock_t inflight_lock;
+	rwlock_t blocking_lock;
 
 	struct list_head list;
 	struct task_struct *reaper;
+	struct task_struct *submiter;
+	wait_queue_head_t reaper_wq;
+	wait_queue_head_t submiter_wq;
 };
 
 struct sheep_aiocb {
@@ -57,10 +63,10 @@ struct sheep_aiocb {
 	u64 offset;
 	u64 length;
 	int ret;
-	u32 nr_requests;
+	atomic_t nr_requests;
 	char *buf;
 	int buf_iter;
-	void (*aio_done_func)(struct sheep_aiocb *, bool);
+	void (*aio_done_func)(struct sheep_aiocb *);
 };
 
 enum sheep_request_type {
diff --git a/sbd/sheep.c b/sbd/sheep.c
index 33269b4..bcc65f0 100644
--- a/sbd/sheep.c
+++ b/sbd/sheep.c
@@ -11,12 +11,25 @@
 
 #include "sbd.h"
 
+/* FIXME I need this hack to compile DEFINE_MUTEX successfully */
+#ifdef __SPIN_LOCK_UNLOCKED
+# undef __SPIN_LOCK_UNLOCKED
+# define __SPIN_LOCK_UNLOCKED(lockname) __SPIN_LOCK_INITIALIZER(lockname)
+#endif
+
+static DEFINE_MUTEX(socket_mutex);
+
 void socket_shutdown(struct socket *sock)
 {
 	if (sock)
 		kernel_sock_shutdown(sock, SHUT_RDWR);
 }
 
+static struct sbd_device *sheep_request_to_device(struct sheep_request *req)
+{
+	return req->aiocb->request->q->queuedata;
+}
+
 static struct sbd_device *sheep_aiocb_to_device(struct sheep_aiocb *aiocb)
 {
 	return aiocb->request->q->queuedata;
@@ -49,7 +62,7 @@ static int socket_create(struct socket **sock, const char *ip_addr, int port)
 			      (char *)&nodelay, sizeof(nodelay));
 	set_fs(oldmm);
 	if (ret != 0) {
-		pr_err("Can't set SO_LINGER: %d\n", ret);
+		pr_err("Can't set nodelay: %d\n", ret);
 		goto shutdown;
 	}
 
@@ -129,14 +142,20 @@ static int socket_write(struct socket *sock, void *buf, int len)
 static int sheep_submit_sdreq(struct socket *sock, struct sd_req *hdr,
 			      void *data, unsigned int wlen)
 {
-	int ret = socket_write(sock, hdr, sizeof(*hdr));
+	int ret;
 
+	/* Make sheep_submit_sdreq thread safe */
+	mutex_lock(&socket_mutex);
+
+	ret = socket_write(sock, hdr, sizeof(*hdr));
 	if (ret < 0)
-		return ret;
+		goto out;
 
 	if (wlen)
-		return socket_write(sock, data, wlen);
-	return 0;
+		ret = socket_write(sock, data, wlen);
+out:
+	mutex_unlock(&socket_mutex);
+	return ret;
 }
 
 /* Run the request synchronously */
@@ -252,33 +271,58 @@ out:
 
 static void submit_sheep_request(struct sheep_request *req)
 {
+	struct sd_req hdr = {};
+	struct sbd_device *dev = sheep_request_to_device(req);
+
+	hdr.id = req->seq_num;
+	hdr.data_length = req->length;
+	hdr.obj.oid = req->oid;
+	hdr.obj.offset = req->offset;
+
+	write_lock(&dev->inflight_lock);
+	BUG_ON(!list_empty(&req->list));
+	list_add_tail(&req->list, &dev->inflight_head);
+	write_unlock(&dev->inflight_lock);
+
+	switch (req->type) {
+	case SHEEP_CREATE:
+	case SHEEP_WRITE:
+		if (req->type == SHEEP_CREATE)
+			hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
+		else
+			hdr.opcode = SD_OP_WRITE_OBJ;
+		hdr.flags = SD_FLAG_CMD_WRITE | SD_FLAG_CMD_DIRECT;
+		sheep_submit_sdreq(dev->sock, &hdr, req->buf, req->length);
+		break;
+	case SHEEP_READ:
+		hdr.opcode = SD_OP_READ_OBJ;
+		sheep_submit_sdreq(dev->sock, &hdr, NULL, 0);
+		break;
+	}
+	sbd_debug("add oid %llx off %d, len %d, seq %u\n", req->oid,
+		  req->offset, req->length, req->seq_num);
+	wake_up(&dev->reaper_wq);
 }
 
 static inline void free_sheep_aiocb(struct sheep_aiocb *aiocb)
 {
-	kfree(aiocb->buf);
+	vfree(aiocb->buf);
 	kfree(aiocb);
 }
 
-static void aio_write_done(struct sheep_aiocb *aiocb, bool locked)
+static void aio_write_done(struct sheep_aiocb *aiocb)
 {
-	sbd_debug("off %llu, len %llu\n", aiocb->offset, aiocb->length);
+	sbd_debug("wdone off %llu, len %llu\n", aiocb->offset, aiocb->length);
 
-	if (locked)
-		__blk_end_request_all(aiocb->request, aiocb->ret);
-	else
-		blk_end_request_all(aiocb->request, aiocb->ret);
+	blk_end_request_all(aiocb->request, aiocb->ret);
 	free_sheep_aiocb(aiocb);
 }
 
-static void aio_read_done(struct sheep_aiocb *aiocb, bool locked)
+static void aio_read_done(struct sheep_aiocb *aiocb)
 {
-	sbd_debug("off %llu, len %llu\n", aiocb->offset, aiocb->length);
+	sbd_debug("rdone off %llu, len %llu\n", aiocb->offset, aiocb->length);
 
-	if (locked)
-		__blk_end_request_all(aiocb->request, aiocb->ret);
-	else
-		blk_end_request_all(aiocb->request, aiocb->ret);
+	blk_end_request_all(aiocb->request, aiocb->ret);
 	free_sheep_aiocb(aiocb);
 }
 
@@ -294,11 +338,16 @@ struct sheep_aiocb *sheep_aiocb_setup(struct request *req)
 
 	aiocb->offset = blk_rq_pos(req) * SECTOR_SIZE;
 	aiocb->length = blk_rq_bytes(req);
-	aiocb->nr_requests = 0;
 	aiocb->ret = 0;
 	aiocb->buf_iter = 0;
 	aiocb->request = req;
-	aiocb->buf = kzalloc(aiocb->length, GFP_KERNEL);
+	aiocb->buf = vzalloc(aiocb->length);
+	atomic_set(&aiocb->nr_requests, 0);
+
+	if (!aiocb->buf) {
+		kfree(aiocb);
+		return ERR_PTR(-ENOMEM);
+	}
 
 	switch (rq_data_dir(req)) {
 	case WRITE:
@@ -343,6 +392,7 @@ static struct sheep_request *alloc_sheep_request(struct sheep_aiocb *aiocb,
 	req->aiocb = aiocb;
 	req->buf = aiocb->buf + aiocb->buf_iter;
 	req->seq_num = atomic_inc_return(&dev->seq_num);
+	INIT_LIST_HEAD(&req->list);
 
 	switch (rq_data_dir(aiocb->request)) {
 	case WRITE:
@@ -359,23 +409,51 @@ static struct sheep_request *alloc_sheep_request(struct sheep_aiocb *aiocb,
 	}
 
 	aiocb->buf_iter += len;
-	aiocb->nr_requests++;
+	atomic_inc(&aiocb->nr_requests);
 
 	return req;
 }
 
-static void end_sheep_request(struct sheep_request *req, bool queue_locked)
+static void end_sheep_request(struct sheep_request *req)
 {
 	struct sheep_aiocb *aiocb = req->aiocb;
 
-	if (--aiocb->nr_requests == 0)
-		aiocb->aio_done_func(aiocb, queue_locked);
-
 	sbd_debug("end oid %llx off %d, len %d, seq %u\n", req->oid,
 		  req->offset, req->length, req->seq_num);
+
+	if (atomic_dec_return(&aiocb->nr_requests) <= 0)
+		aiocb->aio_done_func(aiocb);
+	BUG_ON(!list_empty(&req->list));
 	kfree(req);
 }
 
+static struct sheep_request *find_inflight_request_oid(struct sbd_device *dev,
+						       uint64_t oid)
+{
+	struct sheep_request *req;
+
+	read_lock(&dev->inflight_lock);
+	list_for_each_entry(req, &dev->inflight_head, list) {
+		if (req->oid == oid) {
+			read_unlock(&dev->inflight_lock);
+			return req;
+		}
+	}
+	read_unlock(&dev->inflight_lock);
+	return NULL;
+}
+
+static bool sheep_inode_has_idx(struct sbd_device *dev, u32 idx)
+{
+	spin_lock(&dev->vdi_lock);
+	if (dev->vdi.inode->data_vdi_id[idx]) {
+		spin_unlock(&dev->vdi_lock);
+		return true;
+	}
+	spin_unlock(&dev->vdi_lock);
+	return false;
+}
+
 int sheep_aiocb_submit(struct sheep_aiocb *aiocb)
 {
 	struct sbd_device *dev = sheep_aiocb_to_device(aiocb);
@@ -384,35 +462,59 @@ int sheep_aiocb_submit(struct sheep_aiocb *aiocb)
 	u64 start = offset % SD_DATA_OBJ_SIZE;
 	u32 vid = dev->vdi.vid;
 	u64 oid = vid_to_data_oid(vid, offset / SD_DATA_OBJ_SIZE);
-	u32 idx = data_oid_to_idx(oid);
 	int len = SD_DATA_OBJ_SIZE - start;
 
 	if (total < len)
 		len = total;
 
-	sbd_debug("submit oid %llx off %llu, len %llu\n", oid, offset, total);
+	sbd_debug("submit off %llu, len %llu\n", offset, total);
 	/*
 	 * Make sure we don't free the aiocb before we are done with all
 	 * requests.This additional reference is dropped at the end of this
 	 * function.
 	 */
-	aiocb->nr_requests++;
+	atomic_inc(&aiocb->nr_requests);
 
 	do {
 		struct sheep_request *req;
+		u32 idx = data_oid_to_idx(oid);
 
 		req = alloc_sheep_request(aiocb, oid, len, start);
 		if (IS_ERR(req))
 			return PTR_ERR(req);
 
-		if (likely(dev->vdi.inode->data_vdi_id[idx]))
+		if (likely(sheep_inode_has_idx(dev, idx)))
 			goto submit;
 
 		/* Object is not created yet... */
 		switch (req->type) {
 		case SHEEP_WRITE:
+			/*
+			 * Sheepdog can't handle concurrent creation on the same
+			 * object. We send one create req first and then send
+			 * write reqs in next.
+			 */
+			if (find_inflight_request_oid(dev, oid)) {
+				write_lock(&dev->blocking_lock);
+				/*
+				 * There are slim chance object was created
+				 * before we grab blocking_lock
+				 */
+				if (unlikely(sheep_inode_has_idx(dev, idx))) {
+					write_unlock(&dev->blocking_lock);
+					goto submit;
+				}
+				list_add_tail(&req->list, &dev->blocking_head);
+				sbd_debug("block oid %llx off %d, len %d,"
+					  " seq %u\n", req->oid, req->offset,
+					  req->length, req->seq_num);
+				write_unlock(&dev->blocking_lock);
+				goto done;
+			}
+			req->type = SHEEP_CREATE;
+			break;
 		case SHEEP_READ:
-			end_sheep_request(req, true);
+			end_sheep_request(req);
 			goto done;
 		}
 submit:
@@ -424,13 +526,108 @@ done:
 		len = total > SD_DATA_OBJ_SIZE ? SD_DATA_OBJ_SIZE : total;
 	} while (total > 0);
 
-	if (--aiocb->nr_requests == 0)
-		aiocb->aio_done_func(aiocb, true);
+	if (atomic_dec_return(&aiocb->nr_requests) <= 0)
+		aiocb->aio_done_func(aiocb);
 
 	return 0;
 }
 
+static struct sheep_request *fetch_inflight_request(struct sbd_device *dev,
+						    u32 seq_num)
+{
+	struct sheep_request *req, *t;
+
+	write_lock(&dev->inflight_lock);
+	list_for_each_entry_safe(req, t, &dev->inflight_head, list) {
+		if (req->seq_num == seq_num) {
+			list_del_init(&req->list);
+			goto out;
+		}
+	}
+	req = NULL;
+out:
+	write_unlock(&dev->inflight_lock);
+	return req;
+}
+
+static void submit_blocking_sheep_request(struct sbd_device *dev, uint64_t oid)
+{
+	struct sheep_request *req, *t;
+
+	write_lock(&dev->blocking_lock);
+	list_for_each_entry_safe(req, t, &dev->blocking_head, list) {
+		if (req->oid != oid)
+			continue;
+		list_del_init(&req->list);
+		submit_sheep_request(req);
+	}
+	write_unlock(&dev->blocking_lock);
+}
+
 int sheep_handle_reply(struct sbd_device *dev)
 {
+	struct sd_rsp rsp = {};
+	struct sheep_request *req, *new;
+	uint32_t vid, idx;
+	uint64_t oid;
+	int ret;
+
+	ret = socket_read(dev->sock, (char *)&rsp, sizeof(rsp));
+	if (ret < 0) {
+		pr_err("failed to read reply header\n");
+		goto err;
+	}
+
+	req = fetch_inflight_request(dev, rsp.id);
+	if (!req) {
+		pr_err("failed to find req %u\n", rsp.id);
+		return 0;
+	}
+	if (rsp.data_length > 0) {
+		ret = socket_read(dev->sock, req->buf, req->length);
+		if (ret < 0) {
+			pr_err("failed to read reply payload\n");
+			goto err;
+		}
+	}
+
+	switch (req->type) {
+	case SHEEP_CREATE:
+		/* We need to update inode for create */
+		new = kmalloc(sizeof(*new), GFP_KERNEL);
+		if (!new) {
+			ret = -ENOMEM;
+			goto err;
+		}
+
+		vid = dev->vdi.vid;
+		oid = vid_to_vdi_oid(vid);
+		idx = data_oid_to_idx(req->oid);
+		new->offset = SD_INODE_HEADER_SIZE + sizeof(vid) * idx;
+		new->length = sizeof(vid);
+		new->oid = oid;
+		new->aiocb = req->aiocb;
+		new->buf = (char *)&vid;
+		new->seq_num = atomic_inc_return(&dev->seq_num);
+		new->type = SHEEP_WRITE;
+		atomic_inc(&req->aiocb->nr_requests);
+		INIT_LIST_HEAD(&new->list);
+
+		/* Make sure no request is queued while we update inode */
+		spin_lock(&dev->vdi_lock);
+		dev->vdi.inode->data_vdi_id[idx] = vid;
+		spin_unlock(&dev->vdi_lock);
+
+		submit_sheep_request(new);
+		submit_blocking_sheep_request(dev, req->oid);
+		/* fall thru */
+	case SHEEP_WRITE:
+	case SHEEP_READ:
+		end_sheep_request(req);
+		break;
+	}
+
 	return 0;
+err:
+	return ret;
 }
diff --git a/sbd/sheep_block_device.c b/sbd/sheep_block_device.c
index e7331dc..9dc5ce4 100644
--- a/sbd/sheep_block_device.c
+++ b/sbd/sheep_block_device.c
@@ -35,21 +35,26 @@ static int sbd_submit_request(struct request *req)
 	return sheep_aiocb_submit(aiocb);
 }
 
-static void sbd_request_submiter(struct request_queue *q)
+static void sbd_request_fn(struct request_queue *q)
+__releases(q->queue_lock) __acquires(q->queue_lock)
 {
 	struct request *req;
+	struct sbd_device *dev = q->queuedata;
 
 	while ((req = blk_fetch_request(q)) != NULL) {
-		int ret;
 
 		/* filter out block requests we don't understand */
-		if (req->cmd_type != REQ_TYPE_FS) {
+		if (unlikely(req->cmd_type != REQ_TYPE_FS)) {
 			__blk_end_request_all(req, 0);
 			continue;
 		}
-		ret = sbd_submit_request(req);
-		if (ret < 0)
-			break;
+
+		list_add_tail(&req->queuelist, &dev->request_head);
+		spin_unlock_irq(q->queue_lock);
+
+		wake_up(&dev->submiter_wq);
+
+		spin_lock_irq(q->queue_lock);
 	}
 }
 
@@ -68,7 +73,7 @@ static int sbd_add_disk(struct sbd_device *dev)
 	disk->fops = &sbd_bd_ops;
 	disk->private_data = dev;
 
-	rq = blk_init_queue(sbd_request_submiter, &dev->queue_lock);
+	rq = blk_init_queue(sbd_request_fn, &dev->queue_lock);
 	if (!rq) {
 		put_disk(disk);
 		return -ENOMEM;
@@ -93,16 +98,53 @@ static int sbd_add_disk(struct sbd_device *dev)
 static int sbd_request_reaper(void *data)
 {
 	struct sbd_device *dev = data;
+	int ret;
 
 	while (!kthread_should_stop() || !list_empty(&dev->inflight_head)) {
-		wait_event_interruptible(dev->inflight_wq,
+		bool empty;
+
+		wait_event_interruptible(dev->reaper_wq,
 					 kthread_should_stop() ||
 					 !list_empty(&dev->inflight_head));
 
-		if (list_empty(&dev->inflight_head))
+		read_lock(&dev->inflight_lock);
+		empty = list_empty(&dev->inflight_head);
+		read_unlock(&dev->inflight_lock);
+
+		if (unlikely(empty))
 			continue;
 
-		sheep_handle_reply(dev);
+		ret = sheep_handle_reply(dev);
+		if (unlikely(ret < 0))
+			pr_err("reaper: failed to handle reply\n");
+	}
+	return 0;
+}
+
+static int sbd_request_submiter(void *data)
+{
+	struct sbd_device *dev = data;
+	int ret;
+
+	while (!kthread_should_stop() || !list_empty(&dev->request_head)) {
+		struct request *req;
+
+		wait_event_interruptible(dev->submiter_wq,
+					 kthread_should_stop() ||
+					 !list_empty(&dev->request_head));
+
+		spin_lock_irq(&dev->queue_lock);
+		if (unlikely(list_empty(&dev->request_head))) {
+			spin_unlock_irq(&dev->queue_lock);
+			continue;
+		}
+		req = list_entry_rq(dev->request_head.next);
+		list_del_init(&req->queuelist);
+		spin_unlock_irq(&dev->queue_lock);
+
+		ret = sbd_submit_request(req);
+		if (unlikely(ret < 0))
+			pr_err("submiter: failed to submit request\n");
 	}
 	return 0;
 }
@@ -138,9 +180,14 @@ static ssize_t sbd_add(struct bus_type *bus, const char *buf,
 	}
 
 	spin_lock_init(&dev->queue_lock);
+	spin_lock_init(&dev->vdi_lock);
 	INIT_LIST_HEAD(&dev->inflight_head);
 	INIT_LIST_HEAD(&dev->blocking_head);
-	init_waitqueue_head(&dev->inflight_wq);
+	INIT_LIST_HEAD(&dev->request_head);
+	init_waitqueue_head(&dev->reaper_wq);
+	init_waitqueue_head(&dev->submiter_wq);
+	rwlock_init(&dev->inflight_lock);
+	rwlock_init(&dev->blocking_lock);
 
 	list_for_each_entry(tmp, &sbd_dev_list, list) {
 		if (tmp->id > new_id)
@@ -159,6 +206,11 @@ static ssize_t sbd_add(struct bus_type *bus, const char *buf,
 	dev->major = ret;
 	dev->minor = 0;
 	dev->reaper = kthread_run(sbd_request_reaper, dev, "sbd_reaper");
+	if (IS_ERR(dev->reaper))
+		goto err_unreg_blkdev;
+	dev->submiter = kthread_run(sbd_request_submiter, dev, "sbd_submiter");
+	if (IS_ERR(dev->submiter))
+		goto err_unreg_blkdev;
 
 	ret = sbd_add_disk(dev);
 	if (ret < 0)
@@ -222,7 +274,9 @@ static ssize_t sbd_remove(struct bus_type *bus, const char *buf,
 		return -ENOENT;
 
 	kthread_stop(dev->reaper);
-	wake_up_interruptible(&dev->inflight_wq);
+	kthread_stop(dev->submiter);
+	wake_up(&dev->reaper_wq);
+	wake_up(&dev->submiter_wq);
 
 	sbd_del_disk(dev);
 	free_sbd_device(dev);
-- 
1.8.1.2




More information about the sheepdog mailing list