At Wed, 13 Oct 2010 17:46:57 +0300, Yuriy Kohut wrote: > > Got it configured. > > But faced with the following issue while running 'make': > --- > work.c:28:28: error: linux/signalfd.h: No such file or directory > work.c: In function ‘bs_thread_request_done’: > work.c:169: error: array type has incomplete element type > work.c:169: warning: unused variable ‘siginfo’ > --- > > signalfd.h seems to be not kernel version 2.6.18 (default CentOS 5.5 kernel) header. > > Any ideas ? > Can you try the following patch? This removes use of signalfd. Kazutaka == diff --git a/sheep/work.c b/sheep/work.c index 8016435..5f7e96c 100644 --- a/sheep/work.c +++ b/sheep/work.c @@ -24,8 +24,6 @@ #include <syscall.h> #include <sys/types.h> #include <linux/types.h> -#define _LINUX_FCNTL_H -#include <linux/signalfd.h> #include "list.h" #include "util.h" @@ -33,9 +31,6 @@ #include "logger.h" #include "event.h" -extern int signalfd(int fd, const sigset_t *mask, int flags); - -static int sig_fd; static LIST_HEAD(worker_info_list); struct work_queue { @@ -67,6 +62,13 @@ struct worker_info { pthread_mutex_t startup_lock; + int command_fd[2]; + int done_fd[2]; + + pthread_cond_t finished_cond; + struct list_head ack_list; + pthread_t ack_thread; + pthread_t worker_thread[0]; }; @@ -161,36 +163,86 @@ static void work_post_done(struct work_queue *q, enum work_attr attr) } } +static void *bs_thread_ack_fn(void *arg) +{ + struct worker_info *wi = arg; + int command, ret, nr; + struct work *work; + +retry: + ret = read(wi->command_fd[0], &command, sizeof(command)); + if (ret < 0) { + eprintf("ack pthread will be dead, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto retry; + + goto out; + } + + pthread_mutex_lock(&wi->finished_lock); +retest: + if (list_empty(&wi->finished_list)) { + pthread_cond_wait(&wi->finished_cond, &wi->finished_lock); + goto retest; + } + + while (!list_empty(&wi->finished_list)) { + work = list_first_entry(&wi->finished_list, + struct work, w_list); + + list_del(&work->w_list); + list_add_tail(&work->w_list, &wi->ack_list); + } + + pthread_mutex_unlock(&wi->finished_lock); + + nr = 1; +rewrite: + ret = write(wi->done_fd[1], &nr, sizeof(nr)); + if (ret < 0) { + eprintf("can't ack tgtd, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto rewrite; + + goto out; + } + + goto retry; +out: + pthread_exit(NULL); +} + static void bs_thread_request_done(int fd, int events, void *data) { int ret; - struct worker_info *wi; + struct worker_info *wi = data; struct work *work; - struct signalfd_siginfo siginfo[16]; - LIST_HEAD(list); + int nr_events; - ret = read(fd, (char *)siginfo, sizeof(siginfo)); - if (ret <= 0) + ret = read(wi->done_fd[0], &nr_events, sizeof(nr_events)); + if (ret < 0) return; - list_for_each_entry(wi, &worker_info_list, worker_info_siblings) { - pthread_mutex_lock(&wi->finished_lock); - list_splice_init(&wi->finished_list, &list); - pthread_mutex_unlock(&wi->finished_lock); + while (!list_empty(&wi->ack_list)) { + enum work_attr attr; + work = list_first_entry(&wi->ack_list, + struct work, w_list); - while (!list_empty(&list)) { - enum work_attr attr; - work = list_first_entry(&list, struct work, w_list); - list_del(&work->w_list); - - /* - * work->done might free the work so we must - * save its attr for qork_post_done(). - */ - attr = work->attr; - work->done(work, 0); - work_post_done(&wi->q, attr); - } + list_del(&work->w_list); + + attr = work->attr; + work->done(work, 0); + work_post_done(&wi->q, attr); + } + +rewrite: + ret = write(wi->command_fd[1], &nr_events, sizeof(nr_events)); + if (ret < 0) { + eprintf("can't write done, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto rewrite; + + return; } } @@ -240,58 +292,53 @@ retest: list_add_tail(&work->w_list, &wi->finished_list); pthread_mutex_unlock(&wi->finished_lock); - kill(getpid(), SIGUSR2); + pthread_cond_signal(&wi->finished_cond); } pthread_exit(NULL); } -static int init_signalfd(void) -{ - int ret; - sigset_t mask; - static int done = 0; - - if (done++) - return 0; - - sigemptyset(&mask); - sigaddset(&mask, SIGUSR2); - sigprocmask(SIG_BLOCK, &mask, NULL); - - sig_fd = signalfd(-1, &mask, 0); - if (sig_fd < 0) { - eprintf("failed to create a signal fd, %m\n"); - return 1; - } - - ret = fcntl(sig_fd, F_GETFL); - ret |= fcntl(sig_fd, F_SETFL, ret | O_NONBLOCK); - - ret = register_event(sig_fd, bs_thread_request_done, NULL); - - return 0; -} - int init_work_queue(int nr) { int i, ret; struct worker_info *wi; - ret = init_signalfd(); - if (ret) - return -1; - wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t)); if (!wi) return -1; + ret = pipe(wi->command_fd); + if (ret) { + eprintf("failed to create command pipe, %m\n"); + goto destroy_cond_mutex; + } + + ret = pipe(wi->done_fd); + if (ret) { + eprintf("failed to done command pipe, %m\n"); + goto close_command_fd; + } + + ret = register_event(wi->done_fd[0], bs_thread_request_done, wi); + if (ret) { + eprintf("failed to add epoll event\n"); + goto close_done_fd; + } + + ret = pthread_create(&wi->ack_thread, NULL, bs_thread_ack_fn, wi); + if (ret) { + eprintf("failed to create an ack thread, %s\n", strerror(ret)); + goto event_del; + } + wi->nr_threads = nr; INIT_LIST_HEAD(&wi->q.pending_list); INIT_LIST_HEAD(&wi->q.blocked_list); INIT_LIST_HEAD(&wi->finished_list); + INIT_LIST_HEAD(&wi->ack_list); + pthread_cond_init(&wi->finished_cond, NULL); pthread_cond_init(&wi->pending_cond, NULL); pthread_mutex_init(&wi->finished_lock, NULL); @@ -315,6 +362,14 @@ int init_work_queue(int nr) list_add(&wi->worker_info_siblings, &worker_info_list); wqueue = &wi->q; +rewrite: + ret = write(wi->command_fd[1], &ret, sizeof(ret)); + if (ret < 0) { + eprintf("can't write done, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto rewrite; + } + return 0; destroy_threads: @@ -324,8 +379,15 @@ destroy_threads: pthread_join(wi->worker_thread[i - 1], NULL); eprintf("stopped the worker thread %d\n", i - 1); } - -/* destroy_cond_mutex: */ +event_del: + unregister_event(wi->done_fd[0]); +close_done_fd: + close(wi->done_fd[0]); + close(wi->done_fd[1]); +close_command_fd: + close(wi->command_fd[0]); + close(wi->command_fd[1]); +destroy_cond_mutex: pthread_cond_destroy(&wi->pending_cond); pthread_mutex_destroy(&wi->pending_lock); pthread_mutex_destroy(&wi->startup_lock); @@ -340,6 +402,9 @@ static void exit_work_queue(struct work_queue *q) int i; struct worker_info *wi = container_of(q, struct worker_info, q); + pthread_cancel(wi->ack_thread); + pthread_join(wi->ack_thread, NULL); + q->wq_state |= WQ_DEAD; pthread_cond_broadcast(&wi->pending_cond); @@ -351,5 +416,12 @@ static void exit_work_queue(struct work_queue *q) pthread_mutex_destroy(&wi->pending_lock); pthread_mutex_destroy(&wi->startup_lock); pthread_mutex_destroy(&wi->finished_lock); + + pthread_cond_destroy(&wi->finished_cond); + + close(wi->done_fd[0]); + close(wi->done_fd[1]); + close(wi->command_fd[0]); + close(wi->command_fd[1]); } #endif |