From: Liu Yuan <tailai.ly at taobao.com> This patch introduces *short thread* abstraction that is created on demand and destroyed after serving the work when sheep run short of gateway or io threads, aiming to solve two problems: 1. timeout of IO requests from guests. With on-demand short threads, we guarantee that there is always one thread available to execute the request ASAP. 2. system halt for corner cases that all gateway and io threads are executing local requests that ask for creation of another thread to execute the request and sleep wait for response. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- v2: use atomic op for nr_free sheep/work.c | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ sheep/work.h | 1 + 2 files changed, 54 insertions(+) diff --git a/sheep/work.c b/sheep/work.c index f81c41d..bd149c4 100644 --- a/sheep/work.c +++ b/sheep/work.c @@ -34,17 +34,64 @@ static int efd; int total_nr_workers; LIST_HEAD(worker_info_list); +LIST_HEAD(short_wi_finished_list); enum wq_state { WQ_DEAD = (1U << 1), }; +static pthread_mutex_t short_wi_lock = PTHREAD_MUTEX_INITIALIZER; + +/* + * Short thread is created on demand and destroyed after serving the work when + * sheep runs short of gateway or io threads, aiming to solve two problems: + * + * 1. timeout of IO requests from guests. With on-demand short threads, we + * guarantee that there is always one thread available to execute the + * request as soon as possible. + * 2. sheep halt for corner case that all gateway and io threads are executing + * local requests that ask for creation of another thread to execute the + * requests and sleep-wait for responses. + */ +static void *run_short_thread(void * arg) +{ + struct work *work = arg; + eventfd_t value = 1; + static uint64_t idx = 0; + + set_thread_name("short", idx++); + + work->fn(work); + + pthread_mutex_lock(&short_wi_lock); + list_add_tail(&work->w_list, &short_wi_finished_list); + pthread_mutex_unlock(&short_wi_lock); + + eventfd_write(efd, value); + return NULL; +} + +static inline void create_short_thread(struct work *work) +{ + pthread_t thread; + + if (pthread_create(&thread, NULL, run_short_thread, work)) + panic("%m\n"); +} + void queue_work(struct work_queue *q, struct work *work) { struct worker_info *wi = container_of(q, struct worker_info, q); pthread_mutex_lock(&wi->pending_lock); + if (!wi->nr_free && wi->nr_threads > 1) { + /* We only create short threads for gateway and IO req */ + pthread_mutex_unlock(&wi->pending_lock); + create_short_thread(work); + return; + } list_add_tail(&work->w_list, &wi->q.pending_list); + wi->nr_free--; pthread_mutex_unlock(&wi->pending_lock); pthread_cond_signal(&wi->pending_cond); @@ -67,6 +114,10 @@ static void bs_thread_request_done(int fd, int events, void *data) list_splice_init(&wi->finished_list, &list); pthread_mutex_unlock(&wi->finished_lock); + pthread_mutex_lock(&short_wi_lock); + list_splice_tail_init(&short_wi_finished_list, &list); + pthread_mutex_unlock(&short_wi_lock); + while (!list_empty(&list)) { work = list_first_entry(&list, struct work, w_list); list_del(&work->w_list); @@ -119,6 +170,7 @@ retest: pthread_mutex_lock(&wi->finished_lock); list_add_tail(&work->w_list, &wi->finished_list); + wi->nr_free++; pthread_mutex_unlock(&wi->finished_lock); eventfd_write(efd, value); @@ -193,6 +245,7 @@ struct work_queue *init_work_queue(const char *name, int nr) list_add(&wi->worker_info_siblings, &worker_info_list); + wi->nr_free = nr; total_nr_workers += nr; return &wi->q; destroy_threads: diff --git a/sheep/work.h b/sheep/work.h index 8e0beba..eb6a583 100644 --- a/sheep/work.h +++ b/sheep/work.h @@ -25,6 +25,7 @@ struct worker_info { struct list_head worker_info_siblings; int nr_threads; + int nr_free; pthread_mutex_t finished_lock; struct list_head finished_list; -- 1.7.10.2 |