On Tue, May 15, 2012 at 3:01 PM, Yunkai Zhang <yunkai.me at gmail.com> wrote: > On Tue, May 15, 2012 at 11:57 AM, Yunkai Zhang <yunkai.me at gmail.com> wrote: >> On Tue, May 15, 2012 at 1:08 AM, Christoph Hellwig <hch at infradead.org> wrote: >>> On Tue, May 15, 2012 at 12:46:13AM +0800, Yunkai Zhang wrote: >>>> > another thing I noticed is that I think your patch makes the ->dispatch >>>> > cluster driver operation obsolete. Once we never register the event >>>> > handler for the cluster driver FD it can be handled netirely inside the >>>> > cluster driver, e.g. just call register_event from inside the init >>>> > method and don't even tell the core about it. >>>> > >>>> We have called register_event(cdrv_fd, ...) in create_cluster(). >>> >>> Yes, but we could also do this in the drivers. See below for an >>> untested version of your patch that implements this suggestions: >>> >>> From: Yunkai Zhang <yunkai.me at gmail.com> >>> Subject: [Sheepdog] [PATCH] Remove unregister_event from sd_xxx_handler() >>> >>> From: Yunkai Zhang <qiushu.zyk at taobao.com> >>> >>> Call update_cluster_info() in driver directly so that we can remove >>> unregister_event() from sd_xxx_handler(). >>> >>> This can speed up joinng/leaving process. >>> >>> Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> >>> --- >>> sheep/cluster.h | 3 +++ >>> sheep/cluster/accord.c | 4 ++++ >>> sheep/cluster/corosync.c | 5 +++++ >>> sheep/cluster/zookeeper.c | 7 ++++++- >>> sheep/group.c | 30 +++++++++++++----------------- >>> 5 files changed, 31 insertions(+), 18 deletions(-) >>> >>> Index: sheepdog/sheep/cluster.h >>> =================================================================== >>> --- sheepdog.orig/sheep/cluster.h 2012-05-14 18:45:08.967970005 +0200 >>> +++ sheepdog/sheep/cluster.h 2012-05-14 18:49:03.759967772 +0200 >>> @@ -86,19 +86,6 @@ struct cluster_driver { >>> */ >>> int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg)); >>> >>> - /* >>> - * Dispatch handlers >>> - * >>> - * This function dispatches handlers according to the >>> - * delivered events (join/leave/notify) in the cluster. >>> - * >>> - * Note that the events sequence is totally ordered; all nodes >>> - * call the handlers in the same sequence. >>> - * >>> - * Returns zero on success, -1 on error >>> - */ >>> - int (*dispatch)(void); >>> - >>> struct list_head list; >>> }; >>> >>> @@ -107,7 +94,7 @@ extern struct list_head cluster_drivers; >>> #define cdrv_register(driver) \ >>> static void __attribute__((constructor)) regist_ ## driver(void) { \ >>> if (!driver.init || !driver.join || !driver.leave || \ >>> - !driver.notify || !driver.dispatch) \ >>> + !driver.notify) \ >>> panic("the driver '%s' is incomplete\n", driver.name); \ >>> list_add(&driver.list, &cluster_drivers); \ >>> } >>> @@ -191,5 +178,8 @@ void sd_leave_handler(struct sd_node *le >>> void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len); >>> enum cluster_join_result sd_check_join_cb(struct sd_node *joining, >>> void *opaque); >>> +void update_cluster_info(struct sd_node *joined, struct sd_node *nodes, >>> + size_t nr_nodes, void *opaque); >>> +void update_epoch_info(void); >>> >>> #endif >>> Index: sheepdog/sheep/cluster/accord.c >>> =================================================================== >>> --- sheepdog.orig/sheep/cluster/accord.c 2012-05-11 17:51:27.000000000 +0200 >>> +++ sheepdog/sheep/cluster/accord.c 2012-05-14 18:56:36.855963441 +0200 >>> @@ -16,6 +16,7 @@ >>> #include <assert.h> >>> #include <pthread.h> >>> #include <sys/eventfd.h> >>> +#include <sys/epoll.h> >>> #include <accord/accord.h> >>> >>> #include "cluster.h" >>> @@ -463,6 +464,8 @@ static void acrd_watch_fn(struct acrd_ha >>> eventfd_write(efd, value); >>> } >>> >>> +static void accord_dispatch(int listen_fd, int events, void *data); >>> + >>> static int accord_init(const char *option, uint8_t *myaddr) >>> { >>> if (!option) { >>> @@ -509,7 +512,13 @@ static int accord_init(const char *optio >>> acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL, >>> acrd_watch_fn, NULL); >>> >>> - return efd; >>> + ret = register_event(efd, accord_dispatch, NULL); >>> + if (ret) { >>> + eprintf("failed to register epoll events (%d)\n", ret); >>> + return -1; >>> + } >>> + >>> + return 0; >>> } >>> >>> static int accord_join(struct sd_node *myself, >>> @@ -550,7 +559,7 @@ static void acrd_block_done(struct work >>> { >>> } >>> >>> -static int accord_dispatch(void) >>> +static void accord_dispatch(int listen_fd, int events, void *data) >>> { >>> int ret; >>> eventfd_t value; >>> @@ -561,6 +570,11 @@ static int accord_dispatch(void) >>> .done = acrd_block_done, >>> }; >>> >>> + if (events & EPOLLHUP) { >>> + eprintf("received EPOLLHUP event: has accord exited?\n"); >>> + goto out; >>> + } >>> + >>> dprintf("read event\n"); >>> ret = eventfd_read(efd, &value); >>> if (ret < 0) >>> @@ -603,10 +617,14 @@ static int accord_dispatch(void) >>> acrd_queue_pop(ahandle, &ev); >>> } >>> >>> + if (ev.join_result == CJ_RES_SUCCESS) >>> + update_cluster_info(&ev.sender, ev.nodes, >>> + ev.nr_nodes, ev.buf); >>> sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes, >>> ev.join_result, ev.buf); >>> break; >>> case EVENT_LEAVE: >>> + update_epoch_info(); >>> sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); >>> break; >>> case EVENT_NOTIFY: >>> @@ -628,8 +646,10 @@ static int accord_dispatch(void) >>> } >>> out: >>> pthread_mutex_unlock(&queue_lock); >>> - >>> return 0; >>> +err: >>> + log_close(); >>> + exit(1); >>> } >>> >>> struct cluster_driver cdrv_accord = { >>> @@ -639,7 +659,6 @@ struct cluster_driver cdrv_accord = { >>> .join = accord_join, >>> .leave = accord_leave, >>> .notify = accord_notify, >>> - .dispatch = accord_dispatch, >>> }; >>> >>> cdrv_register(cdrv_accord); >>> Index: sheepdog/sheep/cluster/corosync.c >>> =================================================================== >>> --- sheepdog.orig/sheep/cluster/corosync.c 2012-05-14 18:45:08.967970005 +0200 >>> +++ sheepdog/sheep/cluster/corosync.c 2012-05-14 18:57:48.279962765 +0200 >>> @@ -12,7 +12,9 @@ >>> #include <unistd.h> >>> #include <corosync/cpg.h> >>> #include <corosync/cfg.h> >>> +#include <sys/epoll.h> >>> >>> +#include "event.h" >>> #include "cluster.h" >>> #include "work.h" >>> >>> @@ -322,6 +324,10 @@ static int __corosync_dispatch_one(struc >>> case CJ_RES_FAIL: >>> case CJ_RES_JOIN_LATER: >>> build_node_list(cpg_nodes, nr_cpg_nodes, entries); >>> + if (cevent->result == CJ_RES_SUCCESS) >>> + update_cluster_info(&cevent->sender.ent, >>> + entries, nr_cpg_nodes, >>> + cevent->msg); >>> sd_join_handler(&cevent->sender.ent, entries, >>> nr_cpg_nodes, cevent->result, >>> cevent->msg); >>> @@ -334,6 +340,7 @@ static int __corosync_dispatch_one(struc >>> break; >>> cevent->sender.ent = cpg_nodes[idx].ent; >>> >>> + update_epoch_info(); >>> del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); >>> nr_cpg_nodes--; >>> build_node_list(cpg_nodes, nr_cpg_nodes, entries); >>> @@ -614,6 +621,28 @@ static void cdrv_cpg_confchg(cpg_handle_ >>> __corosync_dispatch(); >>> } >>> >>> +static void corosync_dispatch(int listen_fd, int events, void *data) >>> +{ >>> + int ret; >>> + >>> + if (events & EPOLLHUP) { >>> + eprintf("received EPOLLHUP event: has corosync exited?\n"); >>> + goto out; >>> + } >>> + >>> + ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL); >>> + if (ret != CPG_OK) { >>> + eprintf("oops... an error occurred inside corosync\n"); >>> + goto out; >>> + } >>> + >>> + return; >>> +out: >>> + log_close(); >>> + exit(1); >>> +} >>> + >>> + >>> static int corosync_init(const char *option, uint8_t *myaddr) >>> { >>> int ret, fd; >>> @@ -662,7 +691,13 @@ static int corosync_init(const char *opt >>> return -1; >>> } >>> >>> - return fd; >>> + ret = register_event(fd, corosync_dispatch, NULL); >>> + if (ret) { >>> + eprintf("failed to register epoll events (%d)\n", ret); >>> + return -1; >>> + } >>> + >>> + return 0; >>> } >>> >>> static int corosync_join(struct sd_node *myself, >>> @@ -728,17 +763,6 @@ static int corosync_notify(void *msg, si >>> return ret; >>> } >>> >>> -static int corosync_dispatch(void) >>> -{ >>> - int ret; >>> - >>> - ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL); >>> - if (ret != CPG_OK) >>> - return -1; >>> - >>> - return 0; >>> -} >>> - >>> struct cluster_driver cdrv_corosync = { >>> .name = "corosync", >>> >>> @@ -746,7 +770,6 @@ struct cluster_driver cdrv_corosync = { >>> .join = corosync_join, >>> .leave = corosync_leave, >>> .notify = corosync_notify, >>> - .dispatch = corosync_dispatch, >>> }; >>> >>> cdrv_register(cdrv_corosync); >>> Index: sheepdog/sheep/cluster/zookeeper.c >>> =================================================================== >>> --- sheepdog.orig/sheep/cluster/zookeeper.c 2012-05-14 18:45:08.971970007 +0200 >>> +++ sheepdog/sheep/cluster/zookeeper.c 2012-05-14 18:58:38.199962294 +0200 >>> @@ -14,10 +14,12 @@ >>> #include <netdb.h> >>> #include <search.h> >>> #include <assert.h> >>> +#include <sys/epoll.h> >>> #include <sys/eventfd.h> >>> #include <zookeeper/zookeeper.h> >>> #include <urcu/uatomic.h> >>> >>> +#include "event.h" >>> #include "cluster.h" >>> #include "work.h" >>> >>> @@ -30,6 +32,7 @@ >>> #define QUEUE_ZNODE BASE_ZNODE "/queue" >>> #define MEMBER_ZNODE BASE_ZNODE "/member" >>> >>> +static void zk_dispatch(int listen_fd, int events, void *data); >>> >>> /* zookeeper API wrapper prototypes */ >>> ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value, >>> @@ -647,6 +650,8 @@ static int get_addr(uint8_t *bytes) >>> >>> static int zk_init(const char *option, uint8_t *myaddr) >>> { >>> + int ret; >>> + >>> if (!option) { >>> eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n"); >>> eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n"); >>> @@ -678,7 +683,13 @@ static int zk_init(const char *option, u >>> return -1; >>> } >>> >>> - return efd; >>> + ret = register_event(efd, zk_dispatch, NULL); >>> + if (ret) { >>> + eprintf("failed to register epoll events (%d)\n", ret); >>> + return -1; >>> + } >>> + >>> + return 0; >>> } >>> >>> static int zk_join(struct sd_node *myself, >>> @@ -745,7 +756,7 @@ static void zk_block_done(struct work *w >>> { >>> } >>> >>> -static int zk_dispatch(void) >>> +static void zk_dispatch(int listen_fd, int events, void *data) >>> { >>> int ret, rc, retry; >>> char path[256]; >>> @@ -758,13 +769,18 @@ static int zk_dispatch(void) >>> .done = zk_block_done, >>> }; >>> >>> + if (events & EPOLLHUP) { >>> + eprintf("received EPOLLHUP event: has corosync exited?\n"); >>> + goto err; >>> + } >>> + >>> dprintf("read event\n"); >>> ret = eventfd_read(efd, &value); >>> if (ret < 0) >>> - return 0; >>> + return; >>> >>> if (uatomic_read(&zk_notify_blocked)) >>> - return 0; >>> + return; >>> >>> ret = zk_queue_pop(zhandle, &ev); >>> if (ret < 0) >>> @@ -824,6 +840,10 @@ static int zk_dispatch(void) >>> } >>> >>> node_btree_add(&zk_node_btroot, &ev.sender); >>> + build_node_list(zk_node_btroot); >>> + if (ev.join_result == CJ_RES_SUCCESS) >>> + update_cluster_info(&ev.sender.node, sd_nodes, >>> + nr_sd_nodes, ev.buf); >>> dprintf("one sheep joined[down], nr_nodes:%ld, sender:%s, joined:%d\n", >>> nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined); >>> >>> @@ -841,7 +861,6 @@ static int zk_dispatch(void) >>> } >>> } >>> >>> - build_node_list(zk_node_btroot); >>> sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes, >>> ev.join_result, ev.buf); >>> break; >>> @@ -853,6 +872,8 @@ static int zk_dispatch(void) >>> goto out; >>> } >>> >>> + update_epoch_info(); >>> + >>> node_btree_del(&zk_node_btroot, n); >>> dprintf("one sheep left, nr_nodes:%ld\n", nr_zk_nodes); >>> >>> @@ -880,7 +901,10 @@ static int zk_dispatch(void) >>> break; >>> } >>> out: >>> - return 0; >>> + return; >>> +err: >>> + log_close(); >>> + exit(1); >>> } >>> >>> struct cluster_driver cdrv_zookeeper = { >>> @@ -890,7 +914,6 @@ struct cluster_driver cdrv_zookeeper = { >>> .join = zk_join, >>> .leave = zk_leave, >>> .notify = zk_notify, >>> - .dispatch = zk_dispatch, >>> }; >>> >>> cdrv_register(cdrv_zookeeper); >>> Index: sheepdog/sheep/group.c >>> =================================================================== >>> --- sheepdog.orig/sheep/group.c 2012-05-14 18:45:08.971970007 +0200 >>> +++ sheepdog/sheep/group.c 2012-05-14 18:54:38.735964575 +0200 >>> @@ -25,8 +25,6 @@ >>> #include "work.h" >>> #include "cluster.h" >>> >>> -static int cdrv_fd; >>> - >>> struct node { >>> struct sd_node ent; >>> struct list_head list; >>> @@ -262,24 +260,6 @@ void do_cluster_request(struct work *wor >>> free(msg); >>> } >>> >>> -static void group_handler(int listen_fd, int events, void *data) >>> -{ >>> - int ret; >>> - if (events & EPOLLHUP) { >>> - eprintf("received EPOLLHUP event: has corosync exited?\n"); >>> - goto out; >>> - } >>> - >>> - ret = sys->cdrv->dispatch(); >>> - if (ret == 0) >>> - return; >>> - else >>> - eprintf("oops... an error occurred inside corosync\n"); >>> -out: >>> - log_close(); >>> - exit(1); >>> -} >>> - >>> static inline int get_nodes_nr_from(struct list_head *l) >>> { >>> struct node *node; >>> @@ -577,9 +557,20 @@ static void finish_join(struct join_mess >>> } >>> } >>> >>> -static void update_cluster_info(struct join_message *msg, >>> - struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes) >>> +void update_epoch_info(void) >>> +{ >>> + if (sys_can_recover()) { >>> + sys->epoch++; >>> + update_epoch_store(sys->epoch); >>> + update_epoch_log(sys->epoch); >>> + } >>> +} >>> + >>> +void update_cluster_info(struct sd_node *joined, struct sd_node *nodes, >>> + size_t nr_nodes, void *opaque) >>> { >>> + struct join_message *msg = opaque; >>> + >>> eprintf("status = %d, epoch = %d, %x, %d\n", msg->cluster_status, >>> msg->epoch, msg->result, sys->join_finished); >>> >>> @@ -836,11 +827,6 @@ static void __sd_join_done(struct event_ >>> >>> print_node_list(sys->nodes, sys->nr_nodes); >>> >>> - if (!sys_stat_join_failed()) { >>> - update_cluster_info(jm, &w->joined, w->member_list, >>> - w->member_list_entries); >>> - } >>> - >>> if (sys_can_recover() && jm->inc_epoch) { >>> list_for_each_entry_safe(node, t, &sys->leave_list, list) { >>> list_del(&node->list); >>> @@ -866,11 +852,6 @@ static void __sd_leave_done(struct event >>> memcpy(sys->nodes, w->member_list, sizeof(*sys->nodes) * sys->nr_nodes); >>> qsort(sys->nodes, sys->nr_nodes, sizeof(*sys->nodes), node_cmp); >>> >>> - if (sys_can_recover()) { >>> - sys->epoch++; >>> - update_epoch_store(sys->epoch); >>> - update_epoch_log(sys->epoch); >>> - } >>> update_vnode_info(); >>> >>> print_node_list(sys->nodes, sys->nr_nodes); >>> @@ -943,7 +924,6 @@ static void event_fn(struct work *work) >>> static void event_done(struct work *work) >>> { >>> struct event_struct *cevent; >>> - int ret; >>> >>> if (!sys->cur_cevent) >>> vprintf(SDOG_ERR, "bug\n"); >>> @@ -973,9 +953,6 @@ static void event_done(struct work *work >>> vprintf(SDOG_DEBUG, "free %p\n", cevent); >>> event_free(cevent); >>> event_running = 0; >>> - ret = register_event(cdrv_fd, group_handler, NULL); >>> - if (ret) >>> - panic("failed to register event fd"); >>> >>> process_request_event_queues(); >>> } >>> @@ -1086,7 +1063,6 @@ static inline void process_event_queue(v >>> event_work.fn = event_fn; >>> event_work.done = event_done; >>> >>> - unregister_event(cdrv_fd); >>> queue_work(sys->event_wqueue, &event_work); >>> } >>> >>> @@ -1298,8 +1274,8 @@ int create_cluster(int port, int64_t zon >>> } >>> } >>> >>> - cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr); >>> - if (cdrv_fd < 0) >>> + ret = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr); >>> + if (ret < 0) >>> return -1; >>> >>> sys->this_node.port = port; >>> @@ -1327,12 +1303,6 @@ int create_cluster(int port, int64_t zon >>> INIT_LIST_HEAD(&sys->request_queue); >>> INIT_LIST_HEAD(&sys->event_queue); >>> >>> - ret = register_event(cdrv_fd, group_handler, NULL); >>> - if (ret) { >>> - eprintf("failed to register epoll events (%d)\n", ret); >>> - return 1; >>> - } >>> - >>> ret = send_join_request(&sys->this_node); >>> if (ret != 0) >>> return -1; >>> Index: sheepdog/sheep/cluster/local.c >>> =================================================================== >>> --- sheepdog.orig/sheep/cluster/local.c 2012-05-14 18:45:08.967970005 +0200 >>> +++ sheepdog/sheep/cluster/local.c 2012-05-14 18:57:05.299963176 +0200 >>> @@ -13,9 +13,11 @@ >>> #include <unistd.h> >>> #include <sys/mman.h> >>> #include <sys/signalfd.h> >>> +#include <sys/epoll.h> >>> #include <sys/file.h> >>> #include <search.h> >>> #include <signal.h> >>> +#include <sys/epoll.h> >>> #include <fcntl.h> >>> #include <assert.h> >>> >>> @@ -34,6 +36,8 @@ static struct sd_node this_node; >>> >>> static struct work_queue *local_block_wq; >>> >>> +static void local_dispatch(int listen_fd, int events, void *data); >>> + >>> enum local_event_type { >>> EVENT_JOIN = 1, >>> EVENT_LEAVE, >>> @@ -280,12 +284,12 @@ static void check_pids(void *arg) >>> add_timer(arg, 1); >>> } >>> >>> - >>> /* Local driver APIs */ >>> >>> static int local_init(const char *option, uint8_t *myaddr) >>> { >>> sigset_t mask; >>> + int ret; >>> static struct timer t = { >>> .callback = check_pids, >>> .data = &t, >>> @@ -319,7 +323,13 @@ static int local_init(const char *option >>> return -1; >>> } >>> >>> - return sigfd; >>> + ret = register_event(sigfd, local_dispatch, NULL); >>> + if (ret) { >>> + eprintf("failed to register epoll events (%d)\n", ret); >>> + return -1; >>> + } >>> + >>> + return 0; >>> } >>> >>> static int local_join(struct sd_node *myself, >>> @@ -379,7 +389,7 @@ static void local_block_done(struct work >>> { >>> } >>> >>> -static int local_dispatch(void) >>> +static void local_dispatch(int listen_fd, int events, void *data) >>> { >>> int ret; >>> struct signalfd_siginfo siginfo; >>> @@ -390,6 +400,11 @@ static int local_dispatch(void) >>> .done = local_block_done, >>> }; >>> >>> + if (events & EPOLLHUP) { >>> + eprintf("received EPOLLHUP event: has corosync exited?\n"); >>> + goto err; >>> + } >>> + >>> dprintf("read siginfo\n"); >>> ret = read(sigfd, &siginfo, sizeof(siginfo)); >>> assert(ret == sizeof(siginfo)); >>> @@ -456,7 +471,10 @@ static int local_dispatch(void) >>> out: >>> shm_queue_unlock(); >>> >>> - return 0; >>> + return; >>> +err: >>> + log_close(); >>> + exit(1); >>> } >>> >>> struct cluster_driver cdrv_local = { >>> @@ -466,7 +484,6 @@ struct cluster_driver cdrv_local = { >>> .join = local_join, >>> .leave = local_leave, >>> .notify = local_notify, >>> - .dispatch = local_dispatch, >>> }; >>> >>> cdrv_register(cdrv_local); >> Good! It can make driver's API more cleanly. >> >> I'll update this patch to reflect your suggestion in V2. >> >> I have read your latest patchsets, I think this patch can work nicely >> with yours. > > With a second thought on this patch: > https://mail-attachment.googleusercontent.com/attachment/u/0/?ui=2&ik=4a47af2829&view=att&th=1374cd782bf1a84a&attid=0&disp=inline&safe=1&zw&saduie=AG9B_P_WkgHa4wEYljmyBoplbIV_&sadet=1337064897009&sads=1zqvBm3krrhYKhyRnQOuHJT3znY correct this URL: http://lists.wpkg.org/pipermail/sheepdog/2012-May/003622.html > > I think your solution have changed too more things: a lot of > function's prototype. But with my solution, they are not necessary. We > should not introduce difficult solution just because it could let you > not need to rebase them. > > Can you wait my V2 which I want to make more testing. > >> >> Thank you very much. >> >> >> -- >> Yunkai Zhang >> Work at Taobao > > > > -- > Yunkai Zhang > Work at Taobao -- Yunkai Zhang Work at Taobao |