[sheepdog] [PATCH 2/3] work: enlarge the number of worker threads dynamically
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Mon Oct 22 06:31:04 CEST 2012
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/work.c | 65 ++++++++++++++++++++++++++++++++++++++++++++-------------
sheep/work.h | 2 +
2 files changed, 52 insertions(+), 15 deletions(-)
diff --git a/sheep/work.c b/sheep/work.c
index 5678223..b197bb2 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -33,7 +33,8 @@
#include "event.h"
#include "trace/trace.h"
-#define NR_WORKER_THREADS 4
+#define NR_MIN_WORKER_THREADS 4
+#define NR_MAX_WORKER_THREADS 16384
static int efd;
int total_ordered_workers;
@@ -43,11 +44,46 @@ enum wq_state {
WQ_DEAD = (1U << 1),
};
+static void *worker_routine(void *arg);
+
+static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
+{
+ int ret;
+
+ nr_threads = min(nr_threads, (size_t)NR_MAX_WORKER_THREADS);
+
+ pthread_mutex_lock(&wi->startup_lock);
+ if (nr_threads <= wi->nr_threads) {
+ pthread_mutex_unlock(&wi->startup_lock);
+ return 0;
+ }
+ while (wi->nr_threads < nr_threads) {
+ wi->nr_threads++;
+ ret = pthread_create(&wi->worker_thread[wi->nr_threads - 1],
+ NULL, worker_routine, wi);
+ if (ret != 0) {
+ eprintf("failed to create worker thread: %m\n");
+ wi->nr_threads--;
+ pthread_mutex_unlock(&wi->startup_lock);
+ return -1;
+ }
+ dprintf("create thread %s %zd\n", wi->name, wi->nr_threads);
+ }
+ pthread_mutex_unlock(&wi->startup_lock);
+
+ return 0;
+}
+
void queue_work(struct work_queue *q, struct work *work)
{
struct worker_info *wi = container_of(q, struct worker_info, q);
pthread_mutex_lock(&wi->pending_lock);
+ wi->nr_pending++;
+
+ if (!wi->ordered && wi->nr_threads < wi->nr_pending + wi->nr_running)
+ create_worker_threads(wi, wi->nr_threads * 2);
+
list_add_tail(&work->w_list, &wi->q.pending_list);
pthread_mutex_unlock(&wi->pending_lock);
@@ -103,19 +139,26 @@ static void *worker_routine(void *arg)
/* started this thread */
pthread_mutex_unlock(&wi->startup_lock);
+ pthread_mutex_lock(&wi->pending_lock);
+ wi->nr_running++;
+ pthread_mutex_unlock(&wi->pending_lock);
+
while (!(wi->q.wq_state & WQ_DEAD)) {
pthread_mutex_lock(&wi->pending_lock);
retest:
if (list_empty(&wi->q.pending_list)) {
+ wi->nr_running--;
pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
if (wi->q.wq_state & WQ_DEAD) {
pthread_mutex_unlock(&wi->pending_lock);
pthread_exit(NULL);
}
+ wi->nr_running++;
goto retest;
}
+ wi->nr_pending--;
work = list_first_entry(&wi->q.pending_list,
struct work, w_list);
@@ -169,14 +212,13 @@ struct work_queue *init_work_queue(const char *name, bool ordered)
if (ret)
return NULL;
- nr = ordered ? 1 : NR_WORKER_THREADS;
+ nr = ordered ? 1 : NR_MAX_WORKER_THREADS;
wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
if (!wi)
return NULL;
wi->name = name;
wi->ordered = ordered;
- wi->nr_threads = nr;
INIT_LIST_HEAD(&wi->q.pending_list);
INIT_LIST_HEAD(&wi->finished_list);
@@ -187,17 +229,10 @@ struct work_queue *init_work_queue(const char *name, bool ordered)
pthread_mutex_init(&wi->pending_lock, NULL);
pthread_mutex_init(&wi->startup_lock, NULL);
- pthread_mutex_lock(&wi->startup_lock);
- for (i = 0; i < wi->nr_threads; i++) {
- ret = pthread_create(&wi->worker_thread[i], NULL,
- worker_routine, wi);
- if (ret) {
- eprintf("failed to create worker thread: %s\n",
- strerror(ret));
- goto destroy_threads;
- }
- }
- pthread_mutex_unlock(&wi->startup_lock);
+ nr = ordered ? 1 : NR_MIN_WORKER_THREADS;
+ ret = create_worker_threads(wi, nr);
+ if (ret < 0)
+ goto destroy_threads;
list_add(&wi->worker_info_siblings, &worker_info_list);
@@ -206,7 +241,7 @@ destroy_threads:
wi->q.wq_state |= WQ_DEAD;
pthread_mutex_unlock(&wi->startup_lock);
- for (; i > 0; i--) {
+ for (i = wi->nr_threads; i > 0; i--) {
pthread_join(wi->worker_thread[i - 1], NULL);
eprintf("stopped worker thread #%d\n", i - 1);
}
diff --git a/sheep/work.h b/sheep/work.h
index e57c6d0..e31b3a3 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -35,6 +35,8 @@ struct worker_info {
pthread_mutex_t pending_lock;
/* protected by pending_lock */
struct work_queue q;
+ size_t nr_pending;
+ size_t nr_running;
pthread_mutex_t startup_lock;
--
1.7.2.5
More information about the sheepdog
mailing list