On Tue, May 15, 2012 at 1:08 AM, Christoph Hellwig <hch at infradead.org> wrote: > On Tue, May 15, 2012 at 12:46:13AM +0800, Yunkai Zhang wrote: >> > another thing I noticed is that I think your patch makes the ->dispatch >> > cluster driver operation obsolete. Once we never register the event >> > handler for the cluster driver FD it can be handled netirely inside the >> > cluster driver, e.g. just call register_event from inside the init >> > method and don't even tell the core about it. >> > >> We have called register_event(cdrv_fd, ...) in create_cluster(). > > Yes, but we could also do this in the drivers. See below for an > untested version of your patch that implements this suggestions: > > From: Yunkai Zhang <yunkai.me at gmail.com> > Subject: [Sheepdog] [PATCH] Remove unregister_event from sd_xxx_handler() > > From: Yunkai Zhang <qiushu.zyk at taobao.com> > > Call update_cluster_info() in driver directly so that we can remove > unregister_event() from sd_xxx_handler(). > > This can speed up joinng/leaving process. > > Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> > --- > sheep/cluster.h | 3 +++ > sheep/cluster/accord.c | 4 ++++ > sheep/cluster/corosync.c | 5 +++++ > sheep/cluster/zookeeper.c | 7 ++++++- > sheep/group.c | 30 +++++++++++++----------------- > 5 files changed, 31 insertions(+), 18 deletions(-) > > Index: sheepdog/sheep/cluster.h > =================================================================== > --- sheepdog.orig/sheep/cluster.h 2012-05-14 18:45:08.967970005 +0200 > +++ sheepdog/sheep/cluster.h 2012-05-14 18:49:03.759967772 +0200 > @@ -86,19 +86,6 @@ struct cluster_driver { > */ > int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg)); > > - /* > - * Dispatch handlers > - * > - * This function dispatches handlers according to the > - * delivered events (join/leave/notify) in the cluster. > - * > - * Note that the events sequence is totally ordered; all nodes > - * call the handlers in the same sequence. > - * > - * Returns zero on success, -1 on error > - */ > - int (*dispatch)(void); > - > struct list_head list; > }; > > @@ -107,7 +94,7 @@ extern struct list_head cluster_drivers; > #define cdrv_register(driver) \ > static void __attribute__((constructor)) regist_ ## driver(void) { \ > if (!driver.init || !driver.join || !driver.leave || \ > - !driver.notify || !driver.dispatch) \ > + !driver.notify) \ > panic("the driver '%s' is incomplete\n", driver.name); \ > list_add(&driver.list, &cluster_drivers); \ > } > @@ -191,5 +178,8 @@ void sd_leave_handler(struct sd_node *le > void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len); > enum cluster_join_result sd_check_join_cb(struct sd_node *joining, > void *opaque); > +void update_cluster_info(struct sd_node *joined, struct sd_node *nodes, > + size_t nr_nodes, void *opaque); > +void update_epoch_info(void); > > #endif > Index: sheepdog/sheep/cluster/accord.c > =================================================================== > --- sheepdog.orig/sheep/cluster/accord.c 2012-05-11 17:51:27.000000000 +0200 > +++ sheepdog/sheep/cluster/accord.c 2012-05-14 18:56:36.855963441 +0200 > @@ -16,6 +16,7 @@ > #include <assert.h> > #include <pthread.h> > #include <sys/eventfd.h> > +#include <sys/epoll.h> > #include <accord/accord.h> > > #include "cluster.h" > @@ -463,6 +464,8 @@ static void acrd_watch_fn(struct acrd_ha > eventfd_write(efd, value); > } > > +static void accord_dispatch(int listen_fd, int events, void *data); > + > static int accord_init(const char *option, uint8_t *myaddr) > { > if (!option) { > @@ -509,7 +512,13 @@ static int accord_init(const char *optio > acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL, > acrd_watch_fn, NULL); > > - return efd; > + ret = register_event(efd, accord_dispatch, NULL); > + if (ret) { > + eprintf("failed to register epoll events (%d)\n", ret); > + return -1; > + } > + > + return 0; > } > > static int accord_join(struct sd_node *myself, > @@ -550,7 +559,7 @@ static void acrd_block_done(struct work > { > } > > -static int accord_dispatch(void) > +static void accord_dispatch(int listen_fd, int events, void *data) > { > int ret; > eventfd_t value; > @@ -561,6 +570,11 @@ static int accord_dispatch(void) > .done = acrd_block_done, > }; > > + if (events & EPOLLHUP) { > + eprintf("received EPOLLHUP event: has accord exited?\n"); > + goto out; > + } > + > dprintf("read event\n"); > ret = eventfd_read(efd, &value); > if (ret < 0) > @@ -603,10 +617,14 @@ static int accord_dispatch(void) > acrd_queue_pop(ahandle, &ev); > } > > + if (ev.join_result == CJ_RES_SUCCESS) > + update_cluster_info(&ev.sender, ev.nodes, > + ev.nr_nodes, ev.buf); > sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes, > ev.join_result, ev.buf); > break; > case EVENT_LEAVE: > + update_epoch_info(); > sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); > break; > case EVENT_NOTIFY: > @@ -628,8 +646,10 @@ static int accord_dispatch(void) > } > out: > pthread_mutex_unlock(&queue_lock); > - > return 0; > +err: > + log_close(); > + exit(1); > } > > struct cluster_driver cdrv_accord = { > @@ -639,7 +659,6 @@ struct cluster_driver cdrv_accord = { > .join = accord_join, > .leave = accord_leave, > .notify = accord_notify, > - .dispatch = accord_dispatch, > }; > > cdrv_register(cdrv_accord); > Index: sheepdog/sheep/cluster/corosync.c > =================================================================== > --- sheepdog.orig/sheep/cluster/corosync.c 2012-05-14 18:45:08.967970005 +0200 > +++ sheepdog/sheep/cluster/corosync.c 2012-05-14 18:57:48.279962765 +0200 > @@ -12,7 +12,9 @@ > #include <unistd.h> > #include <corosync/cpg.h> > #include <corosync/cfg.h> > +#include <sys/epoll.h> > > +#include "event.h" > #include "cluster.h" > #include "work.h" > > @@ -322,6 +324,10 @@ static int __corosync_dispatch_one(struc > case CJ_RES_FAIL: > case CJ_RES_JOIN_LATER: > build_node_list(cpg_nodes, nr_cpg_nodes, entries); > + if (cevent->result == CJ_RES_SUCCESS) > + update_cluster_info(&cevent->sender.ent, > + entries, nr_cpg_nodes, > + cevent->msg); > sd_join_handler(&cevent->sender.ent, entries, > nr_cpg_nodes, cevent->result, > cevent->msg); > @@ -334,6 +340,7 @@ static int __corosync_dispatch_one(struc > break; > cevent->sender.ent = cpg_nodes[idx].ent; > > + update_epoch_info(); > del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); > nr_cpg_nodes--; > build_node_list(cpg_nodes, nr_cpg_nodes, entries); > @@ -614,6 +621,28 @@ static void cdrv_cpg_confchg(cpg_handle_ > __corosync_dispatch(); > } > > +static void corosync_dispatch(int listen_fd, int events, void *data) > +{ > + int ret; > + > + if (events & EPOLLHUP) { > + eprintf("received EPOLLHUP event: has corosync exited?\n"); > + goto out; > + } > + > + ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL); > + if (ret != CPG_OK) { > + eprintf("oops... an error occurred inside corosync\n"); > + goto out; > + } > + > + return; > +out: > + log_close(); > + exit(1); > +} > + > + > static int corosync_init(const char *option, uint8_t *myaddr) > { > int ret, fd; > @@ -662,7 +691,13 @@ static int corosync_init(const char *opt > return -1; > } > > - return fd; > + ret = register_event(fd, corosync_dispatch, NULL); > + if (ret) { > + eprintf("failed to register epoll events (%d)\n", ret); > + return -1; > + } > + > + return 0; > } > > static int corosync_join(struct sd_node *myself, > @@ -728,17 +763,6 @@ static int corosync_notify(void *msg, si > return ret; > } > > -static int corosync_dispatch(void) > -{ > - int ret; > - > - ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL); > - if (ret != CPG_OK) > - return -1; > - > - return 0; > -} > - > struct cluster_driver cdrv_corosync = { > .name = "corosync", > > @@ -746,7 +770,6 @@ struct cluster_driver cdrv_corosync = { > .join = corosync_join, > .leave = corosync_leave, > .notify = corosync_notify, > - .dispatch = corosync_dispatch, > }; > > cdrv_register(cdrv_corosync); > Index: sheepdog/sheep/cluster/zookeeper.c > =================================================================== > --- sheepdog.orig/sheep/cluster/zookeeper.c 2012-05-14 18:45:08.971970007 +0200 > +++ sheepdog/sheep/cluster/zookeeper.c 2012-05-14 18:58:38.199962294 +0200 > @@ -14,10 +14,12 @@ > #include <netdb.h> > #include <search.h> > #include <assert.h> > +#include <sys/epoll.h> > #include <sys/eventfd.h> > #include <zookeeper/zookeeper.h> > #include <urcu/uatomic.h> > > +#include "event.h" > #include "cluster.h" > #include "work.h" > > @@ -30,6 +32,7 @@ > #define QUEUE_ZNODE BASE_ZNODE "/queue" > #define MEMBER_ZNODE BASE_ZNODE "/member" > > +static void zk_dispatch(int listen_fd, int events, void *data); > > /* zookeeper API wrapper prototypes */ > ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value, > @@ -647,6 +650,8 @@ static int get_addr(uint8_t *bytes) > > static int zk_init(const char *option, uint8_t *myaddr) > { > + int ret; > + > if (!option) { > eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n"); > eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n"); > @@ -678,7 +683,13 @@ static int zk_init(const char *option, u > return -1; > } > > - return efd; > + ret = register_event(efd, zk_dispatch, NULL); > + if (ret) { > + eprintf("failed to register epoll events (%d)\n", ret); > + return -1; > + } > + > + return 0; > } > > static int zk_join(struct sd_node *myself, > @@ -745,7 +756,7 @@ static void zk_block_done(struct work *w > { > } > > -static int zk_dispatch(void) > +static void zk_dispatch(int listen_fd, int events, void *data) > { > int ret, rc, retry; > char path[256]; > @@ -758,13 +769,18 @@ static int zk_dispatch(void) > .done = zk_block_done, > }; > > + if (events & EPOLLHUP) { > + eprintf("received EPOLLHUP event: has corosync exited?\n"); > + goto err; > + } > + > dprintf("read event\n"); > ret = eventfd_read(efd, &value); > if (ret < 0) > - return 0; > + return; > > if (uatomic_read(&zk_notify_blocked)) > - return 0; > + return; > > ret = zk_queue_pop(zhandle, &ev); > if (ret < 0) > @@ -824,6 +840,10 @@ static int zk_dispatch(void) > } > > node_btree_add(&zk_node_btroot, &ev.sender); > + build_node_list(zk_node_btroot); > + if (ev.join_result == CJ_RES_SUCCESS) > + update_cluster_info(&ev.sender.node, sd_nodes, > + nr_sd_nodes, ev.buf); > dprintf("one sheep joined[down], nr_nodes:%ld, sender:%s, joined:%d\n", > nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined); > > @@ -841,7 +861,6 @@ static int zk_dispatch(void) > } > } > > - build_node_list(zk_node_btroot); > sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes, > ev.join_result, ev.buf); > break; > @@ -853,6 +872,8 @@ static int zk_dispatch(void) > goto out; > } > > + update_epoch_info(); > + > node_btree_del(&zk_node_btroot, n); > dprintf("one sheep left, nr_nodes:%ld\n", nr_zk_nodes); > > @@ -880,7 +901,10 @@ static int zk_dispatch(void) > break; > } > out: > - return 0; > + return; > +err: > + log_close(); > + exit(1); > } > > struct cluster_driver cdrv_zookeeper = { > @@ -890,7 +914,6 @@ struct cluster_driver cdrv_zookeeper = { > .join = zk_join, > .leave = zk_leave, > .notify = zk_notify, > - .dispatch = zk_dispatch, > }; > > cdrv_register(cdrv_zookeeper); > Index: sheepdog/sheep/group.c > =================================================================== > --- sheepdog.orig/sheep/group.c 2012-05-14 18:45:08.971970007 +0200 > +++ sheepdog/sheep/group.c 2012-05-14 18:54:38.735964575 +0200 > @@ -25,8 +25,6 @@ > #include "work.h" > #include "cluster.h" > > -static int cdrv_fd; > - > struct node { > struct sd_node ent; > struct list_head list; > @@ -262,24 +260,6 @@ void do_cluster_request(struct work *wor > free(msg); > } > > -static void group_handler(int listen_fd, int events, void *data) > -{ > - int ret; > - if (events & EPOLLHUP) { > - eprintf("received EPOLLHUP event: has corosync exited?\n"); > - goto out; > - } > - > - ret = sys->cdrv->dispatch(); > - if (ret == 0) > - return; > - else > - eprintf("oops... an error occurred inside corosync\n"); > -out: > - log_close(); > - exit(1); > -} > - > static inline int get_nodes_nr_from(struct list_head *l) > { > struct node *node; > @@ -577,9 +557,20 @@ static void finish_join(struct join_mess > } > } > > -static void update_cluster_info(struct join_message *msg, > - struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes) > +void update_epoch_info(void) > +{ > + if (sys_can_recover()) { > + sys->epoch++; > + update_epoch_store(sys->epoch); > + update_epoch_log(sys->epoch); > + } > +} > + > +void update_cluster_info(struct sd_node *joined, struct sd_node *nodes, > + size_t nr_nodes, void *opaque) > { > + struct join_message *msg = opaque; > + > eprintf("status = %d, epoch = %d, %x, %d\n", msg->cluster_status, > msg->epoch, msg->result, sys->join_finished); > > @@ -836,11 +827,6 @@ static void __sd_join_done(struct event_ > > print_node_list(sys->nodes, sys->nr_nodes); > > - if (!sys_stat_join_failed()) { > - update_cluster_info(jm, &w->joined, w->member_list, > - w->member_list_entries); > - } > - > if (sys_can_recover() && jm->inc_epoch) { > list_for_each_entry_safe(node, t, &sys->leave_list, list) { > list_del(&node->list); > @@ -866,11 +852,6 @@ static void __sd_leave_done(struct event > memcpy(sys->nodes, w->member_list, sizeof(*sys->nodes) * sys->nr_nodes); > qsort(sys->nodes, sys->nr_nodes, sizeof(*sys->nodes), node_cmp); > > - if (sys_can_recover()) { > - sys->epoch++; > - update_epoch_store(sys->epoch); > - update_epoch_log(sys->epoch); > - } > update_vnode_info(); > > print_node_list(sys->nodes, sys->nr_nodes); > @@ -943,7 +924,6 @@ static void event_fn(struct work *work) > static void event_done(struct work *work) > { > struct event_struct *cevent; > - int ret; > > if (!sys->cur_cevent) > vprintf(SDOG_ERR, "bug\n"); > @@ -973,9 +953,6 @@ static void event_done(struct work *work > vprintf(SDOG_DEBUG, "free %p\n", cevent); > event_free(cevent); > event_running = 0; > - ret = register_event(cdrv_fd, group_handler, NULL); > - if (ret) > - panic("failed to register event fd"); > > process_request_event_queues(); > } > @@ -1086,7 +1063,6 @@ static inline void process_event_queue(v > event_work.fn = event_fn; > event_work.done = event_done; > > - unregister_event(cdrv_fd); > queue_work(sys->event_wqueue, &event_work); > } > > @@ -1298,8 +1274,8 @@ int create_cluster(int port, int64_t zon > } > } > > - cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr); > - if (cdrv_fd < 0) > + ret = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr); > + if (ret < 0) > return -1; > > sys->this_node.port = port; > @@ -1327,12 +1303,6 @@ int create_cluster(int port, int64_t zon > INIT_LIST_HEAD(&sys->request_queue); > INIT_LIST_HEAD(&sys->event_queue); > > - ret = register_event(cdrv_fd, group_handler, NULL); > - if (ret) { > - eprintf("failed to register epoll events (%d)\n", ret); > - return 1; > - } > - > ret = send_join_request(&sys->this_node); > if (ret != 0) > return -1; > Index: sheepdog/sheep/cluster/local.c > =================================================================== > --- sheepdog.orig/sheep/cluster/local.c 2012-05-14 18:45:08.967970005 +0200 > +++ sheepdog/sheep/cluster/local.c 2012-05-14 18:57:05.299963176 +0200 > @@ -13,9 +13,11 @@ > #include <unistd.h> > #include <sys/mman.h> > #include <sys/signalfd.h> > +#include <sys/epoll.h> > #include <sys/file.h> > #include <search.h> > #include <signal.h> > +#include <sys/epoll.h> > #include <fcntl.h> > #include <assert.h> > > @@ -34,6 +36,8 @@ static struct sd_node this_node; > > static struct work_queue *local_block_wq; > > +static void local_dispatch(int listen_fd, int events, void *data); > + > enum local_event_type { > EVENT_JOIN = 1, > EVENT_LEAVE, > @@ -280,12 +284,12 @@ static void check_pids(void *arg) > add_timer(arg, 1); > } > > - > /* Local driver APIs */ > > static int local_init(const char *option, uint8_t *myaddr) > { > sigset_t mask; > + int ret; > static struct timer t = { > .callback = check_pids, > .data = &t, > @@ -319,7 +323,13 @@ static int local_init(const char *option > return -1; > } > > - return sigfd; > + ret = register_event(sigfd, local_dispatch, NULL); > + if (ret) { > + eprintf("failed to register epoll events (%d)\n", ret); > + return -1; > + } > + > + return 0; > } > > static int local_join(struct sd_node *myself, > @@ -379,7 +389,7 @@ static void local_block_done(struct work > { > } > > -static int local_dispatch(void) > +static void local_dispatch(int listen_fd, int events, void *data) > { > int ret; > struct signalfd_siginfo siginfo; > @@ -390,6 +400,11 @@ static int local_dispatch(void) > .done = local_block_done, > }; > > + if (events & EPOLLHUP) { > + eprintf("received EPOLLHUP event: has corosync exited?\n"); > + goto err; > + } > + > dprintf("read siginfo\n"); > ret = read(sigfd, &siginfo, sizeof(siginfo)); > assert(ret == sizeof(siginfo)); > @@ -456,7 +471,10 @@ static int local_dispatch(void) > out: > shm_queue_unlock(); > > - return 0; > + return; > +err: > + log_close(); > + exit(1); > } > > struct cluster_driver cdrv_local = { > @@ -466,7 +484,6 @@ struct cluster_driver cdrv_local = { > .join = local_join, > .leave = local_leave, > .notify = local_notify, > - .dispatch = local_dispatch, > }; > > cdrv_register(cdrv_local); Good! It can make driver's API more cleanly. I'll update this patch to reflect your suggestion in V2. I have read your latest patchsets, I think this patch can work nicely with yours. Thank you very much. -- Yunkai Zhang Work at Taobao |