[sheepdog] [PATCH 1/2] lib: prioritize events for epoll() and enhance event loop

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Thu Dec 20 10:41:11 CET 2012


From: Hitoshi Mitake <h.mitake at gmail.com>

This patch adds two new functionalities to event_loop():
1. prioritize registered events
2. more flexible control with return values from event handlers

I'd like to describe the objectives of two functionalities.

1. Current event_loop() treats priorities of every registered event
equally. But in some case, the way of treating them unequally is helpful.

2. Previous event handlers returned no values. This patch let them
return newly defined enum value: event_ret. event_ret has 3 values:
* EVENT_RET_DONE ... this value means no special actions are required.
  event loop should continue normally.
* EVENT_RET_REFRESH ... this value means event_loop() should call
  epoll_wait() again. If events which should be processed as soon as
  possible during the event handler, this value is returned. So the
  event can be processed before already rised events.
* EVENT_RET_BREAK ... this value means event_loop() must break
  immediately.

This patch defines new function register_event_prio(). Basically this
function is same to register_event(), but it also requires priority
as argument. The domain of priority is signed integer, and it is
stored as a newly defined member of struct event_info (named prio).
Now register_event() is a wrapper function which calls
register_event_prio() with its arguments and default priority.

For prioritizing events, new event_loop() sorts ready events after
epoll_wait() with qsort(). But it will consume time in vain because
events which have non default priority is very few. So this patch adds
new integer value nr_prioritized_events. This value is used for
counting events which have non default priority. register_event_prio()
increments it when such an event is registered, and unregister_event()
decrements it when such an event is unregistered. If this value is 0,
event_loop() can skip calling qsort() because there is no events with
non default priority.

(Using qsort() might not be a good way. I think that constructing
binary search tree with nested epoll fds can be a good
alternative. But I believe sucn an optimization should be done later.)

Cc: Liu Yuan <tailai.ly at taobao.com>
Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
 include/event.h           |   22 ++++++++-
 lib/event.c               |  106 +++++++++++++++++++++++++++++++++++++++++---
 sheep/cluster/accord.c    |    6 ++-
 sheep/cluster/corosync.c  |    4 +-
 sheep/cluster/local.c     |    4 +-
 sheep/cluster/zookeeper.c |    8 ++-
 sheep/request.c           |   33 ++++++++------
 sheep/sheep.c             |    6 ++-
 sheep/trace/trace.c       |   16 ++++---
 sheep/work.c              |    6 ++-
 10 files changed, 170 insertions(+), 41 deletions(-)

diff --git a/include/event.h b/include/event.h
index b0e7552..05c9783 100644
--- a/include/event.h
+++ b/include/event.h
@@ -1,14 +1,21 @@
 #ifndef __EVENT_H__
 #define __EVENT_H__
 
+#include <limits.h>
+
 #include "list.h"
 
 struct event_info;
 
-typedef void (*event_handler_t)(int fd, int events, void *data);
+enum event_ret {
+	EVENT_RET_DONE,
+	EVENT_RET_REFRESH,
+	EVENT_RET_BREAK,
+};
+
+typedef enum event_ret (*event_handler_t)(int fd, int events, void *data);
 
 int init_event(int nr);
-int register_event(int fd, event_handler_t h, void *data);
 void unregister_event(int fd);
 int modify_event(int fd, unsigned int events);
 void event_loop(int timeout);
@@ -20,4 +27,15 @@ struct timer {
 
 void add_timer(struct timer *t, unsigned int mseconds);
 
+#define EVENT_PRIO_MAX INT_MAX
+#define EVENT_PRIO_MIN INT_MIN
+#define EVENT_PRIO_DEFAULT 0
+
+int register_event_prio(int fd, event_handler_t h, void *data, int prio);
+static inline int register_event(int fd, event_handler_t h, void *data)
+{
+	return register_event_prio(fd, h, data, EVENT_PRIO_DEFAULT);
+}
+
+
 #endif
diff --git a/lib/event.c b/lib/event.c
index 6b00e06..cc21fd6 100644
--- a/lib/event.c
+++ b/lib/event.c
@@ -26,18 +26,20 @@ static LIST_HEAD(events_list);
 
 #define TICK 1
 
-static void timer_handler(int fd, int events, void *data)
+static enum event_ret timer_handler(int fd, int events, void *data)
 {
 	struct timer *t = data;
 	uint64_t val;
 
 	if (read(fd, &val, sizeof(val)) < 0)
-		return;
+		return EVENT_RET_DONE;
 
 	t->callback(t->data);
 
 	unregister_event(fd);
 	close(fd);
+
+	return EVENT_RET_DONE;
 }
 
 void add_timer(struct timer *t, unsigned int mseconds)
@@ -68,9 +70,14 @@ struct event_info {
 	event_handler_t handler;
 	int fd;
 	void *data;
+	int prio;
 	struct list_head ei_list;
 };
 
+static struct epoll_event *polled_events;
+static int nr_events;
+static struct epoll_event **sorted_events;
+
 int init_event(int nr)
 {
 	efd = epoll_create(nr);
@@ -78,6 +85,22 @@ int init_event(int nr)
 		eprintf("failed to create epoll fd\n");
 		return -1;
 	}
+
+	polled_events = xcalloc(nr, sizeof(struct epoll_event));
+	if (!polled_events) {
+		vprintf(SDOG_ERR, "failed to alloc epoll_event array\n");
+		return -1;
+	}
+
+	sorted_events = xcalloc(nr, sizeof(struct epoll_event **));
+	if (!sorted_events) {
+		vprintf(SDOG_ERR, "failed to alloc working space for"\
+			" sorting by priority\n");
+		return -1;
+	}
+
+	nr_events = nr;
+
 	return 0;
 }
 
@@ -92,7 +115,9 @@ static struct event_info *lookup_event(int fd)
 	return NULL;
 }
 
-int register_event(int fd, event_handler_t h, void *data)
+static int nr_prioritized_events;
+
+int register_event_prio(int fd, event_handler_t h, void *data, int prio)
 {
 	int ret;
 	struct epoll_event ev;
@@ -105,6 +130,9 @@ int register_event(int fd, event_handler_t h, void *data)
 	ei->fd = fd;
 	ei->handler = h;
 	ei->data = data;
+	ei->prio = prio;
+	if (prio != EVENT_PRIO_DEFAULT)
+		nr_prioritized_events++;
 
 	memset(&ev, 0, sizeof(ev));
 	ev.events = EPOLLIN;
@@ -133,6 +161,9 @@ void unregister_event(int fd)
 	if (ret)
 		eprintf("failed to delete epoll event for fd %d: %m\n", fd);
 
+	if (ei->prio != EVENT_PRIO_DEFAULT)
+		nr_prioritized_events--;
+
 	list_del(&ei->ei_list);
 	free(ei);
 }
@@ -161,23 +192,82 @@ int modify_event(int fd, unsigned int events)
 	return 0;
 }
 
+static int epoll_event_cmp(const void *_a, const void *_b)
+{
+	const struct event_info *a, *b;
+
+	a = (*(struct epoll_event **)_a)->data.ptr;
+	b = (*(struct epoll_event **)_b)->data.ptr;
+
+	if (a->prio == b->prio)
+		return 0;
+	else if (a->prio < b->prio)
+		return 1;
+	else
+		return -1;
+}
+
 void event_loop(int timeout)
 {
 	int i, nr;
-	struct epoll_event events[128];
+	int prioritized;
 
-	nr = epoll_wait(efd, events, ARRAY_SIZE(events), TICK * 1000);
+refresh:
+	/*
+	 * we have to refresh the number of prioritized events here, because
+	 * nr_prioritized_events is increased in event handlers
+	 */
+	prioritized = nr_prioritized_events;
+
+	nr = epoll_wait(efd, polled_events, nr_events, TICK * 1000);
 	if (nr < 0) {
 		if (errno == EINTR)
 			return;
 		eprintf("epoll_wait failed: %m\n");
 		exit(1);
 	} else if (nr) {
+		if (prioritized) {
+			for (i = 0; i < nr; i++)
+				sorted_events[i] = (struct epoll_event *)
+					&polled_events[i];
+
+			qsort(sorted_events, nr, sizeof(struct epoll_event *),
+				epoll_event_cmp);
+		}
+
 		for (i = 0; i < nr; i++) {
 			struct event_info *ei;
-
-			ei = (struct event_info *)events[i].data.ptr;
-			ei->handler(ei->fd, events[i].events, ei->data);
+			uint32_t e;
+			enum event_ret ret;
+
+			if (!prioritized) {
+				ei = (struct event_info *)
+					polled_events[i].data.ptr;
+				e = polled_events[i].events;
+			} else {
+				ei = (struct event_info *)
+					sorted_events[i]->data.ptr;
+				e = sorted_events[i]->events;
+			}
+
+			ret = ei->handler(ei->fd, e, ei->data);
+			switch (ret) {
+			case EVENT_RET_DONE:
+				break;
+
+			case EVENT_RET_REFRESH:
+				goto refresh;
+
+			case EVENT_RET_BREAK:
+				goto end;
+
+			default:
+				panic("invalid event_ret: %d\n", ret);
+				break;
+			}
 		}
 	}
+
+end:
+	return;
 }
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index be0cf83..87da1e5 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -449,7 +449,7 @@ static void accord_unblock(void *msg, size_t msg_len)
 	pthread_mutex_unlock(&queue_lock);
 }
 
-static void acrd_handler(int listen_fd, int events, void *data)
+static enum event_ret acrd_handler(int listen_fd, int events, void *data)
 {
 	int ret;
 	eventfd_t value;
@@ -466,7 +466,7 @@ static void acrd_handler(int listen_fd, int events, void *data)
 
 	ret = eventfd_read(efd, &value);
 	if (ret < 0)
-		return;
+		return EVENT_RET_DONE;
 
 	pthread_mutex_lock(&queue_lock);
 
@@ -519,6 +519,8 @@ static void acrd_handler(int listen_fd, int events, void *data)
 	}
 out:
 	pthread_mutex_unlock(&queue_lock);
+
+	return EVENT_RET_DONE;
 }
 
 static int accord_init(const char *option)
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index a607bae..ad00848 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -735,7 +735,7 @@ static int corosync_notify(void *msg, size_t msg_len)
 			   NULL, 0, msg, msg_len);
 }
 
-static void corosync_handler(int listen_fd, int events, void *data)
+static enum event_ret corosync_handler(int listen_fd, int events, void *data)
 {
 	int ret;
 
@@ -750,7 +750,7 @@ static void corosync_handler(int listen_fd, int events, void *data)
 		goto out;
 	}
 
-	return;
+	return EVENT_RET_DONE;
 out:
 	log_close();
 	exit(1);
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 20ce325..c37cbe9 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -490,7 +490,7 @@ out:
 	return true;
 }
 
-static void local_handler(int listen_fd, int events, void *data)
+static enum event_ret local_handler(int listen_fd, int events, void *data)
 {
 	struct signalfd_siginfo siginfo;
 	int ret;
@@ -512,6 +512,8 @@ static void local_handler(int listen_fd, int events, void *data)
 		;
 
 	shm_queue_unlock();
+
+	return EVENT_RET_DONE;
 }
 
 static int local_get_local_addr(uint8_t *myaddr)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index ea02137..0ccc9aa 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -645,7 +645,7 @@ static void (*const zk_event_handlers[])(struct zk_event *ev) = {
 
 static const int zk_max_event_handlers = ARRAY_SIZE(zk_event_handlers);
 
-static void zk_event_handler(int listen_fd, int events, void *data)
+static enum event_ret zk_event_handler(int listen_fd, int events, void *data)
 {
 	eventfd_t value;
 	struct zk_event ev;
@@ -657,15 +657,17 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 	}
 
 	if (eventfd_read(efd, &value) < 0)
-		return;
+		return EVENT_RET_DONE;
 
 	if (zk_queue_pop(&ev) < 0)
-		return;
+		return EVENT_RET_DONE;
 
 	if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
 		zk_event_handlers[ev.type](&ev);
 	else
 		eprintf("unhandled type %d\n", ev.type);
+
+	return EVENT_RET_DONE;
 }
 
 static int zk_init(const char *option)
diff --git a/sheep/request.c b/sheep/request.c
index cd81e36..2ecea77 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -807,24 +807,28 @@ static struct client_info *create_client(int fd, struct cluster_info *cluster)
 	return ci;
 }
 
-static void client_handler(int fd, int events, void *data)
+static enum event_ret client_handler(int fd, int events, void *data)
 {
 	struct client_info *ci = (struct client_info *)data;
 
 	dprintf("%x, rx %d, tx %d\n", events, ci->conn.c_rx_state,
 		ci->conn.c_tx_state);
 
-	if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
-		return clear_client_info(ci);
+	if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn)) {
+		clear_client_info(ci);
+		return EVENT_RET_DONE;
+	}
 
 	if (events & EPOLLIN)
 		do_client_rx(ci);
 
 	if (events & EPOLLOUT)
 		do_client_tx(ci);
+
+	return EVENT_RET_DONE;
 }
 
-static void listen_handler(int listen_fd, int events, void *data)
+static enum event_ret listen_handler(int listen_fd, int events, void *data)
 {
 	struct sockaddr_storage from;
 	socklen_t namesize;
@@ -835,49 +839,50 @@ static void listen_handler(int listen_fd, int events, void *data)
 	if (sys->status == SD_STATUS_SHUTDOWN) {
 		dprintf("unregistering connection %d\n", listen_fd);
 		unregister_event(listen_fd);
-		return;
+		return EVENT_RET_DONE;
 	}
 
 	namesize = sizeof(from);
 	fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
 	if (fd < 0) {
 		eprintf("failed to accept a new connection: %m\n");
-		return;
+		return EVENT_RET_DONE;
 	}
 
 	if (is_inet_socket) {
 		ret = set_keepalive(fd);
 		if (ret) {
 			close(fd);
-			return;
+			return EVENT_RET_DONE;
 		}
 
 		ret = set_nodelay(fd);
 		if (ret) {
 			close(fd);
-			return;
+			return EVENT_RET_DONE;
 		}
 	}
 
 	ret = set_nonblocking(fd);
 	if (ret) {
 		close(fd);
-		return;
+		return EVENT_RET_DONE;
 	}
 
 	ci = create_client(fd, data);
 	if (!ci) {
 		close(fd);
-		return;
+		return EVENT_RET_DONE;
 	}
 
 	ret = register_event(fd, client_handler, ci);
 	if (ret) {
 		destroy_client(ci);
-		return;
+		return EVENT_RET_DONE;
 	}
 
 	dprintf("accepted a new connection: %d\n", fd);
+	return EVENT_RET_DONE;
 }
 
 static int create_listen_port_fn(int fd, void *data)
@@ -905,7 +910,7 @@ int init_unix_domain_socket(const char *dir)
 					 &is_inet_socket);
 }
 
-static void req_handler(int listen_fd, int events, void *data)
+static enum event_ret req_handler(int listen_fd, int events, void *data)
 {
 	eventfd_t value;
 	struct request *req, *t;
@@ -917,7 +922,7 @@ static void req_handler(int listen_fd, int events, void *data)
 
 	ret = eventfd_read(listen_fd, &value);
 	if (ret < 0)
-		return;
+		return EVENT_RET_DONE;
 
 	pthread_mutex_lock(&sys->wait_req_lock);
 	list_splice_init(&sys->wait_req_queue, &pending_list);
@@ -927,6 +932,8 @@ static void req_handler(int listen_fd, int events, void *data)
 		list_del(&req->request_list);
 		queue_request(req);
 	}
+
+	return EVENT_RET_DONE;
 }
 
 void local_req_init(void)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 7f4bf26..c3f4200 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -123,7 +123,7 @@ static int create_pidfile(const char *filename)
 
 static int sigfd;
 
-static void signal_handler(int listen_fd, int events, void *data)
+static enum event_ret signal_handler(int listen_fd, int events, void *data)
 {
 	struct signalfd_siginfo siginfo;
 	int ret;
@@ -139,6 +139,8 @@ static void signal_handler(int listen_fd, int events, void *data)
 		eprintf("signal %d unhandled\n", siginfo.ssi_signo);
 		break;
 	}
+
+	return EVENT_RET_DONE;
 }
 
 static int init_signal(void)
@@ -160,7 +162,7 @@ static int init_signal(void)
 		return -1;
 	}
 
-	ret = register_event(sigfd, signal_handler, NULL);
+	ret = register_event_prio(sigfd, signal_handler, NULL, EVENT_PRIO_MAX);
 	if (ret) {
 		eprintf("failed to register signal handler (%d)\n", ret);
 		return -1;
diff --git a/sheep/trace/trace.c b/sheep/trace/trace.c
index b63f4d2..e6f3c23 100644
--- a/sheep/trace/trace.c
+++ b/sheep/trace/trace.c
@@ -236,7 +236,7 @@ static inline bool short_thread_running(void)
 	return !!nr_short_thread;
 }
 
-static notrace void enable_tracer(int fd, int events, void *data)
+static notrace enum event_ret enable_tracer(int fd, int events, void *data)
 {
 	eventfd_t value;
 	int ret;
@@ -248,11 +248,11 @@ static notrace void enable_tracer(int fd, int events, void *data)
 	 */
 	if (ret < 0) {
 		eprintf("%m\n");
-		return;
+		return EVENT_RET_DONE;
 	}
 
 	if (short_thread_running())
-		return;
+		return EVENT_RET_DONE;
 
 	suspend_worker_threads();
 	patch_all_sites((unsigned long)trace_caller);
@@ -260,9 +260,11 @@ static notrace void enable_tracer(int fd, int events, void *data)
 	unregister_event(trace_efd);
 	trace_in_patch = false;
 	dprintf("tracer enabled\n");
+
+	return EVENT_RET_DONE;
 }
 
-static notrace void disable_tracer(int fd, int events, void *data)
+static notrace enum event_ret disable_tracer(int fd, int events, void *data)
 {
 	eventfd_t value;
 	int ret;
@@ -270,11 +272,11 @@ static notrace void disable_tracer(int fd, int events, void *data)
 	ret = eventfd_read(fd, &value);
 	if (ret < 0) {
 		eprintf("%m\n");
-		return;
+		return EVENT_RET_DONE;
 	}
 
 	if (short_thread_running())
-		return;
+		return EVENT_RET_DONE;
 
 	suspend_worker_threads();
 	nop_all_sites();
@@ -282,6 +284,8 @@ static notrace void disable_tracer(int fd, int events, void *data)
 	unregister_event(trace_efd);
 	trace_in_patch = false;
 	dprintf("tracer disabled\n");
+
+	return EVENT_RET_DONE;
 }
 
 notrace int trace_enable(void)
diff --git a/sheep/work.c b/sheep/work.c
index 49eac9a..ffda292 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -113,7 +113,7 @@ void queue_work(struct work_queue *q, struct work *work)
 		create_short_thread(wi, work);
 }
 
-static void bs_thread_request_done(int fd, int events, void *data)
+static enum event_ret bs_thread_request_done(int fd, int events, void *data)
 {
 	int ret;
 	struct worker_info *wi;
@@ -123,7 +123,7 @@ static void bs_thread_request_done(int fd, int events, void *data)
 
 	ret = eventfd_read(fd, &value);
 	if (ret < 0)
-		return;
+		return EVENT_RET_DONE;
 
 	list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
 		pthread_mutex_lock(&wi->finished_lock);
@@ -139,6 +139,8 @@ static void bs_thread_request_done(int fd, int events, void *data)
 				short_thread_end();
 		}
 	}
+
+	return EVENT_RET_DONE;
 }
 
 static void *worker_routine(void *arg)
-- 
1.7.2.5




More information about the sheepdog mailing list