[sheepdog] [PATCH] introduce eventfd_xread and eventfd_xwrite
MORITA Kazutaka
morita.kazutaka at gmail.com
Fri Aug 2 11:08:54 CEST 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
Currently, we don't check the return value of eventfd_read and
eventfd_wriete in many places. This introduces a helper function
which retries on EINTR and calls panic() on unrecoverable errors.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
include/util.h | 3 +++
lib/util.c | 34 ++++++++++++++++++++++++++++++++++
lib/work.c | 9 ++-------
sheep/cluster/shepherd.c | 16 +++-------------
sheep/cluster/zookeeper.c | 16 ++++++----------
sheep/object_cache.c | 11 ++++-------
sheep/request.c | 31 ++++++++++---------------------
sheep/sheep_priv.h | 1 -
shepherd/shepherd.c | 20 +++++---------------
9 files changed, 67 insertions(+), 74 deletions(-)
diff --git a/include/util.h b/include/util.h
index 583fa9e..aa5341e 100644
--- a/include/util.h
+++ b/include/util.h
@@ -10,6 +10,7 @@
#include <unistd.h>
#include <search.h>
#include <urcu/uatomic.h>
+#include <sys/eventfd.h>
#include "logger.h"
#include "bitops.h"
@@ -85,6 +86,8 @@ ssize_t xpwrite(int fd, const void *buf, size_t count, off_t offset);
int xmkdir(const char *pathname, mode_t mode);
int xfallocate(int fd, int mode, off_t offset, off_t len);
int xftruncate(int fd, off_t length);
+int eventfd_xread(int efd);
+void eventfd_xwrite(int efd, int value);
void pstrcpy(char *buf, int buf_size, const char *str);
int rmdir_r(char *dir_path);
int purge_directory(char *dir_path);
diff --git a/lib/util.c b/lib/util.c
index 5fe0bfa..4210ea8 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -273,6 +273,40 @@ int xftruncate(int fd, off_t length)
}
/*
+ * Return the read value on success, or -1 if efd has been made nonblocking and
+ * errno is EAGAIN. If efd has been marked blocking or the eventfd counter is
+ * not zero, this function doesn't return error.
+ */
+int eventfd_xread(int efd)
+{
+ int ret;
+ eventfd_t value = 0;
+
+ do {
+ ret = eventfd_read(efd, &value);
+ } while (ret < 0 && errno == EINTR);
+
+ if (ret == 0)
+ ret = value;
+ else if (errno != EAGAIN)
+ panic("eventfd_read() failed, %m");
+
+ return ret;
+}
+
+void eventfd_xwrite(int efd, int value)
+{
+ int ret;
+
+ do {
+ ret = eventfd_write(efd, (eventfd_t)value);
+ } while (ret < 0 && (errno == EINTR || errno == EAGAIN));
+
+ if (ret < 0)
+ panic("eventfd_write() failed, %m");
+}
+
+/*
* Copy the string str to buf. If str length is bigger than buf_size -
* 1 then it is clamped to buf_size - 1.
* NOTE: this function does what strncpy should have done to be
diff --git a/lib/work.c b/lib/work.c
index 49364c7..dfde630 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -170,18 +170,14 @@ void queue_work(struct work_queue *q, struct work *work)
static void worker_thread_request_done(int fd, int events, void *data)
{
- int ret;
struct worker_info *wi;
struct work *work;
- eventfd_t value;
LIST_HEAD(list);
if (wq_get_nr_nodes)
nr_nodes = wq_get_nr_nodes();
- ret = eventfd_read(fd, &value);
- if (ret < 0)
- return;
+ eventfd_xread(fd);
list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
pthread_mutex_lock(&wi->finished_lock);
@@ -202,7 +198,6 @@ static void *worker_routine(void *arg)
{
struct worker_info *wi = arg;
struct work *work;
- eventfd_t value = 1;
set_thread_name(wi->name, (wi->tc != WQ_ORDERED));
@@ -245,7 +240,7 @@ retest:
list_add_tail(&work->w_list, &wi->finished_list);
pthread_mutex_unlock(&wi->finished_lock);
- eventfd_write(efd, value);
+ eventfd_xwrite(efd, 1);
}
pthread_exit(NULL);
diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c
index b2ab92d..c1753e0 100644
--- a/sheep/cluster/shepherd.c
+++ b/sheep/cluster/shepherd.c
@@ -224,7 +224,6 @@ remove:
static void push_sph_event(bool nonblock, struct sd_node *sender,
void *msg, int msg_len)
{
- int ret;
struct sph_event *ev;
sd_dprintf("push_sph_event() called, pushing %sblocking event",
@@ -249,11 +248,7 @@ static void push_sph_event(bool nonblock, struct sd_node *sender,
else
list_add_tail(&ev->event_list, &blocked_event_list);
- ret = eventfd_write(sph_event_fd, 1);
- if (ret) {
- sd_eprintf("eventfd_write() failed: %m");
- exit(1);
- }
+ eventfd_xwrite(sph_event_fd, 1);
}
static void remove_one_block_event(void)
@@ -275,19 +270,14 @@ static void remove_one_block_event(void)
if (!removed)
panic("removed is not true");
- eventfd_write(sph_event_fd, 1);
+ eventfd_xwrite(sph_event_fd, 1);
sd_dprintf("unblock a blocking event");
}
static void sph_event_handler(int fd, int events, void *data)
{
- int ret;
- eventfd_t val;
-
- ret = eventfd_read(fd, &val);
- if (ret < 0)
- panic("eventfd_read() failed: %m");
+ eventfd_xread(fd);
while (sph_process_event())
;
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 6ad694b..6e632fc 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -385,7 +385,7 @@ again:
sscanf(buf, QUEUE_ZNODE "/%"PRId32, &seq);
queue_pos = seq;
- eventfd_write(efd, 1);
+ eventfd_xwrite(efd, 1);
first_push = false;
}
@@ -547,7 +547,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
* do reconnect in main thread to avoid on-the-fly zookeeper
* operations.
*/
- eventfd_write(efd, 1);
+ eventfd_xwrite(efd, 1);
return;
}
@@ -558,7 +558,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
if (ret == 1)
zk_node_exists(path);
/* kick off the event handler */
- eventfd_write(efd, 1);
+ eventfd_xwrite(efd, 1);
} else if (type == ZOO_DELETED_EVENT) {
struct zk_node *n;
@@ -1039,7 +1039,6 @@ static inline void handle_session_expire(void)
static void zk_event_handler(int listen_fd, int events, void *data)
{
- eventfd_t value;
struct zk_event ev;
bool peek;
@@ -1051,16 +1050,13 @@ static void zk_event_handler(int listen_fd, int events, void *data)
exit(1);
}
- if (eventfd_read(efd, &value) < 0) {
- sd_eprintf("%m");
- return;
- }
+ eventfd_xread(efd);
if (zoo_state(zhandle) == ZOO_EXPIRED_SESSION_STATE) {
sd_eprintf("detect a session timeout. reconnecting...");
handle_session_expire();
sd_iprintf("reconnected");
- eventfd_write(efd, 1);
+ eventfd_xwrite(efd, 1);
return;
}
@@ -1077,7 +1073,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
RETURN_VOID_IF_ERROR(zk_queue_peek(&peek), "");
if (peek) {
/* Someone has created next event, go kick event handler. */
- eventfd_write(efd, 1);
+ eventfd_xwrite(efd, 1);
return;
}
kick_block_event:
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index 27d544d..c267a1f 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -898,7 +898,7 @@ static void do_push_object(struct work *work)
panic("push failed but should never fail");
clean:
if (uatomic_sub_return(&oc->push_count, 1) == 0)
- eventfd_write(oc->push_efd, 1);
+ eventfd_xwrite(oc->push_efd, 1);
entry->idx &= ~CACHE_CREATE_BIT;
entry->bmap = 0;
unlock_entry(entry);
@@ -925,7 +925,6 @@ static void push_object_done(struct work *work)
static int object_cache_push(struct object_cache *oc)
{
struct object_cache_entry *entry, *t;
- eventfd_t value;
write_lock_cache(oc);
if (list_empty(&oc->dirty_head)) {
@@ -946,11 +945,9 @@ static int object_cache_push(struct object_cache *oc)
del_from_dirty_list(entry);
}
unlock_cache(oc);
-reread:
- if (eventfd_read(oc->push_efd, &value) < 0) {
- sd_eprintf("eventfd read failed, %m");
- goto reread;
- }
+
+ eventfd_xread(oc->push_efd);
+
sd_dprintf("%"PRIx32" completed", oc->vid);
return SD_RES_SUCCESS;
}
diff --git a/sheep/request.c b/sheep/request.c
index 7f795fd..1cc5a9d 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -441,7 +441,6 @@ static void free_local_request(struct request *req)
int exec_local_req(struct sd_req *rq, void *data)
{
struct request *req;
- eventfd_t value = 1;
int ret;
assert(is_worker_thread());
@@ -449,24 +448,19 @@ int exec_local_req(struct sd_req *rq, void *data)
req = alloc_local_request(data, rq->data_length);
req->rq = *rq;
req->local_req_efd = eventfd(0, 0);
+ if (req->local_req_efd < 0) {
+ /* Fake the result to ask for retry */
+ req->rp.result = SD_RES_NETWORK_ERROR;
+ goto out;
+ }
pthread_mutex_lock(&sys->local_req_lock);
list_add_tail(&req->request_list, &sys->local_req_queue);
pthread_mutex_unlock(&sys->local_req_lock);
- eventfd_write(sys->local_req_efd, value);
-
-again:
- /* In error case (for e.g, EINTR) just retry read */
- ret = eventfd_read(req->local_req_efd, &value);
- if (ret < 0) {
- sd_eprintf("%m");
- if (errno == EINTR)
- goto again;
- /* Fake the result to ask for retry */
- req->rp.result = SD_RES_NETWORK_ERROR;
- }
-
+ eventfd_xwrite(sys->local_req_efd, 1);
+ eventfd_xread(req->local_req_efd);
+out:
/* fill rq with response header as exec_req does */
memcpy(rq, &req->rp, sizeof(req->rp));
@@ -517,13 +511,12 @@ static void free_request(struct request *req)
void put_request(struct request *req)
{
struct client_info *ci = req->ci;
- eventfd_t value = 1;
if (refcount_dec(&req->refcnt) > 0)
return;
if (req->local)
- eventfd_write(req->local_req_efd, value);
+ eventfd_xwrite(req->local_req_efd, 1);
else {
if (conn_tx_on(&ci->conn)) {
clear_client_info(ci);
@@ -887,17 +880,13 @@ int init_unix_domain_socket(const char *dir)
static void local_req_handler(int listen_fd, int events, void *data)
{
- eventfd_t value;
struct request *req, *t;
LIST_HEAD(pending_list);
- int ret;
if (events & EPOLLERR)
sd_eprintf("request handler error");
- ret = eventfd_read(listen_fd, &value);
- if (ret < 0)
- return;
+ eventfd_xread(listen_fd);
pthread_mutex_lock(&sys->local_req_lock);
list_splice_init(&sys->local_req_queue, &pending_list);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 09303ff..ba40c94 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -26,7 +26,6 @@
#include <string.h>
#include <fcntl.h>
#include <sys/mman.h>
-#include <sys/eventfd.h>
#include <sys/time.h>
#include <sys/epoll.h>
#include <signal.h>
diff --git a/shepherd/shepherd.c b/shepherd/shepherd.c
index a7b4f73..7c96796 100644
--- a/shepherd/shepherd.c
+++ b/shepherd/shepherd.c
@@ -104,15 +104,11 @@ static int remove_efd;
static inline void remove_sheep(struct sheep *sheep)
{
- int ret;
-
sd_dprintf("remove_sheep() called, removing %s",
node_to_str(&sheep->node));
sheep->state = SHEEP_STATE_LEAVING;
- ret = eventfd_write(remove_efd, 1);
- if (ret < 0)
- panic("eventfd_write() failed: %m");
+ eventfd_xwrite(remove_efd, 1);
event_force_refresh();
}
@@ -147,17 +143,11 @@ static int notify_remove_sheep(struct sheep *leaving)
static void remove_handler(int fd, int events, void *data)
{
struct sheep *s;
- int ret, failed = 0;
- eventfd_t val;
-
- ret = eventfd_read(remove_efd, &val);
- if (ret < 0)
- panic("eventfd_read() failed: %m");
-
- sd_dprintf("removed sheeps: %" PRIu64, val);
- assert(0 < val);
+ int nr_removed, failed = 0;
+ nr_removed = eventfd_xread(remove_efd);
+ sd_dprintf("removed sheeps");
remove:
list_for_each_entry(s, &sheep_list_head, sheep_list) {
if (s->state != SHEEP_STATE_LEAVING)
@@ -197,7 +187,7 @@ del:
event_force_refresh();
- if (--val)
+ if (--nr_removed)
goto remove;
end:
--
1.7.9.5
More information about the sheepdog
mailing list