[sheepdog] [PATCH] sheep: remove the dispatch handler
Christoph Hellwig
hch at infradead.org
Fri May 18 11:30:40 CEST 2012
Now that we don't unregister and re-register the cluster driver event FD
the need for the dispatch method goes away, instead the cluster drivers
can handle their events locally.
Signed-off-by: Christoph Hellwig <hch at lst.de>
---
sheep/cluster.h | 16 --
sheep/cluster/accord.c | 122 ++++++++++---------
sheep/cluster/corosync.c | 117 ++++++++++--------
sheep/cluster/local.c | 100 ++++++++-------
sheep/cluster/zookeeper.c | 90 ++++++++------
sheep/group.c | 30 ----
7 files changed, 252 insertions(+), 223 deletions(-)
Index: sheepdog/sheep/cluster.h
===================================================================
--- sheepdog.orig/sheep/cluster.h 2012-05-17 15:41:54.571912094 +0200
+++ sheepdog/sheep/cluster.h 2012-05-18 11:08:29.235978546 +0200
@@ -94,19 +94,6 @@ struct cluster_driver {
*/
void (*unblock)(void *msg, size_t msg_len);
- /*
- * Dispatch handlers
- *
- * This function dispatches handlers according to the
- * delivered events (join/leave/notify) in the cluster.
- *
- * Note that the events sequence is totally ordered; all nodes
- * call the handlers in the same sequence.
- *
- * Returns zero on success, -1 on error
- */
- int (*dispatch)(void);
-
struct list_head list;
};
@@ -114,8 +101,7 @@ extern struct list_head cluster_drivers;
#define cdrv_register(driver) \
static void __attribute__((constructor)) regist_ ## driver(void) { \
- if (!driver.init || !driver.join || !driver.leave || \
- !driver.notify || !driver.dispatch) \
+ if (!driver.init || !driver.join || !driver.leave || !driver.notify) \
panic("the driver '%s' is incomplete\n", driver.name); \
list_add(&driver.list, &cluster_drivers); \
}
Index: sheepdog/sheep/cluster/accord.c
===================================================================
--- sheepdog.orig/sheep/cluster/accord.c 2012-05-17 15:41:54.575912094 +0200
+++ sheepdog/sheep/cluster/accord.c 2012-05-18 11:15:24.003974591 +0200
@@ -15,10 +15,12 @@
#include <search.h>
#include <assert.h>
#include <pthread.h>
+#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <accord/accord.h>
#include "cluster.h"
+#include "event.h"
#include "work.h"
#define MAX_EVENT_BUF_SIZE (64 * 1024)
@@ -458,55 +460,6 @@ static void acrd_watch_fn(struct acrd_ha
eventfd_write(efd, value);
}
-static int accord_init(const char *option, uint8_t *myaddr)
-{
- if (!option) {
- eprintf("specify one of the accord servers.\n");
- eprintf("e.g. sheep /store -c accord:127.0.0.1\n");
- return -1;
- }
-
- pthread_mutex_lock(&start_lock);
-
- ahandle = acrd_init(option, 9090, acrd_join_fn, acrd_leave_fn, NULL);
- if (!ahandle) {
- eprintf("failed to connect to accrd server %s\n", option);
- return -1;
- }
-
- if (get_addr(myaddr) < 0)
- return -1;
-
- efd = eventfd(0, EFD_NONBLOCK);
- if (efd < 0) {
- eprintf("failed to create an event fd: %m\n");
- return -1;
- }
-
- acrd_wq = init_work_queue(1);
- if (!acrd_wq)
- eprintf("failed to create accord workqueue: %m\n");
- return -1;
- }
-
- pthread_cond_wait(&start_cond, &start_lock);
- pthread_mutex_unlock(&start_lock);
-
- if (need_cleanup)
- for_each_acrd_file(ahandle, BASE_FILE, __acrd_del, NULL);
- else {
- queue_start_pos = -1;
- queue_end_pos = -1;
- for_each_acrd_file(ahandle, QUEUE_FILE, find_queue_end,
- &queue_end_pos);
- }
-
- acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL,
- acrd_watch_fn, NULL);
-
- return efd;
-}
-
static int accord_join(struct sd_node *myself,
void *opaque, size_t opaque_len)
{
@@ -549,17 +502,24 @@ static void acrd_unblock(void *msg, size
pthread_mutex_unlock(&queue_lock);
}
-static int accord_dispatch(void)
+static void acrd_handler(int listen_fd, int events, void *data)
{
int ret;
eventfd_t value;
struct acrd_event ev;
enum cluster_join_result res;
+ if (events & EPOLLHUP) {
+ eprintf("accord driver received EPOLLHUP event, exiting.\n");
+ log_close();
+ exit(1);
+ }
+
dprintf("read event\n");
+
ret = eventfd_read(efd, &value);
if (ret < 0)
- return 0;
+ return;
pthread_mutex_lock(&queue_lock);
@@ -623,7 +583,64 @@ static int accord_dispatch(void)
out:
pthread_mutex_unlock(&queue_lock);
- return 0;
+ return;
+}
+
+static int accord_init(const char *option, uint8_t *myaddr)
+{
+ int ret;
+
+ if (!option) {
+ eprintf("specify one of the accord servers.\n");
+ eprintf("e.g. sheep /store -c accord:127.0.0.1\n");
+ return -1;
+ }
+
+ pthread_mutex_lock(&start_lock);
+
+ ahandle = acrd_init(option, 9090, acrd_join_fn, acrd_leave_fn, NULL);
+ if (!ahandle) {
+ eprintf("failed to connect to accrd server %s\n", option);
+ return -1;
+ }
+
+ if (get_addr(myaddr) < 0)
+ return -1;
+
+ efd = eventfd(0, EFD_NONBLOCK);
+ if (efd < 0) {
+ eprintf("failed to create an event fd: %m\n");
+ return -1;
+ }
+
+ acrd_wq = init_work_queue(1);
+ if (!acrd_wq)
+ eprintf("failed to create accord workqueue: %m\n");
+ return -1;
+ }
+
+ pthread_cond_wait(&start_cond, &start_lock);
+ pthread_mutex_unlock(&start_lock);
+
+ if (need_cleanup)
+ for_each_acrd_file(ahandle, BASE_FILE, __acrd_del, NULL);
+ else {
+ queue_start_pos = -1;
+ queue_end_pos = -1;
+ for_each_acrd_file(ahandle, QUEUE_FILE, find_queue_end,
+ &queue_end_pos);
+ }
+
+ acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL,
+ acrd_watch_fn, NULL);
+
+ ret = register_event(efd, acrd_handler, NULL);
+ if (ret) {
+ eprintf("failed to register accord event handler (%d)\n", ret);
+ return -1;
+ }
+
+ return efd;
}
struct cluster_driver cdrv_accord = {
@@ -635,7 +652,6 @@ struct cluster_driver cdrv_accord = {
.notify = accord_notify,
.block = accord_block,
.unblock = accord_unblock,
- .dispatch = accord_dispatch,
};
cdrv_register(cdrv_accord);
Index: sheepdog/sheep/cluster/corosync.c
===================================================================
--- sheepdog.orig/sheep/cluster/corosync.c 2012-05-18 09:47:08.311994966 +0200
+++ sheepdog/sheep/cluster/corosync.c 2012-05-18 11:16:19.727974059 +0200
@@ -10,10 +10,12 @@
*/
#include <stdio.h>
#include <unistd.h>
+#include <sys/epoll.h>
#include <corosync/cpg.h>
#include <corosync/cfg.h>
#include "cluster.h"
+#include "event.h"
#include "work.h"
struct cpg_node {
@@ -596,51 +598,6 @@ static void cdrv_cpg_confchg(cpg_handle_
__corosync_dispatch();
}
-static int corosync_init(const char *option, uint8_t *myaddr)
-{
- int ret, fd;
- uint32_t nodeid;
- cpg_callbacks_t cb = {
- .cpg_deliver_fn = cdrv_cpg_deliver,
- .cpg_confchg_fn = cdrv_cpg_confchg
- };
-
- ret = cpg_initialize(&cpg_handle, &cb);
- if (ret != CPG_OK) {
- eprintf("failed to initialize cpg (%d) - is corosync running?\n", ret);
- return -1;
- }
-
- ret = corosync_cfg_initialize(&cfg_handle, NULL);
- if (ret != CS_OK) {
- vprintf(SDOG_ERR, "failed to initialize cfg (%d)\n", ret);
- return -1;
- }
-
- ret = corosync_cfg_local_get(cfg_handle, &nodeid);
- if (ret != CS_OK) {
- vprintf(SDOG_ERR, "failed to get node id (%d)\n", ret);
- return -1;
- }
-
- ret = nodeid_to_addr(nodeid, myaddr);
- if (ret < 0) {
- eprintf("failed to get local address\n");
- return -1;
- }
-
- this_node.nodeid = nodeid;
- this_node.pid = getpid();
-
- ret = cpg_fd_get(cpg_handle, &fd);
- if (ret != CPG_OK) {
- eprintf("failed to get cpg file descriptor (%d)\n", ret);
- return -1;
- }
-
- return fd;
-}
-
static int corosync_join(struct sd_node *myself,
void *opaque, size_t opaque_len)
{
@@ -695,15 +652,78 @@ static int corosync_notify(void *msg, si
NULL, 0, msg, msg_len);
}
-static int corosync_dispatch(void)
+static void corosync_handler(int listen_fd, int events, void *data)
{
int ret;
+ if (events & EPOLLHUP) {
+ eprintf("corosync driver received EPOLLHUP event, exiting.\n");
+ goto out;
+ }
+
ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL);
- if (ret != CPG_OK)
+ if (ret != CPG_OK) {
+ eprintf("cpg_dispatch returned %d\n", ret);
+ goto out;
+ }
+
+ return;
+out:
+ log_close();
+ exit(1);
+}
+
+static int corosync_init(const char *option, uint8_t *myaddr)
+{
+ int ret, fd;
+ uint32_t nodeid;
+ cpg_callbacks_t cb = {
+ .cpg_deliver_fn = cdrv_cpg_deliver,
+ .cpg_confchg_fn = cdrv_cpg_confchg
+ };
+
+ ret = cpg_initialize(&cpg_handle, &cb);
+ if (ret != CPG_OK) {
+ eprintf("failed to initialize cpg (%d) - "
+ "is corosync running?\n", ret);
return -1;
+ }
- return 0;
+ ret = corosync_cfg_initialize(&cfg_handle, NULL);
+ if (ret != CS_OK) {
+ vprintf(SDOG_ERR, "failed to initialize cfg (%d)\n", ret);
+ return -1;
+ }
+
+ ret = corosync_cfg_local_get(cfg_handle, &nodeid);
+ if (ret != CS_OK) {
+ vprintf(SDOG_ERR, "failed to get node id (%d)\n", ret);
+ return -1;
+ }
+
+ ret = nodeid_to_addr(nodeid, myaddr);
+ if (ret < 0) {
+ eprintf("failed to get local address\n");
+ return -1;
+ }
+
+ this_node.nodeid = nodeid;
+ this_node.pid = getpid();
+
+ ret = cpg_fd_get(cpg_handle, &fd);
+ if (ret != CPG_OK) {
+ eprintf("failed to get cpg file descriptor (%d)\n", ret);
+ return -1;
+ }
+
+ ret = register_event(fd, corosync_handler, NULL);
+ if (ret) {
+ eprintf("failed to register corosync event handler (%d)\n",
+ ret);
+ return -1;
+ }
+
+ return fd;
}
struct cluster_driver cdrv_corosync = {
@@ -715,7 +735,6 @@ struct cluster_driver cdrv_corosync = {
.notify = corosync_notify,
.block = corosync_block,
.unblock = corosync_unblock,
- .dispatch = corosync_dispatch,
};
cdrv_register(cdrv_corosync);
Index: sheepdog/sheep/cluster/local.c
===================================================================
--- sheepdog.orig/sheep/cluster/local.c 2012-05-17 15:41:54.575912094 +0200
+++ sheepdog/sheep/cluster/local.c 2012-05-18 11:15:36.087974475 +0200
@@ -11,6 +11,7 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
+#include <sys/epoll.h>
#include <sys/mman.h>
#include <sys/signalfd.h>
#include <sys/file.h>
@@ -280,45 +281,6 @@ static void check_pids(void *arg)
/* Local driver APIs */
-static int local_init(const char *option, uint8_t *myaddr)
-{
- sigset_t mask;
- static struct timer t = {
- .callback = check_pids,
- .data = &t,
- };
-
- if (option)
- shmfile = option;
-
- /* set 127.0.0.1 */
- memset(myaddr, 0, 16);
- myaddr[12] = 127;
- myaddr[15] = 1;
-
- shm_queue_init();
-
- sigemptyset(&mask);
- sigaddset(&mask, SIGUSR1);
- sigprocmask(SIG_BLOCK, &mask, NULL);
-
- sigfd = signalfd(-1, &mask, SFD_NONBLOCK);
- if (sigfd < 0) {
- eprintf("failed to create a signal fd: %m\n");
- return -1;
- }
-
- add_timer(&t, 1);
-
- local_block_wq = init_work_queue(1);
- if (!local_block_wq) {
- eprintf("failed to create local workqueue: %m\n");
- return -1;
- }
-
- return sigfd;
-}
-
static int local_join(struct sd_node *myself,
void *opaque, size_t opaque_len)
{
@@ -383,14 +345,21 @@ static void local_unblock(void *msg, siz
shm_queue_unlock();
}
-static int local_dispatch(void)
+static void local_handler(int listen_fd, int events, void *data)
{
- int ret;
struct signalfd_siginfo siginfo;
struct local_event *ev;
enum cluster_join_result res;
+ int ret;
+
+ if (events & EPOLLHUP) {
+ eprintf("local driver received EPOLLHUP event, exiting.\n");
+ log_close();
+ exit(1);
+ }
dprintf("read siginfo\n");
+
ret = read(sigfd, &siginfo, sizeof(siginfo));
assert(ret == sizeof(siginfo));
@@ -454,7 +423,53 @@ static int local_dispatch(void)
out:
shm_queue_unlock();
- return 0;
+ return;
+}
+
+static int local_init(const char *option, uint8_t *myaddr)
+{
+ sigset_t mask;
+ int ret;
+ static struct timer t = {
+ .callback = check_pids,
+ .data = &t,
+ };
+
+ if (option)
+ shmfile = option;
+
+ /* set 127.0.0.1 */
+ memset(myaddr, 0, 16);
+ myaddr[12] = 127;
+ myaddr[15] = 1;
+
+ shm_queue_init();
+
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGUSR1);
+ sigprocmask(SIG_BLOCK, &mask, NULL);
+
+ sigfd = signalfd(-1, &mask, SFD_NONBLOCK);
+ if (sigfd < 0) {
+ eprintf("failed to create a signal fd: %m\n");
+ return -1;
+ }
+
+ add_timer(&t, 1);
+
+ local_block_wq = init_work_queue(1);
+ if (!local_block_wq) {
+ eprintf("failed to create local workqueue: %m\n");
+ return -1;
+ }
+
+ ret = register_event(sigfd, local_handler, NULL);
+ if (ret) {
+ eprintf("failed to register local event handler (%d)\n", ret);
+ return -1;
+ }
+
+ return sigfd;
}
struct cluster_driver cdrv_local = {
@@ -466,7 +481,6 @@ struct cluster_driver cdrv_local = {
.notify = local_notify,
.block = local_block,
.unblock = local_unblock,
- .dispatch = local_dispatch,
};
cdrv_register(cdrv_local);
Index: sheepdog/sheep/cluster/zookeeper.c
===================================================================
--- sheepdog.orig/sheep/cluster/zookeeper.c 2012-05-17 15:41:54.575912094 +0200
+++ sheepdog/sheep/cluster/zookeeper.c 2012-05-18 11:17:06.619973612 +0200
@@ -14,11 +14,13 @@
#include <netdb.h>
#include <search.h>
#include <assert.h>
+#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <zookeeper/zookeeper.h>
#include <urcu/uatomic.h>
#include "cluster.h"
+#include "event.h"
#include "work.h"
#define MAX_EVENT_BUF_SIZE (64 * 1024)
@@ -638,36 +640,6 @@ static int get_addr(uint8_t *bytes)
return 0;
}
-static int zk_init(const char *option, uint8_t *myaddr)
-{
- if (!option) {
- eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n");
- eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n");
- return -1;
- }
-
- zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, 0, NULL, 0);
- if (!zhandle) {
- eprintf("failed to connect to zk server %s\n", option);
- return -1;
- }
- dprintf("request session timeout:%dms, negotiated session timeout:%dms\n",
- SESSION_TIMEOUT, zoo_recv_timeout(zhandle));
-
- if (get_addr(myaddr) < 0)
- return -1;
-
- zk_queue_init(zhandle);
-
- efd = eventfd(0, EFD_NONBLOCK);
- if (efd < 0) {
- eprintf("failed to create an event fd: %m\n");
- return -1;
- }
-
- return efd;
-}
-
static int zk_join(struct sd_node *myself,
void *opaque, size_t opaque_len)
{
@@ -735,7 +707,7 @@ static void zk_unblock(void *msg, size_t
eventfd_write(efd, value);
}
-static int zk_dispatch(void)
+static void zk_handler(int listen_fd, int events, void *data)
{
int ret, rc, retry;
char path[256];
@@ -744,13 +716,20 @@ static int zk_dispatch(void)
struct zk_node *n;
enum cluster_join_result res;
+ if (events & EPOLLHUP) {
+ eprintf("zookeeper driver received EPOLLHUP event, exiting.\n");
+ log_close();
+ exit(1);
+ }
+
dprintf("read event\n");
+
ret = eventfd_read(efd, &value);
if (ret < 0)
- return 0;
+ return;
if (uatomic_read(&zk_notify_blocked))
- return 0;
+ return;
ret = zk_queue_pop(zhandle, &ev);
if (ret < 0)
@@ -864,7 +843,49 @@ static int zk_dispatch(void)
break;
}
out:
- return 0;
+ return;
+}
+
+static int zk_init(const char *option, uint8_t *myaddr)
+{
+ int ret;
+
+ if (!option) {
+ eprintf("specify comma separated host:port pairs, "
+ "each corresponding to a zk server.\n");
+ eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:"
+ "3000,127.0.0.1:3001,127.0.0.1:3002\n");
+ return -1;
+ }
+
+ zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, 0, NULL, 0);
+ if (!zhandle) {
+ eprintf("failed to connect to zk server %s\n", option);
+ return -1;
+ }
+ dprintf("request session timeout:%dms, "
+ "negotiated session timeout:%dms\n",
+ SESSION_TIMEOUT, zoo_recv_timeout(zhandle));
+
+ if (get_addr(myaddr) < 0)
+ return -1;
+
+ zk_queue_init(zhandle);
+
+ efd = eventfd(0, EFD_NONBLOCK);
+ if (efd < 0) {
+ eprintf("failed to create an event fd: %m\n");
+ return -1;
+ }
+
+ ret = register_event(efd, zk_handler, NULL);
+ if (ret) {
+ eprintf("failed to register zookeeper event handler (%d)\n",
+ ret);
+ return -1;
+ }
+
+ return efd;
}
struct cluster_driver cdrv_zookeeper = {
@@ -876,7 +897,6 @@ struct cluster_driver cdrv_zookeeper = {
.notify = zk_notify,
.block = zk_block,
.unblock = zk_unblock,
- .dispatch = zk_dispatch,
};
cdrv_register(cdrv_zookeeper);
Index: sheepdog/sheep/group.c
===================================================================
--- sheepdog.orig/sheep/group.c 2012-05-18 09:47:08.311994966 +0200
+++ sheepdog/sheep/group.c 2012-05-18 11:02:56.695981717 +0200
@@ -26,8 +26,6 @@
#include "work.h"
#include "cluster.h"
-static int cdrv_fd;
-
struct node {
struct sd_node ent;
struct list_head list;
@@ -335,24 +333,6 @@ static void queue_cluster_request(struct
}
}
-static void group_handler(int listen_fd, int events, void *data)
-{
- int ret;
- if (events & EPOLLHUP) {
- eprintf("received EPOLLHUP event: has corosync exited?\n");
- goto out;
- }
-
- ret = sys->cdrv->dispatch();
- if (ret == 0)
- return;
- else
- eprintf("oops... an error occurred inside corosync\n");
-out:
- log_close();
- exit(1);
-}
-
static inline int get_nodes_nr_from(struct list_head *l)
{
struct node *node;
@@ -1366,8 +1346,8 @@ int create_cluster(int port, int64_t zon
}
}
- cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
- if (cdrv_fd < 0)
+ ret = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
+ if (ret < 0)
return -1;
sys->this_node.port = port;
@@ -1395,12 +1375,6 @@ int create_cluster(int port, int64_t zon
INIT_LIST_HEAD(&sys->request_queue);
INIT_LIST_HEAD(&sys->event_queue);
- ret = register_event(cdrv_fd, group_handler, NULL);
- if (ret) {
- eprintf("failed to register epoll events (%d)\n", ret);
- return 1;
- }
-
ret = send_join_request(&sys->this_node);
if (ret != 0)
return -1;
More information about the sheepdog
mailing list