[sheepdog] [PATCH v2 5/5] work: shrink the number of worker threads dynamically

MORITA Kazutaka morita.kazutaka at gmail.com
Thu Feb 28 11:46:22 CET 2013


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

If more than half of threads are not used more than 1 second, the work
queue reduces the current number of threads.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/work.c |   36 ++++++++++++++++++++++++++++++++++++
 sheep/work.h |    2 ++
 2 files changed, 38 insertions(+)

diff --git a/sheep/work.c b/sheep/work.c
index a246122..3bf340d 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -24,6 +24,7 @@
 #include <syscall.h>
 #include <sys/types.h>
 #include <sys/eventfd.h>
+#include <sys/time.h>
 #include <linux/types.h>
 
 #include "list.h"
@@ -33,6 +34,9 @@
 #include "event.h"
 #include "trace/trace.h"
 
+/* the protection period from shrinking work queue */
+#define WQ_PROTECTION_PERIOD 1000 /* ms */
+
 static int efd;
 int total_ordered_workers;
 LIST_HEAD(worker_info_list);
@@ -43,12 +47,35 @@ enum wq_state {
 
 static void *worker_routine(void *arg);
 
+static uint64_t get_msec_time(void)
+{
+	struct timeval tv;
+
+	gettimeofday(&tv, NULL);
+	return tv.tv_sec * 1000 + tv.tv_usec / 1000;
+}
+
 static bool wq_grow_needed(struct worker_info *wi)
 {
+	wi->tm_end_of_protection = get_msec_time() + WQ_PROTECTION_PERIOD;
+
 	return wi->nr_threads < wi->nr_pending + wi->nr_running &&
 		wi->nr_threads < wi->max_threads;
 }
 
+/* return true if more than half of threads are not used more than
+ * WQ_PROTECTION_PERIOD seconds */
+static bool wq_shrink_needed(struct worker_info *wi)
+{
+	if (wi->nr_pending + wi->nr_running <= wi->nr_threads / 2)
+		/* we cannot shrink work queue during protection period. */
+		return wi->tm_end_of_protection <= get_msec_time();
+
+	/* update the end of protection time */
+	wi->tm_end_of_protection = get_msec_time() + WQ_PROTECTION_PERIOD;
+	return false;
+}
+
 static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
 {
 	pthread_t thread;
@@ -134,6 +161,15 @@ static void *worker_routine(void *arg)
 	while (!(wi->q.wq_state & WQ_DEAD)) {
 
 		pthread_mutex_lock(&wi->pending_lock);
+		if (wq_shrink_needed(wi)) {
+			wi->nr_running--;
+			wi->nr_threads--;
+			pthread_mutex_unlock(&wi->pending_lock);
+			pthread_detach(pthread_self());
+			sd_dprintf("destroy thread %s %d, %zd", wi->name,
+				   gettid(), wi->nr_threads);
+			break;
+		}
 retest:
 		if (list_empty(&wi->q.pending_list)) {
 			wi->nr_running--;
diff --git a/sheep/work.h b/sheep/work.h
index dd071a1..3945226 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -36,6 +36,8 @@ struct worker_info {
 	size_t nr_pending;
 	size_t nr_running;
 	size_t nr_threads;
+	/* we cannot shrink work queue till this time */
+	uint64_t tm_end_of_protection;
 
 	pthread_mutex_t startup_lock;
 
-- 
1.7.9.5




More information about the sheepdog mailing list