[Sheepdog] [PATCH] Remove unregister_event from sd_xxx_handler()
Yunkai Zhang
yunkai.me at gmail.com
Tue May 15 05:57:24 CEST 2012
On Tue, May 15, 2012 at 1:08 AM, Christoph Hellwig <hch at infradead.org> wrote:
> On Tue, May 15, 2012 at 12:46:13AM +0800, Yunkai Zhang wrote:
>> > another thing I noticed is that I think your patch makes the ->dispatch
>> > cluster driver operation obsolete. Once we never register the event
>> > handler for the cluster driver FD it can be handled netirely inside the
>> > cluster driver, e.g. just call register_event from inside the init
>> > method and don't even tell the core about it.
>> >
>> We have called register_event(cdrv_fd, ...) in create_cluster().
>
> Yes, but we could also do this in the drivers. See below for an
> untested version of your patch that implements this suggestions:
>
> From: Yunkai Zhang <yunkai.me at gmail.com>
> Subject: [Sheepdog] [PATCH] Remove unregister_event from sd_xxx_handler()
>
> From: Yunkai Zhang <qiushu.zyk at taobao.com>
>
> Call update_cluster_info() in driver directly so that we can remove
> unregister_event() from sd_xxx_handler().
>
> This can speed up joinng/leaving process.
>
> Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
> ---
> sheep/cluster.h | 3 +++
> sheep/cluster/accord.c | 4 ++++
> sheep/cluster/corosync.c | 5 +++++
> sheep/cluster/zookeeper.c | 7 ++++++-
> sheep/group.c | 30 +++++++++++++-----------------
> 5 files changed, 31 insertions(+), 18 deletions(-)
>
> Index: sheepdog/sheep/cluster.h
> ===================================================================
> --- sheepdog.orig/sheep/cluster.h 2012-05-14 18:45:08.967970005 +0200
> +++ sheepdog/sheep/cluster.h 2012-05-14 18:49:03.759967772 +0200
> @@ -86,19 +86,6 @@ struct cluster_driver {
> */
> int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
>
> - /*
> - * Dispatch handlers
> - *
> - * This function dispatches handlers according to the
> - * delivered events (join/leave/notify) in the cluster.
> - *
> - * Note that the events sequence is totally ordered; all nodes
> - * call the handlers in the same sequence.
> - *
> - * Returns zero on success, -1 on error
> - */
> - int (*dispatch)(void);
> -
> struct list_head list;
> };
>
> @@ -107,7 +94,7 @@ extern struct list_head cluster_drivers;
> #define cdrv_register(driver) \
> static void __attribute__((constructor)) regist_ ## driver(void) { \
> if (!driver.init || !driver.join || !driver.leave || \
> - !driver.notify || !driver.dispatch) \
> + !driver.notify) \
> panic("the driver '%s' is incomplete\n", driver.name); \
> list_add(&driver.list, &cluster_drivers); \
> }
> @@ -191,5 +178,8 @@ void sd_leave_handler(struct sd_node *le
> void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
> enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
> void *opaque);
> +void update_cluster_info(struct sd_node *joined, struct sd_node *nodes,
> + size_t nr_nodes, void *opaque);
> +void update_epoch_info(void);
>
> #endif
> Index: sheepdog/sheep/cluster/accord.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/accord.c 2012-05-11 17:51:27.000000000 +0200
> +++ sheepdog/sheep/cluster/accord.c 2012-05-14 18:56:36.855963441 +0200
> @@ -16,6 +16,7 @@
> #include <assert.h>
> #include <pthread.h>
> #include <sys/eventfd.h>
> +#include <sys/epoll.h>
> #include <accord/accord.h>
>
> #include "cluster.h"
> @@ -463,6 +464,8 @@ static void acrd_watch_fn(struct acrd_ha
> eventfd_write(efd, value);
> }
>
> +static void accord_dispatch(int listen_fd, int events, void *data);
> +
> static int accord_init(const char *option, uint8_t *myaddr)
> {
> if (!option) {
> @@ -509,7 +512,13 @@ static int accord_init(const char *optio
> acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL,
> acrd_watch_fn, NULL);
>
> - return efd;
> + ret = register_event(efd, accord_dispatch, NULL);
> + if (ret) {
> + eprintf("failed to register epoll events (%d)\n", ret);
> + return -1;
> + }
> +
> + return 0;
> }
>
> static int accord_join(struct sd_node *myself,
> @@ -550,7 +559,7 @@ static void acrd_block_done(struct work
> {
> }
>
> -static int accord_dispatch(void)
> +static void accord_dispatch(int listen_fd, int events, void *data)
> {
> int ret;
> eventfd_t value;
> @@ -561,6 +570,11 @@ static int accord_dispatch(void)
> .done = acrd_block_done,
> };
>
> + if (events & EPOLLHUP) {
> + eprintf("received EPOLLHUP event: has accord exited?\n");
> + goto out;
> + }
> +
> dprintf("read event\n");
> ret = eventfd_read(efd, &value);
> if (ret < 0)
> @@ -603,10 +617,14 @@ static int accord_dispatch(void)
> acrd_queue_pop(ahandle, &ev);
> }
>
> + if (ev.join_result == CJ_RES_SUCCESS)
> + update_cluster_info(&ev.sender, ev.nodes,
> + ev.nr_nodes, ev.buf);
> sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
> ev.join_result, ev.buf);
> break;
> case EVENT_LEAVE:
> + update_epoch_info();
> sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
> break;
> case EVENT_NOTIFY:
> @@ -628,8 +646,10 @@ static int accord_dispatch(void)
> }
> out:
> pthread_mutex_unlock(&queue_lock);
> -
> return 0;
> +err:
> + log_close();
> + exit(1);
> }
>
> struct cluster_driver cdrv_accord = {
> @@ -639,7 +659,6 @@ struct cluster_driver cdrv_accord = {
> .join = accord_join,
> .leave = accord_leave,
> .notify = accord_notify,
> - .dispatch = accord_dispatch,
> };
>
> cdrv_register(cdrv_accord);
> Index: sheepdog/sheep/cluster/corosync.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/corosync.c 2012-05-14 18:45:08.967970005 +0200
> +++ sheepdog/sheep/cluster/corosync.c 2012-05-14 18:57:48.279962765 +0200
> @@ -12,7 +12,9 @@
> #include <unistd.h>
> #include <corosync/cpg.h>
> #include <corosync/cfg.h>
> +#include <sys/epoll.h>
>
> +#include "event.h"
> #include "cluster.h"
> #include "work.h"
>
> @@ -322,6 +324,10 @@ static int __corosync_dispatch_one(struc
> case CJ_RES_FAIL:
> case CJ_RES_JOIN_LATER:
> build_node_list(cpg_nodes, nr_cpg_nodes, entries);
> + if (cevent->result == CJ_RES_SUCCESS)
> + update_cluster_info(&cevent->sender.ent,
> + entries, nr_cpg_nodes,
> + cevent->msg);
> sd_join_handler(&cevent->sender.ent, entries,
> nr_cpg_nodes, cevent->result,
> cevent->msg);
> @@ -334,6 +340,7 @@ static int __corosync_dispatch_one(struc
> break;
> cevent->sender.ent = cpg_nodes[idx].ent;
>
> + update_epoch_info();
> del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
> nr_cpg_nodes--;
> build_node_list(cpg_nodes, nr_cpg_nodes, entries);
> @@ -614,6 +621,28 @@ static void cdrv_cpg_confchg(cpg_handle_
> __corosync_dispatch();
> }
>
> +static void corosync_dispatch(int listen_fd, int events, void *data)
> +{
> + int ret;
> +
> + if (events & EPOLLHUP) {
> + eprintf("received EPOLLHUP event: has corosync exited?\n");
> + goto out;
> + }
> +
> + ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL);
> + if (ret != CPG_OK) {
> + eprintf("oops... an error occurred inside corosync\n");
> + goto out;
> + }
> +
> + return;
> +out:
> + log_close();
> + exit(1);
> +}
> +
> +
> static int corosync_init(const char *option, uint8_t *myaddr)
> {
> int ret, fd;
> @@ -662,7 +691,13 @@ static int corosync_init(const char *opt
> return -1;
> }
>
> - return fd;
> + ret = register_event(fd, corosync_dispatch, NULL);
> + if (ret) {
> + eprintf("failed to register epoll events (%d)\n", ret);
> + return -1;
> + }
> +
> + return 0;
> }
>
> static int corosync_join(struct sd_node *myself,
> @@ -728,17 +763,6 @@ static int corosync_notify(void *msg, si
> return ret;
> }
>
> -static int corosync_dispatch(void)
> -{
> - int ret;
> -
> - ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL);
> - if (ret != CPG_OK)
> - return -1;
> -
> - return 0;
> -}
> -
> struct cluster_driver cdrv_corosync = {
> .name = "corosync",
>
> @@ -746,7 +770,6 @@ struct cluster_driver cdrv_corosync = {
> .join = corosync_join,
> .leave = corosync_leave,
> .notify = corosync_notify,
> - .dispatch = corosync_dispatch,
> };
>
> cdrv_register(cdrv_corosync);
> Index: sheepdog/sheep/cluster/zookeeper.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/zookeeper.c 2012-05-14 18:45:08.971970007 +0200
> +++ sheepdog/sheep/cluster/zookeeper.c 2012-05-14 18:58:38.199962294 +0200
> @@ -14,10 +14,12 @@
> #include <netdb.h>
> #include <search.h>
> #include <assert.h>
> +#include <sys/epoll.h>
> #include <sys/eventfd.h>
> #include <zookeeper/zookeeper.h>
> #include <urcu/uatomic.h>
>
> +#include "event.h"
> #include "cluster.h"
> #include "work.h"
>
> @@ -30,6 +32,7 @@
> #define QUEUE_ZNODE BASE_ZNODE "/queue"
> #define MEMBER_ZNODE BASE_ZNODE "/member"
>
> +static void zk_dispatch(int listen_fd, int events, void *data);
>
> /* zookeeper API wrapper prototypes */
> ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
> @@ -647,6 +650,8 @@ static int get_addr(uint8_t *bytes)
>
> static int zk_init(const char *option, uint8_t *myaddr)
> {
> + int ret;
> +
> if (!option) {
> eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n");
> eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n");
> @@ -678,7 +683,13 @@ static int zk_init(const char *option, u
> return -1;
> }
>
> - return efd;
> + ret = register_event(efd, zk_dispatch, NULL);
> + if (ret) {
> + eprintf("failed to register epoll events (%d)\n", ret);
> + return -1;
> + }
> +
> + return 0;
> }
>
> static int zk_join(struct sd_node *myself,
> @@ -745,7 +756,7 @@ static void zk_block_done(struct work *w
> {
> }
>
> -static int zk_dispatch(void)
> +static void zk_dispatch(int listen_fd, int events, void *data)
> {
> int ret, rc, retry;
> char path[256];
> @@ -758,13 +769,18 @@ static int zk_dispatch(void)
> .done = zk_block_done,
> };
>
> + if (events & EPOLLHUP) {
> + eprintf("received EPOLLHUP event: has corosync exited?\n");
> + goto err;
> + }
> +
> dprintf("read event\n");
> ret = eventfd_read(efd, &value);
> if (ret < 0)
> - return 0;
> + return;
>
> if (uatomic_read(&zk_notify_blocked))
> - return 0;
> + return;
>
> ret = zk_queue_pop(zhandle, &ev);
> if (ret < 0)
> @@ -824,6 +840,10 @@ static int zk_dispatch(void)
> }
>
> node_btree_add(&zk_node_btroot, &ev.sender);
> + build_node_list(zk_node_btroot);
> + if (ev.join_result == CJ_RES_SUCCESS)
> + update_cluster_info(&ev.sender.node, sd_nodes,
> + nr_sd_nodes, ev.buf);
> dprintf("one sheep joined[down], nr_nodes:%ld, sender:%s, joined:%d\n",
> nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
>
> @@ -841,7 +861,6 @@ static int zk_dispatch(void)
> }
> }
>
> - build_node_list(zk_node_btroot);
> sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes,
> ev.join_result, ev.buf);
> break;
> @@ -853,6 +872,8 @@ static int zk_dispatch(void)
> goto out;
> }
>
> + update_epoch_info();
> +
> node_btree_del(&zk_node_btroot, n);
> dprintf("one sheep left, nr_nodes:%ld\n", nr_zk_nodes);
>
> @@ -880,7 +901,10 @@ static int zk_dispatch(void)
> break;
> }
> out:
> - return 0;
> + return;
> +err:
> + log_close();
> + exit(1);
> }
>
> struct cluster_driver cdrv_zookeeper = {
> @@ -890,7 +914,6 @@ struct cluster_driver cdrv_zookeeper = {
> .join = zk_join,
> .leave = zk_leave,
> .notify = zk_notify,
> - .dispatch = zk_dispatch,
> };
>
> cdrv_register(cdrv_zookeeper);
> Index: sheepdog/sheep/group.c
> ===================================================================
> --- sheepdog.orig/sheep/group.c 2012-05-14 18:45:08.971970007 +0200
> +++ sheepdog/sheep/group.c 2012-05-14 18:54:38.735964575 +0200
> @@ -25,8 +25,6 @@
> #include "work.h"
> #include "cluster.h"
>
> -static int cdrv_fd;
> -
> struct node {
> struct sd_node ent;
> struct list_head list;
> @@ -262,24 +260,6 @@ void do_cluster_request(struct work *wor
> free(msg);
> }
>
> -static void group_handler(int listen_fd, int events, void *data)
> -{
> - int ret;
> - if (events & EPOLLHUP) {
> - eprintf("received EPOLLHUP event: has corosync exited?\n");
> - goto out;
> - }
> -
> - ret = sys->cdrv->dispatch();
> - if (ret == 0)
> - return;
> - else
> - eprintf("oops... an error occurred inside corosync\n");
> -out:
> - log_close();
> - exit(1);
> -}
> -
> static inline int get_nodes_nr_from(struct list_head *l)
> {
> struct node *node;
> @@ -577,9 +557,20 @@ static void finish_join(struct join_mess
> }
> }
>
> -static void update_cluster_info(struct join_message *msg,
> - struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes)
> +void update_epoch_info(void)
> +{
> + if (sys_can_recover()) {
> + sys->epoch++;
> + update_epoch_store(sys->epoch);
> + update_epoch_log(sys->epoch);
> + }
> +}
> +
> +void update_cluster_info(struct sd_node *joined, struct sd_node *nodes,
> + size_t nr_nodes, void *opaque)
> {
> + struct join_message *msg = opaque;
> +
> eprintf("status = %d, epoch = %d, %x, %d\n", msg->cluster_status,
> msg->epoch, msg->result, sys->join_finished);
>
> @@ -836,11 +827,6 @@ static void __sd_join_done(struct event_
>
> print_node_list(sys->nodes, sys->nr_nodes);
>
> - if (!sys_stat_join_failed()) {
> - update_cluster_info(jm, &w->joined, w->member_list,
> - w->member_list_entries);
> - }
> -
> if (sys_can_recover() && jm->inc_epoch) {
> list_for_each_entry_safe(node, t, &sys->leave_list, list) {
> list_del(&node->list);
> @@ -866,11 +852,6 @@ static void __sd_leave_done(struct event
> memcpy(sys->nodes, w->member_list, sizeof(*sys->nodes) * sys->nr_nodes);
> qsort(sys->nodes, sys->nr_nodes, sizeof(*sys->nodes), node_cmp);
>
> - if (sys_can_recover()) {
> - sys->epoch++;
> - update_epoch_store(sys->epoch);
> - update_epoch_log(sys->epoch);
> - }
> update_vnode_info();
>
> print_node_list(sys->nodes, sys->nr_nodes);
> @@ -943,7 +924,6 @@ static void event_fn(struct work *work)
> static void event_done(struct work *work)
> {
> struct event_struct *cevent;
> - int ret;
>
> if (!sys->cur_cevent)
> vprintf(SDOG_ERR, "bug\n");
> @@ -973,9 +953,6 @@ static void event_done(struct work *work
> vprintf(SDOG_DEBUG, "free %p\n", cevent);
> event_free(cevent);
> event_running = 0;
> - ret = register_event(cdrv_fd, group_handler, NULL);
> - if (ret)
> - panic("failed to register event fd");
>
> process_request_event_queues();
> }
> @@ -1086,7 +1063,6 @@ static inline void process_event_queue(v
> event_work.fn = event_fn;
> event_work.done = event_done;
>
> - unregister_event(cdrv_fd);
> queue_work(sys->event_wqueue, &event_work);
> }
>
> @@ -1298,8 +1274,8 @@ int create_cluster(int port, int64_t zon
> }
> }
>
> - cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
> - if (cdrv_fd < 0)
> + ret = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
> + if (ret < 0)
> return -1;
>
> sys->this_node.port = port;
> @@ -1327,12 +1303,6 @@ int create_cluster(int port, int64_t zon
> INIT_LIST_HEAD(&sys->request_queue);
> INIT_LIST_HEAD(&sys->event_queue);
>
> - ret = register_event(cdrv_fd, group_handler, NULL);
> - if (ret) {
> - eprintf("failed to register epoll events (%d)\n", ret);
> - return 1;
> - }
> -
> ret = send_join_request(&sys->this_node);
> if (ret != 0)
> return -1;
> Index: sheepdog/sheep/cluster/local.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/local.c 2012-05-14 18:45:08.967970005 +0200
> +++ sheepdog/sheep/cluster/local.c 2012-05-14 18:57:05.299963176 +0200
> @@ -13,9 +13,11 @@
> #include <unistd.h>
> #include <sys/mman.h>
> #include <sys/signalfd.h>
> +#include <sys/epoll.h>
> #include <sys/file.h>
> #include <search.h>
> #include <signal.h>
> +#include <sys/epoll.h>
> #include <fcntl.h>
> #include <assert.h>
>
> @@ -34,6 +36,8 @@ static struct sd_node this_node;
>
> static struct work_queue *local_block_wq;
>
> +static void local_dispatch(int listen_fd, int events, void *data);
> +
> enum local_event_type {
> EVENT_JOIN = 1,
> EVENT_LEAVE,
> @@ -280,12 +284,12 @@ static void check_pids(void *arg)
> add_timer(arg, 1);
> }
>
> -
> /* Local driver APIs */
>
> static int local_init(const char *option, uint8_t *myaddr)
> {
> sigset_t mask;
> + int ret;
> static struct timer t = {
> .callback = check_pids,
> .data = &t,
> @@ -319,7 +323,13 @@ static int local_init(const char *option
> return -1;
> }
>
> - return sigfd;
> + ret = register_event(sigfd, local_dispatch, NULL);
> + if (ret) {
> + eprintf("failed to register epoll events (%d)\n", ret);
> + return -1;
> + }
> +
> + return 0;
> }
>
> static int local_join(struct sd_node *myself,
> @@ -379,7 +389,7 @@ static void local_block_done(struct work
> {
> }
>
> -static int local_dispatch(void)
> +static void local_dispatch(int listen_fd, int events, void *data)
> {
> int ret;
> struct signalfd_siginfo siginfo;
> @@ -390,6 +400,11 @@ static int local_dispatch(void)
> .done = local_block_done,
> };
>
> + if (events & EPOLLHUP) {
> + eprintf("received EPOLLHUP event: has corosync exited?\n");
> + goto err;
> + }
> +
> dprintf("read siginfo\n");
> ret = read(sigfd, &siginfo, sizeof(siginfo));
> assert(ret == sizeof(siginfo));
> @@ -456,7 +471,10 @@ static int local_dispatch(void)
> out:
> shm_queue_unlock();
>
> - return 0;
> + return;
> +err:
> + log_close();
> + exit(1);
> }
>
> struct cluster_driver cdrv_local = {
> @@ -466,7 +484,6 @@ struct cluster_driver cdrv_local = {
> .join = local_join,
> .leave = local_leave,
> .notify = local_notify,
> - .dispatch = local_dispatch,
> };
>
> cdrv_register(cdrv_local);
Good! It can make driver's API more cleanly.
I'll update this patch to reflect your suggestion in V2.
I have read your latest patchsets, I think this patch can work nicely
with yours.
Thank you very much.
--
Yunkai Zhang
Work at Taobao
More information about the sheepdog
mailing list