[Sheepdog] [PATCH] sheep: remove cdrv_handlers and check_join_cb

Christoph Hellwig hch at infradead.org
Wed Apr 25 09:03:02 CEST 2012


Instead of obscuring the callbacks into sheepdog from the cluster drivers
by various means of callbacks just call them directly like you would do in
normal C code.  The block_cb callback to the notify routine is left for now
as that area needs broader attention later.

Signed-off-by: Christoph Hellwig <hch at lst.de>

---
 sheep/cluster.h           |   60 ++++++++++++++++++++--------------------------
 sheep/cluster/accord.c    |   20 +++------------
 sheep/cluster/corosync.c  |   22 +++-------------
 sheep/cluster/local.c     |   20 +++------------
 sheep/cluster/zookeeper.c |   20 +++------------
 sheep/group.c             |   28 ++++++---------------
 6 files changed, 56 insertions(+), 114 deletions(-)

Index: sheepdog/sheep/cluster.h
===================================================================
--- sheepdog.orig/sheep/cluster.h	2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster.h	2012-04-25 08:53:17.152053137 +0200
@@ -31,18 +31,6 @@ enum cluster_join_result {
 				 * will leave the cluster (restart later). */
 };
 
-struct cdrv_handlers {
-	void (*join_handler)(struct sd_node *joined,
-			     struct sd_node *members,
-			     size_t nr_members, enum cluster_join_result result,
-			     void *opaque);
-	void (*leave_handler)(struct sd_node *left,
-			      struct sd_node *members,
-			      size_t nr_members);
-	void (*notify_handler)(struct sd_node *sender,
-			       void *msg, size_t msg_len);
-};
-
 struct cluster_driver {
 	const char *name;
 
@@ -53,27 +41,23 @@ struct cluster_driver {
 	 * may be used with the poll(2) to monitor cluster events.  On
 	 * error, returns -1.
 	 */
-	int (*init)(struct cdrv_handlers *handlers, const char *option,
-		    uint8_t *myaddr);
+	int (*init)(const char *option, uint8_t *myaddr);
 
 	/*
 	 * Join the cluster
 	 *
-	 * This function is used to join the cluster, and notifies a
-	 * join event to all the nodes.  The copy of 'opaque' is
-	 * passed to check_join_cb() and join_handler().
-	 * check_join_cb() is called on one of the nodes which already
+	 * This function is used to join the cluster, and notifies a join
+	 * event to all the nodes.  The copy of 'opaque' is passed to
+	 * sd_check_join_cb() and sd_join_handler().
+	 *
+	 * sd_check_join_cb() is called on one of the nodes which already
 	 * paticipate in the cluster.  If the content of 'opaque' is
-	 * changed in check_join_cb(), the updated 'opaque' must be
-	 * passed to join_handler().
+	 * changed in sd_check_join_cb(), the updated 'opaque' must be
+	 * passed to sd_join_handler().
 	 *
 	 * Returns zero on success, -1 on error
 	 */
-	int (*join)(struct sd_node *myself,
-		    enum cluster_join_result (*check_join_cb)(
-			    struct sd_node *joining,
-			    void *opaque),
-		    void *opaque, size_t opaque_len);
+	int (*join)(struct sd_node *myself, void *opaque, size_t opaque_len);
 
 	/*
 	 * Leave the cluster
@@ -88,14 +72,14 @@ struct cluster_driver {
 	/*
 	 * Notify a message to all nodes in the cluster
 	 *
-	 * This function sends 'msg' to all the nodes.  The notified
-	 * messages can be read through notify_handler() in
-	 * cdrv_handlers.  If 'block_cb' is specified, block_cb() is
-	 * called before 'msg' is notified to all the nodes.  All the
-	 * cluster events including this notification are blocked
-	 * until block_cb() returns or this blocking node leaves the
-	 * cluster.  The sheep daemon can sleep in block_cb(), so this
-	 * callback must be not called from the dispatch (main) thread.
+	 * This function sends 'msg' to all the nodes.  The notified messages
+	 * can be read through sd_notify_handler().
+	 *
+	 * If 'block_cb' is specified, block_cb() is called before 'msg' is
+	 * notified to all the nodes.  All the cluster events including this
+	 * notification are blocked until block_cb() returns or this blocking
+	 * node leaves the cluster.  The sheep daemon can sleep in block_cb(),
+	 * so this callback must be not called from the dispatch (main) thread.
 	 *
 	 * Returns zero on success, -1 on error
 	 */
@@ -168,4 +152,14 @@ static inline char *node_to_str(struct s
 	return str;
 }
 
+/* callbacks back into sheepdog from the cluster drivers */
+void sd_join_handler(struct sd_node *joined, struct sd_node *members,
+		size_t nr_members, enum cluster_join_result result,
+		void *opaque);
+void sd_leave_handler(struct sd_node *left, struct sd_node *members,
+		size_t nr_members);
+void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
+enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
+		void *opaque);
+
 #endif
Index: sheepdog/sheep/cluster/accord.c
===================================================================
--- sheepdog.orig/sheep/cluster/accord.c	2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster/accord.c	2012-04-25 08:51:20.224050138 +0200
@@ -217,10 +217,6 @@ static int efd;
 
 static struct work_queue *acrd_wq;
 
-static struct cdrv_handlers acrd_hdlrs;
-static enum cluster_join_result (*acrd_check_join_cb)(
-	struct sd_node *joining, void *opaque);
-
 /* get node list from the last pushed data */
 static size_t get_nodes(struct acrd_handle *ah,
 			struct sd_node *nodes,
@@ -467,10 +463,8 @@ static void acrd_watch_fn(struct acrd_ha
 	eventfd_write(efd, value);
 }
 
-static int accord_init(struct cdrv_handlers *handlers, const char *option,
-		       uint8_t *myaddr)
+static int accord_init(const char *option, uint8_t *myaddr)
 {
-	acrd_hdlrs = *handlers;
 	if (!option) {
 		eprintf("specify one of the accord servers.\n");
 		eprintf("e.g. sheep /store -c accord:127.0.0.1\n");
@@ -515,13 +509,9 @@ static int accord_init(struct cdrv_handl
 }
 
 static int accord_join(struct sd_node *myself,
-		       enum cluster_join_result (*check_join_cb)(
-			       struct sd_node *joining,
-			       void *opaque),
 		       void *opaque, size_t opaque_len)
 {
 	this_node = *myself;
-	acrd_check_join_cb = check_join_cb;
 
 	return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
 }
@@ -582,7 +572,7 @@ static int accord_dispatch(void)
 	case EVENT_JOIN:
 		if (ev.blocked) {
 			if (node_cmp(&ev.nodes[0], &this_node) == 0) {
-				res = acrd_check_join_cb(&ev.sender, ev.buf);
+				res = sd_check_join_cb(&ev.sender, ev.buf);
 				ev.join_result = res;
 				ev.blocked = 0;
 
@@ -609,11 +599,11 @@ static int accord_dispatch(void)
 			acrd_queue_pop(ahandle, &ev);
 		}
 
-		acrd_hdlrs.join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
+		sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
 				    ev.join_result, ev.buf);
 		break;
 	case EVENT_LEAVE:
-		acrd_hdlrs.leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
+		sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
 		break;
 	case EVENT_NOTIFY:
 		if (ev.blocked) {
@@ -629,7 +619,7 @@ static int accord_dispatch(void)
 			goto out;
 		}
 
-		acrd_hdlrs.notify_handler(&ev.sender, ev.buf, ev.buf_len);
+		sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
 		break;
 	}
 out:
Index: sheepdog/sheep/cluster/corosync.c
===================================================================
--- sheepdog.orig/sheep/cluster/corosync.c	2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster/corosync.c	2012-04-25 08:51:20.228050137 +0200
@@ -31,10 +31,6 @@ static struct cpg_node this_node;
 
 static struct work_queue *corosync_block_wq;
 
-static struct cdrv_handlers corosync_handlers;
-static enum cluster_join_result (*corosync_check_join_cb)(
-	struct sd_node *joining, void *opaque);
-
 static LIST_HEAD(corosync_event_list);
 static LIST_HEAD(corosync_block_list);
 
@@ -299,7 +295,7 @@ static int __corosync_dispatch_one(struc
 				/* check_join() must be called only once */
 				return 0;
 
-			res = corosync_check_join_cb(&cevent->sender.ent,
+			res = sd_check_join_cb(&cevent->sender.ent,
 						     cevent->msg);
 			if (res == CJ_RES_MASTER_TRANSFER)
 				nr_cpg_nodes = 0;
@@ -326,7 +322,7 @@ static int __corosync_dispatch_one(struc
 		case CJ_RES_FAIL:
 		case CJ_RES_JOIN_LATER:
 			build_node_list(cpg_nodes, nr_cpg_nodes, entries);
-			corosync_handlers.join_handler(&cevent->sender.ent, entries,
+			sd_join_handler(&cevent->sender.ent, entries,
 						       nr_cpg_nodes, cevent->result,
 						       cevent->msg);
 			break;
@@ -341,8 +337,7 @@ static int __corosync_dispatch_one(struc
 		del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
 		nr_cpg_nodes--;
 		build_node_list(cpg_nodes, nr_cpg_nodes, entries);
-		corosync_handlers.leave_handler(&cevent->sender.ent,
-						entries, nr_cpg_nodes);
+		sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes);
 		break;
 	case COROSYNC_EVENT_TYPE_NOTIFY:
 		if (cevent->blocked) {
@@ -367,7 +362,7 @@ static int __corosync_dispatch_one(struc
 			return 0;
 		}
 
-		corosync_handlers.notify_handler(&cevent->sender.ent, cevent->msg,
+		sd_notify_handler(&cevent->sender.ent, cevent->msg,
 						 cevent->msg_len);
 		break;
 	}
@@ -619,8 +614,7 @@ static void cdrv_cpg_confchg(cpg_handle_
 	__corosync_dispatch();
 }
 
-static int corosync_init(struct cdrv_handlers *handlers, const char *option,
-			 uint8_t *myaddr)
+static int corosync_init(const char *option, uint8_t *myaddr)
 {
 	int ret, fd;
 	uint32_t nodeid;
@@ -629,8 +623,6 @@ static int corosync_init(struct cdrv_han
 		.cpg_confchg_fn = cdrv_cpg_confchg
 	};
 
-	corosync_handlers = *handlers;
-
 	ret = cpg_initialize(&cpg_handle, &cb);
 	if (ret != CPG_OK) {
 		eprintf("failed to initialize cpg (%d) - is corosync running?\n", ret);
@@ -670,14 +662,10 @@ static int corosync_init(struct cdrv_han
 }
 
 static int corosync_join(struct sd_node *myself,
-			 enum cluster_join_result (*check_join_cb)(
-				 struct sd_node *joining,
-				 void *opaque),
 			 void *opaque, size_t opaque_len)
 {
 	int ret;
 
-	corosync_check_join_cb = check_join_cb;
 retry:
 	ret = cpg_join(cpg_handle, &cpg_group);
 	switch (ret) {
Index: sheepdog/sheep/cluster/local.c
===================================================================
--- sheepdog.orig/sheep/cluster/local.c	2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster/local.c	2012-04-25 08:51:20.228050137 +0200
@@ -34,10 +34,6 @@ static struct sd_node this_node;
 
 static struct work_queue *local_block_wq;
 
-static struct cdrv_handlers lhdlrs;
-static enum cluster_join_result (*local_check_join_cb)(
-	struct sd_node *joining, void *opaque);
-
 enum local_event_type {
 	EVENT_JOIN = 1,
 	EVENT_LEAVE,
@@ -287,8 +283,7 @@ static void check_pids(void *arg)
 
 /* Local driver APIs */
 
-static int local_init(struct cdrv_handlers *handlers, const char *option,
-		      uint8_t *myaddr)
+static int local_init(const char *option, uint8_t *myaddr)
 {
 	sigset_t mask;
 	static struct timer t = {
@@ -296,7 +291,6 @@ static int local_init(struct cdrv_handle
 		.data = &t,
 	};
 
-	lhdlrs = *handlers;
 	if (option)
 		shmfile = option;
 
@@ -325,13 +319,9 @@ static int local_init(struct cdrv_handle
 }
 
 static int local_join(struct sd_node *myself,
-		      enum cluster_join_result (*check_join_cb)(
-			      struct sd_node *joining,
-			      void *opaque),
 		      void *opaque, size_t opaque_len)
 {
 	this_node = *myself;
-	local_check_join_cb = check_join_cb;
 
 	shm_queue_lock();
 
@@ -410,7 +400,7 @@ static int local_dispatch(void)
 	case EVENT_JOIN:
 		if (ev->blocked) {
 			if (node_cmp(&ev->nodes[0], &this_node) == 0) {
-				res = local_check_join_cb(&ev->sender, ev->buf);
+				res = sd_check_join_cb(&ev->sender, ev->buf);
 				ev->join_result = res;
 				ev->blocked = 0;
 				msync(ev, sizeof(*ev), MS_SYNC);
@@ -436,11 +426,11 @@ static int local_dispatch(void)
 			shm_queue_set_chksum();
 		}
 
-		lhdlrs.join_handler(&ev->sender, ev->nodes, ev->nr_nodes,
+		sd_join_handler(&ev->sender, ev->nodes, ev->nr_nodes,
 				    ev->join_result, ev->buf);
 		break;
 	case EVENT_LEAVE:
-		lhdlrs.leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
+		sd_leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
 		break;
 	case EVENT_NOTIFY:
 		if (ev->blocked) {
@@ -454,7 +444,7 @@ static int local_dispatch(void)
 			goto out;
 		}
 
-		lhdlrs.notify_handler(&ev->sender, ev->buf, ev->buf_len);
+		sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
 		break;
 	}
 
Index: sheepdog/sheep/cluster/zookeeper.c
===================================================================
--- sheepdog.orig/sheep/cluster/zookeeper.c	2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster/zookeeper.c	2012-04-25 08:51:20.228050137 +0200
@@ -222,10 +222,6 @@ static struct work_queue *zk_block_wq;
 
 static struct sd_node this_node;
 
-static struct cdrv_handlers zk_hdlrs;
-static enum cluster_join_result (*zk_check_join_cb)(
-	struct sd_node *joining, void *opaque);
-
 /* get node list from the last pushed data */
 static size_t get_nodes(zhandle_t *zh, struct sd_node *nodes)
 {
@@ -376,10 +372,8 @@ static int get_addr(uint8_t *bytes)
 	return 0;
 }
 
-static int zk_init(struct cdrv_handlers *handlers, const char *option,
-		   uint8_t *myaddr)
+static int zk_init(const char *option, uint8_t *myaddr)
 {
-	zk_hdlrs = *handlers;
 	if (!option) {
 		eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n");
 		eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n");
@@ -409,9 +403,6 @@ static int zk_init(struct cdrv_handlers
 }
 
 static int zk_join(struct sd_node *myself,
-		   enum cluster_join_result (*check_join_cb)(
-			   struct sd_node *joining,
-			   void *opaque),
 		   void *opaque, size_t opaque_len)
 {
 	int rc;
@@ -419,7 +410,6 @@ static int zk_join(struct sd_node *mysel
 	char path[256];
 
 	this_node = *myself;
-	zk_check_join_cb = check_join_cb;
 
 	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
 	joined = 0;
@@ -483,7 +473,7 @@ static int zk_dispatch(void)
 	case EVENT_JOIN:
 		if (ev.blocked) {
 			if (node_cmp(&ev.nodes[0], &this_node) == 0) {
-				res = zk_check_join_cb(&ev.sender, ev.buf);
+				res = sd_check_join_cb(&ev.sender, ev.buf);
 				ev.join_result = res;
 				ev.blocked = 0;
 
@@ -517,11 +507,11 @@ static int zk_dispatch(void)
 		sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender));
 		zoo_exists(zhandle, path, 1, NULL);
 
-		zk_hdlrs.join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
+		sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
 				    ev.join_result, ev.buf);
 		break;
 	case EVENT_LEAVE:
-		zk_hdlrs.leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
+		sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
 		break;
 	case EVENT_NOTIFY:
 		if (ev.blocked) {
@@ -537,7 +527,7 @@ static int zk_dispatch(void)
 			goto out;
 		}
 
-		zk_hdlrs.notify_handler(&ev.sender, ev.buf, ev.buf_len);
+		sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
 		break;
 	}
 out:
Index: sheepdog/sheep/group.c
===================================================================
--- sheepdog.orig/sheep/group.c	2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/group.c	2012-04-25 08:51:20.228050137 +0200
@@ -637,8 +637,7 @@ static void __sd_notify_done(struct even
 	req->done(req);
 }
 
-static void sd_notify_handler(struct sd_node *sender,
-			      void *msg, size_t msg_len)
+void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len)
 {
 	struct event_struct *cevent;
 	struct work_notify *w;
@@ -736,8 +735,7 @@ static void __sd_leave(struct event_stru
 	}
 }
 
-static enum cluster_join_result sd_check_join_cb(
-	struct sd_node *joining, void *opaque)
+enum cluster_join_result sd_check_join_cb(struct sd_node *joining, void *opaque)
 {
 	struct join_message *jm = opaque;
 	struct node *node;
@@ -813,8 +811,7 @@ static int send_join_request(struct sd_n
 	if (ret == SD_RES_SUCCESS)
 		msg->nr_nodes = nr_entries;
 
-	ret = sys->cdrv->join(ent, sd_check_join_cb, msg,
-			      get_join_message_size(msg));
+	ret = sys->cdrv->join(ent, msg, get_join_message_size(msg));
 
 	vprintf(SDOG_INFO, "%s\n", node_to_str(&sys->this_node));
 
@@ -1098,10 +1095,9 @@ void process_request_event_queues(void)
 		process_request_queue();
 }
 
-static void sd_join_handler(struct sd_node *joined,
-			    struct sd_node *members,
-			    size_t nr_members, enum cluster_join_result result,
-			    void *opaque)
+void sd_join_handler(struct sd_node *joined, struct sd_node *members,
+		size_t nr_members, enum cluster_join_result result,
+		void *opaque)
 {
 	struct event_struct *cevent;
 	struct work_join *w = NULL;
@@ -1240,9 +1236,8 @@ static void sd_join_handler(struct sd_no
 	}
 }
 
-static void sd_leave_handler(struct sd_node *left,
-			     struct sd_node *members,
-			     size_t nr_members)
+void sd_leave_handler(struct sd_node *left, struct sd_node *members,
+		size_t nr_members)
 {
 	struct event_struct *cevent;
 	struct work_leave *w = NULL;
@@ -1291,11 +1286,6 @@ oom:
 int create_cluster(int port, int64_t zone, int nr_vnodes)
 {
 	int ret;
-	struct cdrv_handlers handlers = {
-		.join_handler = sd_join_handler,
-		.leave_handler = sd_leave_handler,
-		.notify_handler = sd_notify_handler,
-	};
 
 	if (!sys->cdrv) {
 		sys->cdrv = find_cdrv("corosync");
@@ -1308,7 +1298,7 @@ int create_cluster(int port, int64_t zon
 		}
 	}
 
-	cdrv_fd = sys->cdrv->init(&handlers, sys->cdrv_option, sys->this_node.addr);
+	cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
 	if (cdrv_fd < 0)
 		return -1;
 



More information about the sheepdog mailing list