[sheepdog] [PATCH] collie: allow calling queue_work() in done()

MORITA Kazutaka morita.kazutaka at gmail.com
Wed May 15 18:38:10 CEST 2013


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

Currently, if we call queue_work() in the second event_loop(),
work_queue_wait() cannot wait for completion of the work.

This adds a field 'nr_works' to the worker_info and makes
work_queue_empty() return true only if both worker_routine() and
worker_thread_request_done() have no outstanding works.  I think this
removes a race condition in work_queue_wait() completely.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/common.c |    7 -------
 lib/work.c      |   25 +++++++++----------------
 2 files changed, 9 insertions(+), 23 deletions(-)

diff --git a/collie/common.c b/collie/common.c
index 0646587..e4d33e5 100644
--- a/collie/common.c
+++ b/collie/common.c
@@ -249,13 +249,6 @@ void confirm(const char *message)
 
 void work_queue_wait(struct work_queue *q)
 {
-	assert(is_main_thread());
-
 	while (!work_queue_empty(q))
 		event_loop(-1);
-	/*
-	 * We have to call event_loop() again because some works are remained in
-	 * the finished list.
-	 */
-	event_loop(-1);
 }
diff --git a/lib/work.c b/lib/work.c
index cb8879d..61c1197 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -55,9 +55,11 @@ struct worker_info {
 	pthread_mutex_t pending_lock;
 	/* protected by pending_lock */
 	struct work_queue q;
-	size_t nr_pending;
-	size_t nr_running;
 	size_t nr_threads;
+
+	/* protected by uatomic primitives */
+	size_t nr_works;
+
 	/* we cannot shrink work queue till this time */
 	uint64_t tm_end_of_protection;
 	enum wq_thread_control tc;
@@ -102,7 +104,7 @@ static inline uint64_t wq_get_roof(struct worker_info *wi)
 
 static bool wq_need_grow(struct worker_info *wi)
 {
-	if (wi->nr_threads < wi->nr_pending + wi->nr_running &&
+	if (wi->nr_threads < uatomic_read(&wi->nr_works) &&
 	    wi->nr_threads * 2 <= wq_get_roof(wi)) {
 		wi->tm_end_of_protection = get_msec_time() +
 			WQ_PROTECTION_PERIOD;
@@ -118,7 +120,7 @@ static bool wq_need_grow(struct worker_info *wi)
  */
 static bool wq_need_shrink(struct worker_info *wi)
 {
-	if (wi->nr_pending + wi->nr_running <= wi->nr_threads / 2)
+	if (uatomic_read(&wi->nr_works) < wi->nr_threads / 2)
 		/* we cannot shrink work queue during protection period. */
 		return wi->tm_end_of_protection <= get_msec_time();
 
@@ -154,8 +156,8 @@ void queue_work(struct work_queue *q, struct work *work)
 {
 	struct worker_info *wi = container_of(q, struct worker_info, q);
 
+	uatomic_inc(&wi->nr_works);
 	pthread_mutex_lock(&wi->pending_lock);
-	wi->nr_pending++;
 
 	if (wq_need_grow(wi))
 		/* double the thread pool size */
@@ -192,6 +194,7 @@ static void worker_thread_request_done(int fd, int events, void *data)
 			list_del(&work->w_list);
 
 			work->done(work);
+			uatomic_dec(&wi->nr_works);
 		}
 	}
 }
@@ -209,14 +212,12 @@ static void *worker_routine(void *arg)
 	pthread_mutex_unlock(&wi->startup_lock);
 
 	pthread_mutex_lock(&wi->pending_lock);
-	wi->nr_running++;
 	pthread_mutex_unlock(&wi->pending_lock);
 
 	while (true) {
 
 		pthread_mutex_lock(&wi->pending_lock);
 		if (wq_need_shrink(wi)) {
-			wi->nr_running--;
 			wi->nr_threads--;
 			if (wq_destroy_cb)
 				wq_destroy_cb(pthread_self());
@@ -228,13 +229,10 @@ static void *worker_routine(void *arg)
 		}
 retest:
 		if (list_empty(&wi->q.pending_list)) {
-			wi->nr_running--;
 			pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
-			wi->nr_running++;
 			goto retest;
 		}
 
-		wi->nr_pending--;
 		work = list_first_entry(&wi->q.pending_list,
 				       struct work, w_list);
 
@@ -337,11 +335,6 @@ struct work_queue *create_ordered_work_queue(const char *name)
 bool work_queue_empty(struct work_queue *q)
 {
 	struct worker_info *wi = container_of(q, struct worker_info, q);
-	size_t nr_works;
-
-	pthread_mutex_lock(&wi->pending_lock);
-	nr_works = wi->nr_running + wi->nr_pending;
-	pthread_mutex_unlock(&wi->pending_lock);
 
-	return nr_works == 0;
+	return uatomic_read(&wi->nr_works) == 0;
 }
-- 
1.7.9.5




More information about the sheepdog mailing list