Now that we don't unregister and re-register the cluster driver event FD the need for the dispatch method goes away, instead the cluster drivers can handle their events locally. Signed-off-by: Christoph Hellwig <hch at lst.de> --- sheep/cluster.h | 16 -- sheep/cluster/accord.c | 122 ++++++++++--------- sheep/cluster/corosync.c | 117 ++++++++++-------- sheep/cluster/local.c | 100 ++++++++------- sheep/cluster/zookeeper.c | 90 ++++++++------ sheep/group.c | 30 ---- 7 files changed, 252 insertions(+), 223 deletions(-) Index: sheepdog/sheep/cluster.h =================================================================== --- sheepdog.orig/sheep/cluster.h 2012-05-17 15:41:54.571912094 +0200 +++ sheepdog/sheep/cluster.h 2012-05-18 11:08:29.235978546 +0200 @@ -94,19 +94,6 @@ struct cluster_driver { */ void (*unblock)(void *msg, size_t msg_len); - /* - * Dispatch handlers - * - * This function dispatches handlers according to the - * delivered events (join/leave/notify) in the cluster. - * - * Note that the events sequence is totally ordered; all nodes - * call the handlers in the same sequence. - * - * Returns zero on success, -1 on error - */ - int (*dispatch)(void); - struct list_head list; }; @@ -114,8 +101,7 @@ extern struct list_head cluster_drivers; #define cdrv_register(driver) \ static void __attribute__((constructor)) regist_ ## driver(void) { \ - if (!driver.init || !driver.join || !driver.leave || \ - !driver.notify || !driver.dispatch) \ + if (!driver.init || !driver.join || !driver.leave || !driver.notify) \ panic("the driver '%s' is incomplete\n", driver.name); \ list_add(&driver.list, &cluster_drivers); \ } Index: sheepdog/sheep/cluster/accord.c =================================================================== --- sheepdog.orig/sheep/cluster/accord.c 2012-05-17 15:41:54.575912094 +0200 +++ sheepdog/sheep/cluster/accord.c 2012-05-18 11:15:24.003974591 +0200 @@ -15,10 +15,12 @@ #include <search.h> #include <assert.h> #include <pthread.h> +#include <sys/epoll.h> #include <sys/eventfd.h> #include <accord/accord.h> #include "cluster.h" +#include "event.h" #include "work.h" #define MAX_EVENT_BUF_SIZE (64 * 1024) @@ -458,55 +460,6 @@ static void acrd_watch_fn(struct acrd_ha eventfd_write(efd, value); } -static int accord_init(const char *option, uint8_t *myaddr) -{ - if (!option) { - eprintf("specify one of the accord servers.\n"); - eprintf("e.g. sheep /store -c accord:127.0.0.1\n"); - return -1; - } - - pthread_mutex_lock(&start_lock); - - ahandle = acrd_init(option, 9090, acrd_join_fn, acrd_leave_fn, NULL); - if (!ahandle) { - eprintf("failed to connect to accrd server %s\n", option); - return -1; - } - - if (get_addr(myaddr) < 0) - return -1; - - efd = eventfd(0, EFD_NONBLOCK); - if (efd < 0) { - eprintf("failed to create an event fd: %m\n"); - return -1; - } - - acrd_wq = init_work_queue(1); - if (!acrd_wq) - eprintf("failed to create accord workqueue: %m\n"); - return -1; - } - - pthread_cond_wait(&start_cond, &start_lock); - pthread_mutex_unlock(&start_lock); - - if (need_cleanup) - for_each_acrd_file(ahandle, BASE_FILE, __acrd_del, NULL); - else { - queue_start_pos = -1; - queue_end_pos = -1; - for_each_acrd_file(ahandle, QUEUE_FILE, find_queue_end, - &queue_end_pos); - } - - acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL, - acrd_watch_fn, NULL); - - return efd; -} - static int accord_join(struct sd_node *myself, void *opaque, size_t opaque_len) { @@ -549,17 +502,24 @@ static void acrd_unblock(void *msg, size pthread_mutex_unlock(&queue_lock); } -static int accord_dispatch(void) +static void acrd_handler(int listen_fd, int events, void *data) { int ret; eventfd_t value; struct acrd_event ev; enum cluster_join_result res; + if (events & EPOLLHUP) { + eprintf("accord driver received EPOLLHUP event, exiting.\n"); + log_close(); + exit(1); + } + dprintf("read event\n"); + ret = eventfd_read(efd, &value); if (ret < 0) - return 0; + return; pthread_mutex_lock(&queue_lock); @@ -623,7 +583,64 @@ static int accord_dispatch(void) out: pthread_mutex_unlock(&queue_lock); - return 0; + return; +} + +static int accord_init(const char *option, uint8_t *myaddr) +{ + int ret; + + if (!option) { + eprintf("specify one of the accord servers.\n"); + eprintf("e.g. sheep /store -c accord:127.0.0.1\n"); + return -1; + } + + pthread_mutex_lock(&start_lock); + + ahandle = acrd_init(option, 9090, acrd_join_fn, acrd_leave_fn, NULL); + if (!ahandle) { + eprintf("failed to connect to accrd server %s\n", option); + return -1; + } + + if (get_addr(myaddr) < 0) + return -1; + + efd = eventfd(0, EFD_NONBLOCK); + if (efd < 0) { + eprintf("failed to create an event fd: %m\n"); + return -1; + } + + acrd_wq = init_work_queue(1); + if (!acrd_wq) + eprintf("failed to create accord workqueue: %m\n"); + return -1; + } + + pthread_cond_wait(&start_cond, &start_lock); + pthread_mutex_unlock(&start_lock); + + if (need_cleanup) + for_each_acrd_file(ahandle, BASE_FILE, __acrd_del, NULL); + else { + queue_start_pos = -1; + queue_end_pos = -1; + for_each_acrd_file(ahandle, QUEUE_FILE, find_queue_end, + &queue_end_pos); + } + + acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL, + acrd_watch_fn, NULL); + + ret = register_event(efd, acrd_handler, NULL); + if (ret) { + eprintf("failed to register accord event handler (%d)\n", ret); + return -1; + } + + return efd; } struct cluster_driver cdrv_accord = { @@ -635,7 +652,6 @@ struct cluster_driver cdrv_accord = { .notify = accord_notify, .block = accord_block, .unblock = accord_unblock, - .dispatch = accord_dispatch, }; cdrv_register(cdrv_accord); Index: sheepdog/sheep/cluster/corosync.c =================================================================== --- sheepdog.orig/sheep/cluster/corosync.c 2012-05-18 09:47:08.311994966 +0200 +++ sheepdog/sheep/cluster/corosync.c 2012-05-18 11:16:19.727974059 +0200 @@ -10,10 +10,12 @@ */ #include <stdio.h> #include <unistd.h> +#include <sys/epoll.h> #include <corosync/cpg.h> #include <corosync/cfg.h> #include "cluster.h" +#include "event.h" #include "work.h" struct cpg_node { @@ -596,51 +598,6 @@ static void cdrv_cpg_confchg(cpg_handle_ __corosync_dispatch(); } -static int corosync_init(const char *option, uint8_t *myaddr) -{ - int ret, fd; - uint32_t nodeid; - cpg_callbacks_t cb = { - .cpg_deliver_fn = cdrv_cpg_deliver, - .cpg_confchg_fn = cdrv_cpg_confchg - }; - - ret = cpg_initialize(&cpg_handle, &cb); - if (ret != CPG_OK) { - eprintf("failed to initialize cpg (%d) - is corosync running?\n", ret); - return -1; - } - - ret = corosync_cfg_initialize(&cfg_handle, NULL); - if (ret != CS_OK) { - vprintf(SDOG_ERR, "failed to initialize cfg (%d)\n", ret); - return -1; - } - - ret = corosync_cfg_local_get(cfg_handle, &nodeid); - if (ret != CS_OK) { - vprintf(SDOG_ERR, "failed to get node id (%d)\n", ret); - return -1; - } - - ret = nodeid_to_addr(nodeid, myaddr); - if (ret < 0) { - eprintf("failed to get local address\n"); - return -1; - } - - this_node.nodeid = nodeid; - this_node.pid = getpid(); - - ret = cpg_fd_get(cpg_handle, &fd); - if (ret != CPG_OK) { - eprintf("failed to get cpg file descriptor (%d)\n", ret); - return -1; - } - - return fd; -} - static int corosync_join(struct sd_node *myself, void *opaque, size_t opaque_len) { @@ -695,15 +652,78 @@ static int corosync_notify(void *msg, si NULL, 0, msg, msg_len); } -static int corosync_dispatch(void) +static void corosync_handler(int listen_fd, int events, void *data) { int ret; + if (events & EPOLLHUP) { + eprintf("corosync driver received EPOLLHUP event, exiting.\n"); + goto out; + } + ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL); - if (ret != CPG_OK) + if (ret != CPG_OK) { + eprintf("cpg_dispatch returned %d\n", ret); + goto out; + } + + return; +out: + log_close(); + exit(1); +} + +static int corosync_init(const char *option, uint8_t *myaddr) +{ + int ret, fd; + uint32_t nodeid; + cpg_callbacks_t cb = { + .cpg_deliver_fn = cdrv_cpg_deliver, + .cpg_confchg_fn = cdrv_cpg_confchg + }; + + ret = cpg_initialize(&cpg_handle, &cb); + if (ret != CPG_OK) { + eprintf("failed to initialize cpg (%d) - " + "is corosync running?\n", ret); return -1; + } - return 0; + ret = corosync_cfg_initialize(&cfg_handle, NULL); + if (ret != CS_OK) { + vprintf(SDOG_ERR, "failed to initialize cfg (%d)\n", ret); + return -1; + } + + ret = corosync_cfg_local_get(cfg_handle, &nodeid); + if (ret != CS_OK) { + vprintf(SDOG_ERR, "failed to get node id (%d)\n", ret); + return -1; + } + + ret = nodeid_to_addr(nodeid, myaddr); + if (ret < 0) { + eprintf("failed to get local address\n"); + return -1; + } + + this_node.nodeid = nodeid; + this_node.pid = getpid(); + + ret = cpg_fd_get(cpg_handle, &fd); + if (ret != CPG_OK) { + eprintf("failed to get cpg file descriptor (%d)\n", ret); + return -1; + } + + ret = register_event(fd, corosync_handler, NULL); + if (ret) { + eprintf("failed to register corosync event handler (%d)\n", + ret); + return -1; + } + + return fd; } struct cluster_driver cdrv_corosync = { @@ -715,7 +735,6 @@ struct cluster_driver cdrv_corosync = { .notify = corosync_notify, .block = corosync_block, .unblock = corosync_unblock, - .dispatch = corosync_dispatch, }; cdrv_register(cdrv_corosync); Index: sheepdog/sheep/cluster/local.c =================================================================== --- sheepdog.orig/sheep/cluster/local.c 2012-05-17 15:41:54.575912094 +0200 +++ sheepdog/sheep/cluster/local.c 2012-05-18 11:15:36.087974475 +0200 @@ -11,6 +11,7 @@ #include <stdio.h> #include <string.h> #include <unistd.h> +#include <sys/epoll.h> #include <sys/mman.h> #include <sys/signalfd.h> #include <sys/file.h> @@ -280,45 +281,6 @@ static void check_pids(void *arg) /* Local driver APIs */ -static int local_init(const char *option, uint8_t *myaddr) -{ - sigset_t mask; - static struct timer t = { - .callback = check_pids, - .data = &t, - }; - - if (option) - shmfile = option; - - /* set 127.0.0.1 */ - memset(myaddr, 0, 16); - myaddr[12] = 127; - myaddr[15] = 1; - - shm_queue_init(); - - sigemptyset(&mask); - sigaddset(&mask, SIGUSR1); - sigprocmask(SIG_BLOCK, &mask, NULL); - - sigfd = signalfd(-1, &mask, SFD_NONBLOCK); - if (sigfd < 0) { - eprintf("failed to create a signal fd: %m\n"); - return -1; - } - - add_timer(&t, 1); - - local_block_wq = init_work_queue(1); - if (!local_block_wq) { - eprintf("failed to create local workqueue: %m\n"); - return -1; - } - - return sigfd; -} - static int local_join(struct sd_node *myself, void *opaque, size_t opaque_len) { @@ -383,14 +345,21 @@ static void local_unblock(void *msg, siz shm_queue_unlock(); } -static int local_dispatch(void) +static void local_handler(int listen_fd, int events, void *data) { - int ret; struct signalfd_siginfo siginfo; struct local_event *ev; enum cluster_join_result res; + int ret; + + if (events & EPOLLHUP) { + eprintf("local driver received EPOLLHUP event, exiting.\n"); + log_close(); + exit(1); + } dprintf("read siginfo\n"); + ret = read(sigfd, &siginfo, sizeof(siginfo)); assert(ret == sizeof(siginfo)); @@ -454,7 +423,53 @@ static int local_dispatch(void) out: shm_queue_unlock(); - return 0; + return; +} + +static int local_init(const char *option, uint8_t *myaddr) +{ + sigset_t mask; + int ret; + static struct timer t = { + .callback = check_pids, + .data = &t, + }; + + if (option) + shmfile = option; + + /* set 127.0.0.1 */ + memset(myaddr, 0, 16); + myaddr[12] = 127; + myaddr[15] = 1; + + shm_queue_init(); + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR1); + sigprocmask(SIG_BLOCK, &mask, NULL); + + sigfd = signalfd(-1, &mask, SFD_NONBLOCK); + if (sigfd < 0) { + eprintf("failed to create a signal fd: %m\n"); + return -1; + } + + add_timer(&t, 1); + + local_block_wq = init_work_queue(1); + if (!local_block_wq) { + eprintf("failed to create local workqueue: %m\n"); + return -1; + } + + ret = register_event(sigfd, local_handler, NULL); + if (ret) { + eprintf("failed to register local event handler (%d)\n", ret); + return -1; + } + + return sigfd; } struct cluster_driver cdrv_local = { @@ -466,7 +481,6 @@ struct cluster_driver cdrv_local = { .notify = local_notify, .block = local_block, .unblock = local_unblock, - .dispatch = local_dispatch, }; cdrv_register(cdrv_local); Index: sheepdog/sheep/cluster/zookeeper.c =================================================================== --- sheepdog.orig/sheep/cluster/zookeeper.c 2012-05-17 15:41:54.575912094 +0200 +++ sheepdog/sheep/cluster/zookeeper.c 2012-05-18 11:17:06.619973612 +0200 @@ -14,11 +14,13 @@ #include <netdb.h> #include <search.h> #include <assert.h> +#include <sys/epoll.h> #include <sys/eventfd.h> #include <zookeeper/zookeeper.h> #include <urcu/uatomic.h> #include "cluster.h" +#include "event.h" #include "work.h" #define MAX_EVENT_BUF_SIZE (64 * 1024) @@ -638,36 +640,6 @@ static int get_addr(uint8_t *bytes) return 0; } -static int zk_init(const char *option, uint8_t *myaddr) -{ - if (!option) { - eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n"); - eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n"); - return -1; - } - - zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, 0, NULL, 0); - if (!zhandle) { - eprintf("failed to connect to zk server %s\n", option); - return -1; - } - dprintf("request session timeout:%dms, negotiated session timeout:%dms\n", - SESSION_TIMEOUT, zoo_recv_timeout(zhandle)); - - if (get_addr(myaddr) < 0) - return -1; - - zk_queue_init(zhandle); - - efd = eventfd(0, EFD_NONBLOCK); - if (efd < 0) { - eprintf("failed to create an event fd: %m\n"); - return -1; - } - - return efd; -} - static int zk_join(struct sd_node *myself, void *opaque, size_t opaque_len) { @@ -735,7 +707,7 @@ static void zk_unblock(void *msg, size_t eventfd_write(efd, value); } -static int zk_dispatch(void) +static void zk_handler(int listen_fd, int events, void *data) { int ret, rc, retry; char path[256]; @@ -744,13 +716,20 @@ static int zk_dispatch(void) struct zk_node *n; enum cluster_join_result res; + if (events & EPOLLHUP) { + eprintf("zookeeper driver received EPOLLHUP event, exiting.\n"); + log_close(); + exit(1); + } + dprintf("read event\n"); + ret = eventfd_read(efd, &value); if (ret < 0) - return 0; + return; if (uatomic_read(&zk_notify_blocked)) - return 0; + return; ret = zk_queue_pop(zhandle, &ev); if (ret < 0) @@ -864,7 +843,49 @@ static int zk_dispatch(void) break; } out: - return 0; + return; +} + +static int zk_init(const char *option, uint8_t *myaddr) +{ + int ret; + + if (!option) { + eprintf("specify comma separated host:port pairs, " + "each corresponding to a zk server.\n"); + eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:" + "3000,127.0.0.1:3001,127.0.0.1:3002\n"); + return -1; + } + + zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, 0, NULL, 0); + if (!zhandle) { + eprintf("failed to connect to zk server %s\n", option); + return -1; + } + dprintf("request session timeout:%dms, " + "negotiated session timeout:%dms\n", + SESSION_TIMEOUT, zoo_recv_timeout(zhandle)); + + if (get_addr(myaddr) < 0) + return -1; + + zk_queue_init(zhandle); + + efd = eventfd(0, EFD_NONBLOCK); + if (efd < 0) { + eprintf("failed to create an event fd: %m\n"); + return -1; + } + + ret = register_event(efd, zk_handler, NULL); + if (ret) { + eprintf("failed to register zookeeper event handler (%d)\n", + ret); + return -1; + } + + return efd; } struct cluster_driver cdrv_zookeeper = { @@ -876,7 +897,6 @@ struct cluster_driver cdrv_zookeeper = { .notify = zk_notify, .block = zk_block, .unblock = zk_unblock, - .dispatch = zk_dispatch, }; cdrv_register(cdrv_zookeeper); Index: sheepdog/sheep/group.c =================================================================== --- sheepdog.orig/sheep/group.c 2012-05-18 09:47:08.311994966 +0200 +++ sheepdog/sheep/group.c 2012-05-18 11:02:56.695981717 +0200 @@ -26,8 +26,6 @@ #include "work.h" #include "cluster.h" -static int cdrv_fd; - struct node { struct sd_node ent; struct list_head list; @@ -335,24 +333,6 @@ static void queue_cluster_request(struct } } -static void group_handler(int listen_fd, int events, void *data) -{ - int ret; - if (events & EPOLLHUP) { - eprintf("received EPOLLHUP event: has corosync exited?\n"); - goto out; - } - - ret = sys->cdrv->dispatch(); - if (ret == 0) - return; - else - eprintf("oops... an error occurred inside corosync\n"); -out: - log_close(); - exit(1); -} - static inline int get_nodes_nr_from(struct list_head *l) { struct node *node; @@ -1366,8 +1346,8 @@ int create_cluster(int port, int64_t zon } } - cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr); - if (cdrv_fd < 0) + ret = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr); + if (ret < 0) return -1; sys->this_node.port = port; @@ -1395,12 +1375,6 @@ int create_cluster(int port, int64_t zon INIT_LIST_HEAD(&sys->request_queue); INIT_LIST_HEAD(&sys->event_queue); - ret = register_event(cdrv_fd, group_handler, NULL); - if (ret) { - eprintf("failed to register epoll events (%d)\n", ret); - return 1; - } - ret = send_join_request(&sys->this_node); if (ret != 0) return -1; |