[sheepdog] [PATCH 1/3] work: remove short thread
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Mon Oct 22 06:31:03 CEST 2012
This is a preparation for a dynamic worker thread pool.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
include/util.h | 2 +-
lib/logger.c | 4 +-
sheep/work.c | 126 +++++++++++++++++--------------------------------------
sheep/work.h | 3 +-
4 files changed, 44 insertions(+), 91 deletions(-)
diff --git a/include/util.h b/include/util.h
index d0bf659..9197c76 100644
--- a/include/util.h
+++ b/include/util.h
@@ -38,7 +38,7 @@
#define notrace __attribute__((no_instrument_function))
-#define uninitialized_var(x) (x = x)
+#define uninitialized_var(x) x = x
static inline int before(uint32_t seq1, uint32_t seq2)
{
diff --git a/lib/logger.c b/lib/logger.c
index aa1c012..7158b65 100644
--- a/lib/logger.c
+++ b/lib/logger.c
@@ -156,7 +156,7 @@ static notrace int log_vsnprintf(char *buff, size_t size, int prio,
{
char *p = buff;
- if (worker_name && worker_idx)
+ if (worker_name && worker_idx >= 0)
snprintf(p, size, "[%s %d] ", worker_name, worker_idx);
else if (worker_name)
snprintf(p, size, "[%s] ", worker_name);
@@ -466,7 +466,7 @@ notrace void set_thread_name(const char *name, int idx)
notrace void get_thread_name(char *name)
{
- if (worker_name && worker_idx)
+ if (worker_name && worker_idx >= 0)
sprintf(name, "%s %d", worker_name, worker_idx);
else if (worker_name)
sprintf(name, "%s", worker_name);
diff --git a/sheep/work.c b/sheep/work.c
index 0ef169d..5678223 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -25,7 +25,6 @@
#include <sys/types.h>
#include <sys/eventfd.h>
#include <linux/types.h>
-#include <urcu/uatomic.h>
#include "list.h"
#include "util.h"
@@ -34,6 +33,8 @@
#include "event.h"
#include "trace/trace.h"
+#define NR_WORKER_THREADS 4
+
static int efd;
int total_ordered_workers;
LIST_HEAD(worker_info_list);
@@ -42,75 +43,15 @@ enum wq_state {
WQ_DEAD = (1U << 1),
};
-/*
- * Short thread is created on demand and destroyed after serving the work for
- * gateway or io requests, aiming to solve two problems:
- *
- * 1. timeout of IO requests from guests. With on-demand short threads, we
- * guarantee that there is always one thread available to execute the
- * request as soon as possible.
- * 2. sheep halt for corner case that all gateway and io threads are executing
- * local requests that ask for creation of another thread to execute the
- * requests and sleep-wait for responses.
- */
-struct short_work {
- struct work *work;
- struct worker_info *wi;
-};
-
-static void *run_short_thread(void *arg)
-{
- struct short_work *sw = arg;
- eventfd_t value = 1;
- static unsigned long idx;
- int err;
-
- /* Tell runtime to release resources after termination */
- err = pthread_detach(pthread_self());
- if (err)
- panic("%s\n", strerror(err));
-
- set_thread_name(sw->wi->name, uatomic_add_return(&idx, 1));
-
- sw->work->fn(sw->work);
-
- pthread_mutex_lock(&sw->wi->finished_lock);
- list_add_tail(&sw->work->w_list, &sw->wi->finished_list);
- pthread_mutex_unlock(&sw->wi->finished_lock);
-
- eventfd_write(efd, value);
- free(sw);
- pthread_exit(NULL);
-}
-
-static inline void create_short_thread(struct worker_info *wi,
- struct work *work)
-{
- pthread_t thread;
- struct short_work *sw = xmalloc(sizeof *sw);
- int err;
-
- sw->work = work;
- sw->wi = wi;
-
- err = pthread_create(&thread, NULL, run_short_thread, sw);
- if (err)
- panic("%s\n", strerror(err));
- short_thread_begin();
-}
-
void queue_work(struct work_queue *q, struct work *work)
{
struct worker_info *wi = container_of(q, struct worker_info, q);
- if (wi->ordered) {
- pthread_mutex_lock(&wi->pending_lock);
- list_add_tail(&work->w_list, &wi->q.pending_list);
- pthread_mutex_unlock(&wi->pending_lock);
+ pthread_mutex_lock(&wi->pending_lock);
+ list_add_tail(&work->w_list, &wi->q.pending_list);
+ pthread_mutex_unlock(&wi->pending_lock);
- pthread_cond_signal(&wi->pending_cond);
- } else
- create_short_thread(wi, work);
+ pthread_cond_signal(&wi->pending_cond);
}
static void bs_thread_request_done(int fd, int events, void *data)
@@ -135,8 +76,6 @@ static void bs_thread_request_done(int fd, int events, void *data)
list_del(&work->w_list);
work->done(work);
- if (!wi->ordered)
- short_thread_end();
}
}
}
@@ -146,8 +85,19 @@ static void *worker_routine(void *arg)
struct worker_info *wi = arg;
struct work *work;
eventfd_t value = 1;
+ int i, uninitialized_var(idx);
+
+ for (i = 0; i < wi->nr_threads; i++) {
+ if (wi->worker_thread[i] == pthread_self()) {
+ idx = i;
+ break;
+ }
+ }
- set_thread_name(wi->name, 0);
+ if (wi->ordered)
+ set_thread_name(wi->name, -1);
+ else
+ set_thread_name(wi->name, idx);
pthread_mutex_lock(&wi->startup_lock);
/* started this thread */
@@ -212,44 +162,42 @@ static int init_eventfd(void)
struct work_queue *init_work_queue(const char *name, bool ordered)
{
- int ret;
+ int i, ret, nr;
struct worker_info *wi;
ret = init_eventfd();
if (ret)
return NULL;
- wi = zalloc(sizeof(*wi));
+ nr = ordered ? 1 : NR_WORKER_THREADS;
+ wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
if (!wi)
return NULL;
wi->name = name;
wi->ordered = ordered;
+ wi->nr_threads = nr;
+ INIT_LIST_HEAD(&wi->q.pending_list);
INIT_LIST_HEAD(&wi->finished_list);
- pthread_mutex_init(&wi->finished_lock, NULL);
-
- if (ordered) {
- INIT_LIST_HEAD(&wi->q.pending_list);
+ pthread_cond_init(&wi->pending_cond, NULL);
- pthread_cond_init(&wi->pending_cond, NULL);
- pthread_mutex_init(&wi->pending_lock, NULL);
- pthread_mutex_init(&wi->startup_lock, NULL);
-
- pthread_mutex_lock(&wi->startup_lock);
+ pthread_mutex_init(&wi->finished_lock, NULL);
+ pthread_mutex_init(&wi->pending_lock, NULL);
+ pthread_mutex_init(&wi->startup_lock, NULL);
- ret = pthread_create(&wi->worker_thread, NULL, worker_routine,
- wi);
+ pthread_mutex_lock(&wi->startup_lock);
+ for (i = 0; i < wi->nr_threads; i++) {
+ ret = pthread_create(&wi->worker_thread[i], NULL,
+ worker_routine, wi);
if (ret) {
eprintf("failed to create worker thread: %s\n",
strerror(ret));
goto destroy_threads;
}
-
- pthread_mutex_unlock(&wi->startup_lock);
- total_ordered_workers++;
}
+ pthread_mutex_unlock(&wi->startup_lock);
list_add(&wi->worker_info_siblings, &worker_info_list);
@@ -258,8 +206,10 @@ destroy_threads:
wi->q.wq_state |= WQ_DEAD;
pthread_mutex_unlock(&wi->startup_lock);
- pthread_join(wi->worker_thread, NULL);
- eprintf("stopped worker thread\n");
+ for (; i > 0; i--) {
+ pthread_join(wi->worker_thread[i - 1], NULL);
+ eprintf("stopped worker thread #%d\n", i - 1);
+ }
/* destroy_cond_mutex: */
pthread_cond_destroy(&wi->pending_cond);
@@ -273,12 +223,14 @@ destroy_threads:
#ifdef COMPILE_UNUSED_CODE
static void exit_work_queue(struct work_queue *q)
{
+ int i;
struct worker_info *wi = container_of(q, struct worker_info, q);
q->wq_state |= WQ_DEAD;
pthread_cond_broadcast(&wi->pending_cond);
- pthread_join(wi->worker_thread, NULL);
+ for (i = 0; wi->worker_thread[i] && i < wi->nr_threads; i++)
+ pthread_join(wi->worker_thread[i], NULL);
pthread_cond_destroy(&wi->pending_cond);
pthread_mutex_destroy(&wi->pending_lock);
diff --git a/sheep/work.h b/sheep/work.h
index 4d45dd6..e57c6d0 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -38,7 +38,8 @@ struct worker_info {
pthread_mutex_t startup_lock;
- pthread_t worker_thread; /* used for an ordered work queue */
+ size_t nr_threads;
+ pthread_t worker_thread[0];
};
extern struct list_head worker_info_list;
--
1.7.2.5
More information about the sheepdog
mailing list