[sheepdog] [PATCH 2/2] lib: introduce new wrappers for pthread_mutex and pthread_cond
Hitoshi Mitake
mitake.hitoshi at gmail.com
Wed Dec 25 14:34:39 CET 2013
There is a possibility that pthread_mutex_* functions can return
undocumented error code, we need to wrap them. This patch introduces
wrapper functions and a new data structure for them.
In addition, pthread_cond_* things should also be wrapped. Because we
don't use pthread_mutex_t directly any more, we cannot use
pthread_cond_wait(). This patch also adds wrappers for them.
Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
include/util.h | 104 ++++++++++++++++++++++++++++++++++++++++++++++
lib/work.c | 71 ++++++++++++++++---------------
sheep/cluster/zookeeper.c | 38 ++++++++---------
sheep/group.c | 16 +++----
sheep/request.c | 10 ++---
sheep/sheep_priv.h | 2 +-
sheep/trace/trace.c | 12 +++---
7 files changed, 178 insertions(+), 75 deletions(-)
diff --git a/include/util.h b/include/util.h
index 4385fd1..12270ec 100644
--- a/include/util.h
+++ b/include/util.h
@@ -268,6 +268,110 @@ static inline int refcount_dec(refcnt_t *rc)
return uatomic_sub_return(&rc->val, 1);
}
+/* wrapper for pthread_mutex */
+
+#define SD_MUTEX_INITIALIZER { .mutex = PTHREAD_MUTEX_INITIALIZER }
+
+struct sd_mutex {
+ pthread_mutex_t mutex;
+};
+
+static inline void sd_init_mutex(struct sd_mutex *mutex)
+{
+ int ret;
+
+ do {
+ ret = pthread_mutex_init(&mutex->mutex, NULL);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to initialize a lock, %s", strerror(ret));
+}
+
+static inline void sd_destroy_mutex(struct sd_mutex *mutex)
+{
+ int ret;
+
+ do {
+ ret = pthread_mutex_destroy(&mutex->mutex);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to destroy a lock, %s", strerror(ret));
+}
+
+static inline void sd_mutex_lock(struct sd_mutex *mutex)
+{
+ int ret;
+
+ do {
+ ret = pthread_mutex_lock(&mutex->mutex);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to lock for reading, %s", strerror(ret));
+}
+
+static inline void sd_mutex_unlock(struct sd_mutex *mutex)
+{
+ int ret;
+
+ do {
+ ret = pthread_mutex_unlock(&mutex->mutex);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to unlock, %s", strerror(ret));
+}
+
+/* wrapper for pthread_cond */
+
+#define SD_COND_INITIALIZER { .cond = PTHREAD_COND_INITIALIZER }
+
+struct sd_cond {
+ pthread_cond_t cond;
+};
+
+static inline void sd_cond_init(struct sd_cond *cond)
+{
+ int ret;
+
+ do {
+ ret = pthread_cond_init(&cond->cond, NULL);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to initialize a lock, %s", strerror(ret));
+
+}
+
+static inline void sd_destroy_cond(struct sd_cond *cond)
+{
+ int ret;
+
+ do {
+ ret = pthread_cond_destroy(&cond->cond);
+ } while (ret == EAGAIN);
+
+ if (unlikely(ret != 0))
+ panic("failed to destroy a lock, %s", strerror(ret));
+}
+
+static inline int sd_cond_signal(struct sd_cond *cond)
+{
+ return pthread_cond_signal(&cond->cond);
+}
+
+static inline int sd_cond_wait(struct sd_cond *cond, struct sd_mutex *mutex)
+{
+ return pthread_cond_wait(&cond->cond, &mutex->mutex);
+}
+
+static inline int sd_cond_broadcast(struct sd_cond *cond)
+{
+ return pthread_cond_broadcast(&cond->cond);
+}
+
/* wrapper for pthread_rwlock */
#define SD_RW_LOCK_INITIALIZER { .rwlock = PTHREAD_RWLOCK_INITIALIZER }
diff --git a/lib/work.c b/lib/work.c
index 62f5d6e..510319f 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -46,13 +46,13 @@ struct wq_info {
struct list_head finished_list;
struct list_node list;
- pthread_mutex_t finished_lock;
- pthread_mutex_t startup_lock;
+ struct sd_mutex finished_lock;
+ struct sd_mutex startup_lock;
/* wokers sleep on this and signaled by work producer */
- pthread_cond_t pending_cond;
+ struct sd_cond pending_cond;
/* locked by work producer and workers */
- pthread_mutex_t pending_lock;
+ struct sd_mutex pending_lock;
/* protected by pending_lock */
struct work_queue q;
size_t nr_threads;
@@ -78,7 +78,7 @@ static void *worker_routine(void *arg);
static size_t tid_max;
static unsigned long *tid_map;
-static pthread_mutex_t tid_map_lock = PTHREAD_MUTEX_INITIALIZER;
+static struct sd_mutex tid_map_lock = SD_MUTEX_INITIALIZER;
static int resume_efd;
static int ack_efd;
@@ -89,7 +89,7 @@ void suspend_worker_threads(void)
int tid;
list_for_each_entry(wi, &wq_info_list, list) {
- pthread_mutex_lock(&wi->pending_lock);
+ sd_mutex_lock(&wi->pending_lock);
}
FOR_EACH_BIT(tid, tid_map, tid_max) {
@@ -121,7 +121,7 @@ void resume_worker_threads(void)
eventfd_xread(ack_efd);
list_for_each_entry(wi, &wq_info_list, list) {
- pthread_mutex_unlock(&wi->pending_lock);
+ sd_mutex_unlock(&wi->pending_lock);
}
}
@@ -158,7 +158,7 @@ static int wq_trace_init(void)
static void trace_set_tid_map(int tid)
{
- pthread_mutex_lock(&tid_map_lock);
+ sd_mutex_lock(&tid_map_lock);
if (tid > tid_max) {
size_t old_tid_max = tid_max;
@@ -169,14 +169,14 @@ static void trace_set_tid_map(int tid)
tid_map = alloc_bitmap(tid_map, old_tid_max, tid_max);
}
set_bit(tid, tid_map);
- pthread_mutex_unlock(&tid_map_lock);
+ sd_mutex_unlock(&tid_map_lock);
}
static void trace_clear_tid_map(int tid)
{
- pthread_mutex_lock(&tid_map_lock);
+ sd_mutex_lock(&tid_map_lock);
clear_bit(tid, tid_map);
- pthread_mutex_unlock(&tid_map_lock);
+ sd_mutex_unlock(&tid_map_lock);
}
#else
@@ -247,18 +247,18 @@ static int create_worker_threads(struct wq_info *wi, size_t nr_threads)
pthread_t thread;
int ret;
- pthread_mutex_lock(&wi->startup_lock);
+ sd_mutex_lock(&wi->startup_lock);
while (wi->nr_threads < nr_threads) {
ret = pthread_create(&thread, NULL, worker_routine, wi);
if (ret != 0) {
sd_err("failed to create worker thread: %m");
- pthread_mutex_unlock(&wi->startup_lock);
+ sd_mutex_unlock(&wi->startup_lock);
return -1;
}
wi->nr_threads++;
sd_debug("create thread %s %zu", wi->name, wi->nr_threads);
}
- pthread_mutex_unlock(&wi->startup_lock);
+ sd_mutex_unlock(&wi->startup_lock);
return 0;
}
@@ -268,16 +268,16 @@ void queue_work(struct work_queue *q, struct work *work)
struct wq_info *wi = container_of(q, struct wq_info, q);
uatomic_inc(&wi->nr_queued_work);
- pthread_mutex_lock(&wi->pending_lock);
+ sd_mutex_lock(&wi->pending_lock);
if (wq_need_grow(wi))
/* double the thread pool size */
create_worker_threads(wi, wi->nr_threads * 2);
list_add_tail(&work->w_list, &wi->q.pending_list);
- pthread_mutex_unlock(&wi->pending_lock);
+ sd_mutex_unlock(&wi->pending_lock);
- pthread_cond_signal(&wi->pending_cond);
+ sd_cond_signal(&wi->pending_cond);
}
static void worker_thread_request_done(int fd, int events, void *data)
@@ -292,9 +292,9 @@ static void worker_thread_request_done(int fd, int events, void *data)
eventfd_xread(fd);
list_for_each_entry(wi, &wq_info_list, list) {
- pthread_mutex_lock(&wi->finished_lock);
+ sd_mutex_lock(&wi->finished_lock);
list_splice_init(&wi->finished_list, &list);
- pthread_mutex_unlock(&wi->finished_lock);
+ sd_mutex_unlock(&wi->finished_lock);
while (!list_empty(&list)) {
work = list_first_entry(&list, struct work, w_list);
@@ -314,19 +314,19 @@ static void *worker_routine(void *arg)
set_thread_name(wi->name, (wi->tc != WQ_ORDERED));
- pthread_mutex_lock(&wi->startup_lock);
+ sd_mutex_lock(&wi->startup_lock);
/* started this thread */
- pthread_mutex_unlock(&wi->startup_lock);
+ sd_mutex_unlock(&wi->startup_lock);
trace_set_tid_map(tid);
while (true) {
- pthread_mutex_lock(&wi->pending_lock);
+ sd_mutex_lock(&wi->pending_lock);
if (wq_need_shrink(wi)) {
wi->nr_threads--;
trace_clear_tid_map(tid);
- pthread_mutex_unlock(&wi->pending_lock);
+ sd_mutex_unlock(&wi->pending_lock);
pthread_detach(pthread_self());
sd_debug("destroy thread %s %d, %zu", wi->name, tid,
wi->nr_threads);
@@ -334,7 +334,7 @@ static void *worker_routine(void *arg)
}
retest:
if (list_empty(&wi->q.pending_list)) {
- pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
+ sd_cond_wait(&wi->pending_cond, &wi->pending_lock);
goto retest;
}
@@ -342,14 +342,14 @@ retest:
struct work, w_list);
list_del(&work->w_list);
- pthread_mutex_unlock(&wi->pending_lock);
+ sd_mutex_unlock(&wi->pending_lock);
if (work->fn)
work->fn(work);
- pthread_mutex_lock(&wi->finished_lock);
+ sd_mutex_lock(&wi->finished_lock);
list_add_tail(&work->w_list, &wi->finished_list);
- pthread_mutex_unlock(&wi->finished_lock);
+ sd_mutex_unlock(&wi->finished_lock);
eventfd_xwrite(efd, 1);
}
@@ -410,11 +410,11 @@ struct work_queue *create_work_queue(const char *name,
INIT_LIST_HEAD(&wi->q.pending_list);
INIT_LIST_HEAD(&wi->finished_list);
- pthread_cond_init(&wi->pending_cond, NULL);
+ sd_cond_init(&wi->pending_cond);
- pthread_mutex_init(&wi->finished_lock, NULL);
- pthread_mutex_init(&wi->pending_lock, NULL);
- pthread_mutex_init(&wi->startup_lock, NULL);
+ sd_init_mutex(&wi->finished_lock);
+ sd_init_mutex(&wi->pending_lock);
+ sd_init_mutex(&wi->startup_lock);
ret = create_worker_threads(wi, 1);
if (ret < 0)
@@ -424,11 +424,10 @@ struct work_queue *create_work_queue(const char *name,
return &wi->q;
destroy_threads:
- pthread_mutex_unlock(&wi->startup_lock);
- pthread_cond_destroy(&wi->pending_cond);
- pthread_mutex_destroy(&wi->pending_lock);
- pthread_mutex_destroy(&wi->startup_lock);
- pthread_mutex_destroy(&wi->finished_lock);
+ sd_mutex_unlock(&wi->startup_lock);
+ sd_destroy_cond(&wi->pending_cond);
+ sd_destroy_mutex(&wi->pending_lock);
+ sd_destroy_mutex(&wi->finished_lock);
return NULL;
}
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 2867e78..9e71b51 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -40,9 +40,9 @@ struct cluster_lock {
/* referenced by different threads in one sheepdog daemon */
uint64_t ref;
/* wait for the release of id by other lock owner */
- pthread_mutex_t wait_wakeup;
+ struct sd_mutex wait_wakeup;
/* lock for different threads of the same node on the same id */
- pthread_mutex_t id_lock;
+ struct sd_mutex id_lock;
char lock_path[MAX_NODE_STR_LEN];
};
@@ -50,7 +50,7 @@ struct cluster_lock {
#define HASH_BUCKET_NR 1021
static struct hlist_head *cluster_locks_table;
-static pthread_mutex_t table_locks[HASH_BUCKET_NR];
+static struct sd_mutex table_locks[HASH_BUCKET_NR];
/*
* Wait a while when create, delete or get_children fail on
@@ -309,15 +309,15 @@ static int lock_table_lookup_wakeup(uint64_t lock_id)
struct hlist_node *iter;
struct cluster_lock *lock;
- pthread_mutex_lock(table_locks + hval);
+ sd_mutex_lock(table_locks + hval);
hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
if (lock->id == lock_id) {
- pthread_mutex_unlock(&lock->wait_wakeup);
+ sd_mutex_unlock(&lock->wait_wakeup);
res = 0;
break;
}
}
- pthread_mutex_unlock(table_locks + hval);
+ sd_mutex_unlock(table_locks + hval);
return res;
}
@@ -329,7 +329,7 @@ static struct cluster_lock *lock_table_lookup_acquire(uint64_t lock_id)
struct cluster_lock *lock, *ret_lock = NULL;
char path[MAX_NODE_STR_LEN];
- pthread_mutex_lock(table_locks + hval);
+ sd_mutex_lock(table_locks + hval);
hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
if (lock->id == lock_id) {
ret_lock = lock;
@@ -349,24 +349,24 @@ static struct cluster_lock *lock_table_lookup_acquire(uint64_t lock_id)
if (rc)
panic("Failed to init node %s", path);
- rc = pthread_mutex_init(&ret_lock->wait_wakeup, NULL);
+ rc = sd_init_mutex(&ret_lock->wait_wakeup);
if (rc)
panic("failed to init cluster_lock->wait_wakeup");
- rc = pthread_mutex_init(&ret_lock->id_lock, NULL);
+ rc = sd_init_mutex(&ret_lock->id_lock);
if (rc)
panic("failed to init cluster_lock->id_lock");
hlist_add_head(&(ret_lock->hnode), cluster_locks_table + hval);
}
- pthread_mutex_unlock(table_locks + hval);
+ sd_mutex_unlock(table_locks + hval);
/*
* if many threads use locks with same id, we should use
* ->id_lock to avoid the only zookeeper handler to
* create many seq-ephemeral files.
*/
- pthread_mutex_lock(&ret_lock->id_lock);
+ sd_mutex_lock(&ret_lock->id_lock);
return ret_lock;
}
@@ -378,7 +378,7 @@ static void lock_table_lookup_release(uint64_t lock_id)
struct cluster_lock *lock;
char path[MAX_NODE_STR_LEN];
- pthread_mutex_lock(table_locks + hval);
+ sd_mutex_lock(table_locks + hval);
hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
if (lock->id != lock_id)
continue;
@@ -391,13 +391,13 @@ static void lock_table_lookup_release(uint64_t lock_id)
zk_wait();
}
lock->lock_path[0] = '\0';
- pthread_mutex_unlock(&lock->id_lock);
+ sd_mutex_unlock(&lock->id_lock);
lock->ref--;
if (!lock->ref) {
hlist_del(iter);
/* free all resource used by this lock */
- pthread_mutex_destroy(&lock->id_lock);
- pthread_mutex_destroy(&lock->wait_wakeup);
+ sd_mutex_destroy(&lock->id_lock);
+ sd_mutex_destroy(&lock->wait_wakeup);
snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu",
lock->id);
/*
@@ -413,7 +413,7 @@ static void lock_table_lookup_release(uint64_t lock_id)
}
break;
}
- pthread_mutex_unlock(table_locks + hval);
+ sd_mutex_unlock(table_locks + hval);
}
/* ZooKeeper-based queue give us an totally ordered events */
@@ -1230,7 +1230,7 @@ kick_block_event:
* This operation will create a seq-ephemeral znode in lock directory
* of zookeeper (use lock-id as dir name). The smallest file path in
* this directory wil be the owner of the lock; the other threads will
- * wait on a pthread_mutex_t (cluster_lock->wait_wakeup)
+ * wait on a struct sd_mutex (cluster_lock->wait_wakeup)
*/
static void zk_lock(uint64_t lock_id)
{
@@ -1275,7 +1275,7 @@ static void zk_lock(uint64_t lock_id)
rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
if (rc == ZOK) {
sd_debug("call zoo_exits success %s", lowest_seq_path);
- pthread_mutex_lock(&cluster_lock->wait_wakeup);
+ sd_mutex_lock(&cluster_lock->wait_wakeup);
} else {
sd_debug("failed to call zoo_exists %s", zerror(rc));
if (rc != ZNONODE)
@@ -1338,7 +1338,7 @@ static int zk_init(const char *option)
HASH_BUCKET_NR);
for (uint64_t i = 0; i < HASH_BUCKET_NR; i++) {
INIT_HLIST_HEAD(cluster_locks_table + i);
- pthread_mutex_init(table_locks + i, NULL);
+ sd_init_mutex(table_locks + i);
}
ret = zk_init_node(LOCK_ZNODE);
diff --git a/sheep/group.c b/sheep/group.c
index 4328547..623fa55 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -23,8 +23,8 @@ struct get_vdis_work {
struct rb_root nroot;
};
-static pthread_mutex_t wait_vdis_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t wait_vdis_cond = PTHREAD_COND_INITIALIZER;
+static struct sd_mutex wait_vdis_lock = SD_MUTEX_INITIALIZER;
+static struct sd_cond wait_vdis_cond = SD_COND_INITIALIZER;
static refcnt_t nr_get_vdis_works;
static main_thread(struct vnode_info *) current_vnode_info;
@@ -525,10 +525,10 @@ static void get_vdis_done(struct work *work)
struct get_vdis_work *w =
container_of(work, struct get_vdis_work, work);
- pthread_mutex_lock(&wait_vdis_lock);
+ sd_mutex_lock(&wait_vdis_lock);
refcount_dec(&nr_get_vdis_works);
- pthread_cond_broadcast(&wait_vdis_cond);
- pthread_mutex_unlock(&wait_vdis_lock);
+ sd_cond_broadcast(&wait_vdis_cond);
+ sd_mutex_unlock(&wait_vdis_lock);
rb_destroy(&w->nroot, struct sd_node, rb);
free(w);
@@ -624,10 +624,10 @@ void wait_get_vdis_done(void)
{
sd_debug("waiting for vdi list");
- pthread_mutex_lock(&wait_vdis_lock);
+ sd_mutex_lock(&wait_vdis_lock);
while (refcount_read(&nr_get_vdis_works) > 0)
- pthread_cond_wait(&wait_vdis_cond, &wait_vdis_lock);
- pthread_mutex_unlock(&wait_vdis_lock);
+ sd_cond_wait(&wait_vdis_cond, &wait_vdis_lock);
+ sd_mutex_unlock(&wait_vdis_lock);
sd_debug("vdi list ready");
}
diff --git a/sheep/request.c b/sheep/request.c
index defffe8..e7d6947 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -501,9 +501,9 @@ static void free_local_request(struct request *req)
static void submit_local_request(struct request *req)
{
- pthread_mutex_lock(&sys->local_req_lock);
+ sd_mutex_lock(&sys->local_req_lock);
list_add_tail(&req->request_list, &sys->local_req_queue);
- pthread_mutex_unlock(&sys->local_req_lock);
+ sd_mutex_unlock(&sys->local_req_lock);
eventfd_xwrite(sys->local_req_efd, 1);
}
@@ -966,9 +966,9 @@ static void local_req_handler(int listen_fd, int events, void *data)
eventfd_xread(listen_fd);
- pthread_mutex_lock(&sys->local_req_lock);
+ sd_mutex_lock(&sys->local_req_lock);
list_splice_init(&sys->local_req_queue, &pending_list);
- pthread_mutex_unlock(&sys->local_req_lock);
+ sd_mutex_unlock(&sys->local_req_lock);
list_for_each_entry(req, &pending_list, request_list) {
list_del(&req->request_list);
@@ -978,7 +978,7 @@ static void local_req_handler(int listen_fd, int events, void *data)
void local_request_init(void)
{
- pthread_mutex_init(&sys->local_req_lock, NULL);
+ sd_init_mutex(&sys->local_req_lock);
sys->local_req_efd = eventfd(0, EFD_NONBLOCK);
if (sys->local_req_efd < 0)
panic("failed to init local req efd");
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 73acf0b..c0a138c 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -131,7 +131,7 @@ struct system_info {
int local_req_efd;
- pthread_mutex_t local_req_lock;
+ struct sd_mutex local_req_lock;
struct list_head local_req_queue;
struct list_head req_wait_queue;
int nr_outstanding_reqs;
diff --git a/sheep/trace/trace.c b/sheep/trace/trace.c
index 937dc72..57c2b4c 100644
--- a/sheep/trace/trace.c
+++ b/sheep/trace/trace.c
@@ -30,7 +30,7 @@ static struct caller *callers;
static size_t nr_callers;
static struct strbuf *buffer;
-static pthread_mutex_t *buffer_lock;
+static struct sd_mutex *buffer_lock;
static int nr_cpu;
static __thread bool in_trace;
@@ -268,9 +268,9 @@ int trace_buffer_pop(void *buf, uint32_t len)
int i;
for (i = 0; i < nr_cpu; i++) {
- pthread_mutex_lock(&buffer_lock[i]);
+ sd_mutex_lock(&buffer_lock[i]);
readin = strbuf_stripout(&buffer[i], buff, len);
- pthread_mutex_unlock(&buffer_lock[i]);
+ sd_mutex_unlock(&buffer_lock[i]);
count += readin;
if (count == requested)
return count;
@@ -286,9 +286,9 @@ int trace_buffer_pop(void *buf, uint32_t len)
void trace_buffer_push(int cpuid, struct trace_graph_item *item)
{
- pthread_mutex_lock(&buffer_lock[cpuid]);
+ sd_mutex_lock(&buffer_lock[cpuid]);
strbuf_add(&buffer[cpuid], item, sizeof(*item));
- pthread_mutex_unlock(&buffer_lock[cpuid]);
+ sd_mutex_unlock(&buffer_lock[cpuid]);
}
/* assume that mcount call exists in the first FIND_MCOUNT_RANGE bytes */
@@ -422,7 +422,7 @@ int trace_init(void)
buffer_lock = xzalloc(sizeof(*buffer_lock) * nr_cpu);
for (i = 0; i < nr_cpu; i++) {
strbuf_init(&buffer[i], 0);
- pthread_mutex_init(&buffer_lock[i], NULL);
+ sd_init_mutex(&buffer_lock[i]);
}
sd_info("trace support enabled. cpu count %d.", nr_cpu);
--
1.8.1.2
More information about the sheepdog
mailing list