[sheepdog] [PATCH] introduce eventfd_xread and eventfd_xwrite

MORITA Kazutaka morita.kazutaka at gmail.com
Fri Aug 2 11:08:54 CEST 2013


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

Currently, we don't check the return value of eventfd_read and
eventfd_wriete in many places.  This introduces a helper function
which retries on EINTR and calls panic() on unrecoverable errors.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 include/util.h            |    3 +++
 lib/util.c                |   34 ++++++++++++++++++++++++++++++++++
 lib/work.c                |    9 ++-------
 sheep/cluster/shepherd.c  |   16 +++-------------
 sheep/cluster/zookeeper.c |   16 ++++++----------
 sheep/object_cache.c      |   11 ++++-------
 sheep/request.c           |   31 ++++++++++---------------------
 sheep/sheep_priv.h        |    1 -
 shepherd/shepherd.c       |   20 +++++---------------
 9 files changed, 67 insertions(+), 74 deletions(-)

diff --git a/include/util.h b/include/util.h
index 583fa9e..aa5341e 100644
--- a/include/util.h
+++ b/include/util.h
@@ -10,6 +10,7 @@
 #include <unistd.h>
 #include <search.h>
 #include <urcu/uatomic.h>
+#include <sys/eventfd.h>
 
 #include "logger.h"
 #include "bitops.h"
@@ -85,6 +86,8 @@ ssize_t xpwrite(int fd, const void *buf, size_t count, off_t offset);
 int xmkdir(const char *pathname, mode_t mode);
 int xfallocate(int fd, int mode, off_t offset, off_t len);
 int xftruncate(int fd, off_t length);
+int eventfd_xread(int efd);
+void eventfd_xwrite(int efd, int value);
 void pstrcpy(char *buf, int buf_size, const char *str);
 int rmdir_r(char *dir_path);
 int purge_directory(char *dir_path);
diff --git a/lib/util.c b/lib/util.c
index 5fe0bfa..4210ea8 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -273,6 +273,40 @@ int xftruncate(int fd, off_t length)
 }
 
 /*
+ * Return the read value on success, or -1 if efd has been made nonblocking and
+ * errno is EAGAIN.  If efd has been marked blocking or the eventfd counter is
+ * not zero, this function doesn't return error.
+ */
+int eventfd_xread(int efd)
+{
+	int ret;
+	eventfd_t value = 0;
+
+	do {
+		ret = eventfd_read(efd, &value);
+	} while (ret < 0 && errno == EINTR);
+
+	if (ret == 0)
+		ret = value;
+	else if (errno != EAGAIN)
+		panic("eventfd_read() failed, %m");
+
+	return ret;
+}
+
+void eventfd_xwrite(int efd, int value)
+{
+	int ret;
+
+	do {
+		ret = eventfd_write(efd, (eventfd_t)value);
+	} while (ret < 0 && (errno == EINTR || errno == EAGAIN));
+
+	if (ret < 0)
+		panic("eventfd_write() failed, %m");
+}
+
+/*
  * Copy the string str to buf. If str length is bigger than buf_size -
  * 1 then it is clamped to buf_size - 1.
  * NOTE: this function does what strncpy should have done to be
diff --git a/lib/work.c b/lib/work.c
index 49364c7..dfde630 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -170,18 +170,14 @@ void queue_work(struct work_queue *q, struct work *work)
 
 static void worker_thread_request_done(int fd, int events, void *data)
 {
-	int ret;
 	struct worker_info *wi;
 	struct work *work;
-	eventfd_t value;
 	LIST_HEAD(list);
 
 	if (wq_get_nr_nodes)
 		nr_nodes = wq_get_nr_nodes();
 
-	ret = eventfd_read(fd, &value);
-	if (ret < 0)
-		return;
+	eventfd_xread(fd);
 
 	list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
 		pthread_mutex_lock(&wi->finished_lock);
@@ -202,7 +198,6 @@ static void *worker_routine(void *arg)
 {
 	struct worker_info *wi = arg;
 	struct work *work;
-	eventfd_t value = 1;
 
 	set_thread_name(wi->name, (wi->tc != WQ_ORDERED));
 
@@ -245,7 +240,7 @@ retest:
 		list_add_tail(&work->w_list, &wi->finished_list);
 		pthread_mutex_unlock(&wi->finished_lock);
 
-		eventfd_write(efd, value);
+		eventfd_xwrite(efd, 1);
 	}
 
 	pthread_exit(NULL);
diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c
index b2ab92d..c1753e0 100644
--- a/sheep/cluster/shepherd.c
+++ b/sheep/cluster/shepherd.c
@@ -224,7 +224,6 @@ remove:
 static void push_sph_event(bool nonblock, struct sd_node *sender,
 			void *msg, int msg_len)
 {
-	int ret;
 	struct sph_event *ev;
 
 	sd_dprintf("push_sph_event() called, pushing %sblocking event",
@@ -249,11 +248,7 @@ static void push_sph_event(bool nonblock, struct sd_node *sender,
 	else
 		list_add_tail(&ev->event_list, &blocked_event_list);
 
-	ret = eventfd_write(sph_event_fd, 1);
-	if (ret) {
-		sd_eprintf("eventfd_write() failed: %m");
-		exit(1);
-	}
+	eventfd_xwrite(sph_event_fd, 1);
 }
 
 static void remove_one_block_event(void)
@@ -275,19 +270,14 @@ static void remove_one_block_event(void)
 	if (!removed)
 		panic("removed is not true");
 
-	eventfd_write(sph_event_fd, 1);
+	eventfd_xwrite(sph_event_fd, 1);
 
 	sd_dprintf("unblock a blocking event");
 }
 
 static void sph_event_handler(int fd, int events, void *data)
 {
-	int ret;
-	eventfd_t val;
-
-	ret = eventfd_read(fd, &val);
-	if (ret < 0)
-		panic("eventfd_read() failed: %m");
+	eventfd_xread(fd);
 
 	while (sph_process_event())
 		;
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 6ad694b..6e632fc 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -385,7 +385,7 @@ again:
 
 		sscanf(buf, QUEUE_ZNODE "/%"PRId32, &seq);
 		queue_pos = seq;
-		eventfd_write(efd, 1);
+		eventfd_xwrite(efd, 1);
 		first_push = false;
 	}
 
@@ -547,7 +547,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 		 * do reconnect in main thread to avoid on-the-fly zookeeper
 		 * operations.
 		 */
-		eventfd_write(efd, 1);
+		eventfd_xwrite(efd, 1);
 		return;
 	}
 
@@ -558,7 +558,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 		if (ret == 1)
 			zk_node_exists(path);
 		/* kick off the event handler */
-		eventfd_write(efd, 1);
+		eventfd_xwrite(efd, 1);
 	} else if (type == ZOO_DELETED_EVENT) {
 		struct zk_node *n;
 
@@ -1039,7 +1039,6 @@ static inline void handle_session_expire(void)
 
 static void zk_event_handler(int listen_fd, int events, void *data)
 {
-	eventfd_t value;
 	struct zk_event ev;
 	bool peek;
 
@@ -1051,16 +1050,13 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 		exit(1);
 	}
 
-	if (eventfd_read(efd, &value) < 0) {
-		sd_eprintf("%m");
-		return;
-	}
+	eventfd_xread(efd);
 
 	if (zoo_state(zhandle) == ZOO_EXPIRED_SESSION_STATE) {
 		sd_eprintf("detect a session timeout. reconnecting...");
 		handle_session_expire();
 		sd_iprintf("reconnected");
-		eventfd_write(efd, 1);
+		eventfd_xwrite(efd, 1);
 		return;
 	}
 
@@ -1077,7 +1073,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 	RETURN_VOID_IF_ERROR(zk_queue_peek(&peek), "");
 	if (peek) {
 		/* Someone has created next event, go kick event handler. */
-		eventfd_write(efd, 1);
+		eventfd_xwrite(efd, 1);
 		return;
 	}
 kick_block_event:
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index 27d544d..c267a1f 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -898,7 +898,7 @@ static void do_push_object(struct work *work)
 		panic("push failed but should never fail");
 clean:
 	if (uatomic_sub_return(&oc->push_count, 1) == 0)
-		eventfd_write(oc->push_efd, 1);
+		eventfd_xwrite(oc->push_efd, 1);
 	entry->idx &= ~CACHE_CREATE_BIT;
 	entry->bmap = 0;
 	unlock_entry(entry);
@@ -925,7 +925,6 @@ static void push_object_done(struct work *work)
 static int object_cache_push(struct object_cache *oc)
 {
 	struct object_cache_entry *entry, *t;
-	eventfd_t value;
 
 	write_lock_cache(oc);
 	if (list_empty(&oc->dirty_head)) {
@@ -946,11 +945,9 @@ static int object_cache_push(struct object_cache *oc)
 		del_from_dirty_list(entry);
 	}
 	unlock_cache(oc);
-reread:
-	if (eventfd_read(oc->push_efd, &value) < 0) {
-		sd_eprintf("eventfd read failed, %m");
-		goto reread;
-	}
+
+	eventfd_xread(oc->push_efd);
+
 	sd_dprintf("%"PRIx32" completed", oc->vid);
 	return SD_RES_SUCCESS;
 }
diff --git a/sheep/request.c b/sheep/request.c
index 7f795fd..1cc5a9d 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -441,7 +441,6 @@ static void free_local_request(struct request *req)
 int exec_local_req(struct sd_req *rq, void *data)
 {
 	struct request *req;
-	eventfd_t value = 1;
 	int ret;
 
 	assert(is_worker_thread());
@@ -449,24 +448,19 @@ int exec_local_req(struct sd_req *rq, void *data)
 	req = alloc_local_request(data, rq->data_length);
 	req->rq = *rq;
 	req->local_req_efd = eventfd(0, 0);
+	if (req->local_req_efd < 0) {
+		/* Fake the result to ask for retry */
+		req->rp.result = SD_RES_NETWORK_ERROR;
+		goto out;
+	}
 
 	pthread_mutex_lock(&sys->local_req_lock);
 	list_add_tail(&req->request_list, &sys->local_req_queue);
 	pthread_mutex_unlock(&sys->local_req_lock);
 
-	eventfd_write(sys->local_req_efd, value);
-
-again:
-	/* In error case (for e.g, EINTR) just retry read */
-	ret = eventfd_read(req->local_req_efd, &value);
-	if (ret < 0) {
-		sd_eprintf("%m");
-		if (errno == EINTR)
-			goto again;
-		/* Fake the result to ask for retry */
-		req->rp.result = SD_RES_NETWORK_ERROR;
-	}
-
+	eventfd_xwrite(sys->local_req_efd, 1);
+	eventfd_xread(req->local_req_efd);
+out:
 	/* fill rq with response header as exec_req does */
 	memcpy(rq, &req->rp, sizeof(req->rp));
 
@@ -517,13 +511,12 @@ static void free_request(struct request *req)
 void put_request(struct request *req)
 {
 	struct client_info *ci = req->ci;
-	eventfd_t value = 1;
 
 	if (refcount_dec(&req->refcnt) > 0)
 		return;
 
 	if (req->local)
-		eventfd_write(req->local_req_efd, value);
+		eventfd_xwrite(req->local_req_efd, 1);
 	else {
 		if (conn_tx_on(&ci->conn)) {
 			clear_client_info(ci);
@@ -887,17 +880,13 @@ int init_unix_domain_socket(const char *dir)
 
 static void local_req_handler(int listen_fd, int events, void *data)
 {
-	eventfd_t value;
 	struct request *req, *t;
 	LIST_HEAD(pending_list);
-	int ret;
 
 	if (events & EPOLLERR)
 		sd_eprintf("request handler error");
 
-	ret = eventfd_read(listen_fd, &value);
-	if (ret < 0)
-		return;
+	eventfd_xread(listen_fd);
 
 	pthread_mutex_lock(&sys->local_req_lock);
 	list_splice_init(&sys->local_req_queue, &pending_list);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 09303ff..ba40c94 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -26,7 +26,6 @@
 #include <string.h>
 #include <fcntl.h>
 #include <sys/mman.h>
-#include <sys/eventfd.h>
 #include <sys/time.h>
 #include <sys/epoll.h>
 #include <signal.h>
diff --git a/shepherd/shepherd.c b/shepherd/shepherd.c
index a7b4f73..7c96796 100644
--- a/shepherd/shepherd.c
+++ b/shepherd/shepherd.c
@@ -104,15 +104,11 @@ static int remove_efd;
 
 static inline void remove_sheep(struct sheep *sheep)
 {
-	int ret;
-
 	sd_dprintf("remove_sheep() called, removing %s",
 		node_to_str(&sheep->node));
 
 	sheep->state = SHEEP_STATE_LEAVING;
-	ret = eventfd_write(remove_efd, 1);
-	if (ret < 0)
-		panic("eventfd_write() failed: %m");
+	eventfd_xwrite(remove_efd, 1);
 
 	event_force_refresh();
 }
@@ -147,17 +143,11 @@ static int notify_remove_sheep(struct sheep *leaving)
 static void remove_handler(int fd, int events, void *data)
 {
 	struct sheep *s;
-	int ret, failed = 0;
-	eventfd_t val;
-
-	ret = eventfd_read(remove_efd, &val);
-	if (ret < 0)
-		panic("eventfd_read() failed: %m");
-
-	sd_dprintf("removed sheeps: %" PRIu64, val);
-	assert(0 < val);
+	int nr_removed, failed = 0;
 
+	nr_removed = eventfd_xread(remove_efd);
 
+	sd_dprintf("removed sheeps");
 remove:
 	list_for_each_entry(s, &sheep_list_head, sheep_list) {
 		if (s->state != SHEEP_STATE_LEAVING)
@@ -197,7 +187,7 @@ del:
 
 	event_force_refresh();
 
-	if (--val)
+	if (--nr_removed)
 		goto remove;
 
 end:
-- 
1.7.9.5




More information about the sheepdog mailing list