[sheepdog] [PATCH] collie: allow calling queue_work() in done()
MORITA Kazutaka
morita.kazutaka at gmail.com
Wed May 15 18:38:10 CEST 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
Currently, if we call queue_work() in the second event_loop(),
work_queue_wait() cannot wait for completion of the work.
This adds a field 'nr_works' to the worker_info and makes
work_queue_empty() return true only if both worker_routine() and
worker_thread_request_done() have no outstanding works. I think this
removes a race condition in work_queue_wait() completely.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
collie/common.c | 7 -------
lib/work.c | 25 +++++++++----------------
2 files changed, 9 insertions(+), 23 deletions(-)
diff --git a/collie/common.c b/collie/common.c
index 0646587..e4d33e5 100644
--- a/collie/common.c
+++ b/collie/common.c
@@ -249,13 +249,6 @@ void confirm(const char *message)
void work_queue_wait(struct work_queue *q)
{
- assert(is_main_thread());
-
while (!work_queue_empty(q))
event_loop(-1);
- /*
- * We have to call event_loop() again because some works are remained in
- * the finished list.
- */
- event_loop(-1);
}
diff --git a/lib/work.c b/lib/work.c
index cb8879d..61c1197 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -55,9 +55,11 @@ struct worker_info {
pthread_mutex_t pending_lock;
/* protected by pending_lock */
struct work_queue q;
- size_t nr_pending;
- size_t nr_running;
size_t nr_threads;
+
+ /* protected by uatomic primitives */
+ size_t nr_works;
+
/* we cannot shrink work queue till this time */
uint64_t tm_end_of_protection;
enum wq_thread_control tc;
@@ -102,7 +104,7 @@ static inline uint64_t wq_get_roof(struct worker_info *wi)
static bool wq_need_grow(struct worker_info *wi)
{
- if (wi->nr_threads < wi->nr_pending + wi->nr_running &&
+ if (wi->nr_threads < uatomic_read(&wi->nr_works) &&
wi->nr_threads * 2 <= wq_get_roof(wi)) {
wi->tm_end_of_protection = get_msec_time() +
WQ_PROTECTION_PERIOD;
@@ -118,7 +120,7 @@ static bool wq_need_grow(struct worker_info *wi)
*/
static bool wq_need_shrink(struct worker_info *wi)
{
- if (wi->nr_pending + wi->nr_running <= wi->nr_threads / 2)
+ if (uatomic_read(&wi->nr_works) < wi->nr_threads / 2)
/* we cannot shrink work queue during protection period. */
return wi->tm_end_of_protection <= get_msec_time();
@@ -154,8 +156,8 @@ void queue_work(struct work_queue *q, struct work *work)
{
struct worker_info *wi = container_of(q, struct worker_info, q);
+ uatomic_inc(&wi->nr_works);
pthread_mutex_lock(&wi->pending_lock);
- wi->nr_pending++;
if (wq_need_grow(wi))
/* double the thread pool size */
@@ -192,6 +194,7 @@ static void worker_thread_request_done(int fd, int events, void *data)
list_del(&work->w_list);
work->done(work);
+ uatomic_dec(&wi->nr_works);
}
}
}
@@ -209,14 +212,12 @@ static void *worker_routine(void *arg)
pthread_mutex_unlock(&wi->startup_lock);
pthread_mutex_lock(&wi->pending_lock);
- wi->nr_running++;
pthread_mutex_unlock(&wi->pending_lock);
while (true) {
pthread_mutex_lock(&wi->pending_lock);
if (wq_need_shrink(wi)) {
- wi->nr_running--;
wi->nr_threads--;
if (wq_destroy_cb)
wq_destroy_cb(pthread_self());
@@ -228,13 +229,10 @@ static void *worker_routine(void *arg)
}
retest:
if (list_empty(&wi->q.pending_list)) {
- wi->nr_running--;
pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
- wi->nr_running++;
goto retest;
}
- wi->nr_pending--;
work = list_first_entry(&wi->q.pending_list,
struct work, w_list);
@@ -337,11 +335,6 @@ struct work_queue *create_ordered_work_queue(const char *name)
bool work_queue_empty(struct work_queue *q)
{
struct worker_info *wi = container_of(q, struct worker_info, q);
- size_t nr_works;
-
- pthread_mutex_lock(&wi->pending_lock);
- nr_works = wi->nr_running + wi->nr_pending;
- pthread_mutex_unlock(&wi->pending_lock);
- return nr_works == 0;
+ return uatomic_read(&wi->nr_works) == 0;
}
--
1.7.9.5
More information about the sheepdog
mailing list