[sheepdog] [PATCH v3 03/10] work: remove sheep_priv.h and trace.h from include files

MORITA Kazutaka morita.kazutaka at gmail.com
Tue May 14 06:13:03 CEST 2013


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

This removes sheep specific header files to move work.c to lib.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/sheep.c       |   14 +++++++++++++-
 sheep/trace/trace.h |    1 -
 sheep/work.c        |   37 ++++++++++++++++++-------------------
 sheep/work.h        |    8 +++++++-
 4 files changed, 38 insertions(+), 22 deletions(-)

diff --git a/sheep/sheep.c b/sheep/sheep.c
index 537c86e..0f44325 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -331,9 +331,21 @@ static void init_io_arg(char *arg)
 	}
 }
 
+static size_t get_nr_nodes(void)
+{
+	const struct vnode_info *vinfo;
+
+	vinfo = get_vnode_info();
+	if (vinfo == NULL)
+		return 1;
+
+	return vinfo->nr_nodes;
+}
+
 static int create_work_queues(void)
 {
-	if (init_work_queue())
+	if (init_work_queue(get_nr_nodes, trace_register_thread,
+			    trace_unregister_thread))
 		return -1;
 
 	sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
diff --git a/sheep/trace/trace.h b/sheep/trace/trace.h
index 9648834..2a920a5 100644
--- a/sheep/trace/trace.h
+++ b/sheep/trace/trace.h
@@ -10,7 +10,6 @@
 #include "sheep.h"
 #include "list.h"
 #include "util.h"
-#include "../work.h"
 
 struct ipinfo {
 	const char *file;           /* Source code filename for EIP */
diff --git a/sheep/work.c b/sheep/work.c
index f79dd1d..1a90561 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -32,8 +32,6 @@
 #include "work.h"
 #include "logger.h"
 #include "event.h"
-#include "trace/trace.h"
-#include "sheep_priv.h"
 
 /*
  * The protection period from shrinking work queue.  This is necessary
@@ -63,11 +61,14 @@ struct worker_info {
 	/* we cannot shrink work queue till this time */
 	uint64_t tm_end_of_protection;
 	enum wq_thread_control tc;
-	size_t nr_nodes;
 };
 
 static int efd;
 static LIST_HEAD(worker_info_list);
+static size_t nr_nodes = 1;
+static size_t (*wq_get_nr_nodes)(void);
+static void (*wq_create_cb)(pthread_t);
+static void (*wq_destroy_cb)(pthread_t);
 
 static void *worker_routine(void *arg);
 
@@ -88,7 +89,7 @@ static inline uint64_t wq_get_roof(struct worker_info *wi)
 		break;
 	case WQ_DYNAMIC:
 		/* FIXME: 2 * nr_nodes threads. No rationale yet. */
-		nr = wi->nr_nodes * 2;
+		nr = nr_nodes * 2;
 		break;
 	case WQ_UNLIMITED:
 		nr = SIZE_MAX;
@@ -139,7 +140,8 @@ static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
 			pthread_mutex_unlock(&wi->startup_lock);
 			return -1;
 		}
-		trace_register_thread(thread);
+		if (wq_create_cb)
+			wq_create_cb(thread);
 		wi->nr_threads++;
 		sd_dprintf("create thread %s %zu", wi->name, wi->nr_threads);
 	}
@@ -172,22 +174,15 @@ static void worker_thread_request_done(int fd, int events, void *data)
 	struct work *work;
 	eventfd_t value;
 	LIST_HEAD(list);
-	struct vnode_info *vinfo;
-	size_t nr_nodes;
+
+	if (wq_get_nr_nodes)
+		nr_nodes = wq_get_nr_nodes();
 
 	ret = eventfd_read(fd, &value);
 	if (ret < 0)
 		return;
 
-	vinfo = get_vnode_info();
-	if (vinfo != NULL)
-		nr_nodes = vinfo->nr_nodes;
-	else
-		nr_nodes = 1; /* cluster doesn't start yet */
-
 	list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
-		wi->nr_nodes = nr_nodes;
-
 		pthread_mutex_lock(&wi->finished_lock);
 		list_splice_init(&wi->finished_list, &list);
 		pthread_mutex_unlock(&wi->finished_lock);
@@ -199,8 +194,6 @@ static void worker_thread_request_done(int fd, int events, void *data)
 			work->done(work);
 		}
 	}
-
-	put_vnode_info(vinfo);
 }
 
 static void *worker_routine(void *arg)
@@ -225,7 +218,8 @@ static void *worker_routine(void *arg)
 		if (wq_need_shrink(wi)) {
 			wi->nr_running--;
 			wi->nr_threads--;
-			trace_unregister_thread(pthread_self());
+			if (wq_destroy_cb)
+				wq_destroy_cb(pthread_self());
 			pthread_mutex_unlock(&wi->pending_lock);
 			pthread_detach(pthread_self());
 			sd_dprintf("destroy thread %s %d, %zu", wi->name,
@@ -260,10 +254,15 @@ retest:
 	pthread_exit(NULL);
 }
 
-int init_work_queue(void)
+int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t),
+		    void (*destroy_cb)(pthread_t))
 {
 	int ret;
 
+	wq_get_nr_nodes = get_nr_nodes;
+	wq_create_cb = create_cb;
+	wq_destroy_cb = destroy_cb;
+
 	efd = eventfd(0, EFD_NONBLOCK);
 	if (efd < 0) {
 		sd_eprintf("failed to create an event fd: %m");
diff --git a/sheep/work.h b/sheep/work.h
index b036e3a..0317b00 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -24,7 +24,13 @@ enum wq_thread_control {
 	WQ_UNLIMITED, /* Unlimited # of threads created */
 };
 
-int init_work_queue(void);
+/*
+ * 'get_nr_nodes' is the function to get the current number of nodes and used
+ * for dynamic work queues.  'create_cb' will be called when worker threads are
+ * created and 'destroy_cb' will be called when worker threads are destroyed.
+ */
+int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t),
+		    void (*destroy_cb)(pthread_t));
 struct work_queue *create_work_queue(const char *name, enum wq_thread_control);
 struct work_queue *create_ordered_work_queue(const char *name);
 void queue_work(struct work_queue *q, struct work *work);
-- 
1.7.9.5




More information about the sheepdog mailing list