Instead of obscuring the callbacks into sheepdog from the cluster drivers by various means of callbacks just call them directly like you would do in normal C code. The block_cb callback to the notify routine is left for now as that area needs broader attention later. Signed-off-by: Christoph Hellwig <hch at lst.de> --- sheep/cluster.h | 60 ++++++++++++++++++++-------------------------- sheep/cluster/accord.c | 20 +++------------ sheep/cluster/corosync.c | 22 +++------------- sheep/cluster/local.c | 20 +++------------ sheep/cluster/zookeeper.c | 20 +++------------ sheep/group.c | 28 ++++++--------------- 6 files changed, 56 insertions(+), 114 deletions(-) Index: sheepdog/sheep/cluster.h =================================================================== --- sheepdog.orig/sheep/cluster.h 2012-04-25 08:49:29.420047299 +0200 +++ sheepdog/sheep/cluster.h 2012-04-25 08:53:17.152053137 +0200 @@ -31,18 +31,6 @@ enum cluster_join_result { * will leave the cluster (restart later). */ }; -struct cdrv_handlers { - void (*join_handler)(struct sd_node *joined, - struct sd_node *members, - size_t nr_members, enum cluster_join_result result, - void *opaque); - void (*leave_handler)(struct sd_node *left, - struct sd_node *members, - size_t nr_members); - void (*notify_handler)(struct sd_node *sender, - void *msg, size_t msg_len); -}; - struct cluster_driver { const char *name; @@ -53,27 +41,23 @@ struct cluster_driver { * may be used with the poll(2) to monitor cluster events. On * error, returns -1. */ - int (*init)(struct cdrv_handlers *handlers, const char *option, - uint8_t *myaddr); + int (*init)(const char *option, uint8_t *myaddr); /* * Join the cluster * - * This function is used to join the cluster, and notifies a - * join event to all the nodes. The copy of 'opaque' is - * passed to check_join_cb() and join_handler(). - * check_join_cb() is called on one of the nodes which already + * This function is used to join the cluster, and notifies a join + * event to all the nodes. The copy of 'opaque' is passed to + * sd_check_join_cb() and sd_join_handler(). + * + * sd_check_join_cb() is called on one of the nodes which already * paticipate in the cluster. If the content of 'opaque' is - * changed in check_join_cb(), the updated 'opaque' must be - * passed to join_handler(). + * changed in sd_check_join_cb(), the updated 'opaque' must be + * passed to sd_join_handler(). * * Returns zero on success, -1 on error */ - int (*join)(struct sd_node *myself, - enum cluster_join_result (*check_join_cb)( - struct sd_node *joining, - void *opaque), - void *opaque, size_t opaque_len); + int (*join)(struct sd_node *myself, void *opaque, size_t opaque_len); /* * Leave the cluster @@ -88,14 +72,14 @@ struct cluster_driver { /* * Notify a message to all nodes in the cluster * - * This function sends 'msg' to all the nodes. The notified - * messages can be read through notify_handler() in - * cdrv_handlers. If 'block_cb' is specified, block_cb() is - * called before 'msg' is notified to all the nodes. All the - * cluster events including this notification are blocked - * until block_cb() returns or this blocking node leaves the - * cluster. The sheep daemon can sleep in block_cb(), so this - * callback must be not called from the dispatch (main) thread. + * This function sends 'msg' to all the nodes. The notified messages + * can be read through sd_notify_handler(). + * + * If 'block_cb' is specified, block_cb() is called before 'msg' is + * notified to all the nodes. All the cluster events including this + * notification are blocked until block_cb() returns or this blocking + * node leaves the cluster. The sheep daemon can sleep in block_cb(), + * so this callback must be not called from the dispatch (main) thread. * * Returns zero on success, -1 on error */ @@ -168,4 +152,14 @@ static inline char *node_to_str(struct s return str; } +/* callbacks back into sheepdog from the cluster drivers */ +void sd_join_handler(struct sd_node *joined, struct sd_node *members, + size_t nr_members, enum cluster_join_result result, + void *opaque); +void sd_leave_handler(struct sd_node *left, struct sd_node *members, + size_t nr_members); +void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len); +enum cluster_join_result sd_check_join_cb(struct sd_node *joining, + void *opaque); + #endif Index: sheepdog/sheep/cluster/accord.c =================================================================== --- sheepdog.orig/sheep/cluster/accord.c 2012-04-25 08:49:29.420047299 +0200 +++ sheepdog/sheep/cluster/accord.c 2012-04-25 08:51:20.224050138 +0200 @@ -217,10 +217,6 @@ static int efd; static struct work_queue *acrd_wq; -static struct cdrv_handlers acrd_hdlrs; -static enum cluster_join_result (*acrd_check_join_cb)( - struct sd_node *joining, void *opaque); - /* get node list from the last pushed data */ static size_t get_nodes(struct acrd_handle *ah, struct sd_node *nodes, @@ -467,10 +463,8 @@ static void acrd_watch_fn(struct acrd_ha eventfd_write(efd, value); } -static int accord_init(struct cdrv_handlers *handlers, const char *option, - uint8_t *myaddr) +static int accord_init(const char *option, uint8_t *myaddr) { - acrd_hdlrs = *handlers; if (!option) { eprintf("specify one of the accord servers.\n"); eprintf("e.g. sheep /store -c accord:127.0.0.1\n"); @@ -515,13 +509,9 @@ static int accord_init(struct cdrv_handl } static int accord_join(struct sd_node *myself, - enum cluster_join_result (*check_join_cb)( - struct sd_node *joining, - void *opaque), void *opaque, size_t opaque_len) { this_node = *myself; - acrd_check_join_cb = check_join_cb; return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL); } @@ -582,7 +572,7 @@ static int accord_dispatch(void) case EVENT_JOIN: if (ev.blocked) { if (node_cmp(&ev.nodes[0], &this_node) == 0) { - res = acrd_check_join_cb(&ev.sender, ev.buf); + res = sd_check_join_cb(&ev.sender, ev.buf); ev.join_result = res; ev.blocked = 0; @@ -609,11 +599,11 @@ static int accord_dispatch(void) acrd_queue_pop(ahandle, &ev); } - acrd_hdlrs.join_handler(&ev.sender, ev.nodes, ev.nr_nodes, + sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes, ev.join_result, ev.buf); break; case EVENT_LEAVE: - acrd_hdlrs.leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); + sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); break; case EVENT_NOTIFY: if (ev.blocked) { @@ -629,7 +619,7 @@ static int accord_dispatch(void) goto out; } - acrd_hdlrs.notify_handler(&ev.sender, ev.buf, ev.buf_len); + sd_notify_handler(&ev.sender, ev.buf, ev.buf_len); break; } out: Index: sheepdog/sheep/cluster/corosync.c =================================================================== --- sheepdog.orig/sheep/cluster/corosync.c 2012-04-25 08:49:29.420047299 +0200 +++ sheepdog/sheep/cluster/corosync.c 2012-04-25 08:51:20.228050137 +0200 @@ -31,10 +31,6 @@ static struct cpg_node this_node; static struct work_queue *corosync_block_wq; -static struct cdrv_handlers corosync_handlers; -static enum cluster_join_result (*corosync_check_join_cb)( - struct sd_node *joining, void *opaque); - static LIST_HEAD(corosync_event_list); static LIST_HEAD(corosync_block_list); @@ -299,7 +295,7 @@ static int __corosync_dispatch_one(struc /* check_join() must be called only once */ return 0; - res = corosync_check_join_cb(&cevent->sender.ent, + res = sd_check_join_cb(&cevent->sender.ent, cevent->msg); if (res == CJ_RES_MASTER_TRANSFER) nr_cpg_nodes = 0; @@ -326,7 +322,7 @@ static int __corosync_dispatch_one(struc case CJ_RES_FAIL: case CJ_RES_JOIN_LATER: build_node_list(cpg_nodes, nr_cpg_nodes, entries); - corosync_handlers.join_handler(&cevent->sender.ent, entries, + sd_join_handler(&cevent->sender.ent, entries, nr_cpg_nodes, cevent->result, cevent->msg); break; @@ -341,8 +337,7 @@ static int __corosync_dispatch_one(struc del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); nr_cpg_nodes--; build_node_list(cpg_nodes, nr_cpg_nodes, entries); - corosync_handlers.leave_handler(&cevent->sender.ent, - entries, nr_cpg_nodes); + sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes); break; case COROSYNC_EVENT_TYPE_NOTIFY: if (cevent->blocked) { @@ -367,7 +362,7 @@ static int __corosync_dispatch_one(struc return 0; } - corosync_handlers.notify_handler(&cevent->sender.ent, cevent->msg, + sd_notify_handler(&cevent->sender.ent, cevent->msg, cevent->msg_len); break; } @@ -619,8 +614,7 @@ static void cdrv_cpg_confchg(cpg_handle_ __corosync_dispatch(); } -static int corosync_init(struct cdrv_handlers *handlers, const char *option, - uint8_t *myaddr) +static int corosync_init(const char *option, uint8_t *myaddr) { int ret, fd; uint32_t nodeid; @@ -629,8 +623,6 @@ static int corosync_init(struct cdrv_han .cpg_confchg_fn = cdrv_cpg_confchg }; - corosync_handlers = *handlers; - ret = cpg_initialize(&cpg_handle, &cb); if (ret != CPG_OK) { eprintf("failed to initialize cpg (%d) - is corosync running?\n", ret); @@ -670,14 +662,10 @@ static int corosync_init(struct cdrv_han } static int corosync_join(struct sd_node *myself, - enum cluster_join_result (*check_join_cb)( - struct sd_node *joining, - void *opaque), void *opaque, size_t opaque_len) { int ret; - corosync_check_join_cb = check_join_cb; retry: ret = cpg_join(cpg_handle, &cpg_group); switch (ret) { Index: sheepdog/sheep/cluster/local.c =================================================================== --- sheepdog.orig/sheep/cluster/local.c 2012-04-25 08:49:29.420047299 +0200 +++ sheepdog/sheep/cluster/local.c 2012-04-25 08:51:20.228050137 +0200 @@ -34,10 +34,6 @@ static struct sd_node this_node; static struct work_queue *local_block_wq; -static struct cdrv_handlers lhdlrs; -static enum cluster_join_result (*local_check_join_cb)( - struct sd_node *joining, void *opaque); - enum local_event_type { EVENT_JOIN = 1, EVENT_LEAVE, @@ -287,8 +283,7 @@ static void check_pids(void *arg) /* Local driver APIs */ -static int local_init(struct cdrv_handlers *handlers, const char *option, - uint8_t *myaddr) +static int local_init(const char *option, uint8_t *myaddr) { sigset_t mask; static struct timer t = { @@ -296,7 +291,6 @@ static int local_init(struct cdrv_handle .data = &t, }; - lhdlrs = *handlers; if (option) shmfile = option; @@ -325,13 +319,9 @@ static int local_init(struct cdrv_handle } static int local_join(struct sd_node *myself, - enum cluster_join_result (*check_join_cb)( - struct sd_node *joining, - void *opaque), void *opaque, size_t opaque_len) { this_node = *myself; - local_check_join_cb = check_join_cb; shm_queue_lock(); @@ -410,7 +400,7 @@ static int local_dispatch(void) case EVENT_JOIN: if (ev->blocked) { if (node_cmp(&ev->nodes[0], &this_node) == 0) { - res = local_check_join_cb(&ev->sender, ev->buf); + res = sd_check_join_cb(&ev->sender, ev->buf); ev->join_result = res; ev->blocked = 0; msync(ev, sizeof(*ev), MS_SYNC); @@ -436,11 +426,11 @@ static int local_dispatch(void) shm_queue_set_chksum(); } - lhdlrs.join_handler(&ev->sender, ev->nodes, ev->nr_nodes, + sd_join_handler(&ev->sender, ev->nodes, ev->nr_nodes, ev->join_result, ev->buf); break; case EVENT_LEAVE: - lhdlrs.leave_handler(&ev->sender, ev->nodes, ev->nr_nodes); + sd_leave_handler(&ev->sender, ev->nodes, ev->nr_nodes); break; case EVENT_NOTIFY: if (ev->blocked) { @@ -454,7 +444,7 @@ static int local_dispatch(void) goto out; } - lhdlrs.notify_handler(&ev->sender, ev->buf, ev->buf_len); + sd_notify_handler(&ev->sender, ev->buf, ev->buf_len); break; } Index: sheepdog/sheep/cluster/zookeeper.c =================================================================== --- sheepdog.orig/sheep/cluster/zookeeper.c 2012-04-25 08:49:29.420047299 +0200 +++ sheepdog/sheep/cluster/zookeeper.c 2012-04-25 08:51:20.228050137 +0200 @@ -222,10 +222,6 @@ static struct work_queue *zk_block_wq; static struct sd_node this_node; -static struct cdrv_handlers zk_hdlrs; -static enum cluster_join_result (*zk_check_join_cb)( - struct sd_node *joining, void *opaque); - /* get node list from the last pushed data */ static size_t get_nodes(zhandle_t *zh, struct sd_node *nodes) { @@ -376,10 +372,8 @@ static int get_addr(uint8_t *bytes) return 0; } -static int zk_init(struct cdrv_handlers *handlers, const char *option, - uint8_t *myaddr) +static int zk_init(const char *option, uint8_t *myaddr) { - zk_hdlrs = *handlers; if (!option) { eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n"); eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n"); @@ -409,9 +403,6 @@ static int zk_init(struct cdrv_handlers } static int zk_join(struct sd_node *myself, - enum cluster_join_result (*check_join_cb)( - struct sd_node *joining, - void *opaque), void *opaque, size_t opaque_len) { int rc; @@ -419,7 +410,6 @@ static int zk_join(struct sd_node *mysel char path[256]; this_node = *myself; - zk_check_join_cb = check_join_cb; sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself)); joined = 0; @@ -483,7 +473,7 @@ static int zk_dispatch(void) case EVENT_JOIN: if (ev.blocked) { if (node_cmp(&ev.nodes[0], &this_node) == 0) { - res = zk_check_join_cb(&ev.sender, ev.buf); + res = sd_check_join_cb(&ev.sender, ev.buf); ev.join_result = res; ev.blocked = 0; @@ -517,11 +507,11 @@ static int zk_dispatch(void) sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender)); zoo_exists(zhandle, path, 1, NULL); - zk_hdlrs.join_handler(&ev.sender, ev.nodes, ev.nr_nodes, + sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes, ev.join_result, ev.buf); break; case EVENT_LEAVE: - zk_hdlrs.leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); + sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); break; case EVENT_NOTIFY: if (ev.blocked) { @@ -537,7 +527,7 @@ static int zk_dispatch(void) goto out; } - zk_hdlrs.notify_handler(&ev.sender, ev.buf, ev.buf_len); + sd_notify_handler(&ev.sender, ev.buf, ev.buf_len); break; } out: Index: sheepdog/sheep/group.c =================================================================== --- sheepdog.orig/sheep/group.c 2012-04-25 08:49:29.420047299 +0200 +++ sheepdog/sheep/group.c 2012-04-25 08:51:20.228050137 +0200 @@ -637,8 +637,7 @@ static void __sd_notify_done(struct even req->done(req); } -static void sd_notify_handler(struct sd_node *sender, - void *msg, size_t msg_len) +void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len) { struct event_struct *cevent; struct work_notify *w; @@ -736,8 +735,7 @@ static void __sd_leave(struct event_stru } } -static enum cluster_join_result sd_check_join_cb( - struct sd_node *joining, void *opaque) +enum cluster_join_result sd_check_join_cb(struct sd_node *joining, void *opaque) { struct join_message *jm = opaque; struct node *node; @@ -813,8 +811,7 @@ static int send_join_request(struct sd_n if (ret == SD_RES_SUCCESS) msg->nr_nodes = nr_entries; - ret = sys->cdrv->join(ent, sd_check_join_cb, msg, - get_join_message_size(msg)); + ret = sys->cdrv->join(ent, msg, get_join_message_size(msg)); vprintf(SDOG_INFO, "%s\n", node_to_str(&sys->this_node)); @@ -1098,10 +1095,9 @@ void process_request_event_queues(void) process_request_queue(); } -static void sd_join_handler(struct sd_node *joined, - struct sd_node *members, - size_t nr_members, enum cluster_join_result result, - void *opaque) +void sd_join_handler(struct sd_node *joined, struct sd_node *members, + size_t nr_members, enum cluster_join_result result, + void *opaque) { struct event_struct *cevent; struct work_join *w = NULL; @@ -1240,9 +1236,8 @@ static void sd_join_handler(struct sd_no } } -static void sd_leave_handler(struct sd_node *left, - struct sd_node *members, - size_t nr_members) +void sd_leave_handler(struct sd_node *left, struct sd_node *members, + size_t nr_members) { struct event_struct *cevent; struct work_leave *w = NULL; @@ -1291,11 +1286,6 @@ oom: int create_cluster(int port, int64_t zone, int nr_vnodes) { int ret; - struct cdrv_handlers handlers = { - .join_handler = sd_join_handler, - .leave_handler = sd_leave_handler, - .notify_handler = sd_notify_handler, - }; if (!sys->cdrv) { sys->cdrv = find_cdrv("corosync"); @@ -1308,7 +1298,7 @@ int create_cluster(int port, int64_t zon } } - cdrv_fd = sys->cdrv->init(&handlers, sys->cdrv_option, sys->this_node.addr); + cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr); if (cdrv_fd < 0) return -1; |