Got sheepdog builded and installed. Thanks to Kazutaka's patch. --- Yura On Oct 14, 2010, at 1:20 PM, Yuriy Kohut wrote: > It seems like line endings are changed while I'm copy/paste the diff from the e-mail. > Could you please attache the diff to e-mail as separate file packed with .tar.gz. > > Thanks > --- > Yura > > On Oct 14, 2010, at 12:44 PM, Yuriy Kohut wrote: > >> I'm getting the following error while patching: >> >> patch -p1 < sheepdog-signalfd.patch >> patching file sheep/work.c >> patch: **** malformed patch at line 6: #include <syscall.h> >> --- >> Yura >> >> On Oct 14, 2010, at 12:17 PM, MORITA Kazutaka wrote: >> >>> At Wed, 13 Oct 2010 17:46:57 +0300, >>> Yuriy Kohut wrote: >>>> >>>> Got it configured. >>>> >>>> But faced with the following issue while running 'make': >>>> --- >>>> work.c:28:28: error: linux/signalfd.h: No such file or directory >>>> work.c: In function ‘bs_thread_request_done’: >>>> work.c:169: error: array type has incomplete element type >>>> work.c:169: warning: unused variable ‘siginfo’ >>>> --- >>>> >>>> signalfd.h seems to be not kernel version 2.6.18 (default CentOS 5.5 kernel) header. >>>> >>>> Any ideas ? >>>> >>> >>> Can you try the following patch? This removes use of signalfd. >>> >>> Kazutaka >>> >>> == >>> diff --git a/sheep/work.c b/sheep/work.c >>> index 8016435..5f7e96c 100644 >>> --- a/sheep/work.c >>> +++ b/sheep/work.c >>> @@ -24,8 +24,6 @@ >>> #include <syscall.h> >>> #include <sys/types.h> >>> #include <linux/types.h> >>> -#define _LINUX_FCNTL_H >>> -#include <linux/signalfd.h> >>> >>> #include "list.h" >>> #include "util.h" >>> @@ -33,9 +31,6 @@ >>> #include "logger.h" >>> #include "event.h" >>> >>> -extern int signalfd(int fd, const sigset_t *mask, int flags); >>> - >>> -static int sig_fd; >>> static LIST_HEAD(worker_info_list); >>> >>> struct work_queue { >>> @@ -67,6 +62,13 @@ struct worker_info { >>> >>> pthread_mutex_t startup_lock; >>> >>> + int command_fd[2]; >>> + int done_fd[2]; >>> + >>> + pthread_cond_t finished_cond; >>> + struct list_head ack_list; >>> + pthread_t ack_thread; >>> + >>> pthread_t worker_thread[0]; >>> }; >>> >>> @@ -161,36 +163,86 @@ static void work_post_done(struct work_queue *q, enum work_attr attr) >>> } >>> } >>> >>> +static void *bs_thread_ack_fn(void *arg) >>> +{ >>> + struct worker_info *wi = arg; >>> + int command, ret, nr; >>> + struct work *work; >>> + >>> +retry: >>> + ret = read(wi->command_fd[0], &command, sizeof(command)); >>> + if (ret < 0) { >>> + eprintf("ack pthread will be dead, %m\n"); >>> + if (errno == EAGAIN || errno == EINTR) >>> + goto retry; >>> + >>> + goto out; >>> + } >>> + >>> + pthread_mutex_lock(&wi->finished_lock); >>> +retest: >>> + if (list_empty(&wi->finished_list)) { >>> + pthread_cond_wait(&wi->finished_cond, &wi->finished_lock); >>> + goto retest; >>> + } >>> + >>> + while (!list_empty(&wi->finished_list)) { >>> + work = list_first_entry(&wi->finished_list, >>> + struct work, w_list); >>> + >>> + list_del(&work->w_list); >>> + list_add_tail(&work->w_list, &wi->ack_list); >>> + } >>> + >>> + pthread_mutex_unlock(&wi->finished_lock); >>> + >>> + nr = 1; >>> +rewrite: >>> + ret = write(wi->done_fd[1], &nr, sizeof(nr)); >>> + if (ret < 0) { >>> + eprintf("can't ack tgtd, %m\n"); >>> + if (errno == EAGAIN || errno == EINTR) >>> + goto rewrite; >>> + >>> + goto out; >>> + } >>> + >>> + goto retry; >>> +out: >>> + pthread_exit(NULL); >>> +} >>> + >>> static void bs_thread_request_done(int fd, int events, void *data) >>> { >>> int ret; >>> - struct worker_info *wi; >>> + struct worker_info *wi = data; >>> struct work *work; >>> - struct signalfd_siginfo siginfo[16]; >>> - LIST_HEAD(list); >>> + int nr_events; >>> >>> - ret = read(fd, (char *)siginfo, sizeof(siginfo)); >>> - if (ret <= 0) >>> + ret = read(wi->done_fd[0], &nr_events, sizeof(nr_events)); >>> + if (ret < 0) >>> return; >>> >>> - list_for_each_entry(wi, &worker_info_list, worker_info_siblings) { >>> - pthread_mutex_lock(&wi->finished_lock); >>> - list_splice_init(&wi->finished_list, &list); >>> - pthread_mutex_unlock(&wi->finished_lock); >>> + while (!list_empty(&wi->ack_list)) { >>> + enum work_attr attr; >>> + work = list_first_entry(&wi->ack_list, >>> + struct work, w_list); >>> >>> - while (!list_empty(&list)) { >>> - enum work_attr attr; >>> - work = list_first_entry(&list, struct work, w_list); >>> - list_del(&work->w_list); >>> - >>> - /* >>> - * work->done might free the work so we must >>> - * save its attr for qork_post_done(). >>> - */ >>> - attr = work->attr; >>> - work->done(work, 0); >>> - work_post_done(&wi->q, attr); >>> - } >>> + list_del(&work->w_list); >>> + >>> + attr = work->attr; >>> + work->done(work, 0); >>> + work_post_done(&wi->q, attr); >>> + } >>> + >>> +rewrite: >>> + ret = write(wi->command_fd[1], &nr_events, sizeof(nr_events)); >>> + if (ret < 0) { >>> + eprintf("can't write done, %m\n"); >>> + if (errno == EAGAIN || errno == EINTR) >>> + goto rewrite; >>> + >>> + return; >>> } >>> } >>> >>> @@ -240,58 +292,53 @@ retest: >>> list_add_tail(&work->w_list, &wi->finished_list); >>> pthread_mutex_unlock(&wi->finished_lock); >>> >>> - kill(getpid(), SIGUSR2); >>> + pthread_cond_signal(&wi->finished_cond); >>> } >>> >>> pthread_exit(NULL); >>> } >>> >>> -static int init_signalfd(void) >>> -{ >>> - int ret; >>> - sigset_t mask; >>> - static int done = 0; >>> - >>> - if (done++) >>> - return 0; >>> - >>> - sigemptyset(&mask); >>> - sigaddset(&mask, SIGUSR2); >>> - sigprocmask(SIG_BLOCK, &mask, NULL); >>> - >>> - sig_fd = signalfd(-1, &mask, 0); >>> - if (sig_fd < 0) { >>> - eprintf("failed to create a signal fd, %m\n"); >>> - return 1; >>> - } >>> - >>> - ret = fcntl(sig_fd, F_GETFL); >>> - ret |= fcntl(sig_fd, F_SETFL, ret | O_NONBLOCK); >>> - >>> - ret = register_event(sig_fd, bs_thread_request_done, NULL); >>> - >>> - return 0; >>> -} >>> - >>> int init_work_queue(int nr) >>> { >>> int i, ret; >>> struct worker_info *wi; >>> >>> - ret = init_signalfd(); >>> - if (ret) >>> - return -1; >>> - >>> wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t)); >>> if (!wi) >>> return -1; >>> >>> + ret = pipe(wi->command_fd); >>> + if (ret) { >>> + eprintf("failed to create command pipe, %m\n"); >>> + goto destroy_cond_mutex; >>> + } >>> + >>> + ret = pipe(wi->done_fd); >>> + if (ret) { >>> + eprintf("failed to done command pipe, %m\n"); >>> + goto close_command_fd; >>> + } >>> + >>> + ret = register_event(wi->done_fd[0], bs_thread_request_done, wi); >>> + if (ret) { >>> + eprintf("failed to add epoll event\n"); >>> + goto close_done_fd; >>> + } >>> + >>> + ret = pthread_create(&wi->ack_thread, NULL, bs_thread_ack_fn, wi); >>> + if (ret) { >>> + eprintf("failed to create an ack thread, %s\n", strerror(ret)); >>> + goto event_del; >>> + } >>> + >>> wi->nr_threads = nr; >>> >>> INIT_LIST_HEAD(&wi->q.pending_list); >>> INIT_LIST_HEAD(&wi->q.blocked_list); >>> INIT_LIST_HEAD(&wi->finished_list); >>> + INIT_LIST_HEAD(&wi->ack_list); >>> >>> + pthread_cond_init(&wi->finished_cond, NULL); >>> pthread_cond_init(&wi->pending_cond, NULL); >>> >>> pthread_mutex_init(&wi->finished_lock, NULL); >>> @@ -315,6 +362,14 @@ int init_work_queue(int nr) >>> list_add(&wi->worker_info_siblings, &worker_info_list); >>> wqueue = &wi->q; >>> >>> +rewrite: >>> + ret = write(wi->command_fd[1], &ret, sizeof(ret)); >>> + if (ret < 0) { >>> + eprintf("can't write done, %m\n"); >>> + if (errno == EAGAIN || errno == EINTR) >>> + goto rewrite; >>> + } >>> + >>> return 0; >>> destroy_threads: >>> >>> @@ -324,8 +379,15 @@ destroy_threads: >>> pthread_join(wi->worker_thread[i - 1], NULL); >>> eprintf("stopped the worker thread %d\n", i - 1); >>> } >>> - >>> -/* destroy_cond_mutex: */ >>> +event_del: >>> + unregister_event(wi->done_fd[0]); >>> +close_done_fd: >>> + close(wi->done_fd[0]); >>> + close(wi->done_fd[1]); >>> +close_command_fd: >>> + close(wi->command_fd[0]); >>> + close(wi->command_fd[1]); >>> +destroy_cond_mutex: >>> pthread_cond_destroy(&wi->pending_cond); >>> pthread_mutex_destroy(&wi->pending_lock); >>> pthread_mutex_destroy(&wi->startup_lock); >>> @@ -340,6 +402,9 @@ static void exit_work_queue(struct work_queue *q) >>> int i; >>> struct worker_info *wi = container_of(q, struct worker_info, q); >>> >>> + pthread_cancel(wi->ack_thread); >>> + pthread_join(wi->ack_thread, NULL); >>> + >>> q->wq_state |= WQ_DEAD; >>> pthread_cond_broadcast(&wi->pending_cond); >>> >>> @@ -351,5 +416,12 @@ static void exit_work_queue(struct work_queue *q) >>> pthread_mutex_destroy(&wi->pending_lock); >>> pthread_mutex_destroy(&wi->startup_lock); >>> pthread_mutex_destroy(&wi->finished_lock); >>> + >>> + pthread_cond_destroy(&wi->finished_cond); >>> + >>> + close(wi->done_fd[0]); >>> + close(wi->done_fd[1]); >>> + close(wi->command_fd[0]); >>> + close(wi->command_fd[1]); >>> } >>> #endif >> >> -- >> sheepdog mailing list >> sheepdog at lists.wpkg.org >> http://lists.wpkg.org/mailman/listinfo/sheepdog > |