[Sheepdog] [PATCH] sheep: remove cdrv_handlers and check_join_cb
Christoph Hellwig
hch at infradead.org
Wed Apr 25 09:03:02 CEST 2012
Instead of obscuring the callbacks into sheepdog from the cluster drivers
by various means of callbacks just call them directly like you would do in
normal C code. The block_cb callback to the notify routine is left for now
as that area needs broader attention later.
Signed-off-by: Christoph Hellwig <hch at lst.de>
---
sheep/cluster.h | 60 ++++++++++++++++++++--------------------------
sheep/cluster/accord.c | 20 +++------------
sheep/cluster/corosync.c | 22 +++-------------
sheep/cluster/local.c | 20 +++------------
sheep/cluster/zookeeper.c | 20 +++------------
sheep/group.c | 28 ++++++---------------
6 files changed, 56 insertions(+), 114 deletions(-)
Index: sheepdog/sheep/cluster.h
===================================================================
--- sheepdog.orig/sheep/cluster.h 2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster.h 2012-04-25 08:53:17.152053137 +0200
@@ -31,18 +31,6 @@ enum cluster_join_result {
* will leave the cluster (restart later). */
};
-struct cdrv_handlers {
- void (*join_handler)(struct sd_node *joined,
- struct sd_node *members,
- size_t nr_members, enum cluster_join_result result,
- void *opaque);
- void (*leave_handler)(struct sd_node *left,
- struct sd_node *members,
- size_t nr_members);
- void (*notify_handler)(struct sd_node *sender,
- void *msg, size_t msg_len);
-};
-
struct cluster_driver {
const char *name;
@@ -53,27 +41,23 @@ struct cluster_driver {
* may be used with the poll(2) to monitor cluster events. On
* error, returns -1.
*/
- int (*init)(struct cdrv_handlers *handlers, const char *option,
- uint8_t *myaddr);
+ int (*init)(const char *option, uint8_t *myaddr);
/*
* Join the cluster
*
- * This function is used to join the cluster, and notifies a
- * join event to all the nodes. The copy of 'opaque' is
- * passed to check_join_cb() and join_handler().
- * check_join_cb() is called on one of the nodes which already
+ * This function is used to join the cluster, and notifies a join
+ * event to all the nodes. The copy of 'opaque' is passed to
+ * sd_check_join_cb() and sd_join_handler().
+ *
+ * sd_check_join_cb() is called on one of the nodes which already
* paticipate in the cluster. If the content of 'opaque' is
- * changed in check_join_cb(), the updated 'opaque' must be
- * passed to join_handler().
+ * changed in sd_check_join_cb(), the updated 'opaque' must be
+ * passed to sd_join_handler().
*
* Returns zero on success, -1 on error
*/
- int (*join)(struct sd_node *myself,
- enum cluster_join_result (*check_join_cb)(
- struct sd_node *joining,
- void *opaque),
- void *opaque, size_t opaque_len);
+ int (*join)(struct sd_node *myself, void *opaque, size_t opaque_len);
/*
* Leave the cluster
@@ -88,14 +72,14 @@ struct cluster_driver {
/*
* Notify a message to all nodes in the cluster
*
- * This function sends 'msg' to all the nodes. The notified
- * messages can be read through notify_handler() in
- * cdrv_handlers. If 'block_cb' is specified, block_cb() is
- * called before 'msg' is notified to all the nodes. All the
- * cluster events including this notification are blocked
- * until block_cb() returns or this blocking node leaves the
- * cluster. The sheep daemon can sleep in block_cb(), so this
- * callback must be not called from the dispatch (main) thread.
+ * This function sends 'msg' to all the nodes. The notified messages
+ * can be read through sd_notify_handler().
+ *
+ * If 'block_cb' is specified, block_cb() is called before 'msg' is
+ * notified to all the nodes. All the cluster events including this
+ * notification are blocked until block_cb() returns or this blocking
+ * node leaves the cluster. The sheep daemon can sleep in block_cb(),
+ * so this callback must be not called from the dispatch (main) thread.
*
* Returns zero on success, -1 on error
*/
@@ -168,4 +152,14 @@ static inline char *node_to_str(struct s
return str;
}
+/* callbacks back into sheepdog from the cluster drivers */
+void sd_join_handler(struct sd_node *joined, struct sd_node *members,
+ size_t nr_members, enum cluster_join_result result,
+ void *opaque);
+void sd_leave_handler(struct sd_node *left, struct sd_node *members,
+ size_t nr_members);
+void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
+enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
+ void *opaque);
+
#endif
Index: sheepdog/sheep/cluster/accord.c
===================================================================
--- sheepdog.orig/sheep/cluster/accord.c 2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster/accord.c 2012-04-25 08:51:20.224050138 +0200
@@ -217,10 +217,6 @@ static int efd;
static struct work_queue *acrd_wq;
-static struct cdrv_handlers acrd_hdlrs;
-static enum cluster_join_result (*acrd_check_join_cb)(
- struct sd_node *joining, void *opaque);
-
/* get node list from the last pushed data */
static size_t get_nodes(struct acrd_handle *ah,
struct sd_node *nodes,
@@ -467,10 +463,8 @@ static void acrd_watch_fn(struct acrd_ha
eventfd_write(efd, value);
}
-static int accord_init(struct cdrv_handlers *handlers, const char *option,
- uint8_t *myaddr)
+static int accord_init(const char *option, uint8_t *myaddr)
{
- acrd_hdlrs = *handlers;
if (!option) {
eprintf("specify one of the accord servers.\n");
eprintf("e.g. sheep /store -c accord:127.0.0.1\n");
@@ -515,13 +509,9 @@ static int accord_init(struct cdrv_handl
}
static int accord_join(struct sd_node *myself,
- enum cluster_join_result (*check_join_cb)(
- struct sd_node *joining,
- void *opaque),
void *opaque, size_t opaque_len)
{
this_node = *myself;
- acrd_check_join_cb = check_join_cb;
return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
}
@@ -582,7 +572,7 @@ static int accord_dispatch(void)
case EVENT_JOIN:
if (ev.blocked) {
if (node_cmp(&ev.nodes[0], &this_node) == 0) {
- res = acrd_check_join_cb(&ev.sender, ev.buf);
+ res = sd_check_join_cb(&ev.sender, ev.buf);
ev.join_result = res;
ev.blocked = 0;
@@ -609,11 +599,11 @@ static int accord_dispatch(void)
acrd_queue_pop(ahandle, &ev);
}
- acrd_hdlrs.join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
+ sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
ev.join_result, ev.buf);
break;
case EVENT_LEAVE:
- acrd_hdlrs.leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
+ sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
break;
case EVENT_NOTIFY:
if (ev.blocked) {
@@ -629,7 +619,7 @@ static int accord_dispatch(void)
goto out;
}
- acrd_hdlrs.notify_handler(&ev.sender, ev.buf, ev.buf_len);
+ sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
break;
}
out:
Index: sheepdog/sheep/cluster/corosync.c
===================================================================
--- sheepdog.orig/sheep/cluster/corosync.c 2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster/corosync.c 2012-04-25 08:51:20.228050137 +0200
@@ -31,10 +31,6 @@ static struct cpg_node this_node;
static struct work_queue *corosync_block_wq;
-static struct cdrv_handlers corosync_handlers;
-static enum cluster_join_result (*corosync_check_join_cb)(
- struct sd_node *joining, void *opaque);
-
static LIST_HEAD(corosync_event_list);
static LIST_HEAD(corosync_block_list);
@@ -299,7 +295,7 @@ static int __corosync_dispatch_one(struc
/* check_join() must be called only once */
return 0;
- res = corosync_check_join_cb(&cevent->sender.ent,
+ res = sd_check_join_cb(&cevent->sender.ent,
cevent->msg);
if (res == CJ_RES_MASTER_TRANSFER)
nr_cpg_nodes = 0;
@@ -326,7 +322,7 @@ static int __corosync_dispatch_one(struc
case CJ_RES_FAIL:
case CJ_RES_JOIN_LATER:
build_node_list(cpg_nodes, nr_cpg_nodes, entries);
- corosync_handlers.join_handler(&cevent->sender.ent, entries,
+ sd_join_handler(&cevent->sender.ent, entries,
nr_cpg_nodes, cevent->result,
cevent->msg);
break;
@@ -341,8 +337,7 @@ static int __corosync_dispatch_one(struc
del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
nr_cpg_nodes--;
build_node_list(cpg_nodes, nr_cpg_nodes, entries);
- corosync_handlers.leave_handler(&cevent->sender.ent,
- entries, nr_cpg_nodes);
+ sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes);
break;
case COROSYNC_EVENT_TYPE_NOTIFY:
if (cevent->blocked) {
@@ -367,7 +362,7 @@ static int __corosync_dispatch_one(struc
return 0;
}
- corosync_handlers.notify_handler(&cevent->sender.ent, cevent->msg,
+ sd_notify_handler(&cevent->sender.ent, cevent->msg,
cevent->msg_len);
break;
}
@@ -619,8 +614,7 @@ static void cdrv_cpg_confchg(cpg_handle_
__corosync_dispatch();
}
-static int corosync_init(struct cdrv_handlers *handlers, const char *option,
- uint8_t *myaddr)
+static int corosync_init(const char *option, uint8_t *myaddr)
{
int ret, fd;
uint32_t nodeid;
@@ -629,8 +623,6 @@ static int corosync_init(struct cdrv_han
.cpg_confchg_fn = cdrv_cpg_confchg
};
- corosync_handlers = *handlers;
-
ret = cpg_initialize(&cpg_handle, &cb);
if (ret != CPG_OK) {
eprintf("failed to initialize cpg (%d) - is corosync running?\n", ret);
@@ -670,14 +662,10 @@ static int corosync_init(struct cdrv_han
}
static int corosync_join(struct sd_node *myself,
- enum cluster_join_result (*check_join_cb)(
- struct sd_node *joining,
- void *opaque),
void *opaque, size_t opaque_len)
{
int ret;
- corosync_check_join_cb = check_join_cb;
retry:
ret = cpg_join(cpg_handle, &cpg_group);
switch (ret) {
Index: sheepdog/sheep/cluster/local.c
===================================================================
--- sheepdog.orig/sheep/cluster/local.c 2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster/local.c 2012-04-25 08:51:20.228050137 +0200
@@ -34,10 +34,6 @@ static struct sd_node this_node;
static struct work_queue *local_block_wq;
-static struct cdrv_handlers lhdlrs;
-static enum cluster_join_result (*local_check_join_cb)(
- struct sd_node *joining, void *opaque);
-
enum local_event_type {
EVENT_JOIN = 1,
EVENT_LEAVE,
@@ -287,8 +283,7 @@ static void check_pids(void *arg)
/* Local driver APIs */
-static int local_init(struct cdrv_handlers *handlers, const char *option,
- uint8_t *myaddr)
+static int local_init(const char *option, uint8_t *myaddr)
{
sigset_t mask;
static struct timer t = {
@@ -296,7 +291,6 @@ static int local_init(struct cdrv_handle
.data = &t,
};
- lhdlrs = *handlers;
if (option)
shmfile = option;
@@ -325,13 +319,9 @@ static int local_init(struct cdrv_handle
}
static int local_join(struct sd_node *myself,
- enum cluster_join_result (*check_join_cb)(
- struct sd_node *joining,
- void *opaque),
void *opaque, size_t opaque_len)
{
this_node = *myself;
- local_check_join_cb = check_join_cb;
shm_queue_lock();
@@ -410,7 +400,7 @@ static int local_dispatch(void)
case EVENT_JOIN:
if (ev->blocked) {
if (node_cmp(&ev->nodes[0], &this_node) == 0) {
- res = local_check_join_cb(&ev->sender, ev->buf);
+ res = sd_check_join_cb(&ev->sender, ev->buf);
ev->join_result = res;
ev->blocked = 0;
msync(ev, sizeof(*ev), MS_SYNC);
@@ -436,11 +426,11 @@ static int local_dispatch(void)
shm_queue_set_chksum();
}
- lhdlrs.join_handler(&ev->sender, ev->nodes, ev->nr_nodes,
+ sd_join_handler(&ev->sender, ev->nodes, ev->nr_nodes,
ev->join_result, ev->buf);
break;
case EVENT_LEAVE:
- lhdlrs.leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
+ sd_leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
break;
case EVENT_NOTIFY:
if (ev->blocked) {
@@ -454,7 +444,7 @@ static int local_dispatch(void)
goto out;
}
- lhdlrs.notify_handler(&ev->sender, ev->buf, ev->buf_len);
+ sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
break;
}
Index: sheepdog/sheep/cluster/zookeeper.c
===================================================================
--- sheepdog.orig/sheep/cluster/zookeeper.c 2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/cluster/zookeeper.c 2012-04-25 08:51:20.228050137 +0200
@@ -222,10 +222,6 @@ static struct work_queue *zk_block_wq;
static struct sd_node this_node;
-static struct cdrv_handlers zk_hdlrs;
-static enum cluster_join_result (*zk_check_join_cb)(
- struct sd_node *joining, void *opaque);
-
/* get node list from the last pushed data */
static size_t get_nodes(zhandle_t *zh, struct sd_node *nodes)
{
@@ -376,10 +372,8 @@ static int get_addr(uint8_t *bytes)
return 0;
}
-static int zk_init(struct cdrv_handlers *handlers, const char *option,
- uint8_t *myaddr)
+static int zk_init(const char *option, uint8_t *myaddr)
{
- zk_hdlrs = *handlers;
if (!option) {
eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n");
eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n");
@@ -409,9 +403,6 @@ static int zk_init(struct cdrv_handlers
}
static int zk_join(struct sd_node *myself,
- enum cluster_join_result (*check_join_cb)(
- struct sd_node *joining,
- void *opaque),
void *opaque, size_t opaque_len)
{
int rc;
@@ -419,7 +410,6 @@ static int zk_join(struct sd_node *mysel
char path[256];
this_node = *myself;
- zk_check_join_cb = check_join_cb;
sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
joined = 0;
@@ -483,7 +473,7 @@ static int zk_dispatch(void)
case EVENT_JOIN:
if (ev.blocked) {
if (node_cmp(&ev.nodes[0], &this_node) == 0) {
- res = zk_check_join_cb(&ev.sender, ev.buf);
+ res = sd_check_join_cb(&ev.sender, ev.buf);
ev.join_result = res;
ev.blocked = 0;
@@ -517,11 +507,11 @@ static int zk_dispatch(void)
sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender));
zoo_exists(zhandle, path, 1, NULL);
- zk_hdlrs.join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
+ sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
ev.join_result, ev.buf);
break;
case EVENT_LEAVE:
- zk_hdlrs.leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
+ sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
break;
case EVENT_NOTIFY:
if (ev.blocked) {
@@ -537,7 +527,7 @@ static int zk_dispatch(void)
goto out;
}
- zk_hdlrs.notify_handler(&ev.sender, ev.buf, ev.buf_len);
+ sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
break;
}
out:
Index: sheepdog/sheep/group.c
===================================================================
--- sheepdog.orig/sheep/group.c 2012-04-25 08:49:29.420047299 +0200
+++ sheepdog/sheep/group.c 2012-04-25 08:51:20.228050137 +0200
@@ -637,8 +637,7 @@ static void __sd_notify_done(struct even
req->done(req);
}
-static void sd_notify_handler(struct sd_node *sender,
- void *msg, size_t msg_len)
+void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len)
{
struct event_struct *cevent;
struct work_notify *w;
@@ -736,8 +735,7 @@ static void __sd_leave(struct event_stru
}
}
-static enum cluster_join_result sd_check_join_cb(
- struct sd_node *joining, void *opaque)
+enum cluster_join_result sd_check_join_cb(struct sd_node *joining, void *opaque)
{
struct join_message *jm = opaque;
struct node *node;
@@ -813,8 +811,7 @@ static int send_join_request(struct sd_n
if (ret == SD_RES_SUCCESS)
msg->nr_nodes = nr_entries;
- ret = sys->cdrv->join(ent, sd_check_join_cb, msg,
- get_join_message_size(msg));
+ ret = sys->cdrv->join(ent, msg, get_join_message_size(msg));
vprintf(SDOG_INFO, "%s\n", node_to_str(&sys->this_node));
@@ -1098,10 +1095,9 @@ void process_request_event_queues(void)
process_request_queue();
}
-static void sd_join_handler(struct sd_node *joined,
- struct sd_node *members,
- size_t nr_members, enum cluster_join_result result,
- void *opaque)
+void sd_join_handler(struct sd_node *joined, struct sd_node *members,
+ size_t nr_members, enum cluster_join_result result,
+ void *opaque)
{
struct event_struct *cevent;
struct work_join *w = NULL;
@@ -1240,9 +1236,8 @@ static void sd_join_handler(struct sd_no
}
}
-static void sd_leave_handler(struct sd_node *left,
- struct sd_node *members,
- size_t nr_members)
+void sd_leave_handler(struct sd_node *left, struct sd_node *members,
+ size_t nr_members)
{
struct event_struct *cevent;
struct work_leave *w = NULL;
@@ -1291,11 +1286,6 @@ oom:
int create_cluster(int port, int64_t zone, int nr_vnodes)
{
int ret;
- struct cdrv_handlers handlers = {
- .join_handler = sd_join_handler,
- .leave_handler = sd_leave_handler,
- .notify_handler = sd_notify_handler,
- };
if (!sys->cdrv) {
sys->cdrv = find_cdrv("corosync");
@@ -1308,7 +1298,7 @@ int create_cluster(int port, int64_t zon
}
}
- cdrv_fd = sys->cdrv->init(&handlers, sys->cdrv_option, sys->this_node.addr);
+ cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
if (cdrv_fd < 0)
return -1;
More information about the sheepdog
mailing list