[sheepdog] [PATCH v3] lib: prioritize events for epoll() and enhance event loop

Hitoshi Mitake h.mitake at gmail.com
Sun Dec 23 16:57:25 CET 2012


This patch adds new function event_loop_prio(). This function is
similar to event_loop(), but it has two new functionalities:
1. prioritize registered events
2. more flexible control with return values from event handlers

I'd like to describe the objectives of two functionalities.

1. event_loop() treats priorities of every registered event
equally. But in some case, the way of treating them unequally is
helpful.

2. Previous event handlers returned no values. This patch let them
return newly defined enum value: event_ret. event_ret has 3 values:

* EVENT_LOOP_NOP ... this value means no special actions are required.
  event loop should continue normally.
* EVENT_LOOP_RETRIEVE ... this value means event_loop_prio() should
  call epoll_wait() again. If events which should be processed as soon
  as possible during the event handler, this value is returned. So the
  event can be processed before already rised events.
* EVENT_LOOP_BREAK ... this value means event_loop_prio() must break
  immediately.

Behaviour of event_loop_prio() can be controlled by these values.

This patch also adds a new function register_event_prio(). Basically
this function is same to register_event(), but it also requires
priority as argument. The domain of priority is signed integer, and it
is stored as a newly defined member of struct event_info (named prio).
Now register_event() is a wrapper function which calls
register_event_prio() with its arguments and default priority.

Sorting available events with their priorities is not cheap. So this
patch adds event_loop_prio() as a new function. If a user of
libsheepdog.a doesn't require this feature, it should use
event_loop().

(Using qsort() might not be a good way. I think that constructing
binary search tree with nested epoll fds can be a good
alternative. But I believe sucn an optimization should be done later.)

Cc: Liu Yuan <tailai.ly at taobao.com>
Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
v2:
 * change name of enum event_ret. e.g. EVENT_RET_DONE -> EVENT_LOOP_NOP
 * eliminate an unnecessary array

v3:
 * add event_loop_prio() instead of changing event_loop() for avoiding
   overhead of sorting.

8<---
 include/event.h           |   22 +++++++++-
 lib/event.c               |  101 +++++++++++++++++++++++++++++++++++++++++---
 sheep/cluster/accord.c    |    6 ++-
 sheep/cluster/corosync.c  |    4 +-
 sheep/cluster/local.c     |    4 +-
 sheep/cluster/zookeeper.c |    8 ++-
 sheep/request.c           |   33 +++++++++------
 sheep/sheep.c             |    6 ++-
 sheep/trace/trace.c       |   16 ++++---
 sheep/work.c              |    6 ++-
 10 files changed, 166 insertions(+), 40 deletions(-)

diff --git a/include/event.h b/include/event.h
index b0e7552..8dfd43e 100644
--- a/include/event.h
+++ b/include/event.h
@@ -1,14 +1,21 @@
 #ifndef __EVENT_H__
 #define __EVENT_H__
 
+#include <limits.h>
+
 #include "list.h"
 
 struct event_info;
 
-typedef void (*event_handler_t)(int fd, int events, void *data);
+enum event_ret {
+	EVENT_LOOP_NOP,
+	EVENT_LOOP_RETRIEVE,
+	EVENT_LOOP_BREAK,
+};
+
+typedef enum event_ret (*event_handler_t)(int fd, int events, void *data);
 
 int init_event(int nr);
-int register_event(int fd, event_handler_t h, void *data);
 void unregister_event(int fd);
 int modify_event(int fd, unsigned int events);
 void event_loop(int timeout);
@@ -20,4 +27,15 @@ struct timer {
 
 void add_timer(struct timer *t, unsigned int mseconds);
 
+#define EVENT_PRIO_MAX INT_MAX
+#define EVENT_PRIO_MIN INT_MIN
+#define EVENT_PRIO_DEFAULT 0
+
+int register_event_prio(int fd, event_handler_t h, void *data, int prio);
+static inline int register_event(int fd, event_handler_t h, void *data)
+{
+	return register_event_prio(fd, h, data, EVENT_PRIO_DEFAULT);
+}
+
+
 #endif
diff --git a/lib/event.c b/lib/event.c
index 6b00e06..11f4004 100644
--- a/lib/event.c
+++ b/lib/event.c
@@ -26,18 +26,20 @@ static LIST_HEAD(events_list);
 
 #define TICK 1
 
-static void timer_handler(int fd, int events, void *data)
+static enum event_ret timer_handler(int fd, int events, void *data)
 {
 	struct timer *t = data;
 	uint64_t val;
 
 	if (read(fd, &val, sizeof(val)) < 0)
-		return;
+		return EVENT_LOOP_NOP;
 
 	t->callback(t->data);
 
 	unregister_event(fd);
 	close(fd);
+
+	return EVENT_LOOP_NOP;
 }
 
 void add_timer(struct timer *t, unsigned int mseconds)
@@ -68,9 +70,13 @@ struct event_info {
 	event_handler_t handler;
 	int fd;
 	void *data;
+	int prio;
 	struct list_head ei_list;
 };
 
+static struct epoll_event *polled_events;
+static int nr_events;
+
 int init_event(int nr)
 {
 	efd = epoll_create(nr);
@@ -78,6 +84,15 @@ int init_event(int nr)
 		eprintf("failed to create epoll fd\n");
 		return -1;
 	}
+
+	polled_events = xcalloc(nr, sizeof(struct epoll_event));
+	if (!polled_events) {
+		vprintf(SDOG_ERR, "failed to alloc epoll_event array\n");
+		return -1;
+	}
+
+	nr_events = nr;
+
 	return 0;
 }
 
@@ -92,7 +107,9 @@ static struct event_info *lookup_event(int fd)
 	return NULL;
 }
 
-int register_event(int fd, event_handler_t h, void *data)
+static int nr_prioritized_events;
+
+int register_event_prio(int fd, event_handler_t h, void *data, int prio)
 {
 	int ret;
 	struct epoll_event ev;
@@ -105,6 +122,9 @@ int register_event(int fd, event_handler_t h, void *data)
 	ei->fd = fd;
 	ei->handler = h;
 	ei->data = data;
+	ei->prio = prio;
+	if (prio != EVENT_PRIO_DEFAULT)
+		nr_prioritized_events++;
 
 	memset(&ev, 0, sizeof(ev));
 	ev.events = EPOLLIN;
@@ -133,6 +153,9 @@ void unregister_event(int fd)
 	if (ret)
 		eprintf("failed to delete epoll event for fd %d: %m\n", fd);
 
+	if (ei->prio != EVENT_PRIO_DEFAULT)
+		nr_prioritized_events--;
+
 	list_del(&ei->ei_list);
 	free(ei);
 }
@@ -161,23 +184,87 @@ int modify_event(int fd, unsigned int events)
 	return 0;
 }
 
+static int epoll_event_cmp(const void *_a, const void *_b)
+{
+	const struct event_info *a, *b;
+
+	a = ((struct epoll_event *)_a)->data.ptr;
+	b = ((struct epoll_event *)_b)->data.ptr;
+
+	if (a->prio == b->prio)
+		return 0;
+	else if (a->prio < b->prio)
+		return 1;
+	else
+		return -1;
+}
+
 void event_loop(int timeout)
 {
 	int i, nr;
-	struct epoll_event events[128];
 
-	nr = epoll_wait(efd, events, ARRAY_SIZE(events), TICK * 1000);
+	nr = epoll_wait(efd, polled_events, nr_events, TICK * 1000);
+	if (nr < 0) {
+		if (errno == EINTR)
+			return;
+		eprintf("epoll_wait failed: %m\n");
+		exit(1);
+	} else if (nr) {
+		for (i = 0; i < nr; i++) {
+			struct event_info *ei;
+			uint32_t e;
+
+			ei = (struct event_info *)polled_events[i].data.ptr;
+			e = polled_events[i].events;
+			ei->handler(ei->fd, e, ei->data);
+		}
+	}
+
+	return;
+}
+
+void event_loop_prio(int timeout)
+{
+	int i, nr;
+
+retrieve:
+	nr = epoll_wait(efd, polled_events, nr_events, TICK * 1000);
 	if (nr < 0) {
 		if (errno == EINTR)
 			return;
 		eprintf("epoll_wait failed: %m\n");
 		exit(1);
 	} else if (nr) {
+		if (nr_prioritized_events)
+			qsort(polled_events, nr, sizeof(struct epoll_event),
+				epoll_event_cmp);
+
 		for (i = 0; i < nr; i++) {
 			struct event_info *ei;
+			uint32_t e;
+			enum event_ret ret;
 
-			ei = (struct event_info *)events[i].data.ptr;
-			ei->handler(ei->fd, events[i].events, ei->data);
+			ei = (struct event_info *)polled_events[i].data.ptr;
+			e = polled_events[i].events;
+
+			ret = ei->handler(ei->fd, e, ei->data);
+			switch (ret) {
+			case EVENT_LOOP_NOP:
+				break;
+
+			case EVENT_LOOP_RETRIEVE:
+				goto retrieve;
+
+			case EVENT_LOOP_BREAK:
+				goto end;
+
+			default:
+				panic("invalid event_ret: %d\n", ret);
+				break;
+			}
 		}
 	}
+
+end:
+	return;
 }
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index be0cf83..ef35cfe 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -449,7 +449,7 @@ static void accord_unblock(void *msg, size_t msg_len)
 	pthread_mutex_unlock(&queue_lock);
 }
 
-static void acrd_handler(int listen_fd, int events, void *data)
+static enum event_ret acrd_handler(int listen_fd, int events, void *data)
 {
 	int ret;
 	eventfd_t value;
@@ -466,7 +466,7 @@ static void acrd_handler(int listen_fd, int events, void *data)
 
 	ret = eventfd_read(efd, &value);
 	if (ret < 0)
-		return;
+		return EVENT_LOOP_NOP;
 
 	pthread_mutex_lock(&queue_lock);
 
@@ -519,6 +519,8 @@ static void acrd_handler(int listen_fd, int events, void *data)
 	}
 out:
 	pthread_mutex_unlock(&queue_lock);
+
+	return EVENT_LOOP_NOP;
 }
 
 static int accord_init(const char *option)
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index a607bae..79356ff 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -735,7 +735,7 @@ static int corosync_notify(void *msg, size_t msg_len)
 			   NULL, 0, msg, msg_len);
 }
 
-static void corosync_handler(int listen_fd, int events, void *data)
+static enum event_ret corosync_handler(int listen_fd, int events, void *data)
 {
 	int ret;
 
@@ -750,7 +750,7 @@ static void corosync_handler(int listen_fd, int events, void *data)
 		goto out;
 	}
 
-	return;
+	return EVENT_LOOP_NOP;
 out:
 	log_close();
 	exit(1);
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 20ce325..927d165 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -490,7 +490,7 @@ out:
 	return true;
 }
 
-static void local_handler(int listen_fd, int events, void *data)
+static enum event_ret local_handler(int listen_fd, int events, void *data)
 {
 	struct signalfd_siginfo siginfo;
 	int ret;
@@ -512,6 +512,8 @@ static void local_handler(int listen_fd, int events, void *data)
 		;
 
 	shm_queue_unlock();
+
+	return EVENT_LOOP_NOP;
 }
 
 static int local_get_local_addr(uint8_t *myaddr)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index ea02137..81fe31a 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -645,7 +645,7 @@ static void (*const zk_event_handlers[])(struct zk_event *ev) = {
 
 static const int zk_max_event_handlers = ARRAY_SIZE(zk_event_handlers);
 
-static void zk_event_handler(int listen_fd, int events, void *data)
+static enum event_ret zk_event_handler(int listen_fd, int events, void *data)
 {
 	eventfd_t value;
 	struct zk_event ev;
@@ -657,15 +657,17 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 	}
 
 	if (eventfd_read(efd, &value) < 0)
-		return;
+		return EVENT_LOOP_NOP;
 
 	if (zk_queue_pop(&ev) < 0)
-		return;
+		return EVENT_LOOP_NOP;
 
 	if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
 		zk_event_handlers[ev.type](&ev);
 	else
 		eprintf("unhandled type %d\n", ev.type);
+
+	return EVENT_LOOP_NOP;
 }
 
 static int zk_init(const char *option)
diff --git a/sheep/request.c b/sheep/request.c
index cd81e36..3d18f7e 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -807,24 +807,28 @@ static struct client_info *create_client(int fd, struct cluster_info *cluster)
 	return ci;
 }
 
-static void client_handler(int fd, int events, void *data)
+static enum event_ret client_handler(int fd, int events, void *data)
 {
 	struct client_info *ci = (struct client_info *)data;
 
 	dprintf("%x, rx %d, tx %d\n", events, ci->conn.c_rx_state,
 		ci->conn.c_tx_state);
 
-	if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
-		return clear_client_info(ci);
+	if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn)) {
+		clear_client_info(ci);
+		return EVENT_LOOP_NOP;
+	}
 
 	if (events & EPOLLIN)
 		do_client_rx(ci);
 
 	if (events & EPOLLOUT)
 		do_client_tx(ci);
+
+	return EVENT_LOOP_NOP;
 }
 
-static void listen_handler(int listen_fd, int events, void *data)
+static enum event_ret listen_handler(int listen_fd, int events, void *data)
 {
 	struct sockaddr_storage from;
 	socklen_t namesize;
@@ -835,49 +839,50 @@ static void listen_handler(int listen_fd, int events, void *data)
 	if (sys->status == SD_STATUS_SHUTDOWN) {
 		dprintf("unregistering connection %d\n", listen_fd);
 		unregister_event(listen_fd);
-		return;
+		return EVENT_LOOP_NOP;
 	}
 
 	namesize = sizeof(from);
 	fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
 	if (fd < 0) {
 		eprintf("failed to accept a new connection: %m\n");
-		return;
+		return EVENT_LOOP_NOP;
 	}
 
 	if (is_inet_socket) {
 		ret = set_keepalive(fd);
 		if (ret) {
 			close(fd);
-			return;
+			return EVENT_LOOP_NOP;
 		}
 
 		ret = set_nodelay(fd);
 		if (ret) {
 			close(fd);
-			return;
+			return EVENT_LOOP_NOP;
 		}
 	}
 
 	ret = set_nonblocking(fd);
 	if (ret) {
 		close(fd);
-		return;
+		return EVENT_LOOP_NOP;
 	}
 
 	ci = create_client(fd, data);
 	if (!ci) {
 		close(fd);
-		return;
+		return EVENT_LOOP_NOP;
 	}
 
 	ret = register_event(fd, client_handler, ci);
 	if (ret) {
 		destroy_client(ci);
-		return;
+		return EVENT_LOOP_NOP;
 	}
 
 	dprintf("accepted a new connection: %d\n", fd);
+	return EVENT_LOOP_NOP;
 }
 
 static int create_listen_port_fn(int fd, void *data)
@@ -905,7 +910,7 @@ int init_unix_domain_socket(const char *dir)
 					 &is_inet_socket);
 }
 
-static void req_handler(int listen_fd, int events, void *data)
+static enum event_ret req_handler(int listen_fd, int events, void *data)
 {
 	eventfd_t value;
 	struct request *req, *t;
@@ -917,7 +922,7 @@ static void req_handler(int listen_fd, int events, void *data)
 
 	ret = eventfd_read(listen_fd, &value);
 	if (ret < 0)
-		return;
+		return EVENT_LOOP_NOP;
 
 	pthread_mutex_lock(&sys->wait_req_lock);
 	list_splice_init(&sys->wait_req_queue, &pending_list);
@@ -927,6 +932,8 @@ static void req_handler(int listen_fd, int events, void *data)
 		list_del(&req->request_list);
 		queue_request(req);
 	}
+
+	return EVENT_LOOP_NOP;
 }
 
 void local_req_init(void)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 7f4bf26..b07ba94 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -123,7 +123,7 @@ static int create_pidfile(const char *filename)
 
 static int sigfd;
 
-static void signal_handler(int listen_fd, int events, void *data)
+static enum event_ret signal_handler(int listen_fd, int events, void *data)
 {
 	struct signalfd_siginfo siginfo;
 	int ret;
@@ -139,6 +139,8 @@ static void signal_handler(int listen_fd, int events, void *data)
 		eprintf("signal %d unhandled\n", siginfo.ssi_signo);
 		break;
 	}
+
+	return EVENT_LOOP_NOP;
 }
 
 static int init_signal(void)
@@ -160,7 +162,7 @@ static int init_signal(void)
 		return -1;
 	}
 
-	ret = register_event(sigfd, signal_handler, NULL);
+	ret = register_event_prio(sigfd, signal_handler, NULL, EVENT_PRIO_MAX);
 	if (ret) {
 		eprintf("failed to register signal handler (%d)\n", ret);
 		return -1;
diff --git a/sheep/trace/trace.c b/sheep/trace/trace.c
index b63f4d2..9c57cc5 100644
--- a/sheep/trace/trace.c
+++ b/sheep/trace/trace.c
@@ -236,7 +236,7 @@ static inline bool short_thread_running(void)
 	return !!nr_short_thread;
 }
 
-static notrace void enable_tracer(int fd, int events, void *data)
+static notrace enum event_ret enable_tracer(int fd, int events, void *data)
 {
 	eventfd_t value;
 	int ret;
@@ -248,11 +248,11 @@ static notrace void enable_tracer(int fd, int events, void *data)
 	 */
 	if (ret < 0) {
 		eprintf("%m\n");
-		return;
+		return EVENT_LOOP_NOP;
 	}
 
 	if (short_thread_running())
-		return;
+		return EVENT_LOOP_NOP;
 
 	suspend_worker_threads();
 	patch_all_sites((unsigned long)trace_caller);
@@ -260,9 +260,11 @@ static notrace void enable_tracer(int fd, int events, void *data)
 	unregister_event(trace_efd);
 	trace_in_patch = false;
 	dprintf("tracer enabled\n");
+
+	return EVENT_LOOP_NOP;
 }
 
-static notrace void disable_tracer(int fd, int events, void *data)
+static notrace enum event_ret disable_tracer(int fd, int events, void *data)
 {
 	eventfd_t value;
 	int ret;
@@ -270,11 +272,11 @@ static notrace void disable_tracer(int fd, int events, void *data)
 	ret = eventfd_read(fd, &value);
 	if (ret < 0) {
 		eprintf("%m\n");
-		return;
+		return EVENT_LOOP_NOP;
 	}
 
 	if (short_thread_running())
-		return;
+		return EVENT_LOOP_NOP;
 
 	suspend_worker_threads();
 	nop_all_sites();
@@ -282,6 +284,8 @@ static notrace void disable_tracer(int fd, int events, void *data)
 	unregister_event(trace_efd);
 	trace_in_patch = false;
 	dprintf("tracer disabled\n");
+
+	return EVENT_LOOP_NOP;
 }
 
 notrace int trace_enable(void)
diff --git a/sheep/work.c b/sheep/work.c
index 49eac9a..bf5bea7 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -113,7 +113,7 @@ void queue_work(struct work_queue *q, struct work *work)
 		create_short_thread(wi, work);
 }
 
-static void bs_thread_request_done(int fd, int events, void *data)
+static enum event_ret bs_thread_request_done(int fd, int events, void *data)
 {
 	int ret;
 	struct worker_info *wi;
@@ -123,7 +123,7 @@ static void bs_thread_request_done(int fd, int events, void *data)
 
 	ret = eventfd_read(fd, &value);
 	if (ret < 0)
-		return;
+		return EVENT_LOOP_NOP;
 
 	list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
 		pthread_mutex_lock(&wi->finished_lock);
@@ -139,6 +139,8 @@ static void bs_thread_request_done(int fd, int events, void *data)
 				short_thread_end();
 		}
 	}
+
+	return EVENT_LOOP_NOP;
 }
 
 static void *worker_routine(void *arg)
-- 
1.7.5.1




More information about the sheepdog mailing list