[Stgt-devel] [Patch 0/4] bs_mmap / kreq_send threading
FUJITA Tomonori
fujita.tomonori
Fri Nov 16 10:33:59 CET 2007
On Tue, 6 Nov 2007 01:27:10 +0900
FUJITA Tomonori <tomof at acm.org> wrote:
> Sorry about the delay.
>
> On Tue, 23 Oct 2007 16:14:01 -0500
> Robert Jennings <rcj at linux.vnet.ibm.com> wrote:
>
> > A little threading work like we have in bs_sync for bs_mmap. The mmap
> > can be taken care of with a small pool of worker threads this way.
> >
> > After the mmap the completed commands enter back into the main tgtd
> > thread and would be sent to the kernel for in-kernel drivers through
> > kreq_send. Unfortunately kreq_send can sleep in blk_rq_map_user or
> > scsi_map_user_pages and hold up processing in tgtd. So kreq_send can
> > add the command to a list and hand it off to a small thread pool to
> > process sending the replies to the kernel.
>
> tgtd is blocked when the data of a file isn't in page cache. So how
> about worker threads does access to the data before tgtd calls
> kreq_send? It works like the flush web server.
Have you measured the performance with your patch?
I tried the worker thread idea, but the performance drops (probabaly
due to pipe notification overveheads).
diff --git a/usr/Makefile b/usr/Makefile
index addf5be..7339dfc 100644
--- a/usr/Makefile
+++ b/usr/Makefile
@@ -29,6 +29,7 @@ ifneq ($(IBMVIO),)
CFLAGS += -DIBMVIO -DUSE_KERNEL
TGTD_OBJS += $(addprefix ibmvio/, ibmvio.o)
TGTD_OBJS += bs_mmap.o tgtif.o
+LIBS += -lpthread
endif
ifneq ($(ISCSI),)
diff --git a/usr/bs_mmap.c b/usr/bs_mmap.c
index ad1bb4f..e4c0b86 100644
--- a/usr/bs_mmap.c
+++ b/usr/bs_mmap.c
@@ -26,59 +26,277 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
+#include <pthread.h>
#include <sys/mman.h>
+#include <sys/epoll.h>
#include "list.h"
#include "util.h"
#include "tgtd.h"
#include "scsi.h"
-static int bs_mmap_open(struct scsi_lu *lu, char *path, int *fd, uint64_t *size)
-{
- *fd = backed_file_open(path, O_RDWR| O_LARGEFILE, size);
+#define NR_WORKER_THREADS 4
- return *fd >= 0 ? 0 : *fd;
-}
+struct bs_mmap_info {
+ pthread_t ack_thread;
+ pthread_t worker_thread[NR_WORKER_THREADS];
-static void bs_mmap_close(struct scsi_lu *lu)
+ /* protected by pipe */
+ struct list_head ack_list;
+
+ pthread_cond_t finished_cond;
+ pthread_mutex_t finished_lock;
+ struct list_head finished_list;
+
+ /* wokers sleep on this and signaled by tgtd */
+ pthread_cond_t pending_cond;
+ /* locked by tgtd and workers */
+ pthread_mutex_t pending_lock;
+ /* protected by pending_lock */
+ struct list_head pending_list;
+
+ int command_fd[2];
+ int done_fd[2];
+
+ int stop;
+};
+
+static void *bs_mmap_ack_fn(void *arg)
{
- close(lu->fd);
+ struct bs_mmap_info *info = arg;
+ int command, ret, nr;
+ struct scsi_cmd *cmd;
+
+retry:
+ ret = read(info->command_fd[0], &command, sizeof(command));
+ if (ret < 0) {
+ eprintf("ack pthread will be dead, %m\n");
+ if (errno == EAGAIN || errno == EINTR)
+ goto retry;
+
+ goto out;
+ }
+
+ pthread_mutex_lock(&info->finished_lock);
+retest:
+ if (list_empty(&info->finished_list)) {
+ pthread_cond_wait(&info->finished_cond, &info->finished_lock);
+ goto retest;
+ }
+
+ while (!list_empty(&info->finished_list)) {
+ cmd = list_entry(info->finished_list.next,
+ struct scsi_cmd, bs_list);
+
+ dprintf("found %p\n", cmd);
+
+ list_del(&cmd->bs_list);
+ list_add(&cmd->bs_list, &info->ack_list);
+ }
+
+ pthread_mutex_unlock(&info->finished_lock);
+
+ nr = 1;
+rewrite:
+ ret = write(info->done_fd[1], &nr, sizeof(nr));
+ if (ret < 0) {
+ eprintf("can't ack tgtd, %m\n");
+ if (errno == EAGAIN || errno == EINTR)
+ goto rewrite;
+
+ goto out;
+ }
+
+ goto retry;
+out:
+ return NULL;
}
#define pgcnt(size, offset) ((((size) + ((offset) & (pagesize - 1))) + (pagesize - 1)) >> pageshift)
-static int bs_mmap_cmd_submit(struct scsi_cmd *cmd)
+static void *bs_mmap_worker_fn(void *arg)
{
- int fd = cmd->dev->fd, ret = 0;
- void *p;
- uint64_t addr;
+ int ret = 0;
+ struct bs_mmap_info *info = arg;
+ struct scsi_cmd *cmd;
uint32_t length;
- if (cmd->scb[0] == SYNCHRONIZE_CACHE ||
- cmd->scb[0] == SYNCHRONIZE_CACHE_16)
- return fsync(fd);
+ while (1) {
+ uint64_t addr;
+ char *p, dummy;
+ int i, nr;
- length = (scsi_get_data_dir(cmd) == DATA_WRITE) ?
- scsi_get_out_length(cmd) : scsi_get_in_length(cmd);
+ pthread_mutex_lock(&info->pending_lock);
+ retest:
+ if (list_empty(&info->pending_list)) {
+ pthread_cond_wait(&info->pending_cond, &info->pending_lock);
+ if (info->stop) {
+ pthread_mutex_unlock(&info->pending_lock);
+ break;
+ }
+ goto retest;
+ }
- p = mmap64(NULL, pgcnt(length, cmd->offset) << pageshift,
- PROT_READ | PROT_WRITE, MAP_SHARED, fd,
- cmd->offset & ~((1ULL << pageshift) - 1));
- if (p == MAP_FAILED) {
- ret = -EINVAL;
- eprintf("%u %" PRIu64 "\n", length, cmd->offset);
+ cmd = list_entry(info->pending_list.next,
+ struct scsi_cmd, bs_list);
+
+ dprintf("got %p\n", cmd);
+
+ list_del(&cmd->bs_list);
+ pthread_mutex_unlock(&info->pending_lock);
+
+ if (cmd->scb[0] == SYNCHRONIZE_CACHE ||
+ cmd->scb[0] == SYNCHRONIZE_CACHE_16) {
+ ret = fsync(cmd->dev->fd);
+ goto done;
+ }
+
+ ret = 0;
+
+ if (cmd->data_dir == DATA_WRITE) {
+ addr = (unsigned long)scsi_get_out_buffer(cmd);
+ length = scsi_get_out_length(cmd);
+ } else {
+ addr = (unsigned long)scsi_get_in_buffer(cmd);
+ length = scsi_get_in_length(cmd);
+ }
+
+ nr = pgcnt(length, (addr & (pagesize - 1)));
+ addr &= ~(pagesize - 1);
+
+ p = (void *)(unsigned long)addr;
+ for (i = 0; i < nr; i++) {
+ dummy = *p;
+ p += pagesize;
+ }
+ done:
+ if (ret) {
+ eprintf("io error %p %x %d %d, %m\n",
+ cmd, cmd->scb[0], ret, cmd->offset);
+ scsi_set_result(cmd, SAM_STAT_CHECK_CONDITION);
+ sense_data_build(cmd, MEDIUM_ERROR, ASC_READ_ERROR);
+ } else
+ scsi_set_result(cmd, SAM_STAT_GOOD);
+
+ pthread_mutex_lock(&info->finished_lock);
+ list_add(&cmd->bs_list, &info->finished_list);
+ pthread_mutex_unlock(&info->finished_lock);
+
+ pthread_cond_signal(&info->finished_cond);
}
- addr = (unsigned long)p + (cmd->offset & (pagesize - 1));
+ return NULL;
+}
- if (scsi_get_data_dir(cmd) == DATA_WRITE)
- scsi_set_out_buffer(cmd, (void *)(unsigned long)addr);
- else if (scsi_get_data_dir(cmd) == DATA_READ)
- scsi_set_in_buffer(cmd, (void *)(unsigned long)addr);
+static void bs_mmap_handler(int fd, int events, void *data)
+{
+ struct bs_mmap_info *info = data;
+ struct scsi_cmd *cmd;
+ int nr_events, ret;
- dprintf("%" PRIx64 " %u %" PRIu64 "\n", addr, length, cmd->offset);
+ ret = read(info->done_fd[0], &nr_events, sizeof(nr_events));
+ if (ret < 0) {
+ eprintf("wrong wakeup\n");
+ return;
+ }
+
+ while (!list_empty(&info->ack_list)) {
+ cmd = list_entry(info->ack_list.next,
+ struct scsi_cmd, bs_list);
- return ret;
+ dprintf("back to tgtd, %p\n", cmd);
+
+ list_del(&cmd->bs_list);
+ target_cmd_io_done(cmd, scsi_get_result(cmd));
+ }
+
+ write(info->command_fd[1], &nr_events, sizeof(nr_events));
+}
+
+static int bs_mmap_open(struct scsi_lu *lu, char *path, int *fd, uint64_t *size)
+{
+ int i, ret;
+ struct bs_mmap_info *info =
+ (struct bs_mmap_info *)((char *)lu + sizeof(*lu));
+
+ INIT_LIST_HEAD(&info->ack_list);
+ INIT_LIST_HEAD(&info->finished_list);
+ INIT_LIST_HEAD(&info->pending_list);
+
+ *fd = backed_file_open(path, O_RDWR| O_LARGEFILE, size);
+ if (*fd < 0)
+ return *fd;
+
+ pthread_cond_init(&info->finished_cond, NULL);
+ pthread_cond_init(&info->pending_cond, NULL);
+
+ pthread_mutex_init(&info->finished_lock, NULL);
+ pthread_mutex_init(&info->pending_lock, NULL);
+
+ ret = pipe(info->command_fd);
+ if (ret)
+ goto close_dev_fd;
+
+ ret = pipe(info->done_fd);
+ if (ret)
+ goto close_command_fd;
+
+ ret = tgt_event_add(info->done_fd[0], EPOLLIN, bs_mmap_handler, info);
+ if (ret)
+ goto close_done_fd;
+
+ ret = pthread_create(&info->ack_thread, NULL, bs_mmap_ack_fn, info);
+ if (ret)
+ goto event_del;
+
+ for (i = 0; i < ARRAY_SIZE(info->worker_thread); i++) {
+ ret = pthread_create(&info->worker_thread[i], NULL,
+ bs_mmap_worker_fn, info);
+ }
+
+ write(info->command_fd[1], &ret, sizeof(ret));
+
+ return 0;
+event_del:
+ tgt_event_del(info->done_fd[0]);
+close_done_fd:
+ close(info->done_fd[0]);
+ close(info->done_fd[1]);
+close_command_fd:
+ close(info->command_fd[0]);
+ close(info->command_fd[1]);
+close_dev_fd:
+ close(*fd);
+
+ pthread_cond_destroy(&info->finished_cond);
+ pthread_cond_destroy(&info->pending_cond);
+ pthread_mutex_destroy(&info->finished_lock);
+ pthread_mutex_destroy(&info->pending_lock);
+
+ return -1;
+}
+
+static void bs_mmap_close(struct scsi_lu *lu)
+{
+ int i;
+ struct bs_mmap_info *info =
+ (struct bs_mmap_info *)((char *)lu + sizeof(*lu));
+
+ pthread_cancel(info->ack_thread);
+ pthread_join(info->ack_thread, NULL);
+
+ info->stop = 1;
+ pthread_cond_broadcast(&info->pending_cond);
+
+ for (i = 0; i < ARRAY_SIZE(info->worker_thread); i++)
+ pthread_join(info->worker_thread[i], NULL);
+
+ pthread_cond_destroy(&info->finished_cond);
+ pthread_cond_destroy(&info->pending_cond);
+ pthread_mutex_destroy(&info->finished_lock);
+ pthread_mutex_destroy(&info->pending_lock);
+
+ close(lu->fd);
}
static int bs_mmap_cmd_done(struct scsi_cmd *cmd)
@@ -109,7 +327,59 @@ static int bs_mmap_cmd_done(struct scsi_cmd *cmd)
return err;
}
+static int bs_mmap_cmd_submit(struct scsi_cmd *cmd)
+{
+ struct scsi_lu *lu = cmd->dev;
+ struct bs_mmap_info *info =
+ (struct bs_mmap_info *)((char *)lu + sizeof(*lu));
+ int fd;
+ uint32_t length;
+ uint64_t addr;
+ void *p;
+
+ fd = cmd->dev->fd;
+ length = 0;
+
+ if (cmd->scb[0] == SYNCHRONIZE_CACHE ||
+ cmd->scb[0] == SYNCHRONIZE_CACHE_16)
+ goto queuing;
+
+ if (cmd->data_dir == DATA_WRITE)
+ length = scsi_get_out_length(cmd);
+ else
+ length = scsi_get_in_length(cmd);
+
+ p = mmap64(NULL, pgcnt(length, cmd->offset) << pageshift,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd,
+ cmd->offset & ~((1ULL << pageshift) - 1));
+ if (p == MAP_FAILED)
+ return EIO;
+
+ addr = (unsigned long)p + (cmd->offset & (pagesize - 1));
+
+ if (scsi_get_data_dir(cmd) == DATA_WRITE)
+ scsi_set_out_buffer(cmd, (void *)(unsigned long)addr);
+ else
+ scsi_set_in_buffer(cmd, (void *)(unsigned long)addr);
+
+ scsi_set_result(cmd, SAM_STAT_GOOD);
+
+queuing:
+ pthread_mutex_lock(&info->pending_lock);
+
+ list_add(&cmd->bs_list, &info->pending_list);
+
+ pthread_mutex_unlock(&info->pending_lock);
+
+ pthread_cond_signal(&info->pending_cond);
+
+ set_cmd_async(cmd);
+
+ return 0;
+}
+
struct backingstore_template mmap_bst = {
+ .bs_datasize = sizeof(struct bs_mmap_info),
.bs_open = bs_mmap_open,
.bs_close = bs_mmap_close,
.bs_cmd_submit = bs_mmap_cmd_submit,
More information about the stgt
mailing list