[sheepdog] [PATCH v4 4/5] work: grow the number of worker threads dynamically
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Mon Mar 4 01:08:05 CET 2013
If a new thread is requested when there is no free one, the work queue
doubles the number of threads in the pool.
The maximum number of threads is passed as an argument of
init_workqueue. If you pass -1 for it, the number of threads will
grow unlimitedly.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/sheep.c | 16 ++++++-------
sheep/work.c | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------
sheep/work.h | 11 +++++----
3 files changed, 82 insertions(+), 22 deletions(-)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index e44ef95..12be4cb 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -379,15 +379,15 @@ static int init_work_queues(void)
if (init_wqueue_eventfd())
return -1;
- sys->gateway_wqueue = init_work_queue("gway", false);
- sys->io_wqueue = init_work_queue("io", false);
- sys->recovery_wqueue = init_work_queue("rw", false);
- sys->deletion_wqueue = init_work_queue("deletion", true);
- sys->block_wqueue = init_work_queue("block", true);
- sys->sockfd_wqueue = init_work_queue("sockfd", true);
+ sys->gateway_wqueue = init_work_queue("gway", -1);
+ sys->io_wqueue = init_work_queue("io", -1);
+ sys->recovery_wqueue = init_work_queue("rw", -1);
+ sys->deletion_wqueue = init_ordered_work_queue("deletion");
+ sys->block_wqueue = init_ordered_work_queue("block");
+ sys->sockfd_wqueue = init_ordered_work_queue("sockfd");
if (is_object_cache_enabled()) {
- sys->oc_reclaim_wqueue = init_work_queue("oc_reclaim", true);
- sys->oc_push_wqueue = init_work_queue("oc_push", false);
+ sys->oc_reclaim_wqueue = init_ordered_work_queue("oc_reclaim");
+ sys->oc_push_wqueue = init_work_queue("oc_push", -1);
if (!sys->oc_reclaim_wqueue || !sys->oc_push_wqueue)
return -1;
}
diff --git a/sheep/work.c b/sheep/work.c
index 041aaa4..768cebf 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -41,11 +41,48 @@ enum wq_state {
WQ_DEAD = (1U << 1),
};
+static void *worker_routine(void *arg);
+
+static bool wq_need_grow(struct worker_info *wi)
+{
+ return wi->nr_threads < wi->nr_pending + wi->nr_running &&
+ wi->nr_threads < wi->max_threads;
+}
+
+static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
+{
+ pthread_t thread;
+ int ret;
+
+ pthread_mutex_lock(&wi->startup_lock);
+ while (wi->nr_threads < nr_threads) {
+ wi->nr_threads++;
+ ret = pthread_create(&thread, NULL, worker_routine, wi);
+ if (ret != 0) {
+ sd_eprintf("failed to create worker thread: %m");
+ wi->nr_threads--;
+ pthread_mutex_unlock(&wi->startup_lock);
+ return -1;
+ }
+ sd_dprintf("create thread %s %zd", wi->name, wi->nr_threads);
+ }
+ pthread_mutex_unlock(&wi->startup_lock);
+
+ return 0;
+}
+
void queue_work(struct work_queue *q, struct work *work)
{
struct worker_info *wi = container_of(q, struct worker_info, q);
pthread_mutex_lock(&wi->pending_lock);
+ wi->nr_pending++;
+
+ if (wq_need_grow(wi))
+ /* double the thread pool size */
+ create_worker_threads(wi,
+ min(wi->nr_threads * 2, wi->max_threads));
+
list_add_tail(&work->w_list, &wi->q.pending_list);
pthread_mutex_unlock(&wi->pending_lock);
@@ -84,25 +121,32 @@ static void *worker_routine(void *arg)
struct work *work;
eventfd_t value = 1;
- set_thread_name(wi->name, false);
+ set_thread_name(wi->name, (wi->max_threads > 1));
pthread_mutex_lock(&wi->startup_lock);
/* started this thread */
pthread_mutex_unlock(&wi->startup_lock);
+ pthread_mutex_lock(&wi->pending_lock);
+ wi->nr_running++;
+ pthread_mutex_unlock(&wi->pending_lock);
+
while (!(wi->q.wq_state & WQ_DEAD)) {
pthread_mutex_lock(&wi->pending_lock);
retest:
if (list_empty(&wi->q.pending_list)) {
+ wi->nr_running--;
pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
if (wi->q.wq_state & WQ_DEAD) {
pthread_mutex_unlock(&wi->pending_lock);
pthread_exit(NULL);
}
+ wi->nr_running++;
goto retest;
}
+ wi->nr_pending--;
work = list_first_entry(&wi->q.pending_list,
struct work, w_list);
@@ -141,14 +185,28 @@ int init_wqueue_eventfd(void)
return 0;
}
-struct work_queue *init_work_queue(const char *name, bool ordered)
+/*
+ * max_threads = -1 allows unlimited threads to be created.
+ * This option is necessary to solve the following problems:
+ *
+ * 1. timeout of IO requests from guests. With on-demand short threads, we
+ * guarantee that there is always one thread available to execute the
+ * request as soon as possible.
+ * 2. sheep halt for corner case that all gateway and io threads are executing
+ * local requests that ask for creation of another thread to execute the
+ * requests and sleep-wait for responses.
+ */
+struct work_queue *init_work_queue(const char *name, int max_threads)
{
int ret;
struct worker_info *wi;
wi = xzalloc(sizeof(*wi));
wi->name = name;
- wi->ordered = ordered;
+ if (max_threads == -1)
+ wi->max_threads = SIZE_MAX;
+ else
+ wi->max_threads = max_threads;
INIT_LIST_HEAD(&wi->q.pending_list);
INIT_LIST_HEAD(&wi->finished_list);
@@ -159,13 +217,9 @@ struct work_queue *init_work_queue(const char *name, bool ordered)
pthread_mutex_init(&wi->pending_lock, NULL);
pthread_mutex_init(&wi->startup_lock, NULL);
- pthread_mutex_lock(&wi->startup_lock);
- ret = pthread_create(&wi->worker_thread, NULL, worker_routine, wi);
- if (ret) {
- sd_eprintf("failed to create worker thread: %s", strerror(ret));
+ ret = create_worker_threads(wi, 1);
+ if (ret < 0)
goto destroy_threads;
- }
- pthread_mutex_unlock(&wi->startup_lock);
list_add(&wi->worker_info_siblings, &worker_info_list);
@@ -180,3 +234,8 @@ destroy_threads:
return NULL;
}
+
+struct work_queue *init_ordered_work_queue(const char *name)
+{
+ return init_work_queue(name, 1);
+}
diff --git a/sheep/work.h b/sheep/work.h
index 0d125dc..63f63d0 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -24,8 +24,6 @@ struct worker_info {
struct list_head worker_info_siblings;
- bool ordered;
-
pthread_mutex_t finished_lock;
struct list_head finished_list;
@@ -35,17 +33,20 @@ struct worker_info {
pthread_mutex_t pending_lock;
/* protected by pending_lock */
struct work_queue q;
+ size_t nr_pending;
+ size_t nr_running;
+ size_t nr_threads;
pthread_mutex_t startup_lock;
- pthread_t worker_thread; /* used for an ordered work queue */
+ size_t max_threads;
};
extern struct list_head worker_info_list;
extern int total_ordered_workers;
-/* if 'ordered' is true, the work queue are processes in order. */
-struct work_queue *init_work_queue(const char *name, bool ordered);
+struct work_queue *init_work_queue(const char *name, int max_threads);
+struct work_queue *init_ordered_work_queue(const char *name);
void queue_work(struct work_queue *q, struct work *work);
int init_wqueue_eventfd(void);
--
1.8.1.3.566.gaa39828
More information about the sheepdog
mailing list