[sheepdog] [PATCH 1/3] work: remove short thread

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Mon Oct 22 06:31:03 CEST 2012


This is a preparation for a dynamic worker thread pool.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 include/util.h |    2 +-
 lib/logger.c   |    4 +-
 sheep/work.c   |  126 +++++++++++++++++--------------------------------------
 sheep/work.h   |    3 +-
 4 files changed, 44 insertions(+), 91 deletions(-)

diff --git a/include/util.h b/include/util.h
index d0bf659..9197c76 100644
--- a/include/util.h
+++ b/include/util.h
@@ -38,7 +38,7 @@
 
 #define notrace __attribute__((no_instrument_function))
 
-#define uninitialized_var(x) (x = x)
+#define uninitialized_var(x) x = x
 
 static inline int before(uint32_t seq1, uint32_t seq2)
 {
diff --git a/lib/logger.c b/lib/logger.c
index aa1c012..7158b65 100644
--- a/lib/logger.c
+++ b/lib/logger.c
@@ -156,7 +156,7 @@ static notrace int log_vsnprintf(char *buff, size_t size, int prio,
 {
 	char *p = buff;
 
-	if (worker_name && worker_idx)
+	if (worker_name && worker_idx >= 0)
 		snprintf(p, size, "[%s %d] ", worker_name, worker_idx);
 	else if (worker_name)
 		snprintf(p, size, "[%s] ", worker_name);
@@ -466,7 +466,7 @@ notrace void set_thread_name(const char *name, int idx)
 
 notrace void get_thread_name(char *name)
 {
-	if (worker_name && worker_idx)
+	if (worker_name && worker_idx >= 0)
 		sprintf(name, "%s %d", worker_name, worker_idx);
 	else if (worker_name)
 		sprintf(name, "%s", worker_name);
diff --git a/sheep/work.c b/sheep/work.c
index 0ef169d..5678223 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -25,7 +25,6 @@
 #include <sys/types.h>
 #include <sys/eventfd.h>
 #include <linux/types.h>
-#include <urcu/uatomic.h>
 
 #include "list.h"
 #include "util.h"
@@ -34,6 +33,8 @@
 #include "event.h"
 #include "trace/trace.h"
 
+#define NR_WORKER_THREADS 4
+
 static int efd;
 int total_ordered_workers;
 LIST_HEAD(worker_info_list);
@@ -42,75 +43,15 @@ enum wq_state {
 	WQ_DEAD = (1U << 1),
 };
 
-/*
- * Short thread is created on demand and destroyed after serving the work for
- * gateway or io requests, aiming to solve two problems:
- *
- *  1. timeout of IO requests from guests. With on-demand short threads, we
- *     guarantee that there is always one thread available to execute the
- *     request as soon as possible.
- *  2. sheep halt for corner case that all gateway and io threads are executing
- *     local requests that ask for creation of another thread to execute the
- *     requests and sleep-wait for responses.
- */
-struct short_work {
-	struct work *work;
-	struct worker_info *wi;
-};
-
-static void *run_short_thread(void *arg)
-{
-	struct short_work *sw = arg;
-	eventfd_t value = 1;
-	static unsigned long idx;
-	int err;
-
-	/* Tell runtime to release resources after termination */
-	err = pthread_detach(pthread_self());
-	if (err)
-		panic("%s\n", strerror(err));
-
-	set_thread_name(sw->wi->name, uatomic_add_return(&idx, 1));
-
-	sw->work->fn(sw->work);
-
-	pthread_mutex_lock(&sw->wi->finished_lock);
-	list_add_tail(&sw->work->w_list, &sw->wi->finished_list);
-	pthread_mutex_unlock(&sw->wi->finished_lock);
-
-	eventfd_write(efd, value);
-	free(sw);
-	pthread_exit(NULL);
-}
-
-static inline void create_short_thread(struct worker_info *wi,
-				       struct work *work)
-{
-	pthread_t thread;
-	struct short_work *sw = xmalloc(sizeof *sw);
-	int err;
-
-	sw->work = work;
-	sw->wi = wi;
-
-	err = pthread_create(&thread, NULL, run_short_thread, sw);
-	if (err)
-		panic("%s\n", strerror(err));
-	short_thread_begin();
-}
-
 void queue_work(struct work_queue *q, struct work *work)
 {
 	struct worker_info *wi = container_of(q, struct worker_info, q);
 
-	if (wi->ordered) {
-		pthread_mutex_lock(&wi->pending_lock);
-		list_add_tail(&work->w_list, &wi->q.pending_list);
-		pthread_mutex_unlock(&wi->pending_lock);
+	pthread_mutex_lock(&wi->pending_lock);
+	list_add_tail(&work->w_list, &wi->q.pending_list);
+	pthread_mutex_unlock(&wi->pending_lock);
 
-		pthread_cond_signal(&wi->pending_cond);
-	} else
-		create_short_thread(wi, work);
+	pthread_cond_signal(&wi->pending_cond);
 }
 
 static void bs_thread_request_done(int fd, int events, void *data)
@@ -135,8 +76,6 @@ static void bs_thread_request_done(int fd, int events, void *data)
 			list_del(&work->w_list);
 
 			work->done(work);
-			if (!wi->ordered)
-				short_thread_end();
 		}
 	}
 }
@@ -146,8 +85,19 @@ static void *worker_routine(void *arg)
 	struct worker_info *wi = arg;
 	struct work *work;
 	eventfd_t value = 1;
+	int i, uninitialized_var(idx);
+
+	for (i = 0; i < wi->nr_threads; i++) {
+		if (wi->worker_thread[i] == pthread_self()) {
+			idx = i;
+			break;
+		}
+	}
 
-	set_thread_name(wi->name, 0);
+	if (wi->ordered)
+		set_thread_name(wi->name, -1);
+	else
+		set_thread_name(wi->name, idx);
 
 	pthread_mutex_lock(&wi->startup_lock);
 	/* started this thread */
@@ -212,44 +162,42 @@ static int init_eventfd(void)
 
 struct work_queue *init_work_queue(const char *name, bool ordered)
 {
-	int ret;
+	int i, ret, nr;
 	struct worker_info *wi;
 
 	ret = init_eventfd();
 	if (ret)
 		return NULL;
 
-	wi = zalloc(sizeof(*wi));
+	nr = ordered ? 1 : NR_WORKER_THREADS;
+	wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
 	if (!wi)
 		return NULL;
 
 	wi->name = name;
 	wi->ordered = ordered;
+	wi->nr_threads = nr;
 
+	INIT_LIST_HEAD(&wi->q.pending_list);
 	INIT_LIST_HEAD(&wi->finished_list);
 
-	pthread_mutex_init(&wi->finished_lock, NULL);
-
-	if (ordered) {
-		INIT_LIST_HEAD(&wi->q.pending_list);
+	pthread_cond_init(&wi->pending_cond, NULL);
 
-		pthread_cond_init(&wi->pending_cond, NULL);
-		pthread_mutex_init(&wi->pending_lock, NULL);
-		pthread_mutex_init(&wi->startup_lock, NULL);
-
-		pthread_mutex_lock(&wi->startup_lock);
+	pthread_mutex_init(&wi->finished_lock, NULL);
+	pthread_mutex_init(&wi->pending_lock, NULL);
+	pthread_mutex_init(&wi->startup_lock, NULL);
 
-		ret = pthread_create(&wi->worker_thread, NULL, worker_routine,
-				     wi);
+	pthread_mutex_lock(&wi->startup_lock);
+	for (i = 0; i < wi->nr_threads; i++) {
+		ret = pthread_create(&wi->worker_thread[i], NULL,
+				     worker_routine, wi);
 		if (ret) {
 			eprintf("failed to create worker thread: %s\n",
 				strerror(ret));
 			goto destroy_threads;
 		}
-
-		pthread_mutex_unlock(&wi->startup_lock);
-		total_ordered_workers++;
 	}
+	pthread_mutex_unlock(&wi->startup_lock);
 
 	list_add(&wi->worker_info_siblings, &worker_info_list);
 
@@ -258,8 +206,10 @@ destroy_threads:
 
 	wi->q.wq_state |= WQ_DEAD;
 	pthread_mutex_unlock(&wi->startup_lock);
-	pthread_join(wi->worker_thread, NULL);
-	eprintf("stopped worker thread\n");
+	for (; i > 0; i--) {
+		pthread_join(wi->worker_thread[i - 1], NULL);
+		eprintf("stopped worker thread #%d\n", i - 1);
+	}
 
 /* destroy_cond_mutex: */
 	pthread_cond_destroy(&wi->pending_cond);
@@ -273,12 +223,14 @@ destroy_threads:
 #ifdef COMPILE_UNUSED_CODE
 static void exit_work_queue(struct work_queue *q)
 {
+	int i;
 	struct worker_info *wi = container_of(q, struct worker_info, q);
 
 	q->wq_state |= WQ_DEAD;
 	pthread_cond_broadcast(&wi->pending_cond);
 
-	pthread_join(wi->worker_thread, NULL);
+	for (i = 0; wi->worker_thread[i] && i < wi->nr_threads; i++)
+		pthread_join(wi->worker_thread[i], NULL);
 
 	pthread_cond_destroy(&wi->pending_cond);
 	pthread_mutex_destroy(&wi->pending_lock);
diff --git a/sheep/work.h b/sheep/work.h
index 4d45dd6..e57c6d0 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -38,7 +38,8 @@ struct worker_info {
 
 	pthread_mutex_t startup_lock;
 
-	pthread_t worker_thread; /* used for an ordered work queue */
+	size_t nr_threads;
+	pthread_t worker_thread[0];
 };
 
 extern struct list_head worker_info_list;
-- 
1.7.2.5




More information about the sheepdog mailing list