* FUJITA Tomonori (fujita.tomonori at lab.ntt.co.jp) wrote: > On Tue, 6 Nov 2007 01:27:10 +0900 > FUJITA Tomonori <tomof at acm.org> wrote: > > > Sorry about the delay. > > > > On Tue, 23 Oct 2007 16:14:01 -0500 > > Robert Jennings <rcj at linux.vnet.ibm.com> wrote: > > > > > A little threading work like we have in bs_sync for bs_mmap. The mmap > > > can be taken care of with a small pool of worker threads this way. > > > > > > After the mmap the completed commands enter back into the main tgtd > > > thread and would be sent to the kernel for in-kernel drivers through > > > kreq_send. Unfortunately kreq_send can sleep in blk_rq_map_user or > > > scsi_map_user_pages and hold up processing in tgtd. So kreq_send can > > > add the command to a list and hand it off to a small thread pool to > > > process sending the replies to the kernel. > > > > tgtd is blocked when the data of a file isn't in page cache. So how > > about worker threads does access to the data before tgtd calls > > kreq_send? It works like the flush web server. > > Have you measured the performance with your patch? > > I tried the worker thread idea, but the performance drops (probabaly > due to pipe notification overveheads). Took me a while to get back to this but I had done performance measures as I made these changes. Applying only the patch to thread bs_sync testing would show that for readv/writev operations there is a slight performance regression. For aioread/aiowrite, pread/pwrite, and read/write the performance improved with this patch by itself. With the full patchset the random readv/writev performance was stable with respect to the unpatched tree. > diff --git a/usr/Makefile b/usr/Makefile > index addf5be..7339dfc 100644 > --- a/usr/Makefile > +++ b/usr/Makefile > @@ -29,6 +29,7 @@ ifneq ($(IBMVIO),) > CFLAGS += -DIBMVIO -DUSE_KERNEL > TGTD_OBJS += $(addprefix ibmvio/, ibmvio.o) > TGTD_OBJS += bs_mmap.o tgtif.o > +LIBS += -lpthread > endif > > ifneq ($(ISCSI),) > diff --git a/usr/bs_mmap.c b/usr/bs_mmap.c > index ad1bb4f..e4c0b86 100644 > --- a/usr/bs_mmap.c > +++ b/usr/bs_mmap.c > @@ -26,59 +26,277 @@ > #include <stdlib.h> > #include <string.h> > #include <unistd.h> > +#include <pthread.h> > #include <sys/mman.h> > +#include <sys/epoll.h> > > #include "list.h" > #include "util.h" > #include "tgtd.h" > #include "scsi.h" > > -static int bs_mmap_open(struct scsi_lu *lu, char *path, int *fd, uint64_t *size) > -{ > - *fd = backed_file_open(path, O_RDWR| O_LARGEFILE, size); > +#define NR_WORKER_THREADS 4 > > - return *fd >= 0 ? 0 : *fd; > -} > +struct bs_mmap_info { > + pthread_t ack_thread; > + pthread_t worker_thread[NR_WORKER_THREADS]; > > -static void bs_mmap_close(struct scsi_lu *lu) > + /* protected by pipe */ > + struct list_head ack_list; > + > + pthread_cond_t finished_cond; > + pthread_mutex_t finished_lock; > + struct list_head finished_list; > + > + /* wokers sleep on this and signaled by tgtd */ > + pthread_cond_t pending_cond; > + /* locked by tgtd and workers */ > + pthread_mutex_t pending_lock; > + /* protected by pending_lock */ > + struct list_head pending_list; > + > + int command_fd[2]; > + int done_fd[2]; > + > + int stop; > +}; > + > +static void *bs_mmap_ack_fn(void *arg) > { > - close(lu->fd); > + struct bs_mmap_info *info = arg; > + int command, ret, nr; > + struct scsi_cmd *cmd; > + > +retry: > + ret = read(info->command_fd[0], &command, sizeof(command)); > + if (ret < 0) { > + eprintf("ack pthread will be dead, %m\n"); > + if (errno == EAGAIN || errno == EINTR) > + goto retry; > + > + goto out; > + } > + > + pthread_mutex_lock(&info->finished_lock); > +retest: > + if (list_empty(&info->finished_list)) { > + pthread_cond_wait(&info->finished_cond, &info->finished_lock); > + goto retest; > + } > + > + while (!list_empty(&info->finished_list)) { > + cmd = list_entry(info->finished_list.next, > + struct scsi_cmd, bs_list); > + > + dprintf("found %p\n", cmd); > + > + list_del(&cmd->bs_list); > + list_add(&cmd->bs_list, &info->ack_list); > + } > + > + pthread_mutex_unlock(&info->finished_lock); > + > + nr = 1; > +rewrite: > + ret = write(info->done_fd[1], &nr, sizeof(nr)); > + if (ret < 0) { > + eprintf("can't ack tgtd, %m\n"); > + if (errno == EAGAIN || errno == EINTR) > + goto rewrite; > + > + goto out; > + } > + > + goto retry; > +out: > + return NULL; > } > > #define pgcnt(size, offset) ((((size) + ((offset) & (pagesize - 1))) + (pagesize - 1)) >> pageshift) > > -static int bs_mmap_cmd_submit(struct scsi_cmd *cmd) > +static void *bs_mmap_worker_fn(void *arg) > { > - int fd = cmd->dev->fd, ret = 0; > - void *p; > - uint64_t addr; > + int ret = 0; > + struct bs_mmap_info *info = arg; > + struct scsi_cmd *cmd; > uint32_t length; > > - if (cmd->scb[0] == SYNCHRONIZE_CACHE || > - cmd->scb[0] == SYNCHRONIZE_CACHE_16) > - return fsync(fd); > + while (1) { > + uint64_t addr; > + char *p, dummy; > + int i, nr; > > - length = (scsi_get_data_dir(cmd) == DATA_WRITE) ? > - scsi_get_out_length(cmd) : scsi_get_in_length(cmd); > + pthread_mutex_lock(&info->pending_lock); > + retest: > + if (list_empty(&info->pending_list)) { > + pthread_cond_wait(&info->pending_cond, &info->pending_lock); > + if (info->stop) { > + pthread_mutex_unlock(&info->pending_lock); > + break; > + } > + goto retest; > + } > > - p = mmap64(NULL, pgcnt(length, cmd->offset) << pageshift, > - PROT_READ | PROT_WRITE, MAP_SHARED, fd, > - cmd->offset & ~((1ULL << pageshift) - 1)); > - if (p == MAP_FAILED) { > - ret = -EINVAL; > - eprintf("%u %" PRIu64 "\n", length, cmd->offset); > + cmd = list_entry(info->pending_list.next, > + struct scsi_cmd, bs_list); > + > + dprintf("got %p\n", cmd); > + > + list_del(&cmd->bs_list); > + pthread_mutex_unlock(&info->pending_lock); > + > + if (cmd->scb[0] == SYNCHRONIZE_CACHE || > + cmd->scb[0] == SYNCHRONIZE_CACHE_16) { > + ret = fsync(cmd->dev->fd); > + goto done; > + } > + > + ret = 0; > + > + if (cmd->data_dir == DATA_WRITE) { > + addr = (unsigned long)scsi_get_out_buffer(cmd); > + length = scsi_get_out_length(cmd); > + } else { > + addr = (unsigned long)scsi_get_in_buffer(cmd); > + length = scsi_get_in_length(cmd); > + } > + > + nr = pgcnt(length, (addr & (pagesize - 1))); > + addr &= ~(pagesize - 1); > + > + p = (void *)(unsigned long)addr; > + for (i = 0; i < nr; i++) { > + dummy = *p; > + p += pagesize; > + } > + done: > + if (ret) { > + eprintf("io error %p %x %d %d, %m\n", > + cmd, cmd->scb[0], ret, cmd->offset); > + scsi_set_result(cmd, SAM_STAT_CHECK_CONDITION); > + sense_data_build(cmd, MEDIUM_ERROR, ASC_READ_ERROR); > + } else > + scsi_set_result(cmd, SAM_STAT_GOOD); > + > + pthread_mutex_lock(&info->finished_lock); > + list_add(&cmd->bs_list, &info->finished_list); > + pthread_mutex_unlock(&info->finished_lock); > + > + pthread_cond_signal(&info->finished_cond); > } > > - addr = (unsigned long)p + (cmd->offset & (pagesize - 1)); > + return NULL; > +} > > - if (scsi_get_data_dir(cmd) == DATA_WRITE) > - scsi_set_out_buffer(cmd, (void *)(unsigned long)addr); > - else if (scsi_get_data_dir(cmd) == DATA_READ) > - scsi_set_in_buffer(cmd, (void *)(unsigned long)addr); > +static void bs_mmap_handler(int fd, int events, void *data) > +{ > + struct bs_mmap_info *info = data; > + struct scsi_cmd *cmd; > + int nr_events, ret; > > - dprintf("%" PRIx64 " %u %" PRIu64 "\n", addr, length, cmd->offset); > + ret = read(info->done_fd[0], &nr_events, sizeof(nr_events)); > + if (ret < 0) { > + eprintf("wrong wakeup\n"); > + return; > + } > + > + while (!list_empty(&info->ack_list)) { > + cmd = list_entry(info->ack_list.next, > + struct scsi_cmd, bs_list); > > - return ret; > + dprintf("back to tgtd, %p\n", cmd); > + > + list_del(&cmd->bs_list); > + target_cmd_io_done(cmd, scsi_get_result(cmd)); > + } > + > + write(info->command_fd[1], &nr_events, sizeof(nr_events)); > +} > + > +static int bs_mmap_open(struct scsi_lu *lu, char *path, int *fd, uint64_t *size) > +{ > + int i, ret; > + struct bs_mmap_info *info = > + (struct bs_mmap_info *)((char *)lu + sizeof(*lu)); > + > + INIT_LIST_HEAD(&info->ack_list); > + INIT_LIST_HEAD(&info->finished_list); > + INIT_LIST_HEAD(&info->pending_list); > + > + *fd = backed_file_open(path, O_RDWR| O_LARGEFILE, size); > + if (*fd < 0) > + return *fd; > + > + pthread_cond_init(&info->finished_cond, NULL); > + pthread_cond_init(&info->pending_cond, NULL); > + > + pthread_mutex_init(&info->finished_lock, NULL); > + pthread_mutex_init(&info->pending_lock, NULL); > + > + ret = pipe(info->command_fd); > + if (ret) > + goto close_dev_fd; > + > + ret = pipe(info->done_fd); > + if (ret) > + goto close_command_fd; > + > + ret = tgt_event_add(info->done_fd[0], EPOLLIN, bs_mmap_handler, info); > + if (ret) > + goto close_done_fd; > + > + ret = pthread_create(&info->ack_thread, NULL, bs_mmap_ack_fn, info); > + if (ret) > + goto event_del; > + > + for (i = 0; i < ARRAY_SIZE(info->worker_thread); i++) { > + ret = pthread_create(&info->worker_thread[i], NULL, > + bs_mmap_worker_fn, info); > + } > + > + write(info->command_fd[1], &ret, sizeof(ret)); > + > + return 0; > +event_del: > + tgt_event_del(info->done_fd[0]); > +close_done_fd: > + close(info->done_fd[0]); > + close(info->done_fd[1]); > +close_command_fd: > + close(info->command_fd[0]); > + close(info->command_fd[1]); > +close_dev_fd: > + close(*fd); > + > + pthread_cond_destroy(&info->finished_cond); > + pthread_cond_destroy(&info->pending_cond); > + pthread_mutex_destroy(&info->finished_lock); > + pthread_mutex_destroy(&info->pending_lock); > + > + return -1; > +} > + > +static void bs_mmap_close(struct scsi_lu *lu) > +{ > + int i; > + struct bs_mmap_info *info = > + (struct bs_mmap_info *)((char *)lu + sizeof(*lu)); > + > + pthread_cancel(info->ack_thread); > + pthread_join(info->ack_thread, NULL); > + > + info->stop = 1; > + pthread_cond_broadcast(&info->pending_cond); > + > + for (i = 0; i < ARRAY_SIZE(info->worker_thread); i++) > + pthread_join(info->worker_thread[i], NULL); > + > + pthread_cond_destroy(&info->finished_cond); > + pthread_cond_destroy(&info->pending_cond); > + pthread_mutex_destroy(&info->finished_lock); > + pthread_mutex_destroy(&info->pending_lock); > + > + close(lu->fd); > } > > static int bs_mmap_cmd_done(struct scsi_cmd *cmd) > @@ -109,7 +327,59 @@ static int bs_mmap_cmd_done(struct scsi_cmd *cmd) > return err; > } > > +static int bs_mmap_cmd_submit(struct scsi_cmd *cmd) > +{ > + struct scsi_lu *lu = cmd->dev; > + struct bs_mmap_info *info = > + (struct bs_mmap_info *)((char *)lu + sizeof(*lu)); > + int fd; > + uint32_t length; > + uint64_t addr; > + void *p; > + > + fd = cmd->dev->fd; > + length = 0; > + > + if (cmd->scb[0] == SYNCHRONIZE_CACHE || > + cmd->scb[0] == SYNCHRONIZE_CACHE_16) > + goto queuing; > + > + if (cmd->data_dir == DATA_WRITE) > + length = scsi_get_out_length(cmd); > + else > + length = scsi_get_in_length(cmd); > + > + p = mmap64(NULL, pgcnt(length, cmd->offset) << pageshift, > + PROT_READ | PROT_WRITE, MAP_SHARED, fd, > + cmd->offset & ~((1ULL << pageshift) - 1)); > + if (p == MAP_FAILED) > + return EIO; > + > + addr = (unsigned long)p + (cmd->offset & (pagesize - 1)); > + > + if (scsi_get_data_dir(cmd) == DATA_WRITE) > + scsi_set_out_buffer(cmd, (void *)(unsigned long)addr); > + else > + scsi_set_in_buffer(cmd, (void *)(unsigned long)addr); > + > + scsi_set_result(cmd, SAM_STAT_GOOD); > + > +queuing: > + pthread_mutex_lock(&info->pending_lock); > + > + list_add(&cmd->bs_list, &info->pending_list); > + > + pthread_mutex_unlock(&info->pending_lock); > + > + pthread_cond_signal(&info->pending_cond); > + > + set_cmd_async(cmd); > + > + return 0; > +} > + > struct backingstore_template mmap_bst = { > + .bs_datasize = sizeof(struct bs_mmap_info), > .bs_open = bs_mmap_open, > .bs_close = bs_mmap_close, > .bs_cmd_submit = bs_mmap_cmd_submit, --- Robert C. Jennings <rcjenn at us.ibm.com> IBM Linux Technology Center, Austin +1.512.838.4964 (t/l 678.4964) |