[sheepdog] [PATCH 4/5] work: clean up workqueue

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Mon Jul 2 20:32:15 CEST 2012


Currently, wi->nr_threads is always 0 or 1, so we can simplify the
code.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/cluster/accord.c |    2 +-
 sheep/sheep.c          |   10 +++---
 sheep/work.c           |   72 ++++++++++++++++++++---------------------------
 sheep/work.h           |    9 ++---
 4 files changed, 41 insertions(+), 52 deletions(-)

diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index 12dc687..a0f1d00 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -545,7 +545,7 @@ static int accord_init(const char *option)
 		return -1;
 	}
 
-	acrd_wq = init_work_queue("accord", 1);
+	acrd_wq = init_work_queue("accord", true);
 	if (!acrd_wq) {
 		eprintf("failed to create accord workqueue: %m\n");
 		return -1;
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 933ecd5..7d1e853 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -247,11 +247,11 @@ int main(int argc, char **argv)
 
 	local_req_init();
 
-	sys->gateway_wqueue = init_work_queue("gateway", 0);
-	sys->io_wqueue = init_work_queue("io", 0);
-	sys->recovery_wqueue = init_work_queue("recovery", 1);
-	sys->deletion_wqueue = init_work_queue("deletion", 1);
-	sys->block_wqueue = init_work_queue("block", 1);
+	sys->gateway_wqueue = init_work_queue("gateway", false);
+	sys->io_wqueue = init_work_queue("io", false);
+	sys->recovery_wqueue = init_work_queue("recovery", true);
+	sys->deletion_wqueue = init_work_queue("deletion", true);
+	sys->block_wqueue = init_work_queue("block", true);
 	if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
 	    !sys->deletion_wqueue || !sys->block_wqueue)
 		exit(1);
diff --git a/sheep/work.c b/sheep/work.c
index 708f88c..9deac01 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -15,6 +15,7 @@
 #include <errno.h>
 #include <string.h>
 #include <inttypes.h>
+#include <stdbool.h>
 #include <pthread.h>
 #include <stdio.h>
 #include <unistd.h>
@@ -91,15 +92,14 @@ void queue_work(struct work_queue *q, struct work *work)
 {
 	struct worker_info *wi = container_of(q, struct worker_info, q);
 
-	if (!wi->nr_threads) {
-		create_short_thread(wi, work);
-		return;
-	}
-	pthread_mutex_lock(&wi->pending_lock);
-	list_add_tail(&work->w_list, &wi->q.pending_list);
-	pthread_mutex_unlock(&wi->pending_lock);
+	if (wi->ordered) {
+		pthread_mutex_lock(&wi->pending_lock);
+		list_add_tail(&work->w_list, &wi->q.pending_list);
+		pthread_mutex_unlock(&wi->pending_lock);
 
-	pthread_cond_signal(&wi->pending_cond);
+		pthread_cond_signal(&wi->pending_cond);
+	} else
+		create_short_thread(wi, work);
 }
 
 static void bs_thread_request_done(int fd, int events, void *data)
@@ -133,16 +133,8 @@ static void *worker_routine(void *arg)
 	struct worker_info *wi = arg;
 	struct work *work;
 	eventfd_t value = 1;
-	int i, uninitialized_var(idx);
-
-	for (i = 0; i < wi->nr_threads; i++) {
-		if (wi->worker_thread[i] == pthread_self()) {
-			idx = i;
-			break;
-		}
-	}
 
-	set_thread_name(wi->name, idx);
+	set_thread_name(wi->name, 0);
 
 	pthread_mutex_lock(&wi->startup_lock);
 	/* started this thread */
@@ -203,58 +195,56 @@ static int init_eventfd(void)
 	return 0;
 }
 
-struct work_queue *init_work_queue(const char *name, int nr)
+struct work_queue *init_work_queue(const char *name, bool ordered)
 {
-	int i, ret;
+	int ret;
 	struct worker_info *wi;
 
 	ret = init_eventfd();
 	if (ret)
 		return NULL;
 
-	wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
+	wi = zalloc(sizeof(*wi));
 	if (!wi)
 		return NULL;
 
 	wi->name = name;
-	wi->nr_threads = nr;
+	wi->ordered = ordered;
 
-	INIT_LIST_HEAD(&wi->q.pending_list);
-	INIT_LIST_HEAD(&wi->q.blocked_list);
 	INIT_LIST_HEAD(&wi->finished_list);
 
-	pthread_cond_init(&wi->pending_cond, NULL);
-
 	pthread_mutex_init(&wi->finished_lock, NULL);
-	pthread_mutex_init(&wi->pending_lock, NULL);
-	pthread_mutex_init(&wi->startup_lock, NULL);
 
-	pthread_mutex_lock(&wi->startup_lock);
-	for (i = 0; i < wi->nr_threads; i++) {
-		ret = pthread_create(&wi->worker_thread[i], NULL,
-				     worker_routine, wi);
+	if (ordered) {
+		INIT_LIST_HEAD(&wi->q.pending_list);
+
+		pthread_cond_init(&wi->pending_cond, NULL);
+		pthread_mutex_init(&wi->pending_lock, NULL);
+		pthread_mutex_init(&wi->startup_lock, NULL);
 
+		pthread_mutex_lock(&wi->startup_lock);
+
+		ret = pthread_create(&wi->worker_thread, NULL, worker_routine,
+				     wi);
 		if (ret) {
-			eprintf("failed to create worker thread #%d: %s\n",
-				i, strerror(ret));
-			if (ret)
-				goto destroy_threads;
+			eprintf("failed to create worker thread: %s\n",
+				strerror(ret));
+			goto destroy_threads;
 		}
+
+		pthread_mutex_unlock(&wi->startup_lock);
 	}
-	pthread_mutex_unlock(&wi->startup_lock);
 
 	list_add(&wi->worker_info_siblings, &worker_info_list);
 
-	total_nr_workers += nr;
+	total_nr_workers++;
 	return &wi->q;
 destroy_threads:
 
 	wi->q.wq_state |= WQ_DEAD;
 	pthread_mutex_unlock(&wi->startup_lock);
-	for (; i > 0; i--) {
-		pthread_join(wi->worker_thread[i - 1], NULL);
-		eprintf("stopped worker thread #%d\n", i - 1);
-	}
+	pthread_join(wi->worker_thread, NULL);
+	eprintf("stopped worker thread\n");
 
 /* destroy_cond_mutex: */
 	pthread_cond_destroy(&wi->pending_cond);
diff --git a/sheep/work.h b/sheep/work.h
index 8e0beba..3634560 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -14,9 +14,7 @@ struct work {
 
 struct work_queue {
 	int wq_state;
-	int nr_active;
 	struct list_head pending_list;
-	struct list_head blocked_list;
 };
 
 struct worker_info {
@@ -24,7 +22,7 @@ struct worker_info {
 
 	struct list_head worker_info_siblings;
 
-	int nr_threads;
+	bool ordered;
 
 	pthread_mutex_t finished_lock;
 	struct list_head finished_list;
@@ -38,13 +36,14 @@ struct worker_info {
 
 	pthread_mutex_t startup_lock;
 
-	pthread_t worker_thread[0];
+	pthread_t worker_thread; /* used for an ordered work queue */
 };
 
 extern struct list_head worker_info_list;
 extern int total_nr_workers;
 
-struct work_queue *init_work_queue(const char *name, int nr);
+/* if 'ordered' is true, the work queue are processes in order. */
+struct work_queue *init_work_queue(const char *name, bool ordered);
 void queue_work(struct work_queue *q, struct work *work);
 
 #endif
-- 
1.7.2.5




More information about the sheepdog mailing list