[sheepdog] [PATCH 3/4] work: let worker threads start with pipe()

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Tue Oct 22 09:13:12 CEST 2013


Current work.c uses mutex for starting worker threads
(startup_lock). The intention of the code is hard to understand. In
addition, the code doesn't destroy newly created threads when it fails
creating or growing work queues.

This patch refactors the code and implement correct cleaning of
threads.

Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
 lib/work.c |   59 +++++++++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 43 insertions(+), 16 deletions(-)

diff --git a/lib/work.c b/lib/work.c
index d8e154e..57b38dd 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -55,10 +55,11 @@ struct worker_info {
 	const char *name;
 
 	struct list_head finished_list;
+	pthread_mutex_t finished_lock;
+
 	struct list_node worker_info_siblings;
 
-	pthread_mutex_t finished_lock;
-	pthread_mutex_t startup_lock;
+	int startup_fds[2];
 
 	/* wokers sleep on this and signaled by tgtd */
 	pthread_cond_t pending_cond;
@@ -141,22 +142,33 @@ static bool wq_need_shrink(struct worker_info *wi)
 static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
 {
 	pthread_t thread;
-	int ret;
+	int ret = 0, created;
+
+	created = wi->nr_threads; /* save previous number of threads */
 
-	pthread_mutex_lock(&wi->startup_lock);
 	while (wi->nr_threads < nr_threads) {
 		ret = pthread_create(&thread, NULL, worker_routine, wi);
 		if (ret != 0) {
 			sd_err("failed to create worker thread: %m");
-			pthread_mutex_unlock(&wi->startup_lock);
-			return -1;
+			ret = -1;
 		}
 		wi->nr_threads++;
 		sd_debug("create thread %s %zu", wi->name, wi->nr_threads);
 	}
-	pthread_mutex_unlock(&wi->startup_lock);
 
-	return 0;
+	created = wi->nr_threads - created;
+	for (int i = 0; i < created; i++) {
+		int wbytes;
+
+		wbytes = write(wi->startup_fds[1], &ret, sizeof(ret));
+		if (wbytes != sizeof(ret))
+			panic("starting worker threads failed");
+	}
+
+	if (ret == -1)
+		wi->nr_threads -= created;
+
+	return ret;
 }
 
 #ifdef ENABLE_TRACE
@@ -263,12 +275,20 @@ static void *worker_routine(void *arg)
 	struct worker_info *wi = arg;
 	struct work *work;
 	int tid = gettid();
+	int start, ret;
 
 	set_thread_name(wi->name, (wi->tc != WQ_ORDERED));
 
-	pthread_mutex_lock(&wi->startup_lock);
-	/* started this thread */
-	pthread_mutex_unlock(&wi->startup_lock);
+	ret = read(wi->startup_fds[0], &start, sizeof(start));
+	if (ret != sizeof(start))
+		panic("starting worker thread failed");
+
+	if (start == -1) {
+		sd_err("starting thread: %s is canceled", wi->name);
+		pthread_exit(NULL);
+	}
+
+	assert(start == 0);
 
 #ifdef ENABLE_TRACE
 
@@ -402,22 +422,29 @@ struct work_queue *create_work_queue(const char *name,
 
 	pthread_mutex_init(&wi->finished_lock, NULL);
 	pthread_mutex_init(&wi->pending_lock, NULL);
-	pthread_mutex_init(&wi->startup_lock, NULL);
+
+	ret = pipe(wi->startup_fds);
+	if (ret < 0) {
+		sd_err("creating pipe() for startup failed");
+		goto free_wi;
+	}
 
 	ret = create_worker_threads(wi, 1);
 	if (ret < 0)
-		goto destroy_threads;
+		goto destroy_mutexes;
 
 	list_add(&wi->worker_info_siblings, &worker_info_list);
 
 	return &wi->q;
-destroy_threads:
-	pthread_mutex_unlock(&wi->startup_lock);
+
+destroy_mutexes:
 	pthread_cond_destroy(&wi->pending_cond);
 	pthread_mutex_destroy(&wi->pending_lock);
-	pthread_mutex_destroy(&wi->startup_lock);
 	pthread_mutex_destroy(&wi->finished_lock);
 
+free_wi:
+	free(wi);
+
 	return NULL;
 }
 
-- 
1.7.10.4




More information about the sheepdog mailing list