[sheepdog] [PATCH v2 01/10] move suspend_worker_threads and resume_worker_threads to work.c

MORITA Kazutaka morita.kazutaka at gmail.com
Fri Aug 9 05:21:55 CEST 2013


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

Currently, we are using trace_lock to prevent a new thread from being
created while suspending worker threads.  However, there is still a
race condition like the following case:

 1. create_worker_threads() creates a new worker thread.
 2. Before wq_create_cb() is called, suspend_worker_threads() sends
    SIGUSR2 signals to the threads in the worker list.  The newly
    created thread is not in the list yet, so it doesn't receive the
    signal.
 3. The newly created thread starts working before
    resume_worker_threads() is called.

We need worker_info->pending_lock to avoid this problem.  This patch
moves suspend_worker_threads() and resume_worker_threads() to work.c
to use the lock.  This also does the following changes:

 - Use bitmap instead of list to manage the worker threads.  With this
   change, we can delete threads more efficiently.

 - Use eventfd instead of pthread_lock to suspend threads.  Basically,
   pthread_mutex_lock() is not a signal-safe function and should be
   avoided here.  In addition, this change removes a busy loop in
   suspend_worker_threads().

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/collie.c     |    2 +-
 collie/common.c     |    5 +--
 include/bitops.h    |   31 ++++++++++++++++
 include/sheep.h     |    1 +
 include/util.h      |    2 +-
 include/work.h      |    5 ++-
 lib/util.c          |    5 +++
 lib/work.c          |  103 ++++++++++++++++++++++++++++++++++++++++++++-------
 sheep/sheep.c       |    7 +---
 sheep/trace/trace.c |   88 -------------------------------------------
 sheep/trace/trace.h |    6 ---
 11 files changed, 133 insertions(+), 122 deletions(-)

diff --git a/collie/collie.c b/collie/collie.c
index d6eef73..166a84c 100644
--- a/collie/collie.c
+++ b/collie/collie.c
@@ -414,7 +414,7 @@ int main(int argc, char **argv)
 	if (init_event(EPOLL_SIZE) < 0)
 		exit(EXIT_SYSFAIL);
 
-	if (init_work_queue(get_nr_nodes, NULL, NULL) != 0) {
+	if (init_work_queue(get_nr_nodes) != 0) {
 		fprintf(stderr, "Failed to init work queue\n");
 		exit(EXIT_SYSFAIL);
 	}
diff --git a/collie/common.c b/collie/common.c
index a480418..fb5812c 100644
--- a/collie/common.c
+++ b/collie/common.c
@@ -110,10 +110,7 @@ int sd_write_object(uint64_t oid, uint64_t cow_oid, void *data,
 	return SD_RES_SUCCESS;
 }
 
-#define FOR_EACH_VDI(nr, vdis)					\
-	for (nr = find_next_bit((vdis), SD_NR_VDIS, 0);		\
-	     nr < SD_NR_VDIS;					\
-	     nr = find_next_bit((vdis), SD_NR_VDIS, nr + 1))
+#define FOR_EACH_VDI(nr, vdis) FOR_EACH_BIT(nr, vdis, SD_NR_VDIS)
 
 int parse_vdi(vdi_parser_func_t func, size_t size, void *data)
 {
diff --git a/include/bitops.h b/include/bitops.h
index 57316ac..448c1af 100644
--- a/include/bitops.h
+++ b/include/bitops.h
@@ -17,6 +17,37 @@
 
 #define BITOP_WORD(nr)		((nr) / BITS_PER_LONG)
 
+/*
+ * Iterate over a bitmap
+ *
+ * @nr: the bit number to use as a loop cursor
+ * @addr: the bitmap you iterate over
+ * @bits: the number of bits this bitmap contains
+ */
+#define FOR_EACH_BIT(nr, addr, bits)					\
+	for (nr = find_next_bit((addr), (bits), 0);			\
+	     nr < (bits);						\
+	     nr = find_next_bit((addr), (bits), nr + 1))
+
+/*
+ * Change the size of allocated bitmap
+ *
+ * This doesn't change the contents of the old bitmap pointed to by `ptr`, and
+ * initializes the newly allocated area with zeros.
+ */
+static inline unsigned long *alloc_bitmap(unsigned long *old_bmap,
+					  size_t old_bits, size_t new_bits)
+{
+	size_t old_size = BITS_TO_LONGS(old_bits) * sizeof(long);
+	size_t new_size = BITS_TO_LONGS(new_bits) * sizeof(long);
+	unsigned long *new_bmap =  xrealloc(old_bmap, new_size);
+
+	if (old_bits < new_bits)
+		memset(new_bmap + old_size, 0, new_size - old_size);
+
+	return new_bmap;
+}
+
 static inline unsigned long find_next_zero_bit(const unsigned long *addr,
 					       unsigned long size,
 					       unsigned long offset)
diff --git a/include/sheep.h b/include/sheep.h
index 0577ade..4e08a1b 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -14,6 +14,7 @@
 #include <stdint.h>
 #include "internal_proto.h"
 #include "util.h"
+#include "bitops.h"
 #include "list.h"
 #include "net.h"
 
diff --git a/include/util.h b/include/util.h
index 0d50f4f..a6da140 100644
--- a/include/util.h
+++ b/include/util.h
@@ -13,7 +13,6 @@
 #include <sys/eventfd.h>
 
 #include "logger.h"
-#include "bitops.h"
 #include "list.h"
 #include "compiler.h"
 
@@ -110,6 +109,7 @@ int install_sighandler(int signum, void (*handler)(int), bool once);
 int install_crash_handler(void (*handler)(int));
 void reraise_crash_signal(int signo, int status);
 pid_t gettid(void);
+int tkill(int tid, int sig);
 bool is_xattr_enabled(const char *path);
 
 void find_zero_blocks(const void *buf, uint64_t *poffset, uint32_t *plen);
diff --git a/include/work.h b/include/work.h
index 6c81436..4dd419e 100644
--- a/include/work.h
+++ b/include/work.h
@@ -58,10 +58,11 @@ static inline bool is_worker_thread(void)
  * for dynamic work queues.  'create_cb' will be called when worker threads are
  * created and 'destroy_cb' will be called when worker threads are destroyed.
  */
-int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t),
-		    void (*destroy_cb)(pthread_t));
+int init_work_queue(size_t (*get_nr_nodes)(void));
 struct work_queue *create_work_queue(const char *name, enum wq_thread_control);
 struct work_queue *create_ordered_work_queue(const char *name);
+void suspend_worker_threads(void);
+void resume_worker_threads(void);
 void queue_work(struct work_queue *q, struct work *work);
 bool work_queue_empty(struct work_queue *q);
 
diff --git a/lib/util.c b/lib/util.c
index 0362dfb..ff0eafb 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -538,6 +538,11 @@ pid_t gettid(void)
 	return syscall(SYS_gettid);
 }
 
+int tkill(int tid, int sig)
+{
+	return syscall(SYS_tgkill, getpid(), tid, sig);
+}
+
 bool is_xattr_enabled(const char *path)
 {
 	int ret, dummy;
diff --git a/lib/work.c b/lib/work.c
index dfde630..b1febc6 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -26,12 +26,21 @@
 #include <sys/eventfd.h>
 #include <sys/time.h>
 #include <linux/types.h>
+#include <signal.h>
 
 #include "list.h"
 #include "util.h"
+#include "bitops.h"
 #include "work.h"
 #include "event.h"
 
+#define TID_MAX_DEFAULT 0x8000 /* default maximum tid for most systems */
+
+static size_t tid_max;
+static unsigned long *tid_map;
+static int resume_efd;
+static int ack_efd;
+
 /*
  * The protection period from shrinking work queue.  This is necessary
  * to avoid many calls of pthread_create.  Without it, threads are
@@ -68,8 +77,6 @@ static int efd;
 static LIST_HEAD(worker_info_list);
 static size_t nr_nodes = 1;
 static size_t (*wq_get_nr_nodes)(void);
-static void (*wq_create_cb)(pthread_t);
-static void (*wq_destroy_cb)(pthread_t);
 
 static void *worker_routine(void *arg);
 
@@ -141,8 +148,6 @@ static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
 			pthread_mutex_unlock(&wi->startup_lock);
 			return -1;
 		}
-		if (wq_create_cb)
-			wq_create_cb(thread);
 		wi->nr_threads++;
 		sd_dprintf("create thread %s %zu", wi->name, wi->nr_threads);
 	}
@@ -151,6 +156,48 @@ static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
 	return 0;
 }
 
+void suspend_worker_threads(void)
+{
+	struct worker_info *wi;
+	int tid;
+
+	list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
+		pthread_mutex_lock(&wi->pending_lock);
+	}
+
+	FOR_EACH_BIT(tid, tid_map, tid_max) {
+		if (tkill(tid, SIGUSR2) < 0)
+			panic("%m");
+	}
+
+	/*
+	 * Wait for all the worker thread to suspend.  We cannot use
+	 * wi->nr_threads here because some thread may have not called set_bit()
+	 * yet (then, the thread doesn't recieve SIGUSR2).
+	 */
+	FOR_EACH_BIT(tid, tid_map, tid_max) {
+		eventfd_xread(ack_efd);
+	}
+}
+
+void resume_worker_threads(void)
+{
+	struct worker_info *wi;
+	int nr_threads = 0, tid;
+
+	FOR_EACH_BIT(tid, tid_map, tid_max) {
+		nr_threads++;
+	}
+
+	eventfd_xwrite(resume_efd, nr_threads);
+	for (int i = 0; i < nr_threads; i++)
+		eventfd_xread(ack_efd);
+
+	list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
+		pthread_mutex_unlock(&wi->pending_lock);
+	}
+}
+
 void queue_work(struct work_queue *q, struct work *work)
 {
 	struct worker_info *wi = container_of(q, struct worker_info, q);
@@ -198,6 +245,7 @@ static void *worker_routine(void *arg)
 {
 	struct worker_info *wi = arg;
 	struct work *work;
+	int tid = gettid();
 
 	set_thread_name(wi->name, (wi->tc != WQ_ORDERED));
 
@@ -206,6 +254,16 @@ static void *worker_routine(void *arg)
 	pthread_mutex_unlock(&wi->startup_lock);
 
 	pthread_mutex_lock(&wi->pending_lock);
+	if (tid > tid_max) {
+		size_t old_tid_max = tid_max;
+
+		/* enlarge bitmap size */
+		while (tid > tid_max)
+			tid_max *= 2;
+
+		tid_map = alloc_bitmap(tid_map, old_tid_max, tid_max);
+	}
+	set_bit(tid, tid_map);
 	pthread_mutex_unlock(&wi->pending_lock);
 
 	while (true) {
@@ -213,12 +271,11 @@ static void *worker_routine(void *arg)
 		pthread_mutex_lock(&wi->pending_lock);
 		if (wq_need_shrink(wi)) {
 			wi->nr_threads--;
-			if (wq_destroy_cb)
-				wq_destroy_cb(pthread_self());
+			clear_bit(tid, tid_map);
 			pthread_mutex_unlock(&wi->pending_lock);
 			pthread_detach(pthread_self());
-			sd_dprintf("destroy thread %s %d, %zu", wi->name,
-				   gettid(), wi->nr_threads);
+			sd_dprintf("destroy thread %s %d, %zu", wi->name, tid,
+				   wi->nr_threads);
 			break;
 		}
 retest:
@@ -246,24 +303,42 @@ retest:
 	pthread_exit(NULL);
 }
 
-int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t),
-		    void (*destroy_cb)(pthread_t))
+static void suspend(int num)
+{
+	int uninitialized_var(value);
+
+	eventfd_xwrite(ack_efd, 1); /* ack of suspend */
+	value = eventfd_xread(resume_efd);
+	assert(value == 1);
+	eventfd_xwrite(ack_efd, 1); /* ack of resume */
+}
+
+int init_work_queue(size_t (*get_nr_nodes)(void))
 {
 	int ret;
 
 	wq_get_nr_nodes = get_nr_nodes;
-	wq_create_cb = create_cb;
-	wq_destroy_cb = destroy_cb;
 
 	if (wq_get_nr_nodes)
 		nr_nodes = wq_get_nr_nodes();
 
+	tid_max = TID_MAX_DEFAULT;
+	tid_map = alloc_bitmap(NULL, 0, tid_max);
+
+	resume_efd = eventfd(0, EFD_SEMAPHORE);
+	ack_efd = eventfd(0, EFD_SEMAPHORE);
 	efd = eventfd(0, EFD_NONBLOCK);
-	if (efd < 0) {
-		sd_eprintf("failed to create an event fd: %m");
+	if (resume_efd < 0 || ack_efd < 0 || efd < 0) {
+		sd_eprintf("failed to create event fds: %m");
 		return 1;
 	}
 
+	/* trace uses this signal to suspend the worker threads */
+	if (install_sighandler(SIGUSR2, suspend, false) < 0) {
+		sd_dprintf("%m");
+		return -1;
+	}
+
 	ret = register_event(efd, worker_thread_request_done, NULL);
 	if (ret) {
 		sd_eprintf("failed to register event fd %m");
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 6a1efe5..e750dbf 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -225,10 +225,6 @@ static int init_signal(void)
 	sigset_t mask;
 	int ret;
 
-	ret = trace_init_signal();
-	if (ret)
-		return ret;
-
 	sigemptyset(&mask);
 	sigaddset(&mask, SIGTERM);
 	sigprocmask(SIG_BLOCK, &mask, NULL);
@@ -421,8 +417,7 @@ static size_t get_nr_nodes(void)
 
 static int create_work_queues(void)
 {
-	if (init_work_queue(get_nr_nodes, trace_register_thread,
-			    trace_unregister_thread))
+	if (init_work_queue(get_nr_nodes))
 		return -1;
 
 	sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
diff --git a/sheep/trace/trace.c b/sheep/trace/trace.c
index e166f96..e0a91c8 100644
--- a/sheep/trace/trace.c
+++ b/sheep/trace/trace.c
@@ -22,20 +22,8 @@ static pthread_mutex_t trace_lock = PTHREAD_MUTEX_INITIALIZER;
 
 static trace_func_t trace_func = trace_call;
 
-static int total_nr_workers;
-
-static pthread_mutex_t suspend_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t suspend_cond = PTHREAD_COND_INITIALIZER;
-static int suspend_count;
-
 static struct strbuf *buffer;
 static int nr_cpu;
-static LIST_HEAD(worker_list);
-
-struct worker {
-	struct list_head list;
-	pthread_t id;
-};
 
 union instruction {
 	unsigned char start[INSN_SIZE];
@@ -45,14 +33,6 @@ union instruction {
 	} __attribute__((packed));
 };
 
-static notrace void suspend(int num)
-{
-	pthread_mutex_lock(&suspend_lock);
-	suspend_count--;
-	pthread_cond_wait(&suspend_cond, &suspend_lock);
-	pthread_mutex_unlock(&suspend_lock);
-}
-
 static inline int trace_hash(unsigned long ip)
 {
 	return hash_64(ip, TRACE_HASH_BITS);
@@ -169,36 +149,6 @@ notrace int register_trace_function(trace_func_t func)
 	return 0;
 }
 
-static notrace void suspend_worker_threads(void)
-{
-	struct worker *w;
-
-	pthread_mutex_lock(&trace_lock);
-	suspend_count = total_nr_workers;
-	list_for_each_entry(w, &worker_list, list) {
-		if (pthread_kill(w->id, SIGUSR2) != 0)
-			panic("%m");
-	}
-
-wait_for_worker_suspend:
-	/* Hold the lock, then all other worker can sleep on it */
-	pthread_mutex_lock(&suspend_lock);
-	if (suspend_count > 0) {
-		pthread_mutex_unlock(&suspend_lock);
-		pthread_yield();
-		goto wait_for_worker_suspend;
-	}
-	pthread_mutex_unlock(&suspend_lock);
-	pthread_mutex_unlock(&trace_lock);
-}
-
-static notrace void resume_worker_threads(void)
-{
-	pthread_mutex_lock(&suspend_lock);
-	pthread_cond_broadcast(&suspend_cond);
-	pthread_mutex_unlock(&suspend_lock);
-}
-
 static notrace void patch_all_sites(unsigned long addr)
 {
 	struct caller *ca;
@@ -248,16 +198,6 @@ notrace int trace_disable(void)
 	return SD_RES_SUCCESS;
 }
 
-int trace_init_signal(void)
-{
-	/* trace uses this signal to suspend the worker threads */
-	if (install_sighandler(SIGUSR2, suspend, false) < 0) {
-		sd_dprintf("%m");
-		return -1;
-	}
-	return 0;
-}
-
 notrace int trace_buffer_pop(void *buf, uint32_t len)
 {
 	int readin, count = 0, requested = len;
@@ -303,31 +243,3 @@ notrace int trace_init(void)
 	sd_iprintf("trace support enabled. cpu count %d.", nr_cpu);
 	return 0;
 }
-
-notrace void trace_register_thread(pthread_t id)
-{
-	struct worker *new = xmalloc(sizeof(*new));
-
-	new->id = id;
-
-	pthread_mutex_lock(&trace_lock);
-	list_add(&new->list, &worker_list);
-	total_nr_workers++;
-	pthread_mutex_unlock(&trace_lock);
-	sd_dprintf("nr %d, add pid %lx", total_nr_workers, id);
-}
-
-notrace void trace_unregister_thread(pthread_t id)
-{
-	struct worker *w, *tmp;
-
-	pthread_mutex_lock(&trace_lock);
-	list_for_each_entry_safe(w, tmp, &worker_list, list) {
-		if (w->id == id) {
-			list_del(&w->list);
-			total_nr_workers--;
-		}
-	}
-	pthread_mutex_unlock(&trace_lock);
-	sd_dprintf("nr %d, del pid %lx", total_nr_workers, id);
-}
diff --git a/sheep/trace/trace.h b/sheep/trace/trace.h
index 92f5166..977b48f 100644
--- a/sheep/trace/trace.h
+++ b/sheep/trace/trace.h
@@ -45,7 +45,6 @@ unsigned long trace_return_call(void);
 
 /* trace.c */
 #ifdef HAVE_TRACE
-  int trace_init_signal(void);
   int trace_init(void);
   int register_trace_function(trace_func_t func);
   int trace_enable(void);
@@ -53,19 +52,14 @@ unsigned long trace_return_call(void);
   struct caller *trace_lookup_ip(unsigned long ip, bool create);
   int trace_buffer_pop(void *buf, uint32_t len);
   void trace_buffer_push(int cpuid, struct trace_graph_item *item);
-  void trace_register_thread(pthread_t id);
-  void trace_unregister_thread(pthread_t id);
 
 #else
-  static inline int trace_init_signal(void) { return 0; }
   static inline int trace_init(void) { return 0; }
   static inline int trace_enable(void) { return 0; }
   static inline int trace_disable(void) { return 0; }
   static inline int trace_buffer_pop(void *buf, uint32_t len) { return 0; }
   static inline void trace_buffer_push(
 	  int cpuid, struct trace_graph_item *item) { return; }
-  static inline void trace_register_thread(pthread_t id) { return; }
-  static inline void trace_unregister_thread(pthread_t id) { return; }
 
 #endif /* HAVE_TRACE */
 
-- 
1.7.9.5




More information about the sheepdog mailing list