[sheepdog] [PATCH 2/4] tracer: rework tracer to work with master
Liu Yuan
namei.unix at gmail.com
Tue Apr 16 10:37:52 CEST 2013
From: Liu Yuan <tailai.ly at taobao.com>
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/trace/trace.c | 148 ++++++++++++++++++---------------------------------
sheep/trace/trace.h | 10 ++--
sheep/work.c | 6 +--
sheep/work.h | 7 +--
4 files changed, 62 insertions(+), 109 deletions(-)
diff --git a/sheep/trace/trace.c b/sheep/trace/trace.c
index 377c4f2..dc0af97 100644
--- a/sheep/trace/trace.c
+++ b/sheep/trace/trace.c
@@ -16,15 +16,12 @@
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
-#include <sys/eventfd.h>
#include "trace.h"
#include "logger.h"
#include "list.h"
-#include "work.h"
#include "sheepdog_proto.h"
#include "strbuf.h"
-#include "event.h"
#define TRACE_HASH_BITS 7
#define TRACE_HASH_SIZE (1 << TRACE_HASH_BITS)
@@ -35,9 +32,7 @@ static pthread_mutex_t trace_lock = PTHREAD_MUTEX_INITIALIZER;
static trace_func_t trace_func = trace_call;
-static int trace_efd;
-static int nr_short_thread;
-static bool trace_in_patch;
+static int total_nr_workers;
static pthread_mutex_t suspend_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t suspend_cond = PTHREAD_COND_INITIALIZER;
@@ -45,6 +40,12 @@ static int suspend_count;
static struct strbuf *buffer;
static int nr_cpu;
+static LIST_HEAD(worker_list);
+
+struct worker {
+ struct list_head list;
+ pthread_t id;
+};
union instruction {
unsigned char start[INSN_SIZE];
@@ -180,17 +181,17 @@ notrace int register_trace_function(trace_func_t func)
static notrace void suspend_worker_threads(void)
{
- struct worker_info *wi;
- suspend_count = total_ordered_workers;
+ struct worker *w;
- /* Hold the lock, then all other worker can sleep on it */
- list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
- if (wi->ordered &&
- pthread_kill(wi->worker_thread, SIGUSR2) != 0)
- sd_dprintf("%m");
+ pthread_mutex_lock(&trace_lock);
+ suspend_count = total_nr_workers;
+ list_for_each_entry(w, &worker_list, list) {
+ if (pthread_kill(w->id, SIGUSR2) != 0)
+ panic("%m");
}
wait_for_worker_suspend:
+ /* Hold the lock, then all other worker can sleep on it */
pthread_mutex_lock(&suspend_lock);
if (suspend_count > 0) {
pthread_mutex_unlock(&suspend_lock);
@@ -198,6 +199,7 @@ wait_for_worker_suspend:
goto wait_for_worker_suspend;
}
pthread_mutex_unlock(&suspend_lock);
+ pthread_mutex_unlock(&trace_lock);
}
static notrace void resume_worker_threads(void)
@@ -231,76 +233,27 @@ static notrace void nop_all_sites(void)
pthread_mutex_unlock(&trace_lock);
}
-static inline bool short_thread_running(void)
-{
- return !!nr_short_thread;
-}
-
-static notrace void enable_tracer(int fd, int events, void *data)
+notrace int trace_enable(void)
{
- eventfd_t value;
- int ret;
-
- ret = eventfd_read(trace_efd, &value);
- /*
- * In error case we can't retry read in main thread, simply return and
- * expected to be waken up by epoll again.
- */
- if (ret < 0) {
- sd_eprintf("%m");
- return;
+ if (trace_func == trace_call) {
+ sd_dprintf("no tracer available");
+ return SD_RES_NO_TAG;
}
- if (short_thread_running())
- return;
-
suspend_worker_threads();
patch_all_sites((unsigned long)trace_caller);
resume_worker_threads();
- unregister_event(trace_efd);
- trace_in_patch = false;
sd_dprintf("tracer enabled");
+
+ return SD_RES_SUCCESS;
}
-static notrace void disable_tracer(int fd, int events, void *data)
+notrace int trace_disable(void)
{
- eventfd_t value;
- int ret;
-
- ret = eventfd_read(fd, &value);
- if (ret < 0) {
- sd_eprintf("%m");
- return;
- }
-
- if (short_thread_running())
- return;
-
suspend_worker_threads();
nop_all_sites();
resume_worker_threads();
- unregister_event(trace_efd);
- trace_in_patch = false;
sd_dprintf("tracer disabled");
-}
-
-notrace int trace_enable(void)
-{
- if (trace_func == trace_call) {
- sd_dprintf("no tracer available");
- return SD_RES_NO_TAG;
- }
-
- register_event(trace_efd, enable_tracer, NULL);
- trace_in_patch = true;
-
- return SD_RES_SUCCESS;
-}
-
-notrace int trace_disable(void)
-{
- register_event(trace_efd, disable_tracer, NULL);
- trace_in_patch = true;
return SD_RES_SUCCESS;
}
@@ -321,9 +274,6 @@ notrace int trace_buffer_pop(void *buf, uint32_t len)
char *buff = (char *)buf;
int i;
- if (trace_in_patch)
- return -1;
-
for (i = 0; i < nr_cpu; i++) {
readin = strbuf_stripout(&buffer[i], buff, len);
count += readin;
@@ -344,32 +294,10 @@ notrace void trace_buffer_push(int cpuid, struct trace_graph_item *item)
strbuf_add(&buffer[cpuid], item, sizeof(*item));
}
-void short_thread_begin(void)
-{
- nr_short_thread++;
-}
-
-void short_thread_end(void)
-{
- eventfd_t value = 1;
-
- nr_short_thread--;
- if (trace_in_patch && nr_short_thread == 0)
- eventfd_write(trace_efd, value);
-}
-
notrace int trace_init(void)
{
- sigset_t block;
int i;
- sigemptyset(&block);
- sigaddset(&block, SIGUSR2);
- if (pthread_sigmask(SIG_BLOCK, &block, NULL) != 0) {
- sd_dprintf("%m");
- return -1;
- }
-
if (make_text_writable((unsigned long)mcount_call) < 0) {
sd_dprintf("%m");
return -1;
@@ -382,8 +310,34 @@ notrace int trace_init(void)
for (i = 0; i < nr_cpu; i++)
strbuf_init(&buffer[i], 0);
- trace_efd = eventfd(0, EFD_NONBLOCK);
-
- sd_dprintf("trace support enabled. cpu count %d.", nr_cpu);
+ sd_iprintf("trace support enabled. cpu count %d.", nr_cpu);
return 0;
}
+
+notrace void trace_register_thread(pthread_t id)
+{
+ struct worker *new = xmalloc(sizeof(*new));
+
+ new->id = id;
+
+ pthread_mutex_lock(&trace_lock);
+ list_add(&new->list, &worker_list);
+ total_nr_workers++;
+ pthread_mutex_unlock(&trace_lock);
+ sd_dprintf("nr %d, add pid %lx", total_nr_workers, id);
+}
+
+notrace void trace_unregister_thread(pthread_t id)
+{
+ struct worker *w, *tmp;
+
+ pthread_mutex_lock(&trace_lock);
+ list_for_each_entry_safe(w, tmp, &worker_list, list) {
+ if (w->id == id) {
+ list_del(&w->list);
+ total_nr_workers--;
+ }
+ }
+ pthread_mutex_unlock(&trace_lock);
+ sd_dprintf("nr %d, del pid %lx", total_nr_workers, id);
+}
diff --git a/sheep/trace/trace.h b/sheep/trace/trace.h
index 3fe38f7..5243207 100644
--- a/sheep/trace/trace.h
+++ b/sheep/trace/trace.h
@@ -10,6 +10,7 @@
#include "sheep.h"
#include "list.h"
#include "util.h"
+#include "../work.h"
struct ipinfo {
const char *file; /* Source code filename for EIP */
@@ -57,8 +58,9 @@ unsigned long trace_return_call(void);
struct caller *trace_lookup_ip(unsigned long ip, bool create);
int trace_buffer_pop(void *buf, uint32_t len);
void trace_buffer_push(int cpuid, struct trace_graph_item *item);
- void short_thread_begin(void);
- void short_thread_end(void);
+ void trace_register_thread(pthread_t id);
+ void trace_unregister_thread(pthread_t id);
+
#else
static inline int trace_init_signal(void) { return 0; }
static inline int trace_init(void) { return 0; }
@@ -67,8 +69,8 @@ unsigned long trace_return_call(void);
static inline int trace_buffer_pop(void *buf, uint32_t len) { return 0; }
static inline void trace_buffer_push(
int cpuid, struct trace_graph_item *item) { return; }
- static inline void short_thread_begin(void) { return; }
- static inline void short_thread_end(void) { return; }
+ static inline void trace_register_thread(pthread_t id){ return; }
+ static inline void trace_unregister_thread(pthread_t id) { return; }
#endif /* ENABLE_TRACE */
diff --git a/sheep/work.c b/sheep/work.c
index e71afbc..5dc319a 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -43,7 +43,6 @@
#define WQ_PROTECTION_PERIOD 1000 /* ms */
static int efd;
-int total_ordered_workers;
LIST_HEAD(worker_info_list);
static void *worker_routine(void *arg);
@@ -115,14 +114,14 @@ static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
pthread_mutex_lock(&wi->startup_lock);
while (wi->nr_threads < nr_threads) {
- wi->nr_threads++;
ret = pthread_create(&thread, NULL, worker_routine, wi);
if (ret != 0) {
sd_eprintf("failed to create worker thread: %m");
- wi->nr_threads--;
pthread_mutex_unlock(&wi->startup_lock);
return -1;
}
+ trace_register_thread(thread);
+ wi->nr_threads++;
sd_dprintf("create thread %s %zd", wi->name, wi->nr_threads);
}
pthread_mutex_unlock(&wi->startup_lock);
@@ -195,6 +194,7 @@ static void *worker_routine(void *arg)
if (wq_need_shrink(wi)) {
wi->nr_running--;
wi->nr_threads--;
+ trace_unregister_thread(pthread_self());
pthread_mutex_unlock(&wi->pending_lock);
pthread_detach(pthread_self());
sd_dprintf("destroy thread %s %d, %zd", wi->name,
diff --git a/sheep/work.h b/sheep/work.h
index 0366cb3..5706a84 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -28,10 +28,11 @@ enum wq_thread_control {
struct worker_info {
const char *name;
+ struct list_head finished_list;
struct list_head worker_info_siblings;
pthread_mutex_t finished_lock;
- struct list_head finished_list;
+ pthread_mutex_t startup_lock;
/* wokers sleep on this and signaled by tgtd */
pthread_cond_t pending_cond;
@@ -44,14 +45,10 @@ struct worker_info {
size_t nr_threads;
/* we cannot shrink work queue till this time */
uint64_t tm_end_of_protection;
-
- pthread_mutex_t startup_lock;
-
enum wq_thread_control tc;
};
extern struct list_head worker_info_list;
-extern int total_ordered_workers;
struct work_queue *init_work_queue(const char *name, enum wq_thread_control);
struct work_queue *init_ordered_work_queue(const char *name);
--
1.7.9.5
More information about the sheepdog
mailing list