[Sheepdog] Building on CentOS 5.5 x86_64
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Thu Oct 14 11:17:18 CEST 2010
At Wed, 13 Oct 2010 17:46:57 +0300,
Yuriy Kohut wrote:
>
> Got it configured.
>
> But faced with the following issue while running 'make':
> ---
> work.c:28:28: error: linux/signalfd.h: No such file or directory
> work.c: In function ‘bs_thread_request_done’:
> work.c:169: error: array type has incomplete element type
> work.c:169: warning: unused variable ‘siginfo’
> ---
>
> signalfd.h seems to be not kernel version 2.6.18 (default CentOS 5.5 kernel) header.
>
> Any ideas ?
>
Can you try the following patch? This removes use of signalfd.
Kazutaka
==
diff --git a/sheep/work.c b/sheep/work.c
index 8016435..5f7e96c 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -24,8 +24,6 @@
#include <syscall.h>
#include <sys/types.h>
#include <linux/types.h>
-#define _LINUX_FCNTL_H
-#include <linux/signalfd.h>
#include "list.h"
#include "util.h"
@@ -33,9 +31,6 @@
#include "logger.h"
#include "event.h"
-extern int signalfd(int fd, const sigset_t *mask, int flags);
-
-static int sig_fd;
static LIST_HEAD(worker_info_list);
struct work_queue {
@@ -67,6 +62,13 @@ struct worker_info {
pthread_mutex_t startup_lock;
+ int command_fd[2];
+ int done_fd[2];
+
+ pthread_cond_t finished_cond;
+ struct list_head ack_list;
+ pthread_t ack_thread;
+
pthread_t worker_thread[0];
};
@@ -161,36 +163,86 @@ static void work_post_done(struct work_queue *q, enum work_attr attr)
}
}
+static void *bs_thread_ack_fn(void *arg)
+{
+ struct worker_info *wi = arg;
+ int command, ret, nr;
+ struct work *work;
+
+retry:
+ ret = read(wi->command_fd[0], &command, sizeof(command));
+ if (ret < 0) {
+ eprintf("ack pthread will be dead, %m\n");
+ if (errno == EAGAIN || errno == EINTR)
+ goto retry;
+
+ goto out;
+ }
+
+ pthread_mutex_lock(&wi->finished_lock);
+retest:
+ if (list_empty(&wi->finished_list)) {
+ pthread_cond_wait(&wi->finished_cond, &wi->finished_lock);
+ goto retest;
+ }
+
+ while (!list_empty(&wi->finished_list)) {
+ work = list_first_entry(&wi->finished_list,
+ struct work, w_list);
+
+ list_del(&work->w_list);
+ list_add_tail(&work->w_list, &wi->ack_list);
+ }
+
+ pthread_mutex_unlock(&wi->finished_lock);
+
+ nr = 1;
+rewrite:
+ ret = write(wi->done_fd[1], &nr, sizeof(nr));
+ if (ret < 0) {
+ eprintf("can't ack tgtd, %m\n");
+ if (errno == EAGAIN || errno == EINTR)
+ goto rewrite;
+
+ goto out;
+ }
+
+ goto retry;
+out:
+ pthread_exit(NULL);
+}
+
static void bs_thread_request_done(int fd, int events, void *data)
{
int ret;
- struct worker_info *wi;
+ struct worker_info *wi = data;
struct work *work;
- struct signalfd_siginfo siginfo[16];
- LIST_HEAD(list);
+ int nr_events;
- ret = read(fd, (char *)siginfo, sizeof(siginfo));
- if (ret <= 0)
+ ret = read(wi->done_fd[0], &nr_events, sizeof(nr_events));
+ if (ret < 0)
return;
- list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
- pthread_mutex_lock(&wi->finished_lock);
- list_splice_init(&wi->finished_list, &list);
- pthread_mutex_unlock(&wi->finished_lock);
+ while (!list_empty(&wi->ack_list)) {
+ enum work_attr attr;
+ work = list_first_entry(&wi->ack_list,
+ struct work, w_list);
- while (!list_empty(&list)) {
- enum work_attr attr;
- work = list_first_entry(&list, struct work, w_list);
- list_del(&work->w_list);
-
- /*
- * work->done might free the work so we must
- * save its attr for qork_post_done().
- */
- attr = work->attr;
- work->done(work, 0);
- work_post_done(&wi->q, attr);
- }
+ list_del(&work->w_list);
+
+ attr = work->attr;
+ work->done(work, 0);
+ work_post_done(&wi->q, attr);
+ }
+
+rewrite:
+ ret = write(wi->command_fd[1], &nr_events, sizeof(nr_events));
+ if (ret < 0) {
+ eprintf("can't write done, %m\n");
+ if (errno == EAGAIN || errno == EINTR)
+ goto rewrite;
+
+ return;
}
}
@@ -240,58 +292,53 @@ retest:
list_add_tail(&work->w_list, &wi->finished_list);
pthread_mutex_unlock(&wi->finished_lock);
- kill(getpid(), SIGUSR2);
+ pthread_cond_signal(&wi->finished_cond);
}
pthread_exit(NULL);
}
-static int init_signalfd(void)
-{
- int ret;
- sigset_t mask;
- static int done = 0;
-
- if (done++)
- return 0;
-
- sigemptyset(&mask);
- sigaddset(&mask, SIGUSR2);
- sigprocmask(SIG_BLOCK, &mask, NULL);
-
- sig_fd = signalfd(-1, &mask, 0);
- if (sig_fd < 0) {
- eprintf("failed to create a signal fd, %m\n");
- return 1;
- }
-
- ret = fcntl(sig_fd, F_GETFL);
- ret |= fcntl(sig_fd, F_SETFL, ret | O_NONBLOCK);
-
- ret = register_event(sig_fd, bs_thread_request_done, NULL);
-
- return 0;
-}
-
int init_work_queue(int nr)
{
int i, ret;
struct worker_info *wi;
- ret = init_signalfd();
- if (ret)
- return -1;
-
wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
if (!wi)
return -1;
+ ret = pipe(wi->command_fd);
+ if (ret) {
+ eprintf("failed to create command pipe, %m\n");
+ goto destroy_cond_mutex;
+ }
+
+ ret = pipe(wi->done_fd);
+ if (ret) {
+ eprintf("failed to done command pipe, %m\n");
+ goto close_command_fd;
+ }
+
+ ret = register_event(wi->done_fd[0], bs_thread_request_done, wi);
+ if (ret) {
+ eprintf("failed to add epoll event\n");
+ goto close_done_fd;
+ }
+
+ ret = pthread_create(&wi->ack_thread, NULL, bs_thread_ack_fn, wi);
+ if (ret) {
+ eprintf("failed to create an ack thread, %s\n", strerror(ret));
+ goto event_del;
+ }
+
wi->nr_threads = nr;
INIT_LIST_HEAD(&wi->q.pending_list);
INIT_LIST_HEAD(&wi->q.blocked_list);
INIT_LIST_HEAD(&wi->finished_list);
+ INIT_LIST_HEAD(&wi->ack_list);
+ pthread_cond_init(&wi->finished_cond, NULL);
pthread_cond_init(&wi->pending_cond, NULL);
pthread_mutex_init(&wi->finished_lock, NULL);
@@ -315,6 +362,14 @@ int init_work_queue(int nr)
list_add(&wi->worker_info_siblings, &worker_info_list);
wqueue = &wi->q;
+rewrite:
+ ret = write(wi->command_fd[1], &ret, sizeof(ret));
+ if (ret < 0) {
+ eprintf("can't write done, %m\n");
+ if (errno == EAGAIN || errno == EINTR)
+ goto rewrite;
+ }
+
return 0;
destroy_threads:
@@ -324,8 +379,15 @@ destroy_threads:
pthread_join(wi->worker_thread[i - 1], NULL);
eprintf("stopped the worker thread %d\n", i - 1);
}
-
-/* destroy_cond_mutex: */
+event_del:
+ unregister_event(wi->done_fd[0]);
+close_done_fd:
+ close(wi->done_fd[0]);
+ close(wi->done_fd[1]);
+close_command_fd:
+ close(wi->command_fd[0]);
+ close(wi->command_fd[1]);
+destroy_cond_mutex:
pthread_cond_destroy(&wi->pending_cond);
pthread_mutex_destroy(&wi->pending_lock);
pthread_mutex_destroy(&wi->startup_lock);
@@ -340,6 +402,9 @@ static void exit_work_queue(struct work_queue *q)
int i;
struct worker_info *wi = container_of(q, struct worker_info, q);
+ pthread_cancel(wi->ack_thread);
+ pthread_join(wi->ack_thread, NULL);
+
q->wq_state |= WQ_DEAD;
pthread_cond_broadcast(&wi->pending_cond);
@@ -351,5 +416,12 @@ static void exit_work_queue(struct work_queue *q)
pthread_mutex_destroy(&wi->pending_lock);
pthread_mutex_destroy(&wi->startup_lock);
pthread_mutex_destroy(&wi->finished_lock);
+
+ pthread_cond_destroy(&wi->finished_cond);
+
+ close(wi->done_fd[0]);
+ close(wi->done_fd[1]);
+ close(wi->command_fd[0]);
+ close(wi->command_fd[1]);
}
#endif
More information about the sheepdog
mailing list