[sheepdog] [PATCH stable-0.7 2/9] lib: introduce new wrappers for pthread_mutex and pthread_cond

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Fri Jan 31 06:46:37 CET 2014


From: Hitoshi Mitake <mitake.hitoshi at gmail.com>

There is a possibility that pthread_mutex_* functions can return
undocumented error code, we need to wrap them. This patch introduces
wrapper functions and a new data structure for them.

In addition, pthread_cond_* things should also be wrapped. Because we
don't use pthread_mutex_t directly any more, we cannot use
pthread_cond_wait(). This patch also adds wrappers for them.

Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
[ fixed compile error - Yuan ]
Signed-off-by: Liu Yuan <namei.unix at gmail.com>

Conflicts:
	sheep/cluster/zookeeper.c
	sheep/request.c

Conflicts were resolved by Hitoshi Mitake.
Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
 include/util.h      |  104 +++++++++++++++++++++++++++++++++++++++++++++++++++
 lib/work.c          |   71 +++++++++++++++++------------------
 sheep/group.c       |   16 ++++----
 sheep/request.c     |   10 ++---
 sheep/sheep_priv.h  |    2 +-
 sheep/trace/trace.c |   12 +++---
 6 files changed, 159 insertions(+), 56 deletions(-)

diff --git a/include/util.h b/include/util.h
index 848b55c..101503f 100644
--- a/include/util.h
+++ b/include/util.h
@@ -259,6 +259,110 @@ static inline int refcount_dec(refcnt_t *rc)
 	return uatomic_sub_return(&rc->val, 1);
 }
 
+/* wrapper for pthread_mutex */
+
+#define SD_MUTEX_INITIALIZER { .mutex = PTHREAD_MUTEX_INITIALIZER }
+
+struct sd_mutex {
+	pthread_mutex_t mutex;
+};
+
+static inline void sd_init_mutex(struct sd_mutex *mutex)
+{
+	int ret;
+
+	do {
+		ret = pthread_mutex_init(&mutex->mutex, NULL);
+	} while (ret == EAGAIN);
+
+	if (unlikely(ret != 0))
+		panic("failed to initialize a lock, %s", strerror(ret));
+}
+
+static inline void sd_destroy_mutex(struct sd_mutex *mutex)
+{
+	int ret;
+
+	do {
+		ret = pthread_mutex_destroy(&mutex->mutex);
+	} while (ret == EAGAIN);
+
+	if (unlikely(ret != 0))
+		panic("failed to destroy a lock, %s", strerror(ret));
+}
+
+static inline void sd_mutex_lock(struct sd_mutex *mutex)
+{
+	int ret;
+
+	do {
+		ret = pthread_mutex_lock(&mutex->mutex);
+	} while (ret == EAGAIN);
+
+	if (unlikely(ret != 0))
+		panic("failed to lock for reading, %s", strerror(ret));
+}
+
+static inline void sd_mutex_unlock(struct sd_mutex *mutex)
+{
+	int ret;
+
+	do {
+		ret = pthread_mutex_unlock(&mutex->mutex);
+	} while (ret == EAGAIN);
+
+	if (unlikely(ret != 0))
+		panic("failed to unlock, %s", strerror(ret));
+}
+
+/* wrapper for pthread_cond */
+
+#define SD_COND_INITIALIZER { .cond = PTHREAD_COND_INITIALIZER }
+
+struct sd_cond {
+	pthread_cond_t cond;
+};
+
+static inline void sd_cond_init(struct sd_cond *cond)
+{
+	int ret;
+
+	do {
+		ret = pthread_cond_init(&cond->cond, NULL);
+	} while (ret == EAGAIN);
+
+	if (unlikely(ret != 0))
+		panic("failed to initialize a lock, %s", strerror(ret));
+
+}
+
+static inline void sd_destroy_cond(struct sd_cond *cond)
+{
+	int ret;
+
+	do {
+		ret = pthread_cond_destroy(&cond->cond);
+	} while (ret == EAGAIN);
+
+	if (unlikely(ret != 0))
+		panic("failed to destroy a lock, %s", strerror(ret));
+}
+
+static inline int sd_cond_signal(struct sd_cond *cond)
+{
+	return pthread_cond_signal(&cond->cond);
+}
+
+static inline int sd_cond_wait(struct sd_cond *cond, struct sd_mutex *mutex)
+{
+	return pthread_cond_wait(&cond->cond, &mutex->mutex);
+}
+
+static inline int sd_cond_broadcast(struct sd_cond *cond)
+{
+	return pthread_cond_broadcast(&cond->cond);
+}
+
 /* wrapper for pthread_rwlock */
 
 #define SD_RW_LOCK_INITIALIZER { .rwlock = PTHREAD_RWLOCK_INITIALIZER }
diff --git a/lib/work.c b/lib/work.c
index 55f3399..e52144e 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -47,13 +47,13 @@ struct wq_info {
 	struct list_head finished_list;
 	struct list_head list;
 
-	pthread_mutex_t finished_lock;
-	pthread_mutex_t startup_lock;
+	struct sd_mutex finished_lock;
+	struct sd_mutex startup_lock;
 
 	/* wokers sleep on this and signaled by work producer */
-	pthread_cond_t pending_cond;
+	struct sd_cond pending_cond;
 	/* locked by work producer and workers */
-	pthread_mutex_t pending_lock;
+	struct sd_mutex pending_lock;
 	/* protected by pending_lock */
 	struct work_queue q;
 	size_t nr_threads;
@@ -79,7 +79,7 @@ static void *worker_routine(void *arg);
 
 static size_t tid_max;
 static unsigned long *tid_map;
-static pthread_mutex_t tid_map_lock = PTHREAD_MUTEX_INITIALIZER;
+static struct sd_mutex tid_map_lock = SD_MUTEX_INITIALIZER;
 
 static int resume_efd;
 static int ack_efd;
@@ -90,7 +90,7 @@ void suspend_worker_threads(void)
 	int tid;
 
 	list_for_each_entry(wi, &wq_info_list, list) {
-		pthread_mutex_lock(&wi->pending_lock);
+		sd_mutex_lock(&wi->pending_lock);
 	}
 
 	FOR_EACH_BIT(tid, tid_map, tid_max) {
@@ -122,7 +122,7 @@ void resume_worker_threads(void)
 		eventfd_xread(ack_efd);
 
 	list_for_each_entry(wi, &wq_info_list, list) {
-		pthread_mutex_unlock(&wi->pending_lock);
+		sd_mutex_unlock(&wi->pending_lock);
 	}
 }
 
@@ -159,7 +159,7 @@ static int wq_trace_init(void)
 
 static void trace_set_tid_map(int tid)
 {
-	pthread_mutex_lock(&tid_map_lock);
+	sd_mutex_lock(&tid_map_lock);
 	if (tid > tid_max) {
 		size_t old_tid_max = tid_max;
 
@@ -170,14 +170,14 @@ static void trace_set_tid_map(int tid)
 		tid_map = alloc_bitmap(tid_map, old_tid_max, tid_max);
 	}
 	set_bit(tid, tid_map);
-	pthread_mutex_unlock(&tid_map_lock);
+	sd_mutex_unlock(&tid_map_lock);
 }
 
 static void trace_clear_tid_map(int tid)
 {
-	pthread_mutex_lock(&tid_map_lock);
+	sd_mutex_lock(&tid_map_lock);
 	clear_bit(tid, tid_map);
-	pthread_mutex_unlock(&tid_map_lock);
+	sd_mutex_unlock(&tid_map_lock);
 }
 
 #else
@@ -248,18 +248,18 @@ static int create_worker_threads(struct wq_info *wi, size_t nr_threads)
 	pthread_t thread;
 	int ret;
 
-	pthread_mutex_lock(&wi->startup_lock);
+	sd_mutex_lock(&wi->startup_lock);
 	while (wi->nr_threads < nr_threads) {
 		ret = pthread_create(&thread, NULL, worker_routine, wi);
 		if (ret != 0) {
 			sd_err("failed to create worker thread: %m");
-			pthread_mutex_unlock(&wi->startup_lock);
+			sd_mutex_unlock(&wi->startup_lock);
 			return -1;
 		}
 		wi->nr_threads++;
 		sd_debug("create thread %s %zu", wi->name, wi->nr_threads);
 	}
-	pthread_mutex_unlock(&wi->startup_lock);
+	sd_mutex_unlock(&wi->startup_lock);
 
 	return 0;
 }
@@ -269,16 +269,16 @@ void queue_work(struct work_queue *q, struct work *work)
 	struct wq_info *wi = container_of(q, struct wq_info, q);
 
 	uatomic_inc(&wi->nr_queued_work);
-	pthread_mutex_lock(&wi->pending_lock);
+	sd_mutex_lock(&wi->pending_lock);
 
 	if (wq_need_grow(wi))
 		/* double the thread pool size */
 		create_worker_threads(wi, wi->nr_threads * 2);
 
 	list_add_tail(&work->w_list, &wi->q.pending_list);
-	pthread_mutex_unlock(&wi->pending_lock);
+	sd_mutex_unlock(&wi->pending_lock);
 
-	pthread_cond_signal(&wi->pending_cond);
+	sd_cond_signal(&wi->pending_cond);
 }
 
 static void worker_thread_request_done(int fd, int events, void *data)
@@ -293,9 +293,9 @@ static void worker_thread_request_done(int fd, int events, void *data)
 	eventfd_xread(fd);
 
 	list_for_each_entry(wi, &wq_info_list, list) {
-		pthread_mutex_lock(&wi->finished_lock);
+		sd_mutex_lock(&wi->finished_lock);
 		list_splice_init(&wi->finished_list, &list);
-		pthread_mutex_unlock(&wi->finished_lock);
+		sd_mutex_unlock(&wi->finished_lock);
 
 		while (!list_empty(&list)) {
 			work = list_first_entry(&list, struct work, w_list);
@@ -315,19 +315,19 @@ static void *worker_routine(void *arg)
 
 	set_thread_name(wi->name, (wi->tc != WQ_ORDERED));
 
-	pthread_mutex_lock(&wi->startup_lock);
+	sd_mutex_lock(&wi->startup_lock);
 	/* started this thread */
-	pthread_mutex_unlock(&wi->startup_lock);
+	sd_mutex_unlock(&wi->startup_lock);
 
 	trace_set_tid_map(tid);
 	while (true) {
 
-		pthread_mutex_lock(&wi->pending_lock);
+		sd_mutex_lock(&wi->pending_lock);
 		if (wq_need_shrink(wi)) {
 			wi->nr_threads--;
 
 			trace_clear_tid_map(tid);
-			pthread_mutex_unlock(&wi->pending_lock);
+			sd_mutex_unlock(&wi->pending_lock);
 			pthread_detach(pthread_self());
 			sd_debug("destroy thread %s %d, %zu", wi->name, tid,
 				 wi->nr_threads);
@@ -335,7 +335,7 @@ static void *worker_routine(void *arg)
 		}
 retest:
 		if (list_empty(&wi->q.pending_list)) {
-			pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
+			sd_cond_wait(&wi->pending_cond, &wi->pending_lock);
 			goto retest;
 		}
 
@@ -343,14 +343,14 @@ retest:
 				       struct work, w_list);
 
 		list_del(&work->w_list);
-		pthread_mutex_unlock(&wi->pending_lock);
+		sd_mutex_unlock(&wi->pending_lock);
 
 		if (work->fn)
 			work->fn(work);
 
-		pthread_mutex_lock(&wi->finished_lock);
+		sd_mutex_lock(&wi->finished_lock);
 		list_add_tail(&work->w_list, &wi->finished_list);
-		pthread_mutex_unlock(&wi->finished_lock);
+		sd_mutex_unlock(&wi->finished_lock);
 
 		eventfd_xwrite(efd, 1);
 	}
@@ -411,11 +411,11 @@ struct work_queue *create_work_queue(const char *name,
 	INIT_LIST_HEAD(&wi->q.pending_list);
 	INIT_LIST_HEAD(&wi->finished_list);
 
-	pthread_cond_init(&wi->pending_cond, NULL);
+	sd_cond_init(&wi->pending_cond);
 
-	pthread_mutex_init(&wi->finished_lock, NULL);
-	pthread_mutex_init(&wi->pending_lock, NULL);
-	pthread_mutex_init(&wi->startup_lock, NULL);
+	sd_init_mutex(&wi->finished_lock);
+	sd_init_mutex(&wi->pending_lock);
+	sd_init_mutex(&wi->startup_lock);
 
 	ret = create_worker_threads(wi, 1);
 	if (ret < 0)
@@ -425,11 +425,10 @@ struct work_queue *create_work_queue(const char *name,
 
 	return &wi->q;
 destroy_threads:
-	pthread_mutex_unlock(&wi->startup_lock);
-	pthread_cond_destroy(&wi->pending_cond);
-	pthread_mutex_destroy(&wi->pending_lock);
-	pthread_mutex_destroy(&wi->startup_lock);
-	pthread_mutex_destroy(&wi->finished_lock);
+	sd_mutex_unlock(&wi->startup_lock);
+	sd_destroy_cond(&wi->pending_cond);
+	sd_destroy_mutex(&wi->pending_lock);
+	sd_destroy_mutex(&wi->finished_lock);
 
 	return NULL;
 }
diff --git a/sheep/group.c b/sheep/group.c
index a8bab4f..31c77e1 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -24,8 +24,8 @@ struct get_vdis_work {
 	struct sd_node members[];
 };
 
-static pthread_mutex_t wait_vdis_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t wait_vdis_cond = PTHREAD_COND_INITIALIZER;
+static struct sd_mutex wait_vdis_lock = SD_MUTEX_INITIALIZER;
+static struct sd_cond wait_vdis_cond = SD_COND_INITIALIZER;
 static refcnt_t nr_get_vdis_works;
 
 static main_thread(struct vnode_info *) current_vnode_info;
@@ -499,10 +499,10 @@ static void get_vdis_done(struct work *work)
 	struct get_vdis_work *w =
 		container_of(work, struct get_vdis_work, work);
 
-	pthread_mutex_lock(&wait_vdis_lock);
+	sd_mutex_lock(&wait_vdis_lock);
 	refcount_dec(&nr_get_vdis_works);
-	pthread_cond_broadcast(&wait_vdis_cond);
-	pthread_mutex_unlock(&wait_vdis_lock);
+	sd_cond_broadcast(&wait_vdis_cond);
+	sd_mutex_unlock(&wait_vdis_lock);
 
 	free(w);
 }
@@ -596,10 +596,10 @@ void wait_get_vdis_done(void)
 {
 	sd_debug("waiting for vdi list");
 
-	pthread_mutex_lock(&wait_vdis_lock);
+	sd_mutex_lock(&wait_vdis_lock);
 	while (refcount_read(&nr_get_vdis_works) > 0)
-		pthread_cond_wait(&wait_vdis_cond, &wait_vdis_lock);
-	pthread_mutex_unlock(&wait_vdis_lock);
+		sd_cond_wait(&wait_vdis_cond, &wait_vdis_lock);
+	sd_mutex_unlock(&wait_vdis_lock);
 
 	sd_debug("vdi list ready");
 }
diff --git a/sheep/request.c b/sheep/request.c
index 8ddc252..8758214 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -446,9 +446,9 @@ worker_fn int exec_local_req(struct sd_req *rq, void *data)
 		goto out;
 	}
 
-	pthread_mutex_lock(&sys->local_req_lock);
+	sd_mutex_lock(&sys->local_req_lock);
 	list_add_tail(&req->request_list, &sys->local_req_queue);
-	pthread_mutex_unlock(&sys->local_req_lock);
+	sd_mutex_unlock(&sys->local_req_lock);
 
 	eventfd_xwrite(sys->local_req_efd, 1);
 	eventfd_xread(req->local_req_efd);
@@ -882,9 +882,9 @@ static void local_req_handler(int listen_fd, int events, void *data)
 
 	eventfd_xread(listen_fd);
 
-	pthread_mutex_lock(&sys->local_req_lock);
+	sd_mutex_lock(&sys->local_req_lock);
 	list_splice_init(&sys->local_req_queue, &pending_list);
-	pthread_mutex_unlock(&sys->local_req_lock);
+	sd_mutex_unlock(&sys->local_req_lock);
 
 	list_for_each_entry_safe(req, t, &pending_list, request_list) {
 		list_del(&req->request_list);
@@ -894,7 +894,7 @@ static void local_req_handler(int listen_fd, int events, void *data)
 
 void local_req_init(void)
 {
-	pthread_mutex_init(&sys->local_req_lock, NULL);
+	sd_init_mutex(&sys->local_req_lock);
 	sys->local_req_efd = eventfd(0, EFD_NONBLOCK);
 	if (sys->local_req_efd < 0)
 		panic("failed to init local req efd");
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 2f97196..c787f61 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -120,7 +120,7 @@ struct system_info {
 
 	int local_req_efd;
 
-	pthread_mutex_t local_req_lock;
+	struct sd_mutex local_req_lock;
 	struct list_head local_req_queue;
 	struct list_head req_wait_queue;
 	int nr_outstanding_reqs;
diff --git a/sheep/trace/trace.c b/sheep/trace/trace.c
index 1bf845e..e70c77a 100644
--- a/sheep/trace/trace.c
+++ b/sheep/trace/trace.c
@@ -30,7 +30,7 @@ static struct caller *callers;
 static size_t nr_callers;
 
 static struct strbuf *buffer;
-static pthread_mutex_t *buffer_lock;
+static struct sd_mutex *buffer_lock;
 static int nr_cpu;
 
 static __thread bool in_trace;
@@ -258,9 +258,9 @@ int trace_buffer_pop(void *buf, uint32_t len)
 	int i;
 
 	for (i = 0; i < nr_cpu; i++) {
-		pthread_mutex_lock(&buffer_lock[i]);
+		sd_mutex_lock(&buffer_lock[i]);
 		readin = strbuf_stripout(&buffer[i], buff, len);
-		pthread_mutex_unlock(&buffer_lock[i]);
+		sd_mutex_unlock(&buffer_lock[i]);
 		count += readin;
 		if (count == requested)
 			return count;
@@ -276,9 +276,9 @@ int trace_buffer_pop(void *buf, uint32_t len)
 
 void trace_buffer_push(int cpuid, struct trace_graph_item *item)
 {
-	pthread_mutex_lock(&buffer_lock[cpuid]);
+	sd_mutex_lock(&buffer_lock[cpuid]);
 	strbuf_add(&buffer[cpuid], item, sizeof(*item));
-	pthread_mutex_unlock(&buffer_lock[cpuid]);
+	sd_mutex_unlock(&buffer_lock[cpuid]);
 }
 
 /* assume that mcount call exists in the first FIND_MCOUNT_RANGE bytes */
@@ -415,7 +415,7 @@ int trace_init(void)
 	buffer_lock = xzalloc(sizeof(*buffer_lock) * nr_cpu);
 	for (i = 0; i < nr_cpu; i++) {
 		strbuf_init(&buffer[i], 0);
-		pthread_mutex_init(&buffer_lock[i], NULL);
+		sd_init_mutex(&buffer_lock[i]);
 	}
 
 	sd_info("trace support enabled. cpu count %d.", nr_cpu);
-- 
1.7.10.4




More information about the sheepdog mailing list