[stgt] [PATCH 1/2] bs: use single notification pthread

FUJITA Tomonori fujita.tomonori at lab.ntt.co.jp
Mon Jun 7 13:31:26 CEST 2010


use single notification pthread instead of pthread per lu.

Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp>
---
 usr/bs.c        |  191 ++++++++++++++++++++++++++++---------------------------
 usr/bs_thread.h |   11 ---
 usr/tgtd.c      |    2 +
 usr/tgtd.h      |    2 +
 4 files changed, 100 insertions(+), 106 deletions(-)

diff --git a/usr/bs.c b/usr/bs.c
index d0fcce4..3d610ed 100644
--- a/usr/bs.c
+++ b/usr/bs.c
@@ -44,6 +44,15 @@ static int sig_fd = -1;
 static LIST_HEAD(sig_finished_list);
 static pthread_mutex_t sig_finished_lock;
 
+static int command_fd[2];
+static int done_fd[2];
+static pthread_t ack_thread;
+/* protected by pipe */
+static LIST_HEAD(ack_list);
+static pthread_cond_t finished_cond;
+static pthread_mutex_t finished_lock;
+static LIST_HEAD(finished_list);
+
 int register_backingstore_template(struct backingstore_template *bst)
 {
 	list_add(&bst->backingstore_siblings, &bst_list);
@@ -66,12 +75,11 @@ struct backingstore_template *get_backingstore_template(const char *name)
 
 static void *bs_thread_ack_fn(void *arg)
 {
-	struct bs_thread_info *info = arg;
 	int command, ret, nr;
 	struct scsi_cmd *cmd;
 
 retry:
-	ret = read(info->command_fd[0], &command, sizeof(command));
+	ret = read(command_fd[0], &command, sizeof(command));
 	if (ret < 0) {
 		eprintf("ack pthread will be dead, %m\n");
 		if (errno == EAGAIN || errno == EINTR)
@@ -80,31 +88,28 @@ retry:
 		goto out;
 	}
 
-	if (info->stop)
-		goto out;
-
-	pthread_mutex_lock(&info->finished_lock);
+	pthread_mutex_lock(&finished_lock);
 retest:
-	if (list_empty(&info->finished_list)) {
-		pthread_cond_wait(&info->finished_cond, &info->finished_lock);
+	if (list_empty(&finished_list)) {
+		pthread_cond_wait(&finished_cond, &finished_lock);
 		goto retest;
 	}
 
-	while (!list_empty(&info->finished_list)) {
-		cmd = list_first_entry(&info->finished_list,
+	while (!list_empty(&finished_list)) {
+		cmd = list_first_entry(&finished_list,
 				 struct scsi_cmd, bs_list);
 
 		dprintf("found %p\n", cmd);
 
 		list_del(&cmd->bs_list);
-		list_add_tail(&cmd->bs_list, &info->ack_list);
+		list_add_tail(&cmd->bs_list, &ack_list);
 	}
 
-	pthread_mutex_unlock(&info->finished_lock);
+	pthread_mutex_unlock(&finished_lock);
 
 	nr = 1;
 rewrite:
-	ret = write(info->done_fd[1], &nr, sizeof(nr));
+	ret = write(done_fd[1], &nr, sizeof(nr));
 	if (ret < 0) {
 		eprintf("can't ack tgtd, %m\n");
 		if (errno == EAGAIN || errno == EINTR)
@@ -120,18 +125,17 @@ out:
 
 static void bs_thread_request_done(int fd, int events, void *data)
 {
-	struct bs_thread_info *info = data;
 	struct scsi_cmd *cmd;
 	int nr_events, ret;
 
-	ret = read(info->done_fd[0], &nr_events, sizeof(nr_events));
+	ret = read(done_fd[0], &nr_events, sizeof(nr_events));
 	if (ret < 0) {
 		eprintf("wrong wakeup\n");
 		return;
 	}
 
-	while (!list_empty(&info->ack_list)) {
-		cmd = list_first_entry(&info->ack_list,
+	while (!list_empty(&ack_list)) {
+		cmd = list_first_entry(&ack_list,
 				       struct scsi_cmd, bs_list);
 
 		dprintf("back to tgtd, %p\n", cmd);
@@ -141,7 +145,7 @@ static void bs_thread_request_done(int fd, int events, void *data)
 	}
 
 rewrite:
-	ret = write(info->command_fd[1], &nr_events, sizeof(nr_events));
+	ret = write(command_fd[1], &nr_events, sizeof(nr_events));
 	if (ret < 0) {
 		eprintf("can't write done, %m\n");
 		if (errno == EAGAIN || errno == EINTR)
@@ -204,19 +208,17 @@ static void *bs_thread_worker_fn(void *arg)
 		cmd = list_first_entry(&info->pending_list,
 				       struct scsi_cmd, bs_list);
 
-		dprintf("got %p\n", cmd);
-
 		list_del(&cmd->bs_list);
 		pthread_mutex_unlock(&info->pending_lock);
 
 		info->request_fn(cmd);
 
 		if (sig_fd < 0) {
-			pthread_mutex_lock(&info->finished_lock);
-			list_add_tail(&cmd->bs_list, &info->finished_list);
-			pthread_mutex_unlock(&info->finished_lock);
+			pthread_mutex_lock(&finished_lock);
+			list_add_tail(&cmd->bs_list, &finished_list);
+			pthread_mutex_unlock(&finished_lock);
 
-			pthread_cond_signal(&info->finished_cond);
+			pthread_cond_signal(&finished_cond);
 		} else {
 			pthread_mutex_lock(&sig_finished_lock);
 			list_add_tail(&cmd->bs_list, &sig_finished_list);
@@ -229,80 +231,116 @@ static void *bs_thread_worker_fn(void *arg)
 	pthread_exit(NULL);
 }
 
-static void bs_init(void)
+static int bs_init_signalfd(void)
 {
-	static int done = 0;
 	sigset_t mask;
 	int ret;
 
-	if (done)
-		return;
-	done++;
-
 	sigemptyset(&mask);
 	sigaddset(&mask, SIGUSR2);
 	sigprocmask(SIG_BLOCK, &mask, NULL);
 
 	sig_fd = __signalfd(-1, &mask, 0);
 	if (sig_fd < 0)
-		return;
+		return 1;
 
 	ret = tgt_event_add(sig_fd, EPOLLIN, bs_sig_request_done, NULL);
 	if (ret < 0) {
 		close (sig_fd);
 		sig_fd = -1;
+
+		return 1;
 	}
 
 	pthread_mutex_init(&sig_finished_lock, NULL);
+
+	return 0;
 }
 
-int bs_thread_open(struct bs_thread_info *info, request_func_t *rfn,
-		   int nr_threads)
+static int bs_init_notify_thread(void)
 {
-	int i, ret;
-
-	bs_init();
-
-	info->request_fn = rfn;
-
-	INIT_LIST_HEAD(&info->ack_list);
-	INIT_LIST_HEAD(&info->finished_list);
-	INIT_LIST_HEAD(&info->pending_list);
-
-	pthread_cond_init(&info->finished_cond, NULL);
-	pthread_cond_init(&info->pending_cond, NULL);
+	int ret;
 
-	pthread_mutex_init(&info->finished_lock, NULL);
-	pthread_mutex_init(&info->pending_lock, NULL);
-	pthread_mutex_init(&info->startup_lock, NULL);
+	pthread_cond_init(&finished_cond, NULL);
+	pthread_mutex_init(&finished_lock, NULL);
 
-	ret = pipe(info->command_fd);
+	ret = pipe(command_fd);
 	if (ret) {
 		eprintf("failed to create command pipe, %m\n");
 		goto destroy_cond_mutex;
 	}
 
-	ret = pipe(info->done_fd);
+	ret = pipe(done_fd);
 	if (ret) {
 		eprintf("failed to done command pipe, %m\n");
 		goto close_command_fd;
 	}
 
-	if (sig_fd < 0) {
-		ret = tgt_event_add(info->done_fd[0], EPOLLIN, bs_thread_request_done,
-				    info);
-		if (ret) {
-			eprintf("failed to add epoll event\n");
-			goto close_done_fd;
-		}
+	ret = tgt_event_add(done_fd[0], EPOLLIN, bs_thread_request_done, NULL);
+	if (ret) {
+		eprintf("failed to add epoll event\n");
+		goto close_done_fd;
 	}
 
-	ret = pthread_create(&info->ack_thread, NULL, bs_thread_ack_fn, info);
+	ret = pthread_create(&ack_thread, NULL, bs_thread_ack_fn, NULL);
 	if (ret) {
 		eprintf("failed to create an ack thread, %s\n", strerror(ret));
 		goto event_del;
 	}
 
+	ret = write(command_fd[1], &ret, sizeof(ret));
+	if (ret <= 0)
+		goto event_del;
+
+	return 0;
+event_del:
+	tgt_event_del(done_fd[0]);
+
+close_done_fd:
+	close(done_fd[0]);
+	close(done_fd[1]);
+close_command_fd:
+	close(command_fd[0]);
+	close(command_fd[1]);
+destroy_cond_mutex:
+	pthread_cond_destroy(&finished_cond);
+	pthread_mutex_destroy(&finished_lock);
+
+	return 1;
+}
+
+int bs_init(void)
+{
+	int ret;
+
+	ret = bs_init_signalfd();
+	if (!ret) {
+		eprintf("use signalfd notification\n");
+		return 0;
+	}
+
+	ret = bs_init_notify_thread();
+	if (!ret) {
+		eprintf("use pthread notification\n");
+		return 0;
+	}
+
+	return 1;
+}
+
+int bs_thread_open(struct bs_thread_info *info, request_func_t *rfn,
+		   int nr_threads)
+{
+	int i, ret;
+
+	info->request_fn = rfn;
+
+	INIT_LIST_HEAD(&info->pending_list);
+
+	pthread_cond_init(&info->pending_cond, NULL);
+	pthread_mutex_init(&info->pending_lock, NULL);
+	pthread_mutex_init(&info->startup_lock, NULL);
+
 	if (nr_threads > ARRAY_SIZE(info->worker_thread)) {
 		eprintf("too many threads %d\n", nr_threads);
 		nr_threads = ARRAY_SIZE(info->worker_thread);
@@ -321,41 +359,18 @@ int bs_thread_open(struct bs_thread_info *info, request_func_t *rfn,
 		}
 	}
 	pthread_mutex_unlock(&info->startup_lock);
-rewrite:
-	ret = write(info->command_fd[1], &ret, sizeof(ret));
-	if (ret < 0) {
-		eprintf("can't write done, %m\n");
-		if (errno == EAGAIN || errno == EINTR)
-			goto rewrite;
-	}
 
 	return 0;
 destroy_threads:
 	info->stop = 1;
-	write(info->command_fd[1], &ret, sizeof(ret));
-	pthread_join(info->ack_thread, NULL);
-
-	eprintf("stopped the ack thread\n");
 
 	pthread_mutex_unlock(&info->startup_lock);
 	for (; i > 0; i--) {
 		pthread_join(info->worker_thread[i - 1], NULL);
 		eprintf("stopped the worker thread %d\n", i - 1);
 	}
-event_del:
-	if (sig_fd < 0)
-		tgt_event_del(info->done_fd[0]);
 
-close_done_fd:
-	close(info->done_fd[0]);
-	close(info->done_fd[1]);
-close_command_fd:
-	close(info->command_fd[0]);
-	close(info->command_fd[1]);
-destroy_cond_mutex:
-	pthread_cond_destroy(&info->finished_cond);
 	pthread_cond_destroy(&info->pending_cond);
-	pthread_mutex_destroy(&info->finished_lock);
 	pthread_mutex_destroy(&info->pending_lock);
 	pthread_mutex_destroy(&info->startup_lock);
 
@@ -366,9 +381,6 @@ void bs_thread_close(struct bs_thread_info *info)
 {
 	int i;
 
-	pthread_cancel(info->ack_thread);
-	pthread_join(info->ack_thread, NULL);
-
 	info->stop = 1;
 	pthread_cond_broadcast(&info->pending_cond);
 
@@ -376,21 +388,10 @@ void bs_thread_close(struct bs_thread_info *info)
 		     i < ARRAY_SIZE(info->worker_thread); i++)
 		pthread_join(info->worker_thread[i], NULL);
 
-	pthread_cond_destroy(&info->finished_cond);
 	pthread_cond_destroy(&info->pending_cond);
-	pthread_mutex_destroy(&info->finished_lock);
 	pthread_mutex_destroy(&info->pending_lock);
 	pthread_mutex_destroy(&info->startup_lock);
 
-	if (sig_fd < 0)
-		tgt_event_del(info->done_fd[0]);
-
-	close(info->done_fd[0]);
-	close(info->done_fd[1]);
-
-	close(info->command_fd[0]);
-	close(info->command_fd[1]);
-
 	info->stop = 0;
 }
 
diff --git a/usr/bs_thread.h b/usr/bs_thread.h
index 9dfbbd5..d460032 100644
--- a/usr/bs_thread.h
+++ b/usr/bs_thread.h
@@ -3,16 +3,8 @@
 typedef void (request_func_t) (struct scsi_cmd *);
 
 struct bs_thread_info {
-	pthread_t ack_thread;
 	pthread_t worker_thread[NR_WORKER_THREADS];
 
-	/* protected by pipe */
-	struct list_head ack_list;
-
-	pthread_cond_t finished_cond;
-	pthread_mutex_t finished_lock;
-	struct list_head finished_list;
-
 	/* wokers sleep on this and signaled by tgtd */
 	pthread_cond_t pending_cond;
 	/* locked by tgtd and workers */
@@ -22,9 +14,6 @@ struct bs_thread_info {
 
 	pthread_mutex_t startup_lock;
 
-	int command_fd[2];
-	int done_fd[2];
-
 	int stop;
 
 	request_func_t *request_fn;
diff --git a/usr/tgtd.c b/usr/tgtd.c
index c386281..f0a158f 100644
--- a/usr/tgtd.c
+++ b/usr/tgtd.c
@@ -445,6 +445,8 @@ int main(int argc, char **argv)
 		}
 	}
 
+	bs_init();
+
 	event_loop();
 
 	lld_exit();
diff --git a/usr/tgtd.h b/usr/tgtd.h
index cc2c468..4467bc7 100644
--- a/usr/tgtd.h
+++ b/usr/tgtd.h
@@ -302,6 +302,8 @@ extern struct backingstore_template *get_backingstore_template(const char *name)
 
 extern int setup_param(char *name, int (*parser)(char *));
 
+extern int bs_init(void);
+
 struct event_data {
 	union {
 		event_handler_t handler;
-- 
1.6.5

--
To unsubscribe from this list: send the line "unsubscribe stgt" in
the body of a message to majordomo at vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html



More information about the stgt mailing list