[sheepdog] [PATCH v2] work queue: introduce dynamic roof to max nr threads
Liu Yuan
namei.unix at gmail.com
Mon Mar 4 17:04:28 CET 2013
From: Liu Yuan <tailai.ly at taobao.com>
This patch add dynamic roof to max # of work threads of one particular queue,
which is proportional to the nr of nodes in the cluster. This is needed to
prevent push storm from object cache, which might otherwise saturate the cluster
with push threads.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
v2:
- refactor wq_need_grow()
sheep/sheep.c | 8 ++++----
sheep/work.c | 58 +++++++++++++++++++++++++++++++++++++++++----------------
sheep/work.h | 10 ++++++++--
3 files changed, 54 insertions(+), 22 deletions(-)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 12be4cb..aa2a769 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -379,15 +379,15 @@ static int init_work_queues(void)
if (init_wqueue_eventfd())
return -1;
- sys->gateway_wqueue = init_work_queue("gway", -1);
- sys->io_wqueue = init_work_queue("io", -1);
- sys->recovery_wqueue = init_work_queue("rw", -1);
+ sys->gateway_wqueue = init_work_queue("gway", WQ_UNLIMITED);
+ sys->io_wqueue = init_work_queue("io", WQ_UNLIMITED);
+ sys->recovery_wqueue = init_work_queue("rw", WQ_UNLIMITED);
sys->deletion_wqueue = init_ordered_work_queue("deletion");
sys->block_wqueue = init_ordered_work_queue("block");
sys->sockfd_wqueue = init_ordered_work_queue("sockfd");
if (is_object_cache_enabled()) {
sys->oc_reclaim_wqueue = init_ordered_work_queue("oc_reclaim");
- sys->oc_push_wqueue = init_work_queue("oc_push", -1);
+ sys->oc_push_wqueue = init_work_queue("oc_push", WQ_DYNAMIC);
if (!sys->oc_reclaim_wqueue || !sys->oc_push_wqueue)
return -1;
}
diff --git a/sheep/work.c b/sheep/work.c
index f03b889..f6c2172 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -33,6 +33,7 @@
#include "logger.h"
#include "event.h"
#include "trace/trace.h"
+#include "sheep_priv.h"
/* The protection period from shrinking work queue. This is necessary
* to avoid many calls of pthread_create. Without it, threads are
@@ -57,16 +58,45 @@ static uint64_t get_msec_time(void)
return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
+static inline uint64_t wq_get_roof(int tc)
+{
+ struct vnode_info *vinfo = get_vnode_info();
+ int nr_nodes = vinfo->nr_nodes;
+ uint64_t nr = 1;
+
+ put_vnode_info(vinfo);
+ switch (tc) {
+ case WQ_ORDERED:
+ break;
+ case WQ_DYNAMIC:
+ /* FIXME 2 * nr_nodes threads. No rationale yet. */
+ nr = nr_nodes * 2;
+ break;
+ case WQ_UNLIMITED:
+ nr = SIZE_MAX;
+ break;
+ default:
+ panic("Invalid threads control %d", tc);
+ }
+ return nr;
+}
+
static bool wq_need_grow(struct worker_info *wi)
{
- wi->tm_end_of_protection = get_msec_time() + WQ_PROTECTION_PERIOD;
+ if (wi->nr_threads < wi->nr_pending + wi->nr_running &&
+ wi->nr_threads * 2 <= wq_get_roof(wi->tc)) {
+ wi->tm_end_of_protection = get_msec_time() +
+ WQ_PROTECTION_PERIOD;
+ return true;
+ }
- return wi->nr_threads < wi->nr_pending + wi->nr_running &&
- wi->nr_threads < wi->max_threads;
+ return false;
}
-/* return true if more than half of threads are not used more than
- * WQ_PROTECTION_PERIOD seconds */
+/*
+ * Return true if more than half of threads are not used more than
+ * WQ_PROTECTION_PERIOD seconds
+ */
static bool wq_need_shrink(struct worker_info *wi)
{
if (wi->nr_pending + wi->nr_running <= wi->nr_threads / 2)
@@ -109,8 +139,7 @@ void queue_work(struct work_queue *q, struct work *work)
if (wq_need_grow(wi))
/* double the thread pool size */
- create_worker_threads(wi,
- min(wi->nr_threads * 2, wi->max_threads));
+ create_worker_threads(wi, wi->nr_threads * 2);
list_add_tail(&work->w_list, &wi->q.pending_list);
pthread_mutex_unlock(&wi->pending_lock);
@@ -150,7 +179,7 @@ static void *worker_routine(void *arg)
struct work *work;
eventfd_t value = 1;
- set_thread_name(wi->name, (wi->max_threads > 1));
+ set_thread_name(wi->name, (wi->tc != WQ_ORDERED));
pthread_mutex_lock(&wi->startup_lock);
/* started this thread */
@@ -224,8 +253,8 @@ int init_wqueue_eventfd(void)
}
/*
- * max_threads = -1 allows unlimited threads to be created.
- * This option is necessary to solve the following problems:
+ * Allowing unlimited threads to be created is necessary to solve the following
+ * problems:
*
* 1. timeout of IO requests from guests. With on-demand short threads, we
* guarantee that there is always one thread available to execute the
@@ -234,17 +263,14 @@ int init_wqueue_eventfd(void)
* local requests that ask for creation of another thread to execute the
* requests and sleep-wait for responses.
*/
-struct work_queue *init_work_queue(const char *name, int max_threads)
+struct work_queue *init_work_queue(const char *name, enum wq_thread_control tc)
{
int ret;
struct worker_info *wi;
wi = xzalloc(sizeof(*wi));
wi->name = name;
- if (max_threads == -1)
- wi->max_threads = SIZE_MAX;
- else
- wi->max_threads = max_threads;
+ wi->tc = tc;
INIT_LIST_HEAD(&wi->q.pending_list);
INIT_LIST_HEAD(&wi->finished_list);
@@ -275,5 +301,5 @@ destroy_threads:
struct work_queue *init_ordered_work_queue(const char *name)
{
- return init_work_queue(name, 1);
+ return init_work_queue(name, WQ_ORDERED);
}
diff --git a/sheep/work.h b/sheep/work.h
index 5ad8511..0366cb3 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -19,6 +19,12 @@ struct work_queue {
struct list_head pending_list;
};
+enum wq_thread_control {
+ WQ_ORDERED, /* Only 1 thread created for work queue */
+ WQ_DYNAMIC, /* # of threads proportional to nr_nodes created */
+ WQ_UNLIMITED, /* Unlimited # of threads created */
+};
+
struct worker_info {
const char *name;
@@ -41,13 +47,13 @@ struct worker_info {
pthread_mutex_t startup_lock;
- size_t max_threads;
+ enum wq_thread_control tc;
};
extern struct list_head worker_info_list;
extern int total_ordered_workers;
-struct work_queue *init_work_queue(const char *name, int max_threads);
+struct work_queue *init_work_queue(const char *name, enum wq_thread_control);
struct work_queue *init_ordered_work_queue(const char *name);
void queue_work(struct work_queue *q, struct work *work);
int init_wqueue_eventfd(void);
--
1.7.9.5
More information about the sheepdog
mailing list