[sheepdog] [PATCH v4 03/10] work: remove sheep_priv.h and trace.h from include files
MORITA Kazutaka
morita.kazutaka at gmail.com
Tue May 14 09:00:12 CEST 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
This removes sheep specific header files to move work.c to lib.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/sheep.c | 16 +++++++++++++++-
sheep/trace/trace.h | 1 -
sheep/work.c | 37 ++++++++++++++++++-------------------
sheep/work.h | 8 +++++++-
4 files changed, 40 insertions(+), 22 deletions(-)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 537c86e..0238a9f 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -331,9 +331,23 @@ static void init_io_arg(char *arg)
}
}
+static size_t get_nr_nodes(void)
+{
+ struct vnode_info *vinfo;
+ size_t nr = 1;
+
+ vinfo = get_vnode_info();
+ if (vinfo != NULL)
+ nr = vinfo->nr_nodes;
+ put_vnode_info(vinfo);
+
+ return nr;
+}
+
static int create_work_queues(void)
{
- if (init_work_queue())
+ if (init_work_queue(get_nr_nodes, trace_register_thread,
+ trace_unregister_thread))
return -1;
sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
diff --git a/sheep/trace/trace.h b/sheep/trace/trace.h
index 9648834..2a920a5 100644
--- a/sheep/trace/trace.h
+++ b/sheep/trace/trace.h
@@ -10,7 +10,6 @@
#include "sheep.h"
#include "list.h"
#include "util.h"
-#include "../work.h"
struct ipinfo {
const char *file; /* Source code filename for EIP */
diff --git a/sheep/work.c b/sheep/work.c
index f79dd1d..1a90561 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -32,8 +32,6 @@
#include "work.h"
#include "logger.h"
#include "event.h"
-#include "trace/trace.h"
-#include "sheep_priv.h"
/*
* The protection period from shrinking work queue. This is necessary
@@ -63,11 +61,14 @@ struct worker_info {
/* we cannot shrink work queue till this time */
uint64_t tm_end_of_protection;
enum wq_thread_control tc;
- size_t nr_nodes;
};
static int efd;
static LIST_HEAD(worker_info_list);
+static size_t nr_nodes = 1;
+static size_t (*wq_get_nr_nodes)(void);
+static void (*wq_create_cb)(pthread_t);
+static void (*wq_destroy_cb)(pthread_t);
static void *worker_routine(void *arg);
@@ -88,7 +89,7 @@ static inline uint64_t wq_get_roof(struct worker_info *wi)
break;
case WQ_DYNAMIC:
/* FIXME: 2 * nr_nodes threads. No rationale yet. */
- nr = wi->nr_nodes * 2;
+ nr = nr_nodes * 2;
break;
case WQ_UNLIMITED:
nr = SIZE_MAX;
@@ -139,7 +140,8 @@ static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
pthread_mutex_unlock(&wi->startup_lock);
return -1;
}
- trace_register_thread(thread);
+ if (wq_create_cb)
+ wq_create_cb(thread);
wi->nr_threads++;
sd_dprintf("create thread %s %zu", wi->name, wi->nr_threads);
}
@@ -172,22 +174,15 @@ static void worker_thread_request_done(int fd, int events, void *data)
struct work *work;
eventfd_t value;
LIST_HEAD(list);
- struct vnode_info *vinfo;
- size_t nr_nodes;
+
+ if (wq_get_nr_nodes)
+ nr_nodes = wq_get_nr_nodes();
ret = eventfd_read(fd, &value);
if (ret < 0)
return;
- vinfo = get_vnode_info();
- if (vinfo != NULL)
- nr_nodes = vinfo->nr_nodes;
- else
- nr_nodes = 1; /* cluster doesn't start yet */
-
list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
- wi->nr_nodes = nr_nodes;
-
pthread_mutex_lock(&wi->finished_lock);
list_splice_init(&wi->finished_list, &list);
pthread_mutex_unlock(&wi->finished_lock);
@@ -199,8 +194,6 @@ static void worker_thread_request_done(int fd, int events, void *data)
work->done(work);
}
}
-
- put_vnode_info(vinfo);
}
static void *worker_routine(void *arg)
@@ -225,7 +218,8 @@ static void *worker_routine(void *arg)
if (wq_need_shrink(wi)) {
wi->nr_running--;
wi->nr_threads--;
- trace_unregister_thread(pthread_self());
+ if (wq_destroy_cb)
+ wq_destroy_cb(pthread_self());
pthread_mutex_unlock(&wi->pending_lock);
pthread_detach(pthread_self());
sd_dprintf("destroy thread %s %d, %zu", wi->name,
@@ -260,10 +254,15 @@ retest:
pthread_exit(NULL);
}
-int init_work_queue(void)
+int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t),
+ void (*destroy_cb)(pthread_t))
{
int ret;
+ wq_get_nr_nodes = get_nr_nodes;
+ wq_create_cb = create_cb;
+ wq_destroy_cb = destroy_cb;
+
efd = eventfd(0, EFD_NONBLOCK);
if (efd < 0) {
sd_eprintf("failed to create an event fd: %m");
diff --git a/sheep/work.h b/sheep/work.h
index b036e3a..0317b00 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -24,7 +24,13 @@ enum wq_thread_control {
WQ_UNLIMITED, /* Unlimited # of threads created */
};
-int init_work_queue(void);
+/*
+ * 'get_nr_nodes' is the function to get the current number of nodes and used
+ * for dynamic work queues. 'create_cb' will be called when worker threads are
+ * created and 'destroy_cb' will be called when worker threads are destroyed.
+ */
+int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t),
+ void (*destroy_cb)(pthread_t));
struct work_queue *create_work_queue(const char *name, enum wq_thread_control);
struct work_queue *create_ordered_work_queue(const char *name);
void queue_work(struct work_queue *q, struct work *work);
--
1.7.9.5
More information about the sheepdog
mailing list