From: Yunkai Zhang <qiushu.zyk at taobao.com> Call update_cluster_info() in driver directly so that we can remove unregister_event() from sd_xxx_handler(). This can speed up joinng/leaving process. Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- sheep/cluster.h | 3 +++ sheep/cluster/accord.c | 4 ++++ sheep/cluster/corosync.c | 5 +++++ sheep/cluster/zookeeper.c | 7 ++++++- sheep/group.c | 30 +++++++++++++----------------- 5 files changed, 31 insertions(+), 18 deletions(-) diff --git a/sheep/cluster.h b/sheep/cluster.h index d543e99..513c20d 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -191,5 +191,8 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members, void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len); enum cluster_join_result sd_check_join_cb(struct sd_node *joining, void *opaque); +void update_cluster_info(struct sd_node *joined, struct sd_node *nodes, + size_t nr_nodes, void *opaque); +void update_epoch_info(void); #endif diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c index 1fdca91..607c657 100644 --- a/sheep/cluster/accord.c +++ b/sheep/cluster/accord.c @@ -603,10 +603,14 @@ static int accord_dispatch(void) acrd_queue_pop(ahandle, &ev); } + if (ev.join_result == CJ_RES_SUCCESS) + update_cluster_info(&ev.sender, ev.nodes, + ev.nr_nodes, ev.buf); sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes, ev.join_result, ev.buf); break; case EVENT_LEAVE: + update_epoch_info(); sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); break; case EVENT_NOTIFY: diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index 4a588e9..bb4ce47 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -322,6 +322,10 @@ static int __corosync_dispatch_one(struct corosync_event *cevent) case CJ_RES_FAIL: case CJ_RES_JOIN_LATER: build_node_list(cpg_nodes, nr_cpg_nodes, entries); + if (cevent->result == CJ_RES_SUCCESS) + update_cluster_info(&cevent->sender.ent, + entries, nr_cpg_nodes, + cevent->msg); sd_join_handler(&cevent->sender.ent, entries, nr_cpg_nodes, cevent->result, cevent->msg); @@ -334,6 +338,7 @@ static int __corosync_dispatch_one(struct corosync_event *cevent) break; cevent->sender.ent = cpg_nodes[idx].ent; + update_epoch_info(); del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); nr_cpg_nodes--; build_node_list(cpg_nodes, nr_cpg_nodes, entries); diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 491056a..826abfe 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -824,6 +824,10 @@ static int zk_dispatch(void) } node_btree_add(&zk_node_btroot, &ev.sender); + build_node_list(zk_node_btroot); + if (ev.join_result == CJ_RES_SUCCESS) + update_cluster_info(&ev.sender.node, sd_nodes, + nr_sd_nodes, ev.buf); dprintf("one sheep joined[down], nr_nodes:%ld, sender:%s, joined:%d\n", nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined); @@ -841,7 +845,6 @@ static int zk_dispatch(void) } } - build_node_list(zk_node_btroot); sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes, ev.join_result, ev.buf); break; @@ -853,6 +856,8 @@ static int zk_dispatch(void) goto out; } + update_epoch_info(); + node_btree_del(&zk_node_btroot, n); dprintf("one sheep left, nr_nodes:%ld\n", nr_zk_nodes); diff --git a/sheep/group.c b/sheep/group.c index 73a5ba7..e9433c9 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -577,9 +577,20 @@ static void finish_join(struct join_message *msg, struct sd_node *joined, } } -static void update_cluster_info(struct join_message *msg, - struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes) +void update_epoch_info(void) { + if (sys_can_recover()) { + sys->epoch++; + update_epoch_store(sys->epoch); + update_epoch_log(sys->epoch); + } +} + +void update_cluster_info(struct sd_node *joined, struct sd_node *nodes, + size_t nr_nodes, void *opaque) +{ + struct join_message *msg = opaque; + eprintf("status = %d, epoch = %d, %x, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished); @@ -836,11 +847,6 @@ static void __sd_join_done(struct event_struct *cevent) print_node_list(sys->nodes, sys->nr_nodes); - if (!sys_stat_join_failed()) { - update_cluster_info(jm, &w->joined, w->member_list, - w->member_list_entries); - } - if (sys_can_recover() && jm->inc_epoch) { list_for_each_entry_safe(node, t, &sys->leave_list, list) { list_del(&node->list); @@ -866,11 +872,6 @@ static void __sd_leave_done(struct event_struct *cevent) memcpy(sys->nodes, w->member_list, sizeof(*sys->nodes) * sys->nr_nodes); qsort(sys->nodes, sys->nr_nodes, sizeof(*sys->nodes), node_cmp); - if (sys_can_recover()) { - sys->epoch++; - update_epoch_store(sys->epoch); - update_epoch_log(sys->epoch); - } update_vnode_info(); print_node_list(sys->nodes, sys->nr_nodes); @@ -943,7 +944,6 @@ static void event_fn(struct work *work) static void event_done(struct work *work) { struct event_struct *cevent; - int ret; if (!sys->cur_cevent) vprintf(SDOG_ERR, "bug\n"); @@ -973,9 +973,6 @@ static void event_done(struct work *work) vprintf(SDOG_DEBUG, "free %p\n", cevent); event_free(cevent); event_running = 0; - ret = register_event(cdrv_fd, group_handler, NULL); - if (ret) - panic("failed to register event fd"); process_request_event_queues(); } @@ -1086,7 +1083,6 @@ static inline void process_event_queue(void) event_work.fn = event_fn; event_work.done = event_done; - unregister_event(cdrv_fd); queue_work(sys->event_wqueue, &event_work); } -- 1.7.7.6 |