[sheepdog] [PATCH v2 3/9] sbd: implement write operation
Liu Yuan
namei.unix at gmail.com
Sun May 25 09:53:15 CEST 2014
From: Liu Yuan <tailai.ly at taobao.com>
- change the aio framework
-- have submiter a dedicated kthread to avoid lockup
- flesh out sheep_handle_reply and sheep_aiocb_submit
Now we have two kthreads, one is for request submitting and the other is for
reply handling.
The tricky part is that sheep_submit_sdreq() will be called by both kthreads,
so we add a mutex to avoid send data intermingling, which would cause a very
tricky bug that sheep client will get wrong sd_req structure and reply with
very wrong sd_rsp.
The main problems for aio framework are,
- split the striding request into individual sheep requests for each objects.
- we only send one create request if object isn't created and block the requests
to this object until the creation succeed then send the blocking requests.
To get best of performance,
# echo 4096 > /sys/block/sbd0/queue/max_sectors_kb
Which means io scheduler will try its best to handle us 4MB request.
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
sbd/sbd.h | 16 ++-
sbd/sheep.c | 261 +++++++++++++++++++++++++++++++++++++++++------
sbd/sheep_block_device.c | 78 +++++++++++---
3 files changed, 306 insertions(+), 49 deletions(-)
diff --git a/sbd/sbd.h b/sbd/sbd.h
index a2252ff..dbabc7a 100644
--- a/sbd/sbd.h
+++ b/sbd/sbd.h
@@ -43,13 +43,19 @@ struct sbd_device {
spinlock_t queue_lock; /* request queue lock */
struct sheep_vdi vdi; /* Associated sheep image */
+ spinlock_t vdi_lock;
- struct list_head inflight_head;
- wait_queue_head_t inflight_wq;
- struct list_head blocking_head;
+ struct list_head request_head; /* protected by queue lock */
+ struct list_head inflight_head; /* for inflight sheep requests */
+ struct list_head blocking_head; /* for blocking sheep requests */
+ rwlock_t inflight_lock;
+ rwlock_t blocking_lock;
struct list_head list;
struct task_struct *reaper;
+ struct task_struct *submiter;
+ wait_queue_head_t reaper_wq;
+ wait_queue_head_t submiter_wq;
};
struct sheep_aiocb {
@@ -57,10 +63,10 @@ struct sheep_aiocb {
u64 offset;
u64 length;
int ret;
- u32 nr_requests;
+ atomic_t nr_requests;
char *buf;
int buf_iter;
- void (*aio_done_func)(struct sheep_aiocb *, bool);
+ void (*aio_done_func)(struct sheep_aiocb *);
};
enum sheep_request_type {
diff --git a/sbd/sheep.c b/sbd/sheep.c
index 33269b4..bcc65f0 100644
--- a/sbd/sheep.c
+++ b/sbd/sheep.c
@@ -11,12 +11,25 @@
#include "sbd.h"
+/* FIXME I need this hack to compile DEFINE_MUTEX successfully */
+#ifdef __SPIN_LOCK_UNLOCKED
+# undef __SPIN_LOCK_UNLOCKED
+# define __SPIN_LOCK_UNLOCKED(lockname) __SPIN_LOCK_INITIALIZER(lockname)
+#endif
+
+static DEFINE_MUTEX(socket_mutex);
+
void socket_shutdown(struct socket *sock)
{
if (sock)
kernel_sock_shutdown(sock, SHUT_RDWR);
}
+static struct sbd_device *sheep_request_to_device(struct sheep_request *req)
+{
+ return req->aiocb->request->q->queuedata;
+}
+
static struct sbd_device *sheep_aiocb_to_device(struct sheep_aiocb *aiocb)
{
return aiocb->request->q->queuedata;
@@ -49,7 +62,7 @@ static int socket_create(struct socket **sock, const char *ip_addr, int port)
(char *)&nodelay, sizeof(nodelay));
set_fs(oldmm);
if (ret != 0) {
- pr_err("Can't set SO_LINGER: %d\n", ret);
+ pr_err("Can't set nodelay: %d\n", ret);
goto shutdown;
}
@@ -129,14 +142,20 @@ static int socket_write(struct socket *sock, void *buf, int len)
static int sheep_submit_sdreq(struct socket *sock, struct sd_req *hdr,
void *data, unsigned int wlen)
{
- int ret = socket_write(sock, hdr, sizeof(*hdr));
+ int ret;
+ /* Make sheep_submit_sdreq thread safe */
+ mutex_lock(&socket_mutex);
+
+ ret = socket_write(sock, hdr, sizeof(*hdr));
if (ret < 0)
- return ret;
+ goto out;
if (wlen)
- return socket_write(sock, data, wlen);
- return 0;
+ ret = socket_write(sock, data, wlen);
+out:
+ mutex_unlock(&socket_mutex);
+ return ret;
}
/* Run the request synchronously */
@@ -252,33 +271,58 @@ out:
static void submit_sheep_request(struct sheep_request *req)
{
+ struct sd_req hdr = {};
+ struct sbd_device *dev = sheep_request_to_device(req);
+
+ hdr.id = req->seq_num;
+ hdr.data_length = req->length;
+ hdr.obj.oid = req->oid;
+ hdr.obj.offset = req->offset;
+
+ write_lock(&dev->inflight_lock);
+ BUG_ON(!list_empty(&req->list));
+ list_add_tail(&req->list, &dev->inflight_head);
+ write_unlock(&dev->inflight_lock);
+
+ switch (req->type) {
+ case SHEEP_CREATE:
+ case SHEEP_WRITE:
+ if (req->type == SHEEP_CREATE)
+ hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
+ else
+ hdr.opcode = SD_OP_WRITE_OBJ;
+ hdr.flags = SD_FLAG_CMD_WRITE | SD_FLAG_CMD_DIRECT;
+ sheep_submit_sdreq(dev->sock, &hdr, req->buf, req->length);
+ break;
+ case SHEEP_READ:
+ hdr.opcode = SD_OP_READ_OBJ;
+ sheep_submit_sdreq(dev->sock, &hdr, NULL, 0);
+ break;
+ }
+ sbd_debug("add oid %llx off %d, len %d, seq %u\n", req->oid,
+ req->offset, req->length, req->seq_num);
+ wake_up(&dev->reaper_wq);
}
static inline void free_sheep_aiocb(struct sheep_aiocb *aiocb)
{
- kfree(aiocb->buf);
+ vfree(aiocb->buf);
kfree(aiocb);
}
-static void aio_write_done(struct sheep_aiocb *aiocb, bool locked)
+static void aio_write_done(struct sheep_aiocb *aiocb)
{
- sbd_debug("off %llu, len %llu\n", aiocb->offset, aiocb->length);
+ sbd_debug("wdone off %llu, len %llu\n", aiocb->offset, aiocb->length);
- if (locked)
- __blk_end_request_all(aiocb->request, aiocb->ret);
- else
- blk_end_request_all(aiocb->request, aiocb->ret);
+ blk_end_request_all(aiocb->request, aiocb->ret);
free_sheep_aiocb(aiocb);
}
-static void aio_read_done(struct sheep_aiocb *aiocb, bool locked)
+static void aio_read_done(struct sheep_aiocb *aiocb)
{
- sbd_debug("off %llu, len %llu\n", aiocb->offset, aiocb->length);
+ sbd_debug("rdone off %llu, len %llu\n", aiocb->offset, aiocb->length);
- if (locked)
- __blk_end_request_all(aiocb->request, aiocb->ret);
- else
- blk_end_request_all(aiocb->request, aiocb->ret);
+ blk_end_request_all(aiocb->request, aiocb->ret);
free_sheep_aiocb(aiocb);
}
@@ -294,11 +338,16 @@ struct sheep_aiocb *sheep_aiocb_setup(struct request *req)
aiocb->offset = blk_rq_pos(req) * SECTOR_SIZE;
aiocb->length = blk_rq_bytes(req);
- aiocb->nr_requests = 0;
aiocb->ret = 0;
aiocb->buf_iter = 0;
aiocb->request = req;
- aiocb->buf = kzalloc(aiocb->length, GFP_KERNEL);
+ aiocb->buf = vzalloc(aiocb->length);
+ atomic_set(&aiocb->nr_requests, 0);
+
+ if (!aiocb->buf) {
+ kfree(aiocb);
+ return ERR_PTR(-ENOMEM);
+ }
switch (rq_data_dir(req)) {
case WRITE:
@@ -343,6 +392,7 @@ static struct sheep_request *alloc_sheep_request(struct sheep_aiocb *aiocb,
req->aiocb = aiocb;
req->buf = aiocb->buf + aiocb->buf_iter;
req->seq_num = atomic_inc_return(&dev->seq_num);
+ INIT_LIST_HEAD(&req->list);
switch (rq_data_dir(aiocb->request)) {
case WRITE:
@@ -359,23 +409,51 @@ static struct sheep_request *alloc_sheep_request(struct sheep_aiocb *aiocb,
}
aiocb->buf_iter += len;
- aiocb->nr_requests++;
+ atomic_inc(&aiocb->nr_requests);
return req;
}
-static void end_sheep_request(struct sheep_request *req, bool queue_locked)
+static void end_sheep_request(struct sheep_request *req)
{
struct sheep_aiocb *aiocb = req->aiocb;
- if (--aiocb->nr_requests == 0)
- aiocb->aio_done_func(aiocb, queue_locked);
-
sbd_debug("end oid %llx off %d, len %d, seq %u\n", req->oid,
req->offset, req->length, req->seq_num);
+
+ if (atomic_dec_return(&aiocb->nr_requests) <= 0)
+ aiocb->aio_done_func(aiocb);
+ BUG_ON(!list_empty(&req->list));
kfree(req);
}
+static struct sheep_request *find_inflight_request_oid(struct sbd_device *dev,
+ uint64_t oid)
+{
+ struct sheep_request *req;
+
+ read_lock(&dev->inflight_lock);
+ list_for_each_entry(req, &dev->inflight_head, list) {
+ if (req->oid == oid) {
+ read_unlock(&dev->inflight_lock);
+ return req;
+ }
+ }
+ read_unlock(&dev->inflight_lock);
+ return NULL;
+}
+
+static bool sheep_inode_has_idx(struct sbd_device *dev, u32 idx)
+{
+ spin_lock(&dev->vdi_lock);
+ if (dev->vdi.inode->data_vdi_id[idx]) {
+ spin_unlock(&dev->vdi_lock);
+ return true;
+ }
+ spin_unlock(&dev->vdi_lock);
+ return false;
+}
+
int sheep_aiocb_submit(struct sheep_aiocb *aiocb)
{
struct sbd_device *dev = sheep_aiocb_to_device(aiocb);
@@ -384,35 +462,59 @@ int sheep_aiocb_submit(struct sheep_aiocb *aiocb)
u64 start = offset % SD_DATA_OBJ_SIZE;
u32 vid = dev->vdi.vid;
u64 oid = vid_to_data_oid(vid, offset / SD_DATA_OBJ_SIZE);
- u32 idx = data_oid_to_idx(oid);
int len = SD_DATA_OBJ_SIZE - start;
if (total < len)
len = total;
- sbd_debug("submit oid %llx off %llu, len %llu\n", oid, offset, total);
+ sbd_debug("submit off %llu, len %llu\n", offset, total);
/*
* Make sure we don't free the aiocb before we are done with all
* requests.This additional reference is dropped at the end of this
* function.
*/
- aiocb->nr_requests++;
+ atomic_inc(&aiocb->nr_requests);
do {
struct sheep_request *req;
+ u32 idx = data_oid_to_idx(oid);
req = alloc_sheep_request(aiocb, oid, len, start);
if (IS_ERR(req))
return PTR_ERR(req);
- if (likely(dev->vdi.inode->data_vdi_id[idx]))
+ if (likely(sheep_inode_has_idx(dev, idx)))
goto submit;
/* Object is not created yet... */
switch (req->type) {
case SHEEP_WRITE:
+ /*
+ * Sheepdog can't handle concurrent creation on the same
+ * object. We send one create req first and then send
+ * write reqs in next.
+ */
+ if (find_inflight_request_oid(dev, oid)) {
+ write_lock(&dev->blocking_lock);
+ /*
+ * There are slim chance object was created
+ * before we grab blocking_lock
+ */
+ if (unlikely(sheep_inode_has_idx(dev, idx))) {
+ write_unlock(&dev->blocking_lock);
+ goto submit;
+ }
+ list_add_tail(&req->list, &dev->blocking_head);
+ sbd_debug("block oid %llx off %d, len %d,"
+ " seq %u\n", req->oid, req->offset,
+ req->length, req->seq_num);
+ write_unlock(&dev->blocking_lock);
+ goto done;
+ }
+ req->type = SHEEP_CREATE;
+ break;
case SHEEP_READ:
- end_sheep_request(req, true);
+ end_sheep_request(req);
goto done;
}
submit:
@@ -424,13 +526,108 @@ done:
len = total > SD_DATA_OBJ_SIZE ? SD_DATA_OBJ_SIZE : total;
} while (total > 0);
- if (--aiocb->nr_requests == 0)
- aiocb->aio_done_func(aiocb, true);
+ if (atomic_dec_return(&aiocb->nr_requests) <= 0)
+ aiocb->aio_done_func(aiocb);
return 0;
}
+static struct sheep_request *fetch_inflight_request(struct sbd_device *dev,
+ u32 seq_num)
+{
+ struct sheep_request *req, *t;
+
+ write_lock(&dev->inflight_lock);
+ list_for_each_entry_safe(req, t, &dev->inflight_head, list) {
+ if (req->seq_num == seq_num) {
+ list_del_init(&req->list);
+ goto out;
+ }
+ }
+ req = NULL;
+out:
+ write_unlock(&dev->inflight_lock);
+ return req;
+}
+
+static void submit_blocking_sheep_request(struct sbd_device *dev, uint64_t oid)
+{
+ struct sheep_request *req, *t;
+
+ write_lock(&dev->blocking_lock);
+ list_for_each_entry_safe(req, t, &dev->blocking_head, list) {
+ if (req->oid != oid)
+ continue;
+ list_del_init(&req->list);
+ submit_sheep_request(req);
+ }
+ write_unlock(&dev->blocking_lock);
+}
+
int sheep_handle_reply(struct sbd_device *dev)
{
+ struct sd_rsp rsp = {};
+ struct sheep_request *req, *new;
+ uint32_t vid, idx;
+ uint64_t oid;
+ int ret;
+
+ ret = socket_read(dev->sock, (char *)&rsp, sizeof(rsp));
+ if (ret < 0) {
+ pr_err("failed to read reply header\n");
+ goto err;
+ }
+
+ req = fetch_inflight_request(dev, rsp.id);
+ if (!req) {
+ pr_err("failed to find req %u\n", rsp.id);
+ return 0;
+ }
+ if (rsp.data_length > 0) {
+ ret = socket_read(dev->sock, req->buf, req->length);
+ if (ret < 0) {
+ pr_err("failed to read reply payload\n");
+ goto err;
+ }
+ }
+
+ switch (req->type) {
+ case SHEEP_CREATE:
+ /* We need to update inode for create */
+ new = kmalloc(sizeof(*new), GFP_KERNEL);
+ if (!new) {
+ ret = -ENOMEM;
+ goto err;
+ }
+
+ vid = dev->vdi.vid;
+ oid = vid_to_vdi_oid(vid);
+ idx = data_oid_to_idx(req->oid);
+ new->offset = SD_INODE_HEADER_SIZE + sizeof(vid) * idx;
+ new->length = sizeof(vid);
+ new->oid = oid;
+ new->aiocb = req->aiocb;
+ new->buf = (char *)&vid;
+ new->seq_num = atomic_inc_return(&dev->seq_num);
+ new->type = SHEEP_WRITE;
+ atomic_inc(&req->aiocb->nr_requests);
+ INIT_LIST_HEAD(&new->list);
+
+ /* Make sure no request is queued while we update inode */
+ spin_lock(&dev->vdi_lock);
+ dev->vdi.inode->data_vdi_id[idx] = vid;
+ spin_unlock(&dev->vdi_lock);
+
+ submit_sheep_request(new);
+ submit_blocking_sheep_request(dev, req->oid);
+ /* fall thru */
+ case SHEEP_WRITE:
+ case SHEEP_READ:
+ end_sheep_request(req);
+ break;
+ }
+
return 0;
+err:
+ return ret;
}
diff --git a/sbd/sheep_block_device.c b/sbd/sheep_block_device.c
index c2c9dce..b953e36 100644
--- a/sbd/sheep_block_device.c
+++ b/sbd/sheep_block_device.c
@@ -35,21 +35,26 @@ static int sbd_submit_request(struct request *req)
return sheep_aiocb_submit(aiocb);
}
-static void sbd_request_submiter(struct request_queue *q)
+static void sbd_request_fn(struct request_queue *q)
+__releases(q->queue_lock) __acquires(q->queue_lock)
{
struct request *req;
+ struct sbd_device *dev = q->queuedata;
while ((req = blk_fetch_request(q)) != NULL) {
- int ret;
/* filter out block requests we don't understand */
- if (req->cmd_type != REQ_TYPE_FS) {
+ if (unlikely(req->cmd_type != REQ_TYPE_FS)) {
__blk_end_request_all(req, 0);
continue;
}
- ret = sbd_submit_request(req);
- if (ret < 0)
- break;
+
+ list_add_tail(&req->queuelist, &dev->request_head);
+ spin_unlock_irq(q->queue_lock);
+
+ wake_up(&dev->submiter_wq);
+
+ spin_lock_irq(q->queue_lock);
}
}
@@ -68,7 +73,7 @@ static int sbd_add_disk(struct sbd_device *dev)
disk->fops = &sbd_bd_ops;
disk->private_data = dev;
- rq = blk_init_queue(sbd_request_submiter, &dev->queue_lock);
+ rq = blk_init_queue(sbd_request_fn, &dev->queue_lock);
if (!rq) {
put_disk(disk);
return -ENOMEM;
@@ -93,16 +98,53 @@ static int sbd_add_disk(struct sbd_device *dev)
static int sbd_request_reaper(void *data)
{
struct sbd_device *dev = data;
+ int ret;
while (!kthread_should_stop() || !list_empty(&dev->inflight_head)) {
- wait_event_interruptible(dev->inflight_wq,
+ bool empty;
+
+ wait_event_interruptible(dev->reaper_wq,
kthread_should_stop() ||
!list_empty(&dev->inflight_head));
- if (list_empty(&dev->inflight_head))
+ read_lock(&dev->inflight_lock);
+ empty = list_empty(&dev->inflight_head);
+ read_unlock(&dev->inflight_lock);
+
+ if (unlikely(empty))
continue;
- sheep_handle_reply(dev);
+ ret = sheep_handle_reply(dev);
+ if (unlikely(ret < 0))
+ pr_err("reaper: failed to handle reply\n");
+ }
+ return 0;
+}
+
+static int sbd_request_submiter(void *data)
+{
+ struct sbd_device *dev = data;
+ int ret;
+
+ while (!kthread_should_stop() || !list_empty(&dev->request_head)) {
+ struct request *req;
+
+ wait_event_interruptible(dev->submiter_wq,
+ kthread_should_stop() ||
+ !list_empty(&dev->request_head));
+
+ spin_lock_irq(&dev->queue_lock);
+ if (unlikely(list_empty(&dev->request_head))) {
+ spin_unlock_irq(&dev->queue_lock);
+ continue;
+ }
+ req = list_entry_rq(dev->request_head.next);
+ list_del_init(&req->queuelist);
+ spin_unlock_irq(&dev->queue_lock);
+
+ ret = sbd_submit_request(req);
+ if (unlikely(ret < 0))
+ pr_err("submiter: failed to submit request\n");
}
return 0;
}
@@ -138,9 +180,14 @@ static ssize_t sbd_add(struct bus_type *bus, const char *buf,
}
spin_lock_init(&dev->queue_lock);
+ spin_lock_init(&dev->vdi_lock);
INIT_LIST_HEAD(&dev->inflight_head);
INIT_LIST_HEAD(&dev->blocking_head);
- init_waitqueue_head(&dev->inflight_wq);
+ INIT_LIST_HEAD(&dev->request_head);
+ init_waitqueue_head(&dev->reaper_wq);
+ init_waitqueue_head(&dev->submiter_wq);
+ rwlock_init(&dev->inflight_lock);
+ rwlock_init(&dev->blocking_lock);
list_for_each_entry(tmp, &sbd_dev_list, list) {
if (tmp->id > new_id)
@@ -159,6 +206,11 @@ static ssize_t sbd_add(struct bus_type *bus, const char *buf,
dev->major = ret;
dev->minor = 0;
dev->reaper = kthread_run(sbd_request_reaper, dev, "sbd_reaper");
+ if (IS_ERR(dev->reaper))
+ goto err_unreg_blkdev;
+ dev->submiter = kthread_run(sbd_request_submiter, dev, "sbd_submiter");
+ if (IS_ERR(dev->submiter))
+ goto err_unreg_blkdev;
ret = sbd_add_disk(dev);
if (ret < 0)
@@ -222,7 +274,9 @@ static ssize_t sbd_remove(struct bus_type *bus, const char *buf,
return -ENOENT;
kthread_stop(dev->reaper);
- wake_up_interruptible(&dev->inflight_wq);
+ kthread_stop(dev->submiter);
+ wake_up(&dev->reaper_wq);
+ wake_up(&dev->submiter_wq);
sbd_del_disk(dev);
free_sbd_device(dev);
--
1.8.1.2
More information about the sheepdog
mailing list