[sheepdog] [PATCH stable-0.7 2/9] lib: introduce new wrappers for pthread_mutex and pthread_cond
Hitoshi Mitake
mitake.hitoshi at lab.ntt.co.jp
Fri Jan 31 06:46:37 CET 2014
From: Hitoshi Mitake <mitake.hitoshi at gmail.com>
There is a possibility that pthread_mutex_* functions can return
undocumented error code, we need to wrap them. This patch introduces
wrapper functions and a new data structure for them.
In addition, pthread_cond_* things should also be wrapped. Because we
don't use pthread_mutex_t directly any more, we cannot use
pthread_cond_wait(). This patch also adds wrappers for them.
Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
[ fixed compile error - Yuan ]
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
Conflicts:
sheep/cluster/zookeeper.c
sheep/request.c
Conflicts were resolved by Hitoshi Mitake.
Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
include/util.h | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++
lib/work.c | 71 +++++++++++++++++------------------
sheep/group.c | 16 ++++----
sheep/request.c | 10 ++---
sheep/sheep_priv.h | 2 +-
sheep/trace/trace.c | 12 +++---
6 files changed, 159 insertions(+), 56 deletions(-)
diff --git a/include/util.h b/include/util.h
index 848b55c..101503f 100644
--- a/include/util.h
+++ b/include/util.h
@@ -259,6 +259,110 @@ static inline int refcount_dec(refcnt_t *rc)
return uatomic_sub_return(&rc->val, 1);
}
+/* wrapper for pthread_mutex */
+
+#define SD_MUTEX_INITIALIZER { .mutex = PTHREAD_MUTEX_INITIALIZER }
+
+struct sd_mutex {
+ pthread_mutex_t mutex;
+};
+
+static inline void sd_init_mutex(struct sd_mutex *mutex)
+{
+ int ret;
+
+ do {
+ ret = pthread_mutex_init(&mutex->mutex, NULL);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to initialize a lock, %s", strerror(ret));
+}
+
+static inline void sd_destroy_mutex(struct sd_mutex *mutex)
+{
+ int ret;
+
+ do {
+ ret = pthread_mutex_destroy(&mutex->mutex);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to destroy a lock, %s", strerror(ret));
+}
+
+static inline void sd_mutex_lock(struct sd_mutex *mutex)
+{
+ int ret;
+
+ do {
+ ret = pthread_mutex_lock(&mutex->mutex);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to lock for reading, %s", strerror(ret));
+}
+
+static inline void sd_mutex_unlock(struct sd_mutex *mutex)
+{
+ int ret;
+
+ do {
+ ret = pthread_mutex_unlock(&mutex->mutex);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to unlock, %s", strerror(ret));
+}
+
+/* wrapper for pthread_cond */
+
+#define SD_COND_INITIALIZER { .cond = PTHREAD_COND_INITIALIZER }
+
+struct sd_cond {
+ pthread_cond_t cond;
+};
+
+static inline void sd_cond_init(struct sd_cond *cond)
+{
+ int ret;
+
+ do {
+ ret = pthread_cond_init(&cond->cond, NULL);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to initialize a lock, %s", strerror(ret));
+
+}
+
+static inline void sd_destroy_cond(struct sd_cond *cond)
+{
+ int ret;
+
+ do {
+ ret = pthread_cond_destroy(&cond->cond);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to destroy a lock, %s", strerror(ret));
+}
+
+static inline int sd_cond_signal(struct sd_cond *cond)
+{
+ return pthread_cond_signal(&cond->cond);
+}
+
+static inline int sd_cond_wait(struct sd_cond *cond, struct sd_mutex *mutex)
+{
+ return pthread_cond_wait(&cond->cond, &mutex->mutex);
+}
+
+static inline int sd_cond_broadcast(struct sd_cond *cond)
+{
+ return pthread_cond_broadcast(&cond->cond);
+}
+
/* wrapper for pthread_rwlock */
#define SD_RW_LOCK_INITIALIZER { .rwlock = PTHREAD_RWLOCK_INITIALIZER }
diff --git a/lib/work.c b/lib/work.c
index 55f3399..e52144e 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -47,13 +47,13 @@ struct wq_info {
struct list_head finished_list;
struct list_head list;
- pthread_mutex_t finished_lock;
- pthread_mutex_t startup_lock;
+ struct sd_mutex finished_lock;
+ struct sd_mutex startup_lock;
/* wokers sleep on this and signaled by work producer */
- pthread_cond_t pending_cond;
+ struct sd_cond pending_cond;
/* locked by work producer and workers */
- pthread_mutex_t pending_lock;
+ struct sd_mutex pending_lock;
/* protected by pending_lock */
struct work_queue q;
size_t nr_threads;
@@ -79,7 +79,7 @@ static void *worker_routine(void *arg);
static size_t tid_max;
static unsigned long *tid_map;
-static pthread_mutex_t tid_map_lock = PTHREAD_MUTEX_INITIALIZER;
+static struct sd_mutex tid_map_lock = SD_MUTEX_INITIALIZER;
static int resume_efd;
static int ack_efd;
@@ -90,7 +90,7 @@ void suspend_worker_threads(void)
int tid;
list_for_each_entry(wi, &wq_info_list, list) {
- pthread_mutex_lock(&wi->pending_lock);
+ sd_mutex_lock(&wi->pending_lock);
}
FOR_EACH_BIT(tid, tid_map, tid_max) {
@@ -122,7 +122,7 @@ void resume_worker_threads(void)
eventfd_xread(ack_efd);
list_for_each_entry(wi, &wq_info_list, list) {
- pthread_mutex_unlock(&wi->pending_lock);
+ sd_mutex_unlock(&wi->pending_lock);
}
}
@@ -159,7 +159,7 @@ static int wq_trace_init(void)
static void trace_set_tid_map(int tid)
{
- pthread_mutex_lock(&tid_map_lock);
+ sd_mutex_lock(&tid_map_lock);
if (tid > tid_max) {
size_t old_tid_max = tid_max;
@@ -170,14 +170,14 @@ static void trace_set_tid_map(int tid)
tid_map = alloc_bitmap(tid_map, old_tid_max, tid_max);
}
set_bit(tid, tid_map);
- pthread_mutex_unlock(&tid_map_lock);
+ sd_mutex_unlock(&tid_map_lock);
}
static void trace_clear_tid_map(int tid)
{
- pthread_mutex_lock(&tid_map_lock);
+ sd_mutex_lock(&tid_map_lock);
clear_bit(tid, tid_map);
- pthread_mutex_unlock(&tid_map_lock);
+ sd_mutex_unlock(&tid_map_lock);
}
#else
@@ -248,18 +248,18 @@ static int create_worker_threads(struct wq_info *wi, size_t nr_threads)
pthread_t thread;
int ret;
- pthread_mutex_lock(&wi->startup_lock);
+ sd_mutex_lock(&wi->startup_lock);
while (wi->nr_threads < nr_threads) {
ret = pthread_create(&thread, NULL, worker_routine, wi);
if (ret != 0) {
sd_err("failed to create worker thread: %m");
- pthread_mutex_unlock(&wi->startup_lock);
+ sd_mutex_unlock(&wi->startup_lock);
return -1;
}
wi->nr_threads++;
sd_debug("create thread %s %zu", wi->name, wi->nr_threads);
}
- pthread_mutex_unlock(&wi->startup_lock);
+ sd_mutex_unlock(&wi->startup_lock);
return 0;
}
@@ -269,16 +269,16 @@ void queue_work(struct work_queue *q, struct work *work)
struct wq_info *wi = container_of(q, struct wq_info, q);
uatomic_inc(&wi->nr_queued_work);
- pthread_mutex_lock(&wi->pending_lock);
+ sd_mutex_lock(&wi->pending_lock);
if (wq_need_grow(wi))
/* double the thread pool size */
create_worker_threads(wi, wi->nr_threads * 2);
list_add_tail(&work->w_list, &wi->q.pending_list);
- pthread_mutex_unlock(&wi->pending_lock);
+ sd_mutex_unlock(&wi->pending_lock);
- pthread_cond_signal(&wi->pending_cond);
+ sd_cond_signal(&wi->pending_cond);
}
static void worker_thread_request_done(int fd, int events, void *data)
@@ -293,9 +293,9 @@ static void worker_thread_request_done(int fd, int events, void *data)
eventfd_xread(fd);
list_for_each_entry(wi, &wq_info_list, list) {
- pthread_mutex_lock(&wi->finished_lock);
+ sd_mutex_lock(&wi->finished_lock);
list_splice_init(&wi->finished_list, &list);
- pthread_mutex_unlock(&wi->finished_lock);
+ sd_mutex_unlock(&wi->finished_lock);
while (!list_empty(&list)) {
work = list_first_entry(&list, struct work, w_list);
@@ -315,19 +315,19 @@ static void *worker_routine(void *arg)
set_thread_name(wi->name, (wi->tc != WQ_ORDERED));
- pthread_mutex_lock(&wi->startup_lock);
+ sd_mutex_lock(&wi->startup_lock);
/* started this thread */
- pthread_mutex_unlock(&wi->startup_lock);
+ sd_mutex_unlock(&wi->startup_lock);
trace_set_tid_map(tid);
while (true) {
- pthread_mutex_lock(&wi->pending_lock);
+ sd_mutex_lock(&wi->pending_lock);
if (wq_need_shrink(wi)) {
wi->nr_threads--;
trace_clear_tid_map(tid);
- pthread_mutex_unlock(&wi->pending_lock);
+ sd_mutex_unlock(&wi->pending_lock);
pthread_detach(pthread_self());
sd_debug("destroy thread %s %d, %zu", wi->name, tid,
wi->nr_threads);
@@ -335,7 +335,7 @@ static void *worker_routine(void *arg)
}
retest:
if (list_empty(&wi->q.pending_list)) {
- pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
+ sd_cond_wait(&wi->pending_cond, &wi->pending_lock);
goto retest;
}
@@ -343,14 +343,14 @@ retest:
struct work, w_list);
list_del(&work->w_list);
- pthread_mutex_unlock(&wi->pending_lock);
+ sd_mutex_unlock(&wi->pending_lock);
if (work->fn)
work->fn(work);
- pthread_mutex_lock(&wi->finished_lock);
+ sd_mutex_lock(&wi->finished_lock);
list_add_tail(&work->w_list, &wi->finished_list);
- pthread_mutex_unlock(&wi->finished_lock);
+ sd_mutex_unlock(&wi->finished_lock);
eventfd_xwrite(efd, 1);
}
@@ -411,11 +411,11 @@ struct work_queue *create_work_queue(const char *name,
INIT_LIST_HEAD(&wi->q.pending_list);
INIT_LIST_HEAD(&wi->finished_list);
- pthread_cond_init(&wi->pending_cond, NULL);
+ sd_cond_init(&wi->pending_cond);
- pthread_mutex_init(&wi->finished_lock, NULL);
- pthread_mutex_init(&wi->pending_lock, NULL);
- pthread_mutex_init(&wi->startup_lock, NULL);
+ sd_init_mutex(&wi->finished_lock);
+ sd_init_mutex(&wi->pending_lock);
+ sd_init_mutex(&wi->startup_lock);
ret = create_worker_threads(wi, 1);
if (ret < 0)
@@ -425,11 +425,10 @@ struct work_queue *create_work_queue(const char *name,
return &wi->q;
destroy_threads:
- pthread_mutex_unlock(&wi->startup_lock);
- pthread_cond_destroy(&wi->pending_cond);
- pthread_mutex_destroy(&wi->pending_lock);
- pthread_mutex_destroy(&wi->startup_lock);
- pthread_mutex_destroy(&wi->finished_lock);
+ sd_mutex_unlock(&wi->startup_lock);
+ sd_destroy_cond(&wi->pending_cond);
+ sd_destroy_mutex(&wi->pending_lock);
+ sd_destroy_mutex(&wi->finished_lock);
return NULL;
}
diff --git a/sheep/group.c b/sheep/group.c
index a8bab4f..31c77e1 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -24,8 +24,8 @@ struct get_vdis_work {
struct sd_node members[];
};
-static pthread_mutex_t wait_vdis_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t wait_vdis_cond = PTHREAD_COND_INITIALIZER;
+static struct sd_mutex wait_vdis_lock = SD_MUTEX_INITIALIZER;
+static struct sd_cond wait_vdis_cond = SD_COND_INITIALIZER;
static refcnt_t nr_get_vdis_works;
static main_thread(struct vnode_info *) current_vnode_info;
@@ -499,10 +499,10 @@ static void get_vdis_done(struct work *work)
struct get_vdis_work *w =
container_of(work, struct get_vdis_work, work);
- pthread_mutex_lock(&wait_vdis_lock);
+ sd_mutex_lock(&wait_vdis_lock);
refcount_dec(&nr_get_vdis_works);
- pthread_cond_broadcast(&wait_vdis_cond);
- pthread_mutex_unlock(&wait_vdis_lock);
+ sd_cond_broadcast(&wait_vdis_cond);
+ sd_mutex_unlock(&wait_vdis_lock);
free(w);
}
@@ -596,10 +596,10 @@ void wait_get_vdis_done(void)
{
sd_debug("waiting for vdi list");
- pthread_mutex_lock(&wait_vdis_lock);
+ sd_mutex_lock(&wait_vdis_lock);
while (refcount_read(&nr_get_vdis_works) > 0)
- pthread_cond_wait(&wait_vdis_cond, &wait_vdis_lock);
- pthread_mutex_unlock(&wait_vdis_lock);
+ sd_cond_wait(&wait_vdis_cond, &wait_vdis_lock);
+ sd_mutex_unlock(&wait_vdis_lock);
sd_debug("vdi list ready");
}
diff --git a/sheep/request.c b/sheep/request.c
index 8ddc252..8758214 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -446,9 +446,9 @@ worker_fn int exec_local_req(struct sd_req *rq, void *data)
goto out;
}
- pthread_mutex_lock(&sys->local_req_lock);
+ sd_mutex_lock(&sys->local_req_lock);
list_add_tail(&req->request_list, &sys->local_req_queue);
- pthread_mutex_unlock(&sys->local_req_lock);
+ sd_mutex_unlock(&sys->local_req_lock);
eventfd_xwrite(sys->local_req_efd, 1);
eventfd_xread(req->local_req_efd);
@@ -882,9 +882,9 @@ static void local_req_handler(int listen_fd, int events, void *data)
eventfd_xread(listen_fd);
- pthread_mutex_lock(&sys->local_req_lock);
+ sd_mutex_lock(&sys->local_req_lock);
list_splice_init(&sys->local_req_queue, &pending_list);
- pthread_mutex_unlock(&sys->local_req_lock);
+ sd_mutex_unlock(&sys->local_req_lock);
list_for_each_entry_safe(req, t, &pending_list, request_list) {
list_del(&req->request_list);
@@ -894,7 +894,7 @@ static void local_req_handler(int listen_fd, int events, void *data)
void local_req_init(void)
{
- pthread_mutex_init(&sys->local_req_lock, NULL);
+ sd_init_mutex(&sys->local_req_lock);
sys->local_req_efd = eventfd(0, EFD_NONBLOCK);
if (sys->local_req_efd < 0)
panic("failed to init local req efd");
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 2f97196..c787f61 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -120,7 +120,7 @@ struct system_info {
int local_req_efd;
- pthread_mutex_t local_req_lock;
+ struct sd_mutex local_req_lock;
struct list_head local_req_queue;
struct list_head req_wait_queue;
int nr_outstanding_reqs;
diff --git a/sheep/trace/trace.c b/sheep/trace/trace.c
index 1bf845e..e70c77a 100644
--- a/sheep/trace/trace.c
+++ b/sheep/trace/trace.c
@@ -30,7 +30,7 @@ static struct caller *callers;
static size_t nr_callers;
static struct strbuf *buffer;
-static pthread_mutex_t *buffer_lock;
+static struct sd_mutex *buffer_lock;
static int nr_cpu;
static __thread bool in_trace;
@@ -258,9 +258,9 @@ int trace_buffer_pop(void *buf, uint32_t len)
int i;
for (i = 0; i < nr_cpu; i++) {
- pthread_mutex_lock(&buffer_lock[i]);
+ sd_mutex_lock(&buffer_lock[i]);
readin = strbuf_stripout(&buffer[i], buff, len);
- pthread_mutex_unlock(&buffer_lock[i]);
+ sd_mutex_unlock(&buffer_lock[i]);
count += readin;
if (count == requested)
return count;
@@ -276,9 +276,9 @@ int trace_buffer_pop(void *buf, uint32_t len)
void trace_buffer_push(int cpuid, struct trace_graph_item *item)
{
- pthread_mutex_lock(&buffer_lock[cpuid]);
+ sd_mutex_lock(&buffer_lock[cpuid]);
strbuf_add(&buffer[cpuid], item, sizeof(*item));
- pthread_mutex_unlock(&buffer_lock[cpuid]);
+ sd_mutex_unlock(&buffer_lock[cpuid]);
}
/* assume that mcount call exists in the first FIND_MCOUNT_RANGE bytes */
@@ -415,7 +415,7 @@ int trace_init(void)
buffer_lock = xzalloc(sizeof(*buffer_lock) * nr_cpu);
for (i = 0; i < nr_cpu; i++) {
strbuf_init(&buffer[i], 0);
- pthread_mutex_init(&buffer_lock[i], NULL);
+ sd_init_mutex(&buffer_lock[i]);
}
sd_info("trace support enabled. cpu count %d.", nr_cpu);
--
1.7.10.4
More information about the sheepdog
mailing list