[Sheepdog] Building on CentOS 5.5 x86_64

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Thu Oct 14 11:17:18 CEST 2010


At Wed, 13 Oct 2010 17:46:57 +0300,
Yuriy Kohut wrote:
> 
> Got it configured.
> 
> But faced with the following issue while running 'make':
> ---
> work.c:28:28: error: linux/signalfd.h: No such file or directory
> work.c: In function ‘bs_thread_request_done’:
> work.c:169: error: array type has incomplete element type
> work.c:169: warning: unused variable ‘siginfo’
> ---
> 
> signalfd.h seems to be not kernel version 2.6.18 (default CentOS 5.5 kernel) header.
> 
> Any ideas ?
> 

Can you try the following patch?  This removes use of signalfd.

Kazutaka

==
diff --git a/sheep/work.c b/sheep/work.c
index 8016435..5f7e96c 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -24,8 +24,6 @@
 #include <syscall.h>
 #include <sys/types.h>
 #include <linux/types.h>
-#define _LINUX_FCNTL_H
-#include <linux/signalfd.h>
 
 #include "list.h"
 #include "util.h"
@@ -33,9 +31,6 @@
 #include "logger.h"
 #include "event.h"
 
-extern int signalfd(int fd, const sigset_t *mask, int flags);
-
-static int sig_fd;
 static LIST_HEAD(worker_info_list);
 
 struct work_queue {
@@ -67,6 +62,13 @@ struct worker_info {
 
 	pthread_mutex_t startup_lock;
 
+	int command_fd[2];
+	int done_fd[2];
+
+	pthread_cond_t finished_cond;
+	struct list_head ack_list;
+	pthread_t ack_thread;
+
 	pthread_t worker_thread[0];
 };
 
@@ -161,36 +163,86 @@ static void work_post_done(struct work_queue *q, enum work_attr attr)
 	}
 }
 
+static void *bs_thread_ack_fn(void *arg)
+{
+	struct worker_info *wi = arg;
+	int command, ret, nr;
+	struct work *work;
+
+retry:
+	ret = read(wi->command_fd[0], &command, sizeof(command));
+	if (ret < 0) {
+		eprintf("ack pthread will be dead, %m\n");
+		if (errno == EAGAIN || errno == EINTR)
+			goto retry;
+
+		goto out;
+	}
+
+	pthread_mutex_lock(&wi->finished_lock);
+retest:
+	if (list_empty(&wi->finished_list)) {
+		pthread_cond_wait(&wi->finished_cond, &wi->finished_lock);
+		goto retest;
+	}
+
+	while (!list_empty(&wi->finished_list)) {
+		work = list_first_entry(&wi->finished_list,
+					struct work, w_list);
+
+		list_del(&work->w_list);
+		list_add_tail(&work->w_list, &wi->ack_list);
+	}
+
+	pthread_mutex_unlock(&wi->finished_lock);
+
+	nr = 1;
+rewrite:
+	ret = write(wi->done_fd[1], &nr, sizeof(nr));
+	if (ret < 0) {
+		eprintf("can't ack tgtd, %m\n");
+		if (errno == EAGAIN || errno == EINTR)
+			goto rewrite;
+
+		goto out;
+	}
+
+	goto retry;
+out:
+	pthread_exit(NULL);
+}
+
 static void bs_thread_request_done(int fd, int events, void *data)
 {
 	int ret;
-	struct worker_info *wi;
+	struct worker_info *wi = data;
 	struct work *work;
-	struct signalfd_siginfo siginfo[16];
-	LIST_HEAD(list);
+	int nr_events;
 
-	ret = read(fd, (char *)siginfo, sizeof(siginfo));
-	if (ret <= 0)
+	ret = read(wi->done_fd[0], &nr_events, sizeof(nr_events));
+	if (ret < 0)
 		return;
 
-	list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
-		pthread_mutex_lock(&wi->finished_lock);
-		list_splice_init(&wi->finished_list, &list);
-		pthread_mutex_unlock(&wi->finished_lock);
+	while (!list_empty(&wi->ack_list)) {
+		enum work_attr attr;
+		work = list_first_entry(&wi->ack_list,
+					struct work, w_list);
 
-		while (!list_empty(&list)) {
-			enum work_attr attr;
-			work = list_first_entry(&list, struct work, w_list);
-			list_del(&work->w_list);
-
-			/*
-			 * work->done might free the work so we must
-			 * save its attr for qork_post_done().
-			 */
-			attr = work->attr;
-			work->done(work, 0);
-			work_post_done(&wi->q, attr);
-		}
+		list_del(&work->w_list);
+
+		attr = work->attr;
+		work->done(work, 0);
+		work_post_done(&wi->q, attr);
+	}
+
+rewrite:
+	ret = write(wi->command_fd[1], &nr_events, sizeof(nr_events));
+	if (ret < 0) {
+		eprintf("can't write done, %m\n");
+		if (errno == EAGAIN || errno == EINTR)
+			goto rewrite;
+
+		return;
 	}
 }
 
@@ -240,58 +292,53 @@ retest:
 		list_add_tail(&work->w_list, &wi->finished_list);
 		pthread_mutex_unlock(&wi->finished_lock);
 
-		kill(getpid(), SIGUSR2);
+		pthread_cond_signal(&wi->finished_cond);
 	}
 
 	pthread_exit(NULL);
 }
 
-static int init_signalfd(void)
-{
-	int ret;
-	sigset_t mask;
-	static int done = 0;
-
-	if (done++)
-		return 0;
-
-	sigemptyset(&mask);
-	sigaddset(&mask, SIGUSR2);
-	sigprocmask(SIG_BLOCK, &mask, NULL);
-
-	sig_fd = signalfd(-1, &mask, 0);
-	if (sig_fd < 0) {
-		eprintf("failed to create a signal fd, %m\n");
-		return 1;
-	}
-
-	ret = fcntl(sig_fd, F_GETFL);
-	ret |= fcntl(sig_fd, F_SETFL, ret | O_NONBLOCK);
-
-	ret = register_event(sig_fd, bs_thread_request_done, NULL);
-
-	return 0;
-}
-
 int init_work_queue(int nr)
 {
 	int i, ret;
 	struct worker_info *wi;
 
-	ret = init_signalfd();
-	if (ret)
-		return -1;
-
 	wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
 	if (!wi)
 		return -1;
 
+	ret = pipe(wi->command_fd);
+	if (ret) {
+		eprintf("failed to create command pipe, %m\n");
+		goto destroy_cond_mutex;
+	}
+
+	ret = pipe(wi->done_fd);
+	if (ret) {
+		eprintf("failed to done command pipe, %m\n");
+		goto close_command_fd;
+	}
+
+	ret = register_event(wi->done_fd[0], bs_thread_request_done, wi);
+	if (ret) {
+		eprintf("failed to add epoll event\n");
+		goto close_done_fd;
+	}
+
+	ret = pthread_create(&wi->ack_thread, NULL, bs_thread_ack_fn, wi);
+	if (ret) {
+		eprintf("failed to create an ack thread, %s\n", strerror(ret));
+		goto event_del;
+	}
+
 	wi->nr_threads = nr;
 
 	INIT_LIST_HEAD(&wi->q.pending_list);
 	INIT_LIST_HEAD(&wi->q.blocked_list);
 	INIT_LIST_HEAD(&wi->finished_list);
+	INIT_LIST_HEAD(&wi->ack_list);
 
+	pthread_cond_init(&wi->finished_cond, NULL);
 	pthread_cond_init(&wi->pending_cond, NULL);
 
 	pthread_mutex_init(&wi->finished_lock, NULL);
@@ -315,6 +362,14 @@ int init_work_queue(int nr)
 	list_add(&wi->worker_info_siblings, &worker_info_list);
 	wqueue = &wi->q;
 
+rewrite:
+	ret = write(wi->command_fd[1], &ret, sizeof(ret));
+	if (ret < 0) {
+		eprintf("can't write done, %m\n");
+		if (errno == EAGAIN || errno == EINTR)
+			goto rewrite;
+	}
+
 	return 0;
 destroy_threads:
 
@@ -324,8 +379,15 @@ destroy_threads:
 		pthread_join(wi->worker_thread[i - 1], NULL);
 		eprintf("stopped the worker thread %d\n", i - 1);
 	}
-
-/* destroy_cond_mutex: */
+event_del:
+	unregister_event(wi->done_fd[0]);
+close_done_fd:
+	close(wi->done_fd[0]);
+	close(wi->done_fd[1]);
+close_command_fd:
+	close(wi->command_fd[0]);
+	close(wi->command_fd[1]);
+destroy_cond_mutex:
 	pthread_cond_destroy(&wi->pending_cond);
 	pthread_mutex_destroy(&wi->pending_lock);
 	pthread_mutex_destroy(&wi->startup_lock);
@@ -340,6 +402,9 @@ static void exit_work_queue(struct work_queue *q)
 	int i;
 	struct worker_info *wi = container_of(q, struct worker_info, q);
 
+	pthread_cancel(wi->ack_thread);
+	pthread_join(wi->ack_thread, NULL);
+
 	q->wq_state |= WQ_DEAD;
 	pthread_cond_broadcast(&wi->pending_cond);
 
@@ -351,5 +416,12 @@ static void exit_work_queue(struct work_queue *q)
 	pthread_mutex_destroy(&wi->pending_lock);
 	pthread_mutex_destroy(&wi->startup_lock);
 	pthread_mutex_destroy(&wi->finished_lock);
+
+	pthread_cond_destroy(&wi->finished_cond);
+
+	close(wi->done_fd[0]);
+	close(wi->done_fd[1]);
+	close(wi->command_fd[0]);
+	close(wi->command_fd[1]);
 }
 #endif



More information about the sheepdog mailing list