[sheepdog] [PATCH] sheep: remove the dispatch handler

Christoph Hellwig hch at infradead.org
Fri May 18 11:30:40 CEST 2012


Now that we don't unregister and re-register the cluster driver event FD
the need for the dispatch method goes away, instead the cluster drivers
can handle their events locally.

Signed-off-by: Christoph Hellwig <hch at lst.de>

---
 sheep/cluster.h                                      |   16 --
 sheep/cluster/accord.c                               |  122 ++++++++++---------
 sheep/cluster/corosync.c                             |  117 ++++++++++--------
 sheep/cluster/local.c                                |  100 ++++++++-------
 sheep/cluster/zookeeper.c                            |   90 ++++++++------
 sheep/group.c                                        |   30 ----
 7 files changed, 252 insertions(+), 223 deletions(-)

Index: sheepdog/sheep/cluster.h
===================================================================
--- sheepdog.orig/sheep/cluster.h	2012-05-17 15:41:54.571912094 +0200
+++ sheepdog/sheep/cluster.h	2012-05-18 11:08:29.235978546 +0200
@@ -94,19 +94,6 @@ struct cluster_driver {
 	 */
 	void (*unblock)(void *msg, size_t msg_len);
 
-	/*
-	 * Dispatch handlers
-	 *
-	 * This function dispatches handlers according to the
-	 * delivered events (join/leave/notify) in the cluster.
-	 *
-	 * Note that the events sequence is totally ordered; all nodes
-	 * call the handlers in the same sequence.
-	 *
-	 * Returns zero on success, -1 on error
-	 */
-	int (*dispatch)(void);
-
 	struct list_head list;
 };
 
@@ -114,8 +101,7 @@ extern struct list_head cluster_drivers;
 
 #define cdrv_register(driver)						\
 static void __attribute__((constructor)) regist_ ## driver(void) {	\
-	if (!driver.init || !driver.join || !driver.leave ||		\
-	    !driver.notify || !driver.dispatch)				\
+	if (!driver.init || !driver.join || !driver.leave || !driver.notify) \
 		panic("the driver '%s' is incomplete\n", driver.name);	\
 	list_add(&driver.list, &cluster_drivers);			\
 }
Index: sheepdog/sheep/cluster/accord.c
===================================================================
--- sheepdog.orig/sheep/cluster/accord.c	2012-05-17 15:41:54.575912094 +0200
+++ sheepdog/sheep/cluster/accord.c	2012-05-18 11:15:24.003974591 +0200
@@ -15,10 +15,12 @@
 #include <search.h>
 #include <assert.h>
 #include <pthread.h>
+#include <sys/epoll.h>
 #include <sys/eventfd.h>
 #include <accord/accord.h>
 
 #include "cluster.h"
+#include "event.h"
 #include "work.h"
 
 #define MAX_EVENT_BUF_SIZE (64 * 1024)
@@ -458,55 +460,6 @@ static void acrd_watch_fn(struct acrd_ha
 	eventfd_write(efd, value);
 }
 
-static int accord_init(const char *option, uint8_t *myaddr)
-{
-	if (!option) {
-		eprintf("specify one of the accord servers.\n");
-		eprintf("e.g. sheep /store -c accord:127.0.0.1\n");
-		return -1;
-	}
-
-	pthread_mutex_lock(&start_lock);
-
-	ahandle = acrd_init(option, 9090, acrd_join_fn, acrd_leave_fn, NULL);
-	if (!ahandle) {
-		eprintf("failed to connect to accrd server %s\n", option);
-		return -1;
-	}
-
-	if (get_addr(myaddr) < 0)
-		return -1;
-
-	efd = eventfd(0, EFD_NONBLOCK);
-	if (efd < 0) {
-		eprintf("failed to create an event fd: %m\n");
-		return -1;
-	}
-
-	acrd_wq = init_work_queue(1);
-	if (!acrd_wq)
-		eprintf("failed to create accord workqueue: %m\n");
-		return -1;
-	}
-
-	pthread_cond_wait(&start_cond, &start_lock);
-	pthread_mutex_unlock(&start_lock);
-
-	if (need_cleanup)
-		for_each_acrd_file(ahandle, BASE_FILE, __acrd_del, NULL);
-	else {
-		queue_start_pos = -1;
-		queue_end_pos = -1;
-		for_each_acrd_file(ahandle, QUEUE_FILE, find_queue_end,
-				   &queue_end_pos);
-	}
-
-	acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL,
-		       acrd_watch_fn, NULL);
-
-	return efd;
-}
-
 static int accord_join(struct sd_node *myself,
 		       void *opaque, size_t opaque_len)
 {
@@ -549,17 +502,24 @@ static void acrd_unblock(void *msg, size
 	pthread_mutex_unlock(&queue_lock);
 }
 
-static int accord_dispatch(void)
+static void acrd_handler(int listen_fd, int events, void *data)
 {
 	int ret;
 	eventfd_t value;
 	struct acrd_event ev;
 	enum cluster_join_result res;
 
+	if (events & EPOLLHUP) {
+		eprintf("accord driver received EPOLLHUP event, exiting.\n");
+		log_close();
+		exit(1);
+	}
+
 	dprintf("read event\n");
+
 	ret = eventfd_read(efd, &value);
 	if (ret < 0)
-		return 0;
+		return;
 
 	pthread_mutex_lock(&queue_lock);
 
@@ -623,7 +583,64 @@ static int accord_dispatch(void)
 out:
 	pthread_mutex_unlock(&queue_lock);
 
-	return 0;
+	return;
+}
+
+static int accord_init(const char *option, uint8_t *myaddr)
+{
+	int ret;
+
+	if (!option) {
+		eprintf("specify one of the accord servers.\n");
+		eprintf("e.g. sheep /store -c accord:127.0.0.1\n");
+		return -1;
+	}
+
+	pthread_mutex_lock(&start_lock);
+
+	ahandle = acrd_init(option, 9090, acrd_join_fn, acrd_leave_fn, NULL);
+	if (!ahandle) {
+		eprintf("failed to connect to accrd server %s\n", option);
+		return -1;
+	}
+
+	if (get_addr(myaddr) < 0)
+		return -1;
+
+	efd = eventfd(0, EFD_NONBLOCK);
+	if (efd < 0) {
+		eprintf("failed to create an event fd: %m\n");
+		return -1;
+	}
+
+	acrd_wq = init_work_queue(1);
+	if (!acrd_wq)
+		eprintf("failed to create accord workqueue: %m\n");
+		return -1;
+	}
+
+	pthread_cond_wait(&start_cond, &start_lock);
+	pthread_mutex_unlock(&start_lock);
+
+	if (need_cleanup)
+		for_each_acrd_file(ahandle, BASE_FILE, __acrd_del, NULL);
+	else {
+		queue_start_pos = -1;
+		queue_end_pos = -1;
+		for_each_acrd_file(ahandle, QUEUE_FILE, find_queue_end,
+				   &queue_end_pos);
+	}
+
+	acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL,
+		       acrd_watch_fn, NULL);
+
+	ret = register_event(efd, acrd_handler, NULL);
+	if (ret) {
+		eprintf("failed to register accord event handler (%d)\n", ret);
+		return -1;
+	}
+
+	return efd;
 }
 
 struct cluster_driver cdrv_accord = {
@@ -635,7 +652,6 @@ struct cluster_driver cdrv_accord = {
 	.notify     = accord_notify,
 	.block      = accord_block,
 	.unblock    = accord_unblock,
-	.dispatch   = accord_dispatch,
 };
 
 cdrv_register(cdrv_accord);
Index: sheepdog/sheep/cluster/corosync.c
===================================================================
--- sheepdog.orig/sheep/cluster/corosync.c	2012-05-18 09:47:08.311994966 +0200
+++ sheepdog/sheep/cluster/corosync.c	2012-05-18 11:16:19.727974059 +0200
@@ -10,10 +10,12 @@
  */
 #include <stdio.h>
 #include <unistd.h>
+#include <sys/epoll.h>
 #include <corosync/cpg.h>
 #include <corosync/cfg.h>
 
 #include "cluster.h"
+#include "event.h"
 #include "work.h"
 
 struct cpg_node {
@@ -596,51 +598,6 @@ static void cdrv_cpg_confchg(cpg_handle_
 	__corosync_dispatch();
 }
 
-static int corosync_init(const char *option, uint8_t *myaddr)
-{
-	int ret, fd;
-	uint32_t nodeid;
-	cpg_callbacks_t cb = {
-		.cpg_deliver_fn = cdrv_cpg_deliver,
-		.cpg_confchg_fn = cdrv_cpg_confchg
-	};
-
-	ret = cpg_initialize(&cpg_handle, &cb);
-	if (ret != CPG_OK) {
-		eprintf("failed to initialize cpg (%d) - is corosync running?\n", ret);
-		return -1;
-	}
-
-	ret = corosync_cfg_initialize(&cfg_handle, NULL);
-	if (ret != CS_OK) {
-		vprintf(SDOG_ERR, "failed to initialize cfg (%d)\n", ret);
-		return -1;
-	}
-
-	ret = corosync_cfg_local_get(cfg_handle, &nodeid);
-	if (ret != CS_OK) {
-		vprintf(SDOG_ERR, "failed to get node id (%d)\n", ret);
-		return -1;
-	}
-
-	ret = nodeid_to_addr(nodeid, myaddr);
-	if (ret < 0) {
-		eprintf("failed to get local address\n");
-		return -1;
-	}
-
-	this_node.nodeid = nodeid;
-	this_node.pid = getpid();
-
-	ret = cpg_fd_get(cpg_handle, &fd);
-	if (ret != CPG_OK) {
-		eprintf("failed to get cpg file descriptor (%d)\n", ret);
-		return -1;
-	}
-
-	return fd;
-}
-
 static int corosync_join(struct sd_node *myself,
 			 void *opaque, size_t opaque_len)
 {
@@ -695,15 +652,78 @@ static int corosync_notify(void *msg, si
 			   NULL, 0, msg, msg_len);
 }
 
-static int corosync_dispatch(void)
+static void corosync_handler(int listen_fd, int events, void *data)
 {
 	int ret;
 
+	if (events & EPOLLHUP) {
+		eprintf("corosync driver received EPOLLHUP event, exiting.\n");
+		goto out;
+	}
+
 	ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL);
-	if (ret != CPG_OK)
+	if (ret != CPG_OK) {
+		eprintf("cpg_dispatch returned %d\n", ret);
+		goto out;
+	}
+
+	return;
+out:
+	log_close();
+	exit(1);
+}
+
+static int corosync_init(const char *option, uint8_t *myaddr)
+{
+	int ret, fd;
+	uint32_t nodeid;
+	cpg_callbacks_t cb = {
+		.cpg_deliver_fn = cdrv_cpg_deliver,
+		.cpg_confchg_fn = cdrv_cpg_confchg
+	};
+
+	ret = cpg_initialize(&cpg_handle, &cb);
+	if (ret != CPG_OK) {
+		eprintf("failed to initialize cpg (%d) - "
+			"is corosync running?\n", ret);
 		return -1;
+	}
 
-	return 0;
+	ret = corosync_cfg_initialize(&cfg_handle, NULL);
+	if (ret != CS_OK) {
+		vprintf(SDOG_ERR, "failed to initialize cfg (%d)\n", ret);
+		return -1;
+	}
+
+	ret = corosync_cfg_local_get(cfg_handle, &nodeid);
+	if (ret != CS_OK) {
+		vprintf(SDOG_ERR, "failed to get node id (%d)\n", ret);
+		return -1;
+	}
+
+	ret = nodeid_to_addr(nodeid, myaddr);
+	if (ret < 0) {
+		eprintf("failed to get local address\n");
+		return -1;
+	}
+
+	this_node.nodeid = nodeid;
+	this_node.pid = getpid();
+
+	ret = cpg_fd_get(cpg_handle, &fd);
+	if (ret != CPG_OK) {
+		eprintf("failed to get cpg file descriptor (%d)\n", ret);
+		return -1;
+	}
+
+	ret = register_event(fd, corosync_handler, NULL);
+	if (ret) {
+		eprintf("failed to register corosync event handler (%d)\n",
+			ret);
+		return -1;
+	}
+
+	return fd;
 }
 
 struct cluster_driver cdrv_corosync = {
@@ -715,7 +735,6 @@ struct cluster_driver cdrv_corosync = {
 	.notify     = corosync_notify,
 	.block      = corosync_block,
 	.unblock    = corosync_unblock,
-	.dispatch   = corosync_dispatch,
 };
 
 cdrv_register(cdrv_corosync);
Index: sheepdog/sheep/cluster/local.c
===================================================================
--- sheepdog.orig/sheep/cluster/local.c	2012-05-17 15:41:54.575912094 +0200
+++ sheepdog/sheep/cluster/local.c	2012-05-18 11:15:36.087974475 +0200
@@ -11,6 +11,7 @@
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
+#include <sys/epoll.h>
 #include <sys/mman.h>
 #include <sys/signalfd.h>
 #include <sys/file.h>
@@ -280,45 +281,6 @@ static void check_pids(void *arg)
 
 /* Local driver APIs */
 
-static int local_init(const char *option, uint8_t *myaddr)
-{
-	sigset_t mask;
-	static struct timer t = {
-		.callback = check_pids,
-		.data = &t,
-	};
-
-	if (option)
-		shmfile = option;
-
-	/* set 127.0.0.1 */
-	memset(myaddr, 0, 16);
-	myaddr[12] = 127;
-	myaddr[15] = 1;
-
-	shm_queue_init();
-
-	sigemptyset(&mask);
-	sigaddset(&mask, SIGUSR1);
-	sigprocmask(SIG_BLOCK, &mask, NULL);
-
-	sigfd = signalfd(-1, &mask, SFD_NONBLOCK);
-	if (sigfd < 0) {
-		eprintf("failed to create a signal fd: %m\n");
-		return -1;
-	}
-
-	add_timer(&t, 1);
-
-	local_block_wq = init_work_queue(1);
-	if (!local_block_wq) {
-		eprintf("failed to create local workqueue: %m\n");
-		return -1;
-	}
-
-	return sigfd;
-}
-
 static int local_join(struct sd_node *myself,
 		      void *opaque, size_t opaque_len)
 {
@@ -383,14 +345,21 @@ static void local_unblock(void *msg, siz
 	shm_queue_unlock();
 }
 
-static int local_dispatch(void)
+static void local_handler(int listen_fd, int events, void *data)
 {
-	int ret;
 	struct signalfd_siginfo siginfo;
 	struct local_event *ev;
 	enum cluster_join_result res;
+	int ret;
+
+	if (events & EPOLLHUP) {
+		eprintf("local driver received EPOLLHUP event, exiting.\n");
+		log_close();
+		exit(1);
+	}
 
 	dprintf("read siginfo\n");
+
 	ret = read(sigfd, &siginfo, sizeof(siginfo));
 	assert(ret == sizeof(siginfo));
 
@@ -454,7 +423,53 @@ static int local_dispatch(void)
 out:
 	shm_queue_unlock();
 
-	return 0;
+	return;
+}
+
+static int local_init(const char *option, uint8_t *myaddr)
+{
+	sigset_t mask;
+	int ret;
+	static struct timer t = {
+		.callback = check_pids,
+		.data = &t,
+	};
+
+	if (option)
+		shmfile = option;
+
+	/* set 127.0.0.1 */
+	memset(myaddr, 0, 16);
+	myaddr[12] = 127;
+	myaddr[15] = 1;
+
+	shm_queue_init();
+
+	sigemptyset(&mask);
+	sigaddset(&mask, SIGUSR1);
+	sigprocmask(SIG_BLOCK, &mask, NULL);
+
+	sigfd = signalfd(-1, &mask, SFD_NONBLOCK);
+	if (sigfd < 0) {
+		eprintf("failed to create a signal fd: %m\n");
+		return -1;
+	}
+
+	add_timer(&t, 1);
+
+	local_block_wq = init_work_queue(1);
+	if (!local_block_wq) {
+		eprintf("failed to create local workqueue: %m\n");
+		return -1;
+	}
+
+	ret = register_event(sigfd, local_handler, NULL);
+	if (ret) {
+		eprintf("failed to register local event handler (%d)\n", ret);
+		return -1;
+	}
+
+	return sigfd;
 }
 
 struct cluster_driver cdrv_local = {
@@ -466,7 +481,6 @@ struct cluster_driver cdrv_local = {
 	.notify     = local_notify,
 	.block      = local_block,
 	.unblock    = local_unblock,
-	.dispatch   = local_dispatch,
 };
 
 cdrv_register(cdrv_local);
Index: sheepdog/sheep/cluster/zookeeper.c
===================================================================
--- sheepdog.orig/sheep/cluster/zookeeper.c	2012-05-17 15:41:54.575912094 +0200
+++ sheepdog/sheep/cluster/zookeeper.c	2012-05-18 11:17:06.619973612 +0200
@@ -14,11 +14,13 @@
 #include <netdb.h>
 #include <search.h>
 #include <assert.h>
+#include <sys/epoll.h>
 #include <sys/eventfd.h>
 #include <zookeeper/zookeeper.h>
 #include <urcu/uatomic.h>
 
 #include "cluster.h"
+#include "event.h"
 #include "work.h"
 
 #define MAX_EVENT_BUF_SIZE (64 * 1024)
@@ -638,36 +640,6 @@ static int get_addr(uint8_t *bytes)
 	return 0;
 }
 
-static int zk_init(const char *option, uint8_t *myaddr)
-{
-	if (!option) {
-		eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n");
-		eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n");
-		return -1;
-	}
-
-	zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, 0, NULL, 0);
-	if (!zhandle) {
-		eprintf("failed to connect to zk server %s\n", option);
-		return -1;
-	}
-	dprintf("request session timeout:%dms, negotiated session timeout:%dms\n",
-			SESSION_TIMEOUT, zoo_recv_timeout(zhandle));
-
-	if (get_addr(myaddr) < 0)
-		return -1;
-
-	zk_queue_init(zhandle);
-
-	efd = eventfd(0, EFD_NONBLOCK);
-	if (efd < 0) {
-		eprintf("failed to create an event fd: %m\n");
-		return -1;
-	}
-
-	return efd;
-}
-
 static int zk_join(struct sd_node *myself,
 		   void *opaque, size_t opaque_len)
 {
@@ -735,7 +707,7 @@ static void zk_unblock(void *msg, size_t
 	eventfd_write(efd, value);
 }
 
-static int zk_dispatch(void)
+static void zk_handler(int listen_fd, int events, void *data)
 {
 	int ret, rc, retry;
 	char path[256];
@@ -744,13 +716,20 @@ static int zk_dispatch(void)
 	struct zk_node *n;
 	enum cluster_join_result res;
 
+	if (events & EPOLLHUP) {
+		eprintf("zookeeper driver received EPOLLHUP event, exiting.\n");
+		log_close();
+		exit(1);
+	}
+
 	dprintf("read event\n");
+
 	ret = eventfd_read(efd, &value);
 	if (ret < 0)
-		return 0;
+		return;
 
 	if (uatomic_read(&zk_notify_blocked))
-		return 0;
+		return;
 
 	ret = zk_queue_pop(zhandle, &ev);
 	if (ret < 0)
@@ -864,7 +843,49 @@ static int zk_dispatch(void)
 		break;
 	}
 out:
-	return 0;
+	return;
+}
+
+static int zk_init(const char *option, uint8_t *myaddr)
+{
+	int ret;
+
+	if (!option) {
+		eprintf("specify comma separated host:port pairs, "
+			"each corresponding to a zk server.\n");
+		eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:"
+			"3000,127.0.0.1:3001,127.0.0.1:3002\n");
+		return -1;
+	}
+
+	zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, 0, NULL, 0);
+	if (!zhandle) {
+		eprintf("failed to connect to zk server %s\n", option);
+		return -1;
+	}
+	dprintf("request session timeout:%dms, "
+		"negotiated session timeout:%dms\n",
+		SESSION_TIMEOUT, zoo_recv_timeout(zhandle));
+
+	if (get_addr(myaddr) < 0)
+		return -1;
+
+	zk_queue_init(zhandle);
+
+	efd = eventfd(0, EFD_NONBLOCK);
+	if (efd < 0) {
+		eprintf("failed to create an event fd: %m\n");
+		return -1;
+	}
+
+	ret = register_event(efd, zk_handler, NULL);
+	if (ret) {
+		eprintf("failed to register zookeeper event handler (%d)\n",
+			ret);
+		return -1;
+	}
+
+	return efd;
 }
 
 struct cluster_driver cdrv_zookeeper = {
@@ -876,7 +897,6 @@ struct cluster_driver cdrv_zookeeper = {
 	.notify     = zk_notify,
 	.block      = zk_block,
 	.unblock    = zk_unblock,
-	.dispatch   = zk_dispatch,
 };
 
 cdrv_register(cdrv_zookeeper);
Index: sheepdog/sheep/group.c
===================================================================
--- sheepdog.orig/sheep/group.c	2012-05-18 09:47:08.311994966 +0200
+++ sheepdog/sheep/group.c	2012-05-18 11:02:56.695981717 +0200
@@ -26,8 +26,6 @@
 #include "work.h"
 #include "cluster.h"
 
-static int cdrv_fd;
-
 struct node {
 	struct sd_node ent;
 	struct list_head list;
@@ -335,24 +333,6 @@ static void queue_cluster_request(struct
 	}
 }
 
-static void group_handler(int listen_fd, int events, void *data)
-{
-	int ret;
-	if (events & EPOLLHUP) {
-		eprintf("received EPOLLHUP event: has corosync exited?\n");
-		goto out;
-	}
-
-	ret = sys->cdrv->dispatch();
-	if (ret == 0)
-		return;
-	else
-		eprintf("oops... an error occurred inside corosync\n");
-out:
-	log_close();
-	exit(1);
-}
-
 static inline int get_nodes_nr_from(struct list_head *l)
 {
 	struct node *node;
@@ -1366,8 +1346,8 @@ int create_cluster(int port, int64_t zon
 		}
 	}
 
-	cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
-	if (cdrv_fd < 0)
+	ret = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
+	if (ret < 0)
 		return -1;
 
 	sys->this_node.port = port;
@@ -1395,12 +1375,6 @@ int create_cluster(int port, int64_t zon
 	INIT_LIST_HEAD(&sys->request_queue);
 	INIT_LIST_HEAD(&sys->event_queue);
 
-	ret = register_event(cdrv_fd, group_handler, NULL);
-	if (ret) {
-		eprintf("failed to register epoll events (%d)\n", ret);
-		return 1;
-	}
-
 	ret = send_join_request(&sys->this_node);
 	if (ret != 0)
 		return -1;



More information about the sheepdog mailing list