[sheepdog] [PATCH v2 4/5] work: glow the number of worker threads dynamically
MORITA Kazutaka
morita.kazutaka at gmail.com
Thu Feb 28 11:46:21 CET 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
If a new thread is requested when there is no free one, the work queue
doubles the number of threads in the pool.
The maximum number of threads is passed as an argument of
init_workqueue. If you pass -1 for it, the number of threads will
grow unlimitedly.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/sheep.c | 16 +++++++-------
sheep/work.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++++--------
sheep/work.h | 12 ++++++-----
3 files changed, 72 insertions(+), 22 deletions(-)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index e44ef95..12be4cb 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -379,15 +379,15 @@ static int init_work_queues(void)
if (init_wqueue_eventfd())
return -1;
- sys->gateway_wqueue = init_work_queue("gway", false);
- sys->io_wqueue = init_work_queue("io", false);
- sys->recovery_wqueue = init_work_queue("rw", false);
- sys->deletion_wqueue = init_work_queue("deletion", true);
- sys->block_wqueue = init_work_queue("block", true);
- sys->sockfd_wqueue = init_work_queue("sockfd", true);
+ sys->gateway_wqueue = init_work_queue("gway", -1);
+ sys->io_wqueue = init_work_queue("io", -1);
+ sys->recovery_wqueue = init_work_queue("rw", -1);
+ sys->deletion_wqueue = init_ordered_work_queue("deletion");
+ sys->block_wqueue = init_ordered_work_queue("block");
+ sys->sockfd_wqueue = init_ordered_work_queue("sockfd");
if (is_object_cache_enabled()) {
- sys->oc_reclaim_wqueue = init_work_queue("oc_reclaim", true);
- sys->oc_push_wqueue = init_work_queue("oc_push", false);
+ sys->oc_reclaim_wqueue = init_ordered_work_queue("oc_reclaim");
+ sys->oc_push_wqueue = init_work_queue("oc_push", -1);
if (!sys->oc_reclaim_wqueue || !sys->oc_push_wqueue)
return -1;
}
diff --git a/sheep/work.c b/sheep/work.c
index 041aaa4..a246122 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -41,11 +41,48 @@ enum wq_state {
WQ_DEAD = (1U << 1),
};
+static void *worker_routine(void *arg);
+
+static bool wq_grow_needed(struct worker_info *wi)
+{
+ return wi->nr_threads < wi->nr_pending + wi->nr_running &&
+ wi->nr_threads < wi->max_threads;
+}
+
+static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
+{
+ pthread_t thread;
+ int ret;
+
+ pthread_mutex_lock(&wi->startup_lock);
+ while (wi->nr_threads < nr_threads) {
+ wi->nr_threads++;
+ ret = pthread_create(&thread, NULL, worker_routine, wi);
+ if (ret != 0) {
+ sd_eprintf("failed to create worker thread: %m");
+ wi->nr_threads--;
+ pthread_mutex_unlock(&wi->startup_lock);
+ return -1;
+ }
+ sd_dprintf("create thread %s %zd", wi->name, wi->nr_threads);
+ }
+ pthread_mutex_unlock(&wi->startup_lock);
+
+ return 0;
+}
+
void queue_work(struct work_queue *q, struct work *work)
{
struct worker_info *wi = container_of(q, struct worker_info, q);
pthread_mutex_lock(&wi->pending_lock);
+ wi->nr_pending++;
+
+ if (wq_grow_needed(wi))
+ /* double the thread pool size */
+ create_worker_threads(wi,
+ min(wi->nr_threads * 2, wi->max_threads));
+
list_add_tail(&work->w_list, &wi->q.pending_list);
pthread_mutex_unlock(&wi->pending_lock);
@@ -84,25 +121,32 @@ static void *worker_routine(void *arg)
struct work *work;
eventfd_t value = 1;
- set_thread_name(wi->name, false);
+ set_thread_name(wi->name, (wi->max_threads > 1));
pthread_mutex_lock(&wi->startup_lock);
/* started this thread */
pthread_mutex_unlock(&wi->startup_lock);
+ pthread_mutex_lock(&wi->pending_lock);
+ wi->nr_running++;
+ pthread_mutex_unlock(&wi->pending_lock);
+
while (!(wi->q.wq_state & WQ_DEAD)) {
pthread_mutex_lock(&wi->pending_lock);
retest:
if (list_empty(&wi->q.pending_list)) {
+ wi->nr_running--;
pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
if (wi->q.wq_state & WQ_DEAD) {
pthread_mutex_unlock(&wi->pending_lock);
pthread_exit(NULL);
}
+ wi->nr_running++;
goto retest;
}
+ wi->nr_pending--;
work = list_first_entry(&wi->q.pending_list,
struct work, w_list);
@@ -141,14 +185,17 @@ int init_wqueue_eventfd(void)
return 0;
}
-struct work_queue *init_work_queue(const char *name, bool ordered)
+struct work_queue *init_work_queue(const char *name, int max_threads)
{
int ret;
struct worker_info *wi;
wi = xzalloc(sizeof(*wi));
wi->name = name;
- wi->ordered = ordered;
+ if (max_threads == -1)
+ wi->max_threads = SIZE_MAX;
+ else
+ wi->max_threads = max_threads;
INIT_LIST_HEAD(&wi->q.pending_list);
INIT_LIST_HEAD(&wi->finished_list);
@@ -159,13 +206,9 @@ struct work_queue *init_work_queue(const char *name, bool ordered)
pthread_mutex_init(&wi->pending_lock, NULL);
pthread_mutex_init(&wi->startup_lock, NULL);
- pthread_mutex_lock(&wi->startup_lock);
- ret = pthread_create(&wi->worker_thread, NULL, worker_routine, wi);
- if (ret) {
- sd_eprintf("failed to create worker thread: %s", strerror(ret));
+ ret = create_worker_threads(wi, 1);
+ if (ret < 0)
goto destroy_threads;
- }
- pthread_mutex_unlock(&wi->startup_lock);
list_add(&wi->worker_info_siblings, &worker_info_list);
@@ -180,3 +223,8 @@ destroy_threads:
return NULL;
}
+
+struct work_queue *init_ordered_work_queue(const char *name)
+{
+ return init_work_queue(name, 1);
+}
diff --git a/sheep/work.h b/sheep/work.h
index 0d125dc..dd071a1 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -24,8 +24,6 @@ struct worker_info {
struct list_head worker_info_siblings;
- bool ordered;
-
pthread_mutex_t finished_lock;
struct list_head finished_list;
@@ -35,17 +33,21 @@ struct worker_info {
pthread_mutex_t pending_lock;
/* protected by pending_lock */
struct work_queue q;
+ size_t nr_pending;
+ size_t nr_running;
+ size_t nr_threads;
pthread_mutex_t startup_lock;
- pthread_t worker_thread; /* used for an ordered work queue */
+ size_t max_threads;
};
extern struct list_head worker_info_list;
extern int total_ordered_workers;
-/* if 'ordered' is true, the work queue are processes in order. */
-struct work_queue *init_work_queue(const char *name, bool ordered);
+/* max_threads = -1 allows unlimited threads to be created. */
+struct work_queue *init_work_queue(const char *name, int max_threads);
+struct work_queue *init_ordered_work_queue(const char *name);
void queue_work(struct work_queue *q, struct work *work);
int init_wqueue_eventfd(void);
--
1.7.9.5
More information about the sheepdog
mailing list