[Sheepdog] [PATCH] Remove unregister_event from sd_xxx_handler()
Christoph Hellwig
hch at infradead.org
Mon May 14 19:08:31 CEST 2012
On Tue, May 15, 2012 at 12:46:13AM +0800, Yunkai Zhang wrote:
> > another thing I noticed is that I think your patch makes the ->dispatch
> > cluster driver operation obsolete. Once we never register the event
> > handler for the cluster driver FD it can be handled netirely inside the
> > cluster driver, e.g. just call register_event from inside the init
> > method and don't even tell the core about it.
> >
> We have called register_event(cdrv_fd, ...) in create_cluster().
Yes, but we could also do this in the drivers. See below for an
untested version of your patch that implements this suggestions:
From: Yunkai Zhang <yunkai.me at gmail.com>
Subject: [Sheepdog] [PATCH] Remove unregister_event from sd_xxx_handler()
From: Yunkai Zhang <qiushu.zyk at taobao.com>
Call update_cluster_info() in driver directly so that we can remove
unregister_event() from sd_xxx_handler().
This can speed up joinng/leaving process.
Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
sheep/cluster.h | 3 +++
sheep/cluster/accord.c | 4 ++++
sheep/cluster/corosync.c | 5 +++++
sheep/cluster/zookeeper.c | 7 ++++++-
sheep/group.c | 30 +++++++++++++-----------------
5 files changed, 31 insertions(+), 18 deletions(-)
Index: sheepdog/sheep/cluster.h
===================================================================
--- sheepdog.orig/sheep/cluster.h 2012-05-14 18:45:08.967970005 +0200
+++ sheepdog/sheep/cluster.h 2012-05-14 18:49:03.759967772 +0200
@@ -86,19 +86,6 @@ struct cluster_driver {
*/
int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
- /*
- * Dispatch handlers
- *
- * This function dispatches handlers according to the
- * delivered events (join/leave/notify) in the cluster.
- *
- * Note that the events sequence is totally ordered; all nodes
- * call the handlers in the same sequence.
- *
- * Returns zero on success, -1 on error
- */
- int (*dispatch)(void);
-
struct list_head list;
};
@@ -107,7 +94,7 @@ extern struct list_head cluster_drivers;
#define cdrv_register(driver) \
static void __attribute__((constructor)) regist_ ## driver(void) { \
if (!driver.init || !driver.join || !driver.leave || \
- !driver.notify || !driver.dispatch) \
+ !driver.notify) \
panic("the driver '%s' is incomplete\n", driver.name); \
list_add(&driver.list, &cluster_drivers); \
}
@@ -191,5 +178,8 @@ void sd_leave_handler(struct sd_node *le
void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
void *opaque);
+void update_cluster_info(struct sd_node *joined, struct sd_node *nodes,
+ size_t nr_nodes, void *opaque);
+void update_epoch_info(void);
#endif
Index: sheepdog/sheep/cluster/accord.c
===================================================================
--- sheepdog.orig/sheep/cluster/accord.c 2012-05-11 17:51:27.000000000 +0200
+++ sheepdog/sheep/cluster/accord.c 2012-05-14 18:56:36.855963441 +0200
@@ -16,6 +16,7 @@
#include <assert.h>
#include <pthread.h>
#include <sys/eventfd.h>
+#include <sys/epoll.h>
#include <accord/accord.h>
#include "cluster.h"
@@ -463,6 +464,8 @@ static void acrd_watch_fn(struct acrd_ha
eventfd_write(efd, value);
}
+static void accord_dispatch(int listen_fd, int events, void *data);
+
static int accord_init(const char *option, uint8_t *myaddr)
{
if (!option) {
@@ -509,7 +512,13 @@ static int accord_init(const char *optio
acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL,
acrd_watch_fn, NULL);
- return efd;
+ ret = register_event(efd, accord_dispatch, NULL);
+ if (ret) {
+ eprintf("failed to register epoll events (%d)\n", ret);
+ return -1;
+ }
+
+ return 0;
}
static int accord_join(struct sd_node *myself,
@@ -550,7 +559,7 @@ static void acrd_block_done(struct work
{
}
-static int accord_dispatch(void)
+static void accord_dispatch(int listen_fd, int events, void *data)
{
int ret;
eventfd_t value;
@@ -561,6 +570,11 @@ static int accord_dispatch(void)
.done = acrd_block_done,
};
+ if (events & EPOLLHUP) {
+ eprintf("received EPOLLHUP event: has accord exited?\n");
+ goto out;
+ }
+
dprintf("read event\n");
ret = eventfd_read(efd, &value);
if (ret < 0)
@@ -603,10 +617,14 @@ static int accord_dispatch(void)
acrd_queue_pop(ahandle, &ev);
}
+ if (ev.join_result == CJ_RES_SUCCESS)
+ update_cluster_info(&ev.sender, ev.nodes,
+ ev.nr_nodes, ev.buf);
sd_join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
ev.join_result, ev.buf);
break;
case EVENT_LEAVE:
+ update_epoch_info();
sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
break;
case EVENT_NOTIFY:
@@ -628,8 +646,10 @@ static int accord_dispatch(void)
}
out:
pthread_mutex_unlock(&queue_lock);
-
return 0;
+err:
+ log_close();
+ exit(1);
}
struct cluster_driver cdrv_accord = {
@@ -639,7 +659,6 @@ struct cluster_driver cdrv_accord = {
.join = accord_join,
.leave = accord_leave,
.notify = accord_notify,
- .dispatch = accord_dispatch,
};
cdrv_register(cdrv_accord);
Index: sheepdog/sheep/cluster/corosync.c
===================================================================
--- sheepdog.orig/sheep/cluster/corosync.c 2012-05-14 18:45:08.967970005 +0200
+++ sheepdog/sheep/cluster/corosync.c 2012-05-14 18:57:48.279962765 +0200
@@ -12,7 +12,9 @@
#include <unistd.h>
#include <corosync/cpg.h>
#include <corosync/cfg.h>
+#include <sys/epoll.h>
+#include "event.h"
#include "cluster.h"
#include "work.h"
@@ -322,6 +324,10 @@ static int __corosync_dispatch_one(struc
case CJ_RES_FAIL:
case CJ_RES_JOIN_LATER:
build_node_list(cpg_nodes, nr_cpg_nodes, entries);
+ if (cevent->result == CJ_RES_SUCCESS)
+ update_cluster_info(&cevent->sender.ent,
+ entries, nr_cpg_nodes,
+ cevent->msg);
sd_join_handler(&cevent->sender.ent, entries,
nr_cpg_nodes, cevent->result,
cevent->msg);
@@ -334,6 +340,7 @@ static int __corosync_dispatch_one(struc
break;
cevent->sender.ent = cpg_nodes[idx].ent;
+ update_epoch_info();
del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
nr_cpg_nodes--;
build_node_list(cpg_nodes, nr_cpg_nodes, entries);
@@ -614,6 +621,28 @@ static void cdrv_cpg_confchg(cpg_handle_
__corosync_dispatch();
}
+static void corosync_dispatch(int listen_fd, int events, void *data)
+{
+ int ret;
+
+ if (events & EPOLLHUP) {
+ eprintf("received EPOLLHUP event: has corosync exited?\n");
+ goto out;
+ }
+
+ ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL);
+ if (ret != CPG_OK) {
+ eprintf("oops... an error occurred inside corosync\n");
+ goto out;
+ }
+
+ return;
+out:
+ log_close();
+ exit(1);
+}
+
+
static int corosync_init(const char *option, uint8_t *myaddr)
{
int ret, fd;
@@ -662,7 +691,13 @@ static int corosync_init(const char *opt
return -1;
}
- return fd;
+ ret = register_event(fd, corosync_dispatch, NULL);
+ if (ret) {
+ eprintf("failed to register epoll events (%d)\n", ret);
+ return -1;
+ }
+
+ return 0;
}
static int corosync_join(struct sd_node *myself,
@@ -728,17 +763,6 @@ static int corosync_notify(void *msg, si
return ret;
}
-static int corosync_dispatch(void)
-{
- int ret;
-
- ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL);
- if (ret != CPG_OK)
- return -1;
-
- return 0;
-}
-
struct cluster_driver cdrv_corosync = {
.name = "corosync",
@@ -746,7 +770,6 @@ struct cluster_driver cdrv_corosync = {
.join = corosync_join,
.leave = corosync_leave,
.notify = corosync_notify,
- .dispatch = corosync_dispatch,
};
cdrv_register(cdrv_corosync);
Index: sheepdog/sheep/cluster/zookeeper.c
===================================================================
--- sheepdog.orig/sheep/cluster/zookeeper.c 2012-05-14 18:45:08.971970007 +0200
+++ sheepdog/sheep/cluster/zookeeper.c 2012-05-14 18:58:38.199962294 +0200
@@ -14,10 +14,12 @@
#include <netdb.h>
#include <search.h>
#include <assert.h>
+#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <zookeeper/zookeeper.h>
#include <urcu/uatomic.h>
+#include "event.h"
#include "cluster.h"
#include "work.h"
@@ -30,6 +32,7 @@
#define QUEUE_ZNODE BASE_ZNODE "/queue"
#define MEMBER_ZNODE BASE_ZNODE "/member"
+static void zk_dispatch(int listen_fd, int events, void *data);
/* zookeeper API wrapper prototypes */
ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
@@ -647,6 +650,8 @@ static int get_addr(uint8_t *bytes)
static int zk_init(const char *option, uint8_t *myaddr)
{
+ int ret;
+
if (!option) {
eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n");
eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n");
@@ -678,7 +683,13 @@ static int zk_init(const char *option, u
return -1;
}
- return efd;
+ ret = register_event(efd, zk_dispatch, NULL);
+ if (ret) {
+ eprintf("failed to register epoll events (%d)\n", ret);
+ return -1;
+ }
+
+ return 0;
}
static int zk_join(struct sd_node *myself,
@@ -745,7 +756,7 @@ static void zk_block_done(struct work *w
{
}
-static int zk_dispatch(void)
+static void zk_dispatch(int listen_fd, int events, void *data)
{
int ret, rc, retry;
char path[256];
@@ -758,13 +769,18 @@ static int zk_dispatch(void)
.done = zk_block_done,
};
+ if (events & EPOLLHUP) {
+ eprintf("received EPOLLHUP event: has corosync exited?\n");
+ goto err;
+ }
+
dprintf("read event\n");
ret = eventfd_read(efd, &value);
if (ret < 0)
- return 0;
+ return;
if (uatomic_read(&zk_notify_blocked))
- return 0;
+ return;
ret = zk_queue_pop(zhandle, &ev);
if (ret < 0)
@@ -824,6 +840,10 @@ static int zk_dispatch(void)
}
node_btree_add(&zk_node_btroot, &ev.sender);
+ build_node_list(zk_node_btroot);
+ if (ev.join_result == CJ_RES_SUCCESS)
+ update_cluster_info(&ev.sender.node, sd_nodes,
+ nr_sd_nodes, ev.buf);
dprintf("one sheep joined[down], nr_nodes:%ld, sender:%s, joined:%d\n",
nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
@@ -841,7 +861,6 @@ static int zk_dispatch(void)
}
}
- build_node_list(zk_node_btroot);
sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes,
ev.join_result, ev.buf);
break;
@@ -853,6 +872,8 @@ static int zk_dispatch(void)
goto out;
}
+ update_epoch_info();
+
node_btree_del(&zk_node_btroot, n);
dprintf("one sheep left, nr_nodes:%ld\n", nr_zk_nodes);
@@ -880,7 +901,10 @@ static int zk_dispatch(void)
break;
}
out:
- return 0;
+ return;
+err:
+ log_close();
+ exit(1);
}
struct cluster_driver cdrv_zookeeper = {
@@ -890,7 +914,6 @@ struct cluster_driver cdrv_zookeeper = {
.join = zk_join,
.leave = zk_leave,
.notify = zk_notify,
- .dispatch = zk_dispatch,
};
cdrv_register(cdrv_zookeeper);
Index: sheepdog/sheep/group.c
===================================================================
--- sheepdog.orig/sheep/group.c 2012-05-14 18:45:08.971970007 +0200
+++ sheepdog/sheep/group.c 2012-05-14 18:54:38.735964575 +0200
@@ -25,8 +25,6 @@
#include "work.h"
#include "cluster.h"
-static int cdrv_fd;
-
struct node {
struct sd_node ent;
struct list_head list;
@@ -262,24 +260,6 @@ void do_cluster_request(struct work *wor
free(msg);
}
-static void group_handler(int listen_fd, int events, void *data)
-{
- int ret;
- if (events & EPOLLHUP) {
- eprintf("received EPOLLHUP event: has corosync exited?\n");
- goto out;
- }
-
- ret = sys->cdrv->dispatch();
- if (ret == 0)
- return;
- else
- eprintf("oops... an error occurred inside corosync\n");
-out:
- log_close();
- exit(1);
-}
-
static inline int get_nodes_nr_from(struct list_head *l)
{
struct node *node;
@@ -577,9 +557,20 @@ static void finish_join(struct join_mess
}
}
-static void update_cluster_info(struct join_message *msg,
- struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes)
+void update_epoch_info(void)
+{
+ if (sys_can_recover()) {
+ sys->epoch++;
+ update_epoch_store(sys->epoch);
+ update_epoch_log(sys->epoch);
+ }
+}
+
+void update_cluster_info(struct sd_node *joined, struct sd_node *nodes,
+ size_t nr_nodes, void *opaque)
{
+ struct join_message *msg = opaque;
+
eprintf("status = %d, epoch = %d, %x, %d\n", msg->cluster_status,
msg->epoch, msg->result, sys->join_finished);
@@ -836,11 +827,6 @@ static void __sd_join_done(struct event_
print_node_list(sys->nodes, sys->nr_nodes);
- if (!sys_stat_join_failed()) {
- update_cluster_info(jm, &w->joined, w->member_list,
- w->member_list_entries);
- }
-
if (sys_can_recover() && jm->inc_epoch) {
list_for_each_entry_safe(node, t, &sys->leave_list, list) {
list_del(&node->list);
@@ -866,11 +852,6 @@ static void __sd_leave_done(struct event
memcpy(sys->nodes, w->member_list, sizeof(*sys->nodes) * sys->nr_nodes);
qsort(sys->nodes, sys->nr_nodes, sizeof(*sys->nodes), node_cmp);
- if (sys_can_recover()) {
- sys->epoch++;
- update_epoch_store(sys->epoch);
- update_epoch_log(sys->epoch);
- }
update_vnode_info();
print_node_list(sys->nodes, sys->nr_nodes);
@@ -943,7 +924,6 @@ static void event_fn(struct work *work)
static void event_done(struct work *work)
{
struct event_struct *cevent;
- int ret;
if (!sys->cur_cevent)
vprintf(SDOG_ERR, "bug\n");
@@ -973,9 +953,6 @@ static void event_done(struct work *work
vprintf(SDOG_DEBUG, "free %p\n", cevent);
event_free(cevent);
event_running = 0;
- ret = register_event(cdrv_fd, group_handler, NULL);
- if (ret)
- panic("failed to register event fd");
process_request_event_queues();
}
@@ -1086,7 +1063,6 @@ static inline void process_event_queue(v
event_work.fn = event_fn;
event_work.done = event_done;
- unregister_event(cdrv_fd);
queue_work(sys->event_wqueue, &event_work);
}
@@ -1298,8 +1274,8 @@ int create_cluster(int port, int64_t zon
}
}
- cdrv_fd = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
- if (cdrv_fd < 0)
+ ret = sys->cdrv->init(sys->cdrv_option, sys->this_node.addr);
+ if (ret < 0)
return -1;
sys->this_node.port = port;
@@ -1327,12 +1303,6 @@ int create_cluster(int port, int64_t zon
INIT_LIST_HEAD(&sys->request_queue);
INIT_LIST_HEAD(&sys->event_queue);
- ret = register_event(cdrv_fd, group_handler, NULL);
- if (ret) {
- eprintf("failed to register epoll events (%d)\n", ret);
- return 1;
- }
-
ret = send_join_request(&sys->this_node);
if (ret != 0)
return -1;
Index: sheepdog/sheep/cluster/local.c
===================================================================
--- sheepdog.orig/sheep/cluster/local.c 2012-05-14 18:45:08.967970005 +0200
+++ sheepdog/sheep/cluster/local.c 2012-05-14 18:57:05.299963176 +0200
@@ -13,9 +13,11 @@
#include <unistd.h>
#include <sys/mman.h>
#include <sys/signalfd.h>
+#include <sys/epoll.h>
#include <sys/file.h>
#include <search.h>
#include <signal.h>
+#include <sys/epoll.h>
#include <fcntl.h>
#include <assert.h>
@@ -34,6 +36,8 @@ static struct sd_node this_node;
static struct work_queue *local_block_wq;
+static void local_dispatch(int listen_fd, int events, void *data);
+
enum local_event_type {
EVENT_JOIN = 1,
EVENT_LEAVE,
@@ -280,12 +284,12 @@ static void check_pids(void *arg)
add_timer(arg, 1);
}
-
/* Local driver APIs */
static int local_init(const char *option, uint8_t *myaddr)
{
sigset_t mask;
+ int ret;
static struct timer t = {
.callback = check_pids,
.data = &t,
@@ -319,7 +323,13 @@ static int local_init(const char *option
return -1;
}
- return sigfd;
+ ret = register_event(sigfd, local_dispatch, NULL);
+ if (ret) {
+ eprintf("failed to register epoll events (%d)\n", ret);
+ return -1;
+ }
+
+ return 0;
}
static int local_join(struct sd_node *myself,
@@ -379,7 +389,7 @@ static void local_block_done(struct work
{
}
-static int local_dispatch(void)
+static void local_dispatch(int listen_fd, int events, void *data)
{
int ret;
struct signalfd_siginfo siginfo;
@@ -390,6 +400,11 @@ static int local_dispatch(void)
.done = local_block_done,
};
+ if (events & EPOLLHUP) {
+ eprintf("received EPOLLHUP event: has corosync exited?\n");
+ goto err;
+ }
+
dprintf("read siginfo\n");
ret = read(sigfd, &siginfo, sizeof(siginfo));
assert(ret == sizeof(siginfo));
@@ -456,7 +471,10 @@ static int local_dispatch(void)
out:
shm_queue_unlock();
- return 0;
+ return;
+err:
+ log_close();
+ exit(1);
}
struct cluster_driver cdrv_local = {
@@ -466,7 +484,6 @@ struct cluster_driver cdrv_local = {
.join = local_join,
.leave = local_leave,
.notify = local_notify,
- .dispatch = local_dispatch,
};
cdrv_register(cdrv_local);
More information about the sheepdog
mailing list