[sheepdog] [PATCH v2 5/5] work: shrink the number of worker threads dynamically
MORITA Kazutaka
morita.kazutaka at gmail.com
Thu Feb 28 11:46:22 CET 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
If more than half of threads are not used more than 1 second, the work
queue reduces the current number of threads.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/work.c | 36 ++++++++++++++++++++++++++++++++++++
sheep/work.h | 2 ++
2 files changed, 38 insertions(+)
diff --git a/sheep/work.c b/sheep/work.c
index a246122..3bf340d 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -24,6 +24,7 @@
#include <syscall.h>
#include <sys/types.h>
#include <sys/eventfd.h>
+#include <sys/time.h>
#include <linux/types.h>
#include "list.h"
@@ -33,6 +34,9 @@
#include "event.h"
#include "trace/trace.h"
+/* the protection period from shrinking work queue */
+#define WQ_PROTECTION_PERIOD 1000 /* ms */
+
static int efd;
int total_ordered_workers;
LIST_HEAD(worker_info_list);
@@ -43,12 +47,35 @@ enum wq_state {
static void *worker_routine(void *arg);
+static uint64_t get_msec_time(void)
+{
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+ return tv.tv_sec * 1000 + tv.tv_usec / 1000;
+}
+
static bool wq_grow_needed(struct worker_info *wi)
{
+ wi->tm_end_of_protection = get_msec_time() + WQ_PROTECTION_PERIOD;
+
return wi->nr_threads < wi->nr_pending + wi->nr_running &&
wi->nr_threads < wi->max_threads;
}
+/* return true if more than half of threads are not used more than
+ * WQ_PROTECTION_PERIOD seconds */
+static bool wq_shrink_needed(struct worker_info *wi)
+{
+ if (wi->nr_pending + wi->nr_running <= wi->nr_threads / 2)
+ /* we cannot shrink work queue during protection period. */
+ return wi->tm_end_of_protection <= get_msec_time();
+
+ /* update the end of protection time */
+ wi->tm_end_of_protection = get_msec_time() + WQ_PROTECTION_PERIOD;
+ return false;
+}
+
static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
{
pthread_t thread;
@@ -134,6 +161,15 @@ static void *worker_routine(void *arg)
while (!(wi->q.wq_state & WQ_DEAD)) {
pthread_mutex_lock(&wi->pending_lock);
+ if (wq_shrink_needed(wi)) {
+ wi->nr_running--;
+ wi->nr_threads--;
+ pthread_mutex_unlock(&wi->pending_lock);
+ pthread_detach(pthread_self());
+ sd_dprintf("destroy thread %s %d, %zd", wi->name,
+ gettid(), wi->nr_threads);
+ break;
+ }
retest:
if (list_empty(&wi->q.pending_list)) {
wi->nr_running--;
diff --git a/sheep/work.h b/sheep/work.h
index dd071a1..3945226 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -36,6 +36,8 @@ struct worker_info {
size_t nr_pending;
size_t nr_running;
size_t nr_threads;
+ /* we cannot shrink work queue till this time */
+ uint64_t tm_end_of_protection;
pthread_mutex_t startup_lock;
--
1.7.9.5
More information about the sheepdog
mailing list