[sheepdog] [PATCH 2/3] work: enlarge the number of worker threads dynamically

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Mon Oct 22 06:31:04 CEST 2012


Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/work.c |   65 ++++++++++++++++++++++++++++++++++++++++++++-------------
 sheep/work.h |    2 +
 2 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/sheep/work.c b/sheep/work.c
index 5678223..b197bb2 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -33,7 +33,8 @@
 #include "event.h"
 #include "trace/trace.h"
 
-#define NR_WORKER_THREADS 4
+#define NR_MIN_WORKER_THREADS 4
+#define NR_MAX_WORKER_THREADS 16384
 
 static int efd;
 int total_ordered_workers;
@@ -43,11 +44,46 @@ enum wq_state {
 	WQ_DEAD = (1U << 1),
 };
 
+static void *worker_routine(void *arg);
+
+static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
+{
+	int ret;
+
+	nr_threads = min(nr_threads, (size_t)NR_MAX_WORKER_THREADS);
+
+	pthread_mutex_lock(&wi->startup_lock);
+	if (nr_threads <= wi->nr_threads) {
+		pthread_mutex_unlock(&wi->startup_lock);
+		return 0;
+	}
+	while (wi->nr_threads < nr_threads) {
+		wi->nr_threads++;
+		ret = pthread_create(&wi->worker_thread[wi->nr_threads - 1],
+				     NULL, worker_routine, wi);
+		if (ret != 0) {
+			eprintf("failed to create worker thread: %m\n");
+			wi->nr_threads--;
+			pthread_mutex_unlock(&wi->startup_lock);
+			return -1;
+		}
+		dprintf("create thread %s %zd\n", wi->name, wi->nr_threads);
+	}
+	pthread_mutex_unlock(&wi->startup_lock);
+
+	return 0;
+}
+
 void queue_work(struct work_queue *q, struct work *work)
 {
 	struct worker_info *wi = container_of(q, struct worker_info, q);
 
 	pthread_mutex_lock(&wi->pending_lock);
+	wi->nr_pending++;
+
+	if (!wi->ordered && wi->nr_threads < wi->nr_pending + wi->nr_running)
+		create_worker_threads(wi, wi->nr_threads * 2);
+
 	list_add_tail(&work->w_list, &wi->q.pending_list);
 	pthread_mutex_unlock(&wi->pending_lock);
 
@@ -103,19 +139,26 @@ static void *worker_routine(void *arg)
 	/* started this thread */
 	pthread_mutex_unlock(&wi->startup_lock);
 
+	pthread_mutex_lock(&wi->pending_lock);
+	wi->nr_running++;
+	pthread_mutex_unlock(&wi->pending_lock);
+
 	while (!(wi->q.wq_state & WQ_DEAD)) {
 
 		pthread_mutex_lock(&wi->pending_lock);
 retest:
 		if (list_empty(&wi->q.pending_list)) {
+			wi->nr_running--;
 			pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
 			if (wi->q.wq_state & WQ_DEAD) {
 				pthread_mutex_unlock(&wi->pending_lock);
 				pthread_exit(NULL);
 			}
+			wi->nr_running++;
 			goto retest;
 		}
 
+		wi->nr_pending--;
 		work = list_first_entry(&wi->q.pending_list,
 				       struct work, w_list);
 
@@ -169,14 +212,13 @@ struct work_queue *init_work_queue(const char *name, bool ordered)
 	if (ret)
 		return NULL;
 
-	nr = ordered ? 1 : NR_WORKER_THREADS;
+	nr = ordered ? 1 : NR_MAX_WORKER_THREADS;
 	wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
 	if (!wi)
 		return NULL;
 
 	wi->name = name;
 	wi->ordered = ordered;
-	wi->nr_threads = nr;
 
 	INIT_LIST_HEAD(&wi->q.pending_list);
 	INIT_LIST_HEAD(&wi->finished_list);
@@ -187,17 +229,10 @@ struct work_queue *init_work_queue(const char *name, bool ordered)
 	pthread_mutex_init(&wi->pending_lock, NULL);
 	pthread_mutex_init(&wi->startup_lock, NULL);
 
-	pthread_mutex_lock(&wi->startup_lock);
-	for (i = 0; i < wi->nr_threads; i++) {
-		ret = pthread_create(&wi->worker_thread[i], NULL,
-				     worker_routine, wi);
-		if (ret) {
-			eprintf("failed to create worker thread: %s\n",
-				strerror(ret));
-			goto destroy_threads;
-		}
-	}
-	pthread_mutex_unlock(&wi->startup_lock);
+	nr = ordered ? 1 : NR_MIN_WORKER_THREADS;
+	ret = create_worker_threads(wi, nr);
+	if (ret < 0)
+		goto destroy_threads;
 
 	list_add(&wi->worker_info_siblings, &worker_info_list);
 
@@ -206,7 +241,7 @@ destroy_threads:
 
 	wi->q.wq_state |= WQ_DEAD;
 	pthread_mutex_unlock(&wi->startup_lock);
-	for (; i > 0; i--) {
+	for (i = wi->nr_threads; i > 0; i--) {
 		pthread_join(wi->worker_thread[i - 1], NULL);
 		eprintf("stopped worker thread #%d\n", i - 1);
 	}
diff --git a/sheep/work.h b/sheep/work.h
index e57c6d0..e31b3a3 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -35,6 +35,8 @@ struct worker_info {
 	pthread_mutex_t pending_lock;
 	/* protected by pending_lock */
 	struct work_queue q;
+	size_t nr_pending;
+	size_t nr_running;
 
 	pthread_mutex_t startup_lock;
 
-- 
1.7.2.5




More information about the sheepdog mailing list