[sheepdog] [PATCH 2/4] tracer: rework tracer to work with master

Liu Yuan namei.unix at gmail.com
Tue Apr 16 10:37:52 CEST 2013


From: Liu Yuan <tailai.ly at taobao.com>

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/trace/trace.c |  148 ++++++++++++++++++---------------------------------
 sheep/trace/trace.h |   10 ++--
 sheep/work.c        |    6 +--
 sheep/work.h        |    7 +--
 4 files changed, 62 insertions(+), 109 deletions(-)

diff --git a/sheep/trace/trace.c b/sheep/trace/trace.c
index 377c4f2..dc0af97 100644
--- a/sheep/trace/trace.c
+++ b/sheep/trace/trace.c
@@ -16,15 +16,12 @@
 #include <unistd.h>
 #include <pthread.h>
 #include <signal.h>
-#include <sys/eventfd.h>
 
 #include "trace.h"
 #include "logger.h"
 #include "list.h"
-#include "work.h"
 #include "sheepdog_proto.h"
 #include "strbuf.h"
-#include "event.h"
 
 #define TRACE_HASH_BITS       7
 #define TRACE_HASH_SIZE       (1 << TRACE_HASH_BITS)
@@ -35,9 +32,7 @@ static pthread_mutex_t trace_lock = PTHREAD_MUTEX_INITIALIZER;
 
 static trace_func_t trace_func = trace_call;
 
-static int trace_efd;
-static int nr_short_thread;
-static bool trace_in_patch;
+static int total_nr_workers;
 
 static pthread_mutex_t suspend_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t suspend_cond = PTHREAD_COND_INITIALIZER;
@@ -45,6 +40,12 @@ static int suspend_count;
 
 static struct strbuf *buffer;
 static int nr_cpu;
+static LIST_HEAD(worker_list);
+
+struct worker {
+	struct list_head list;
+	pthread_t id;
+};
 
 union instruction {
 	unsigned char start[INSN_SIZE];
@@ -180,17 +181,17 @@ notrace int register_trace_function(trace_func_t func)
 
 static notrace void suspend_worker_threads(void)
 {
-	struct worker_info *wi;
-	suspend_count = total_ordered_workers;
+	struct worker *w;
 
-	/* Hold the lock, then all other worker can sleep on it */
-	list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
-		if (wi->ordered &&
-		    pthread_kill(wi->worker_thread, SIGUSR2) != 0)
-			sd_dprintf("%m");
+	pthread_mutex_lock(&trace_lock);
+	suspend_count = total_nr_workers;
+	list_for_each_entry(w, &worker_list, list) {
+		if (pthread_kill(w->id, SIGUSR2) != 0)
+			panic("%m");
 	}
 
 wait_for_worker_suspend:
+	/* Hold the lock, then all other worker can sleep on it */
 	pthread_mutex_lock(&suspend_lock);
 	if (suspend_count > 0) {
 		pthread_mutex_unlock(&suspend_lock);
@@ -198,6 +199,7 @@ wait_for_worker_suspend:
 		goto wait_for_worker_suspend;
 	}
 	pthread_mutex_unlock(&suspend_lock);
+	pthread_mutex_unlock(&trace_lock);
 }
 
 static notrace void resume_worker_threads(void)
@@ -231,76 +233,27 @@ static notrace void nop_all_sites(void)
 	pthread_mutex_unlock(&trace_lock);
 }
 
-static inline bool short_thread_running(void)
-{
-	return !!nr_short_thread;
-}
-
-static notrace void enable_tracer(int fd, int events, void *data)
+notrace int trace_enable(void)
 {
-	eventfd_t value;
-	int ret;
-
-	ret = eventfd_read(trace_efd, &value);
-	/*
-	 * In error case we can't retry read in main thread, simply return and
-	 * expected to be waken up by epoll again.
-	 */
-	if (ret < 0) {
-		sd_eprintf("%m");
-		return;
+	if (trace_func == trace_call) {
+		sd_dprintf("no tracer available");
+		return SD_RES_NO_TAG;
 	}
 
-	if (short_thread_running())
-		return;
-
 	suspend_worker_threads();
 	patch_all_sites((unsigned long)trace_caller);
 	resume_worker_threads();
-	unregister_event(trace_efd);
-	trace_in_patch = false;
 	sd_dprintf("tracer enabled");
+
+	return SD_RES_SUCCESS;
 }
 
-static notrace void disable_tracer(int fd, int events, void *data)
+notrace int trace_disable(void)
 {
-	eventfd_t value;
-	int ret;
-
-	ret = eventfd_read(fd, &value);
-	if (ret < 0) {
-		sd_eprintf("%m");
-		return;
-	}
-
-	if (short_thread_running())
-		return;
-
 	suspend_worker_threads();
 	nop_all_sites();
 	resume_worker_threads();
-	unregister_event(trace_efd);
-	trace_in_patch = false;
 	sd_dprintf("tracer disabled");
-}
-
-notrace int trace_enable(void)
-{
-	if (trace_func == trace_call) {
-		sd_dprintf("no tracer available");
-		return SD_RES_NO_TAG;
-	}
-
-	register_event(trace_efd, enable_tracer, NULL);
-	trace_in_patch = true;
-
-	return SD_RES_SUCCESS;
-}
-
-notrace int trace_disable(void)
-{
-	register_event(trace_efd, disable_tracer, NULL);
-	trace_in_patch = true;
 
 	return SD_RES_SUCCESS;
 }
@@ -321,9 +274,6 @@ notrace int trace_buffer_pop(void *buf, uint32_t len)
 	char *buff = (char *)buf;
 	int i;
 
-	if (trace_in_patch)
-		return -1;
-
 	for (i = 0; i < nr_cpu; i++) {
 		readin = strbuf_stripout(&buffer[i], buff, len);
 		count += readin;
@@ -344,32 +294,10 @@ notrace void trace_buffer_push(int cpuid, struct trace_graph_item *item)
 	strbuf_add(&buffer[cpuid], item, sizeof(*item));
 }
 
-void short_thread_begin(void)
-{
-	nr_short_thread++;
-}
-
-void short_thread_end(void)
-{
-	eventfd_t value = 1;
-
-	nr_short_thread--;
-	if (trace_in_patch && nr_short_thread == 0)
-		eventfd_write(trace_efd, value);
-}
-
 notrace int trace_init(void)
 {
-	sigset_t block;
 	int i;
 
-	sigemptyset(&block);
-	sigaddset(&block, SIGUSR2);
-	if (pthread_sigmask(SIG_BLOCK, &block, NULL) != 0) {
-		sd_dprintf("%m");
-		return -1;
-	}
-
 	if (make_text_writable((unsigned long)mcount_call) < 0) {
 		sd_dprintf("%m");
 		return -1;
@@ -382,8 +310,34 @@ notrace int trace_init(void)
 	for (i = 0; i < nr_cpu; i++)
 		strbuf_init(&buffer[i], 0);
 
-	trace_efd = eventfd(0, EFD_NONBLOCK);
-
-	sd_dprintf("trace support enabled. cpu count %d.", nr_cpu);
+	sd_iprintf("trace support enabled. cpu count %d.", nr_cpu);
 	return 0;
 }
+
+notrace void trace_register_thread(pthread_t id)
+{
+	struct worker *new = xmalloc(sizeof(*new));
+
+	new->id = id;
+
+	pthread_mutex_lock(&trace_lock);
+	list_add(&new->list, &worker_list);
+	total_nr_workers++;
+	pthread_mutex_unlock(&trace_lock);
+	sd_dprintf("nr %d, add pid %lx", total_nr_workers, id);
+}
+
+notrace void trace_unregister_thread(pthread_t id)
+{
+	struct worker *w, *tmp;
+
+	pthread_mutex_lock(&trace_lock);
+	list_for_each_entry_safe(w, tmp, &worker_list, list) {
+		if (w->id == id) {
+			list_del(&w->list);
+			total_nr_workers--;
+		}
+	}
+	pthread_mutex_unlock(&trace_lock);
+	sd_dprintf("nr %d, del pid %lx", total_nr_workers, id);
+}
diff --git a/sheep/trace/trace.h b/sheep/trace/trace.h
index 3fe38f7..5243207 100644
--- a/sheep/trace/trace.h
+++ b/sheep/trace/trace.h
@@ -10,6 +10,7 @@
 #include "sheep.h"
 #include "list.h"
 #include "util.h"
+#include "../work.h"
 
 struct ipinfo {
 	const char *file;           /* Source code filename for EIP */
@@ -57,8 +58,9 @@ unsigned long trace_return_call(void);
   struct caller *trace_lookup_ip(unsigned long ip, bool create);
   int trace_buffer_pop(void *buf, uint32_t len);
   void trace_buffer_push(int cpuid, struct trace_graph_item *item);
-  void short_thread_begin(void);
-  void short_thread_end(void);
+  void trace_register_thread(pthread_t id);
+  void trace_unregister_thread(pthread_t id);
+
 #else
   static inline int trace_init_signal(void) { return 0; }
   static inline int trace_init(void) { return 0; }
@@ -67,8 +69,8 @@ unsigned long trace_return_call(void);
   static inline int trace_buffer_pop(void *buf, uint32_t len) { return 0; }
   static inline void trace_buffer_push(
 	  int cpuid, struct trace_graph_item *item) { return; }
-  static inline void short_thread_begin(void) { return; }
-  static inline void short_thread_end(void) { return; }
+  static inline void trace_register_thread(pthread_t id){ return; }
+  static inline void trace_unregister_thread(pthread_t id) { return; }
 
 #endif /* ENABLE_TRACE */
 
diff --git a/sheep/work.c b/sheep/work.c
index e71afbc..5dc319a 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -43,7 +43,6 @@
 #define WQ_PROTECTION_PERIOD 1000 /* ms */
 
 static int efd;
-int total_ordered_workers;
 LIST_HEAD(worker_info_list);
 
 static void *worker_routine(void *arg);
@@ -115,14 +114,14 @@ static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
 
 	pthread_mutex_lock(&wi->startup_lock);
 	while (wi->nr_threads < nr_threads) {
-		wi->nr_threads++;
 		ret = pthread_create(&thread, NULL, worker_routine, wi);
 		if (ret != 0) {
 			sd_eprintf("failed to create worker thread: %m");
-			wi->nr_threads--;
 			pthread_mutex_unlock(&wi->startup_lock);
 			return -1;
 		}
+		trace_register_thread(thread);
+		wi->nr_threads++;
 		sd_dprintf("create thread %s %zd", wi->name, wi->nr_threads);
 	}
 	pthread_mutex_unlock(&wi->startup_lock);
@@ -195,6 +194,7 @@ static void *worker_routine(void *arg)
 		if (wq_need_shrink(wi)) {
 			wi->nr_running--;
 			wi->nr_threads--;
+			trace_unregister_thread(pthread_self());
 			pthread_mutex_unlock(&wi->pending_lock);
 			pthread_detach(pthread_self());
 			sd_dprintf("destroy thread %s %d, %zd", wi->name,
diff --git a/sheep/work.h b/sheep/work.h
index 0366cb3..5706a84 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -28,10 +28,11 @@ enum wq_thread_control {
 struct worker_info {
 	const char *name;
 
+	struct list_head finished_list;
 	struct list_head worker_info_siblings;
 
 	pthread_mutex_t finished_lock;
-	struct list_head finished_list;
+	pthread_mutex_t startup_lock;
 
 	/* wokers sleep on this and signaled by tgtd */
 	pthread_cond_t pending_cond;
@@ -44,14 +45,10 @@ struct worker_info {
 	size_t nr_threads;
 	/* we cannot shrink work queue till this time */
 	uint64_t tm_end_of_protection;
-
-	pthread_mutex_t startup_lock;
-
 	enum wq_thread_control tc;
 };
 
 extern struct list_head worker_info_list;
-extern int total_ordered_workers;
 
 struct work_queue *init_work_queue(const char *name, enum wq_thread_control);
 struct work_queue *init_ordered_work_queue(const char *name);
-- 
1.7.9.5




More information about the sheepdog mailing list