[sheepdog] [PATCH 4/5] work: clean up workqueue
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Mon Jul 2 20:32:15 CEST 2012
Currently, wi->nr_threads is always 0 or 1, so we can simplify the
code.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/cluster/accord.c | 2 +-
sheep/sheep.c | 10 +++---
sheep/work.c | 72 ++++++++++++++++++++---------------------------
sheep/work.h | 9 ++---
4 files changed, 41 insertions(+), 52 deletions(-)
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index 12dc687..a0f1d00 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -545,7 +545,7 @@ static int accord_init(const char *option)
return -1;
}
- acrd_wq = init_work_queue("accord", 1);
+ acrd_wq = init_work_queue("accord", true);
if (!acrd_wq) {
eprintf("failed to create accord workqueue: %m\n");
return -1;
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 933ecd5..7d1e853 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -247,11 +247,11 @@ int main(int argc, char **argv)
local_req_init();
- sys->gateway_wqueue = init_work_queue("gateway", 0);
- sys->io_wqueue = init_work_queue("io", 0);
- sys->recovery_wqueue = init_work_queue("recovery", 1);
- sys->deletion_wqueue = init_work_queue("deletion", 1);
- sys->block_wqueue = init_work_queue("block", 1);
+ sys->gateway_wqueue = init_work_queue("gateway", false);
+ sys->io_wqueue = init_work_queue("io", false);
+ sys->recovery_wqueue = init_work_queue("recovery", true);
+ sys->deletion_wqueue = init_work_queue("deletion", true);
+ sys->block_wqueue = init_work_queue("block", true);
if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
!sys->deletion_wqueue || !sys->block_wqueue)
exit(1);
diff --git a/sheep/work.c b/sheep/work.c
index 708f88c..9deac01 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -15,6 +15,7 @@
#include <errno.h>
#include <string.h>
#include <inttypes.h>
+#include <stdbool.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
@@ -91,15 +92,14 @@ void queue_work(struct work_queue *q, struct work *work)
{
struct worker_info *wi = container_of(q, struct worker_info, q);
- if (!wi->nr_threads) {
- create_short_thread(wi, work);
- return;
- }
- pthread_mutex_lock(&wi->pending_lock);
- list_add_tail(&work->w_list, &wi->q.pending_list);
- pthread_mutex_unlock(&wi->pending_lock);
+ if (wi->ordered) {
+ pthread_mutex_lock(&wi->pending_lock);
+ list_add_tail(&work->w_list, &wi->q.pending_list);
+ pthread_mutex_unlock(&wi->pending_lock);
- pthread_cond_signal(&wi->pending_cond);
+ pthread_cond_signal(&wi->pending_cond);
+ } else
+ create_short_thread(wi, work);
}
static void bs_thread_request_done(int fd, int events, void *data)
@@ -133,16 +133,8 @@ static void *worker_routine(void *arg)
struct worker_info *wi = arg;
struct work *work;
eventfd_t value = 1;
- int i, uninitialized_var(idx);
-
- for (i = 0; i < wi->nr_threads; i++) {
- if (wi->worker_thread[i] == pthread_self()) {
- idx = i;
- break;
- }
- }
- set_thread_name(wi->name, idx);
+ set_thread_name(wi->name, 0);
pthread_mutex_lock(&wi->startup_lock);
/* started this thread */
@@ -203,58 +195,56 @@ static int init_eventfd(void)
return 0;
}
-struct work_queue *init_work_queue(const char *name, int nr)
+struct work_queue *init_work_queue(const char *name, bool ordered)
{
- int i, ret;
+ int ret;
struct worker_info *wi;
ret = init_eventfd();
if (ret)
return NULL;
- wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
+ wi = zalloc(sizeof(*wi));
if (!wi)
return NULL;
wi->name = name;
- wi->nr_threads = nr;
+ wi->ordered = ordered;
- INIT_LIST_HEAD(&wi->q.pending_list);
- INIT_LIST_HEAD(&wi->q.blocked_list);
INIT_LIST_HEAD(&wi->finished_list);
- pthread_cond_init(&wi->pending_cond, NULL);
-
pthread_mutex_init(&wi->finished_lock, NULL);
- pthread_mutex_init(&wi->pending_lock, NULL);
- pthread_mutex_init(&wi->startup_lock, NULL);
- pthread_mutex_lock(&wi->startup_lock);
- for (i = 0; i < wi->nr_threads; i++) {
- ret = pthread_create(&wi->worker_thread[i], NULL,
- worker_routine, wi);
+ if (ordered) {
+ INIT_LIST_HEAD(&wi->q.pending_list);
+
+ pthread_cond_init(&wi->pending_cond, NULL);
+ pthread_mutex_init(&wi->pending_lock, NULL);
+ pthread_mutex_init(&wi->startup_lock, NULL);
+ pthread_mutex_lock(&wi->startup_lock);
+
+ ret = pthread_create(&wi->worker_thread, NULL, worker_routine,
+ wi);
if (ret) {
- eprintf("failed to create worker thread #%d: %s\n",
- i, strerror(ret));
- if (ret)
- goto destroy_threads;
+ eprintf("failed to create worker thread: %s\n",
+ strerror(ret));
+ goto destroy_threads;
}
+
+ pthread_mutex_unlock(&wi->startup_lock);
}
- pthread_mutex_unlock(&wi->startup_lock);
list_add(&wi->worker_info_siblings, &worker_info_list);
- total_nr_workers += nr;
+ total_nr_workers++;
return &wi->q;
destroy_threads:
wi->q.wq_state |= WQ_DEAD;
pthread_mutex_unlock(&wi->startup_lock);
- for (; i > 0; i--) {
- pthread_join(wi->worker_thread[i - 1], NULL);
- eprintf("stopped worker thread #%d\n", i - 1);
- }
+ pthread_join(wi->worker_thread, NULL);
+ eprintf("stopped worker thread\n");
/* destroy_cond_mutex: */
pthread_cond_destroy(&wi->pending_cond);
diff --git a/sheep/work.h b/sheep/work.h
index 8e0beba..3634560 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -14,9 +14,7 @@ struct work {
struct work_queue {
int wq_state;
- int nr_active;
struct list_head pending_list;
- struct list_head blocked_list;
};
struct worker_info {
@@ -24,7 +22,7 @@ struct worker_info {
struct list_head worker_info_siblings;
- int nr_threads;
+ bool ordered;
pthread_mutex_t finished_lock;
struct list_head finished_list;
@@ -38,13 +36,14 @@ struct worker_info {
pthread_mutex_t startup_lock;
- pthread_t worker_thread[0];
+ pthread_t worker_thread; /* used for an ordered work queue */
};
extern struct list_head worker_info_list;
extern int total_nr_workers;
-struct work_queue *init_work_queue(const char *name, int nr);
+/* if 'ordered' is true, the work queue are processes in order. */
+struct work_queue *init_work_queue(const char *name, bool ordered);
void queue_work(struct work_queue *q, struct work *work);
#endif
--
1.7.2.5
More information about the sheepdog
mailing list