[Sheepdog] [PATCH] Remove unregister_event from sd_xxx_handler()

Christoph Hellwig hch at infradead.org
Mon May 14 19:08:31 CEST 2012


On Tue, May 15, 2012 at 12:46:13AM +0800, Yunkai Zhang wrote:
> > another thing I noticed is that I think your patch makes the ->dispatch
> > cluster driver operation obsolete.  Once we never register the event
> > handler for the cluster driver FD it can be handled netirely inside the
> > cluster driver, e.g. just call register_event from inside the init
> > method and don't even tell the core about it.
> >
> We have called register_event(cdrv_fd, ...) in create_cluster().

Yes, but we could also do this in the drivers.  See below for an
untested version of your patch that implements this suggestions:

From: Yunkai Zhang <yunkai.me at gmail.com>
Subject: [Sheepdog] [PATCH] Remove unregister_event from sd_xxx_handler()

From: Yunkai Zhang <qiushu.zyk at taobao.com>

Call update_cluster_info() in driver directly so that we can remove
unregister_event() from sd_xxx_handler().

This can speed up joinng/leaving process.

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 sheep/cluster.h           |    3 +++
 sheep/cluster/accord.c    |    4 ++++
 sheep/cluster/corosync.c  |    5 +++++
 sheep/cluster/zookeeper.c |    7 ++++++-
 sheep/group.c             |   30 +++++++++++++-----------------
 5 files changed, 31 insertions(+), 18 deletions(-)

Index: sheepdog/sheep/cluster.h
===================================================================
--- sheepdog.orig/sheep/cluster.h	2012-05-14 18:45:08.967970005 +0200
+++ sheepdog/sheep/cluster.h	2012-05-14 18:49:03.759967772 +0200
@@ -86,19 +86,6 @@ struct cluster_driver {
 	 */
 	int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
 
-	/*
-	 * Dispatch handlers
-	 *
-	 * This function dispatches handlers according to the
-	 * delivered events (join/leave/notify) in the cluster.
-	 *
-	 * Note that the events sequence is totally ordered; all nodes
-	 * call the handlers in the same sequence.
-	 *
-	 * Returns zero on success, -1 on error
-	 */
-	int (*dispatch)(void);
-
 	struct list_head list;
 };
 
@@ -107,7 +94,7 @@ extern struct list_head cluster_drivers;
 #define cdrv_register(driver)						\
 static void __attribute__((constructor)) regist_ ## driver(void) {	\
 	if (!driver.init || !driver.join || !driver.leave ||		\
-	    !driver.notify || !driver.dispatch)				\
+	    !driver.notify)						\
 		panic("the driver '%s' is incomplete\n", driver.name);	\
 	list_add(&driver.list, &cluster_drivers);			\
 }
@@ -191,5 +178,8 @@ void sd_leave_handler(struct sd_node *le
 void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
 enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
 		void *opaque);
+void update_cluster_info(struct sd_node *joined, struct sd_node *nodes,
+		size_t nr_nodes, void *opaque);
+void update_epoch_info(void);
 
 #endif
Index: sheepdog/sheep/cluster/accord.c
===================================================================
--- sheepdog.orig/sheep/cluster/accord.c	2012-05-11 17:51:27.000000000 +0200
+++ sheepdog/sheep/cluster/accord.c	2012-05-14 18:56:36.855963441 +0200
@@ -16,6 +16,7 @@
 #include <assert.h>
 #include <pthread.h>
 #include <sys/eventfd.h>
+#include <sys/epoll.h>
 #include <accord/accord.h>
 
 #include "cluster.h"
@@ -463,6 +464,8 @@ static void acrd_watch_fn(struct acrd_ha
 	eventfd_write(efd, value);
 }
 
+static void accord_dispatch(int listen_fd, int events, void *data);
+
 static int accord_init(const char *option, uint8_t *myaddr)
 {
 	if (!option) {
@@ -509,7 +512,13 @@ static int accord_init(const char *optio
 	acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL,
 		       acrd_watch_fn, NULL);
 
-	return efd;
+	ret = register_event(efd, accord_dispatch, NULL);
+	if (ret) {
+		eprintf("failed to register epoll events (%d)\n", ret);
+		return -1;
+	}
+
+	return 0;
 }
 
 static int accord_join(struct sd_node *myself,
@@ -550,7 +559,7 @@ static void acrd_block_done(struct work
 {
 }
 
-static int accord_dispatch(void)
+static void accord_dispatch(int listen_fd, int events, void *data)
 {
 	int ret;
 	eventfd_t value;
@@ -561,6 +570,11 @@ static int accord_dispatch(void)
 		.done = acrd_block_done,
 	};
 
+	if (events & EPOLLHUP) {
+		eprintf("received EPOLLHUP event: has accord exited?\n");
+		goto out;
+	}
+
 	dprintf("read event\n");
 	ret = eventfd_read(efd, &value);
 	if (ret < 0)
@@ -603,10 +617,14 @@ static int accord_dispatch(void)
 			acrd_queue_pop(ahandle, &ev);
 		}
 
+		if (ev.join_result == CJ_RES_SUCCESS)
+			update_cluster_info(&ev.sender, ev.nodes,
+					ev.nr_nodes, ev.buf);
 		sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
 				    ev.join_result, ev.buf);
 		break;
 	case EVENT_LEAVE:
+		update_epoch_info();
 		sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
 		break;
 	case EVENT_NOTIFY:
@@ -628,8 +646,10 @@ static int accord_dispatch(void)
 	}
 out:
 	pthread_mutex_unlock(&queue_lock);
-
 	return 0;
+err:
+	log_close();
+	exit(1);
 }
 
 struct cluster_driver cdrv_accord = {
@@ -639,7 +659,6 @@ struct cluster_driver cdrv_accord = {
 	.join       = accord_join,
 	.leave      = accord_leave,
 	.notify     = accord_notify,
-	.dispatch   = accord_dispatch,
 };
 
 cdrv_register(cdrv_accord);
Index: sheepdog/sheep/cluster/corosync.c
===================================================================
--- sheepdog.orig/sheep/cluster/corosync.c	2012-05-14 18:45:08.967970005 +0200
+++ sheepdog/sheep/cluster/corosync.c	2012-05-14 18:57:48.279962765 +0200
@@ -12,7 +12,9 @@
 #include <unistd.h>
 #include <corosync/cpg.h>
 #include <corosync/cfg.h>
+#include <sys/epoll.h>
 
+#include "event.h"
 #include "cluster.h"
 #include "work.h"
 
@@ -322,6 +324,10 @@ static int __corosync_dispatch_one(struc
 		case CJ_RES_FAIL:
 		case CJ_RES_JOIN_LATER:
 			build_node_list(cpg_nodes, nr_cpg_nodes, entries);
+			if (cevent->result == CJ_RES_SUCCESS)
+				update_cluster_info(&cevent->sender.ent,
+						entries, nr_cpg_nodes,
+						cevent->msg);
 			sd_join_handler(&cevent->sender.ent, entries,
 						       nr_cpg_nodes, cevent->result,
 						       cevent->msg);
@@ -334,6 +340,7 @@ static int __corosync_dispatch_one(struc
 			break;
 		cevent->sender.ent = cpg_nodes[idx].ent;
 
+		update_epoch_info();
 		del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
 		nr_cpg_nodes--;
 		build_node_list(cpg_nodes, nr_cpg_nodes, entries);
@@ -614,6 +621,28 @@ static void cdrv_cpg_confchg(cpg_handle_
 	__corosync_dispatch();
 }
 
+static void corosync_dispatch(int listen_fd, int events, void *data)
+{
+	int ret;
+
+	if (events & EPOLLHUP) {
+		eprintf("received EPOLLHUP event: has corosync exited?\n");
+		goto out;
+	}
+
+	ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL);
+	if (ret != CPG_OK) {
+		eprintf("oops... an error occurred inside corosync\n");
+		goto out;
+	}
+
+	return;
+out:
+	log_close();
+	exit(1);
+}
+
+
 static int corosync_init(const char *option, uint8_t *myaddr)
 {
 	int ret, fd;
@@ -662,7 +691,13 @@ static int corosync_init(const char *opt
 		return -1;
 	}
 
-	return fd;
+	ret = register_event(fd, corosync_dispatch, NULL);
+	if (ret) {
+		eprintf("failed to register epoll events (%d)\n", ret);
+		return -1;
+	}
+
+	return 0;
 }
 
 static int corosync_join(struct sd_node *myself,
@@ -728,17 +763,6 @@ static int corosync_notify(void *msg, si
 	return ret;
 }
 
-static int corosync_dispatch(void)
-{
-	int ret;
-
-	ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL);
-	if (ret != CPG_OK)
-		return -1;
-
-	return 0;
-}
-
 struct cluster_driver cdrv_corosync = {
 	.name       = "corosync",
 
@@ -746,7 +770,6 @@ struct cluster_driver cdrv_corosync = {
 	.join       = corosync_join,
 	.leave      = corosync_leave,
 	.notify     = corosync_notify,
-	.dispatch   = corosync_dispatch,
 };
 
 cdrv_register(cdrv_corosync);
Index: sheepdog/sheep/cluster/zookeeper.c
===================================================================
--- sheepdog.orig/sheep/cluster/zookeeper.c	2012-05-14 18:45:08.971970007 +0200
+++ sheepdog/sheep/cluster/zookeeper.c	2012-05-14 18:58:38.199962294 +0200
@@ -14,10 +14,12 @@
 #include <netdb.h>
 #include <search.h>
 #include <assert.h>
+#include <sys/epoll.h>
 #include <sys/eventfd.h>
 #include <zookeeper/zookeeper.h>
 #include <urcu/uatomic.h>
 
+#include "event.h"
 #include "cluster.h"
 #include "work.h"
 
@@ -30,6 +32,7 @@
 #define QUEUE_ZNODE BASE_ZNODE "/queue"
 #define MEMBER_ZNODE BASE_ZNODE "/member"
 
+static void zk_dispatch(int listen_fd, int events, void *data);
 
 /* zookeeper API wrapper prototypes */
 ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
@@ -647,6 +650,8 @@ static int get_addr(uint8_t *bytes)
 
 static int zk_init(const char *option, uint8_t *myaddr)
 {
+	int ret;
+
 	if (!option) {
 		eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n");
 		eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n");
@@ -678,7 +683,13 @@ static int zk_init(const char *option, u
 		return -1;
 	}
 
-	return efd;
+	ret = register_event(efd, zk_dispatch, NULL);
+	if (ret) {
+		eprintf("failed to register epoll events (%d)\n", ret);
+		return -1;
+	}
+
+	return 0;
 }
 
 static int zk_join(struct sd_node *myself,
@@ -745,7 +756,7 @@ static void zk_block_done(struct work *w
 {
 }
 
-static int zk_dispatch(void)
+static void zk_dispatch(int listen_fd, int events, void *data)
 {
 	int ret, rc, retry;
 	char path[256];
@@ -758,13 +769,18 @@ static int zk_dispatch(void)
 		.done = zk_block_done,
 	};
 
+	if (events & EPOLLHUP) {
+		eprintf("received EPOLLHUP event: has corosync exited?\n");
+		goto err;
+	}
+
 	dprintf("read event\n");
 	ret = eventfd_read(efd, &value);
 	if (ret < 0)
-		return 0;
+		return;
 
 	if (uatomic_read(&zk_notify_blocked))
-		return 0;
+		return;
 
 	ret = zk_queue_pop(zhandle, &ev);
 	if (ret < 0)
@@ -824,6 +840,10 @@ static int zk_dispatch(void)
 		}
 
 		node_btree_add(&zk_node_btroot, &ev.sender);
+		build_node_list(zk_node_btroot);
+		if (ev.join_result == CJ_RES_SUCCESS)
+			update_cluster_info(&ev.sender.node, sd_nodes,
+					nr_sd_nodes, ev.buf);
 		dprintf("one sheep joined[down], nr_nodes:%ld, sender:%s, joined:%d\n",
 				nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
 
@@ -841,7 +861,6 @@ static int zk_dispatch(void)
 			}
 		}
 
-		build_node_list(zk_node_btroot);
 		sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes,
 				    ev.join_result, ev.buf);
 		break;
@@ -853,6 +872,8 @@ static int zk_dispatch(void)
 			goto out;
 		}
 
+		update_epoch_info();
+
 		node_btree_del(&zk_node_btroot, n);
 		dprintf("one sheep left, nr_nodes:%ld\n", nr_zk_nodes);
 
@@ -880,7 +901,10 @@ static int zk_dispatch(void)
 		break;
 	}
 out:
-	return 0;
+	return;
+err:
+	log_close();
+	exit(1);
 }
 
 struct cluster_driver cdrv_zookeeper = {
@@ -890,7 +914,6 @@ struct cluster_driver cdrv_zookeeper = {
 	.join       = zk_join,
 	.leave      = zk_leave,
 	.notify     = zk_notify,
-	.dispatch   = zk_dispatch,
 };
 
 cdrv_register(cdrv_zookeeper);
Index: sheepdog/sheep/group.c
===================================================================
--- sheepdog.orig/sheep/group.c	2012-05-14 18:45:08.971970007 +0200
+++ sheepdog/sheep/group.c	2012-05-14 18:54:38.735964575 +0200
@@ -25,8 +25,6 @@
 #include "work.h"
 #include "cluster.h"
 
-static int cdrv_fd;
-
 struct node {
 	struct sd_node ent;
 	struct list_head list;
@@ -262,24 +260,6 @@ void do_cluster_request(struct work *wor
 	free(msg);
 }
 
-static void group_handler(int listen_fd, int events, void *data)
-{
-	int ret;
-	if (events & EPOLLHUP) {
-		eprintf("received EPOLLHUP event: has corosync exited?\n");
-		goto out;
-	}
-
-	ret = sys->cdrv->dispatch();
-	if (ret == 0)
-		return;
-	else
-		eprintf("oops... an error occurred inside corosync\n");
-out:
-	log_close();
-	exit(1);
-}
-
 static inline int get_nodes_nr_from(struct list_head *l)
 {
 	struct node *node;
@@ -577,9 +557,20 @@ static void finish_join(struct join_mess
 	}
 }
 
-static void update_cluster_info(struct join_message *msg,
-		struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes)
+void update_epoch_info(void)
+{
+	if (sys_can_recover()) {
+		sys->epoch++;
+		update_epoch_store(sys->epoch);
+		update_epoch_log(sys->epoch);
+	}
+}
+
+void update_cluster_info(struct sd_node *joined, struct sd_node *nodes,
+		size_t nr_nodes, void *opaque)
 {
+	struct join_message *msg = opaque;
+
 	eprintf("status = %d, epoch = %d, %x, %d\n", msg->cluster_status,
 		msg->epoch, msg->result, sys->join_finished);
 
@@ -836,11 +827,6 @@ static void __sd_join_done(struct event_
 
 	print_node_list(sys->nodes, sys->nr_nodes);
 
-	if (!sys_stat_join_failed()) {
-		update_cluster_info(jm, &w->joined, w->member_list,
-				    w->member_list_entries);
-	}
-
 	if (sys_can_recover() && jm->inc_epoch) {
 		list_for_each_entry_safe(node, t, &sys->leave_list, list) {
 			list_del(&node->list);
@@ -866,11 +852,6 @@ static void __sd_leave_done(struct event
 	memcpy(sys->nodes, w->member_list, sizeof(*sys->nodes) * sys->nr_nodes);
 	qsort(sys->nodes, sys->nr_nodes, sizeof(*sys->nodes), node_cmp);
 
-	if (sys_can_recover()) {
-		sys->epoch++;
-		update_epoch_store(sys->epoch);
-		update_epoch_log(sys->epoch);
-	}
 	update_vnode_info();
 
 	print_node_list(sys->nodes, sys->nr_nodes);
@@ -943,7 +924,6 @@ static void event_fn(struct work *work)
 static void event_done(struct work *work)
 {
 	struct event_struct *cevent;
-	int ret;
 
 	if (!sys->cur_cevent)
 		vprintf(SDOG_ERR, "bug\n");
@@ -973,9 +953,6 @@ static void event_done(struct work *work
 	vprintf(SDOG_DEBUG, "free %p\n", cevent);
 	event_free(cevent);
 	event_running = 0;
-	ret = register_event(cdrv_fd, group_handler, NULL);
-	if (ret)
-		panic("failed to register event fd");
 
 	process_request_event_queues();
 }
@@ -1086,7 +1063,6 @@ static inline void process_event_queue(v
 	event_work.fn = event_fn;
 	event_work.done = event_done;
 
-	unregister_event(cdrv_fd);
 	queue_work(sys->event_wqueue, &event_work);
 }
 
@@ -1298,8 +1274,8 @@ int create_cluster(int port, int64_t zon
 		}
 	}
 
-	cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
-	if (cdrv_fd < 0)
+	ret = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
+	if (ret < 0)
 		return -1;
 
 	sys->this_node.port = port;
@@ -1327,12 +1303,6 @@ int create_cluster(int port, int64_t zon
 	INIT_LIST_HEAD(&sys->request_queue);
 	INIT_LIST_HEAD(&sys->event_queue);
 
-	ret = register_event(cdrv_fd, group_handler, NULL);
-	if (ret) {
-		eprintf("failed to register epoll events (%d)\n", ret);
-		return 1;
-	}
-
 	ret = send_join_request(&sys->this_node);
 	if (ret != 0)
 		return -1;
Index: sheepdog/sheep/cluster/local.c
===================================================================
--- sheepdog.orig/sheep/cluster/local.c	2012-05-14 18:45:08.967970005 +0200
+++ sheepdog/sheep/cluster/local.c	2012-05-14 18:57:05.299963176 +0200
@@ -13,9 +13,11 @@
 #include <unistd.h>
 #include <sys/mman.h>
 #include <sys/signalfd.h>
+#include <sys/epoll.h>
 #include <sys/file.h>
 #include <search.h>
 #include <signal.h>
+#include <sys/epoll.h>
 #include <fcntl.h>
 #include <assert.h>
 
@@ -34,6 +36,8 @@ static struct sd_node this_node;
 
 static struct work_queue *local_block_wq;
 
+static void local_dispatch(int listen_fd, int events, void *data);
+
 enum local_event_type {
 	EVENT_JOIN = 1,
 	EVENT_LEAVE,
@@ -280,12 +284,12 @@ static void check_pids(void *arg)
 	add_timer(arg, 1);
 }
 
-
 /* Local driver APIs */
 
 static int local_init(const char *option, uint8_t *myaddr)
 {
 	sigset_t mask;
+	int ret;
 	static struct timer t = {
 		.callback = check_pids,
 		.data = &t,
@@ -319,7 +323,13 @@ static int local_init(const char *option
 		return -1;
 	}
 
-	return sigfd;
+	ret = register_event(sigfd, local_dispatch, NULL);
+	if (ret) {
+		eprintf("failed to register epoll events (%d)\n", ret);
+		return -1;
+	}
+
+	return 0;
 }
 
 static int local_join(struct sd_node *myself,
@@ -379,7 +389,7 @@ static void local_block_done(struct work
 {
 }
 
-static int local_dispatch(void)
+static void local_dispatch(int listen_fd, int events, void *data)
 {
 	int ret;
 	struct signalfd_siginfo siginfo;
@@ -390,6 +400,11 @@ static int local_dispatch(void)
 		.done = local_block_done,
 	};
 
+	if (events & EPOLLHUP) {
+		eprintf("received EPOLLHUP event: has corosync exited?\n");
+		goto err;
+	}
+
 	dprintf("read siginfo\n");
 	ret = read(sigfd, &siginfo, sizeof(siginfo));
 	assert(ret == sizeof(siginfo));
@@ -456,7 +471,10 @@ static int local_dispatch(void)
 out:
 	shm_queue_unlock();
 
-	return 0;
+	return;
+err:
+	log_close();
+	exit(1);
 }
 
 struct cluster_driver cdrv_local = {
@@ -466,7 +484,6 @@ struct cluster_driver cdrv_local = {
 	.join       = local_join,
 	.leave      = local_leave,
 	.notify     = local_notify,
-	.dispatch   = local_dispatch,
 };
 
 cdrv_register(cdrv_local);



More information about the sheepdog mailing list