[sheepdog] [PATCH 03/13] zookeeper: refactor event handling
Liu Yuan
namei.unix at gmail.com
Tue Dec 18 06:37:52 CET 2012
From: Liu Yuan <tailai.ly at taobao.com>
- use function pointers to replace lengthy switch case
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/cluster/zookeeper.c | 366 ++++++++++++++++++++++++---------------------
1 file changed, 192 insertions(+), 174 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index a3a5933..b67bafa 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -189,10 +189,9 @@ static bool zk_queue_empty(zhandle_t *zh)
return true;
}
-static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
+static void zk_queue_push(zhandle_t *zh, struct zk_event *ev)
{
static bool first_push = true;
- int32_t seq;
int len;
char path[256], buf[256];
eventfd_t value = 1;
@@ -201,42 +200,55 @@ static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
sprintf(path, "%s/", QUEUE_ZNODE);
zk_create_node(zh, path, (char *)ev, len,
&ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
- dprintf("create path:%s, nr_nodes:%zu, queue_pos:%010d, len:%d\n"
- , buf, nr_zk_nodes, queue_pos, len);
-
- sscanf(buf, QUEUE_ZNODE "/%d", &seq);
- dprintf("path:%s, seq:%010d\n", buf, seq);
+ dprintf("create path:%s, nr_nodes:%zu, queue_pos:%010d, len:%d\n", buf,
+ nr_zk_nodes, queue_pos, len);
if (first_push) {
+ int32_t seq;
+
+ sscanf(buf, QUEUE_ZNODE "/%d", &seq);
queue_pos = seq;
eventfd_write(efd, value);
first_push = false;
}
-
- return seq;
-
}
+/*
+ * Change the event in place and expect the dedicated handler to be called
+ * via zk_watcher which wakes up one of the zk_event_handlers.
+ */
static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
{
int len;
char path[256];
queue_pos--;
-
dprintf("queue_pos:%010d\n", queue_pos);
if (ev) {
- /* update the last popped data */
len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
zk_set_data(zh, path, (char *)ev, len, -1);
- dprintf("update path:%s, queue_pos:%010d, len:%d\n", path, queue_pos, len);
+ dprintf("update path:%s, queue_pos:%010d, len:%d\n", path,
+ queue_pos, len);
}
return 0;
}
+/*
+ * Peek next queue event and if it exists, we must watch it and manually notify
+ * it in order not to lose it.
+ */
+static void zk_queue_peek_next_notify(zhandle_t *zh, const char *path)
+{
+ int rc = zk_node_exists(zh, path, 1, NULL);
+ if (rc == ZOK) {
+ dprintf("%s\n", path);
+ eventfd_write(efd, 1);
+ }
+}
+
static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
{
int rc, len;
@@ -246,43 +258,41 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
eventfd_t value = 1;
/*
- * Continue to process LEAVE event even if
- * we have an unfinished BLOCK event.
+ * Continue to process LEAVE event even if we have an unfinished BLOCK
+ * event.
*/
if (!called_by_zk_unblock && uatomic_read(&nr_zk_levents)) {
nr_levents = uatomic_sub_return(&nr_zk_levents, 1) + 1;
- dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head);
+ dprintf("nr_levents:%d, head:%u\n", nr_levents, zk_levent_head);
lev = &zk_levents[zk_levent_head%SD_MAX_NODES];
- /* if the node pointed to by queue_pos was send by this leaver,
- * and it have blocked whole cluster, we should ignore it. */
+ /*
+ * If the node pointed to by queue_pos was send by this leaver,
+ * and it have blocked whole cluster, we should ignore it.
+ */
len = sizeof(*ev);
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
rc = zk_get_data(zh, path, 1, (char *)ev, &len, NULL);
if (rc == ZOK &&
node_eq(&ev->sender.node, &lev->sender.node) &&
is_blocking_event(ev)) {
- dprintf("this queue_pos:%010d have blocked whole cluster, ignore it\n", queue_pos);
+ dprintf("this queue_pos:%010d have blocked whole "
+ "cluster, ignore it\n", queue_pos);
queue_pos++;
- /* watch next data */
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
- rc = zk_node_exists(zh, path, 1, NULL);
- dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
- if (rc == ZOK) {
- /* we lost this message, manual notify */
- dprintf("write event to efd:%d\n", efd);
- eventfd_write(efd, value);
- }
+ zk_queue_peek_next_notify(zh, path);
}
memcpy(ev, lev, sizeof(*ev));
zk_levent_head++;
if (uatomic_read(&nr_zk_levents) || rc == ZOK) {
- /* we have pending leave events
- * or queue nodes, manual notify */
+ /*
+ * we have pending leave events or queue nodes,
+ * manual notify
+ */
dprintf("write event to efd:%d\n", efd);
eventfd_write(efd, value);
}
@@ -299,30 +309,23 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
len = sizeof(*ev);
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
rc = zk_get_data(zh, path, 1, (char *)ev, &len, NULL);
- dprintf("read path:%s, nr_nodes:%zu, type:%d, len:%d, rc:%d\n", path,
- nr_zk_nodes, ev->type, len, rc);
if (rc != ZOK)
panic("failed to zk_get_data path:%s, rc:%d\n", path, rc);
+ dprintf("read path:%s, nr_nodes:%zu, type:%d, len:%d, rc:%d\n", path,
+ nr_zk_nodes, ev->type, len, rc);
queue_pos++;
- /* this event will be pushed back to the queue,
+ /*
+ * This event will be pushed back to the queue,
* we just wait for the arrival of its updated,
- * not need to watch next data. */
+ * not need to watch next data.
+ */
if (is_blocking_event(ev))
- goto out;
+ return 0;
- /* watch next data */
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
- rc = zk_node_exists(zh, path, 1, NULL);
- dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
- if (rc == ZOK) {
- /* we lost this message, manual notify */
- dprintf("write event to efd:%d\n", efd);
- eventfd_write(efd, value);
- }
-
-out:
+ zk_queue_peek_next_notify(zh, path);
return 0;
}
@@ -526,13 +529,11 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
zk_levent_tail++;
- /* manual notify */
- dprintf("write event to efd:%d\n", efd);
eventfd_write(efd, value);
return 0;
}
-static void watcher(zhandle_t *zh, int type, int state, const char *path,
+static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
void *ctx)
{
eventfd_t value = 1;
@@ -591,7 +592,8 @@ static int zk_join(const struct sd_node *myself,
sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
rc = zk_node_exists(zhandle, path, 1, NULL);
if (rc == ZOK) {
- eprintf("previous zookeeper session exist, shoot myself\n");
+ eprintf("Previous zookeeper session exist, shoot myself.\n"
+ "Wait for a while and restart me again\n");
exit(1);
}
@@ -649,149 +651,165 @@ static void zk_unblock(void *msg, size_t msg_len)
eventfd_write(efd, value);
}
-static void zk_handler(int listen_fd, int events, void *data)
+static void zk_handle_join_request(struct zk_event *ev)
{
- int ret, rc;
- char path[256];
- eventfd_t value;
- struct zk_event ev;
- struct zk_node *n;
enum cluster_join_result res;
- if (events & EPOLLHUP) {
- eprintf("zookeeper driver received EPOLLHUP event, exiting.\n");
- log_close();
- exit(1);
- }
+ dprintf("nr_nodes: %zu, sender: %s, joined: %d\n",
+ nr_zk_nodes, node_to_str(&ev->sender.node),
+ ev->sender.joined);
- ret = eventfd_read(efd, &value);
- if (ret < 0)
+ if (!is_master(zhandle, &this_node)) {
+ zk_queue_push_back(zhandle, NULL);
return;
+ }
- ret = zk_queue_pop(zhandle, &ev);
- if (ret < 0)
- goto out;
-
- switch (ev.type) {
- case EVENT_JOIN_REQUEST:
- dprintf("JOIN REQUEST nr_nodes: %zu, sender: %s, joined: %d\n",
- nr_zk_nodes, node_to_str(&ev.sender.node),
- ev.sender.joined);
+ res = sd_check_join_cb(&ev->sender.node, ev->buf);
+ ev->join_result = res;
+ ev->type = EVENT_JOIN_RESPONSE;
+ ev->sender.joined = true;
- if (!is_master(zhandle, &this_node)) {
- zk_queue_push_back(zhandle, NULL);
- break;
- }
+ dprintf("I'm master, push back join event\n");
+ zk_queue_push_back(zhandle, ev);
- res = sd_check_join_cb(&ev.sender.node, ev.buf);
- ev.join_result = res;
- ev.type = EVENT_JOIN_RESPONSE;
- ev.sender.joined = true;
+ if (res == CJ_RES_MASTER_TRANSFER) {
+ eprintf("failed to join sheepdog cluster: "
+ "please retry when master is up\n");
+ zk_leave();
+ exit(1);
+ }
+}
- dprintf("I'm master, push back join event\n");
- zk_queue_push_back(zhandle, &ev);
+static void zk_handle_join_response(struct zk_event *ev)
+{
+ char path[256];
+ int rc;
- if (res == CJ_RES_MASTER_TRANSFER) {
- eprintf("failed to join sheepdog cluster: "
- "please retry when master is up\n");
- zk_leave();
- exit(1);
- }
- break;
- case EVENT_JOIN_RESPONSE:
- dprintf("JOIN RESPONSE\n");
-
- if (is_master(zhandle, &this_node) &&
- !node_eq(&ev.sender.node, &this_node.node)) {
- /* wait util the member node has been created */
- int retry =
- MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
-
- sprintf(path, MEMBER_ZNODE "/%s",
- node_to_str(&ev.sender.node));
-
- while (retry &&
- zk_node_exists(zhandle, path, 1, NULL) == ZNONODE) {
- usleep(MEMBER_CREATE_INTERVAL * 1000);
- retry--;
- }
- if (retry <= 0) {
- dprintf("Sender:%s failed to create member, ignore it\n",
- node_to_str(&ev.sender.node));
- goto out;
- }
- }
+ dprintf("JOIN RESPONSE\n");
+ if (is_master(zhandle, &this_node) &&
+ !node_eq(&ev->sender.node, &this_node.node)) {
+ /* wait util the member node has been created */
+ int retry =
+ MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
- if (node_eq(&ev.sender.node, &this_node.node))
- zk_member_init(zhandle);
+ sprintf(path, MEMBER_ZNODE "/%s",
+ node_to_str(&ev->sender.node));
- if (ev.join_result == CJ_RES_MASTER_TRANSFER)
- /*
- * Sheepdog assumes that only one sheep(master will kill
- * itself) is alive in MASTER_TRANSFER scenario. So only
- * the joining sheep will run into here.
- */
- node_btree_clear(&zk_node_btroot);
-
- node_btree_add(&zk_node_btroot, &ev.sender);
- dprintf("one sheep joined[down], nr_nodes:%zu, sender:%s,"
- " joined:%d\n", nr_zk_nodes,
- node_to_str(&ev.sender.node), ev.sender.joined);
-
- switch (ev.join_result) {
- case CJ_RES_SUCCESS:
- case CJ_RES_JOIN_LATER:
- case CJ_RES_MASTER_TRANSFER:
- sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
- if (node_eq(&ev.sender.node, &this_node.node)) {
- dprintf("create path:%s\n", path);
- zk_create_node(zhandle, path,
- (char *)&ev.sender,
- sizeof(ev.sender),
- &ZOO_OPEN_ACL_UNSAFE,
- ZOO_EPHEMERAL, NULL, 0);
- } else {
- rc = zk_node_exists(zhandle, path, 1, NULL);
- dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
- }
- break;
- default:
- break;
+ while (retry &&
+ zk_node_exists(zhandle, path, 1, NULL) == ZNONODE) {
+ usleep(MEMBER_CREATE_INTERVAL * 1000);
+ retry--;
}
-
- build_node_list(zk_node_btroot);
- sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes,
- ev.join_result, ev.buf);
- break;
- case EVENT_LEAVE:
- dprintf("LEAVE EVENT\n");
- n = node_btree_find(&zk_node_btroot, &ev.sender);
- if (!n) {
- dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
- goto out;
+ if (retry <= 0) {
+ dprintf("Sender:%s failed to create member, ignore it\n",
+ node_to_str(&ev->sender.node));
+ return;
}
+ }
- node_btree_del(&zk_node_btroot, n);
- dprintf("one sheep left, nr_nodes:%zu\n", nr_zk_nodes);
-
- build_node_list(zk_node_btroot);
- sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
- break;
- case EVENT_BLOCK:
- dprintf("BLOCK\n");
- zk_queue_push_back(zhandle, NULL);
- if (sd_block_handler(&ev.sender.node)) {
- bool result = uatomic_set_true(&zk_notify_blocked);
- assert(result);
+ if (node_eq(&ev->sender.node, &this_node.node))
+ zk_member_init(zhandle);
+
+ if (ev->join_result == CJ_RES_MASTER_TRANSFER)
+ /*
+ * Sheepdog assumes that only one sheep(master will kill
+ * itself) is alive in MASTER_TRANSFER scenario. So only
+ * the joining sheep will run into here.
+ */
+ node_btree_clear(&zk_node_btroot);
+
+ node_btree_add(&zk_node_btroot, &ev->sender);
+ dprintf("nr_nodes:%zu, sender:%s, joined:%d\n", nr_zk_nodes,
+ node_to_str(&ev->sender.node), ev->sender.joined);
+
+ switch (ev->join_result) {
+ case CJ_RES_SUCCESS:
+ case CJ_RES_JOIN_LATER:
+ case CJ_RES_MASTER_TRANSFER:
+ sprintf(path, MEMBER_ZNODE "/%s",
+ node_to_str(&ev->sender.node));
+ if (node_eq(&ev->sender.node, &this_node.node)) {
+ dprintf("create path:%s\n", path);
+ zk_create_node(zhandle, path,
+ (char *)&ev->sender,
+ sizeof(ev->sender),
+ &ZOO_OPEN_ACL_UNSAFE,
+ ZOO_EPHEMERAL, NULL, 0);
+ } else {
+ rc = zk_node_exists(zhandle, path, 1, NULL);
+ dprintf("watch:%s, exists:%d\n", path, (rc == ZOK));
}
break;
- case EVENT_NOTIFY:
- dprintf("NOTIFY\n");
- sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
+ default:
break;
}
-out:
- return;
+
+ build_node_list(zk_node_btroot);
+ sd_join_handler(&ev->sender.node, sd_nodes, nr_sd_nodes,
+ ev->join_result, ev->buf);
+}
+
+static void zk_handle_leave(struct zk_event *ev)
+{
+ struct zk_node *n = node_btree_find(&zk_node_btroot, &ev->sender);
+ if (!n) {
+ dprintf("can't find this leave node:%s, ignore it.\n",
+ node_to_str(&ev->sender.node));
+ return;
+ }
+
+ node_btree_del(&zk_node_btroot, n);
+ dprintf("one sheep left, nr_nodes:%zu\n", nr_zk_nodes);
+
+ build_node_list(zk_node_btroot);
+ sd_leave_handler(&ev->sender.node, sd_nodes, nr_sd_nodes);
+}
+
+static void zk_handle_block(struct zk_event *ev)
+{
+ dprintf("BLOCK\n");
+ zk_queue_push_back(zhandle, NULL);
+ if (sd_block_handler(&ev->sender.node))
+ assert(uatomic_set_true(&zk_notify_blocked));
+}
+
+static void zk_handle_notify(struct zk_event *ev)
+{
+ dprintf("NOTIFY\n");
+ sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
+}
+
+static void (*const zk_event_handlers[])(struct zk_event *ev) = {
+ [EVENT_JOIN_REQUEST] = zk_handle_join_request,
+ [EVENT_JOIN_RESPONSE] = zk_handle_join_response,
+ [EVENT_LEAVE] = zk_handle_leave,
+ [EVENT_BLOCK] = zk_handle_block,
+ [EVENT_NOTIFY] = zk_handle_notify,
+};
+
+static const int zk_max_event_handlers = ARRAY_SIZE(zk_event_handlers);
+
+static void zk_event_handler(int listen_fd, int events, void *data)
+{
+ eventfd_t value;
+ struct zk_event ev;
+
+ if (events & EPOLLHUP) {
+ eprintf("zookeeper driver received EPOLLHUP event, exiting.\n");
+ log_close();
+ exit(1);
+ }
+
+ if (eventfd_read(efd, &value) < 0)
+ return;
+
+ if (zk_queue_pop(zhandle, &ev) < 0)
+ return;
+
+ if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
+ zk_event_handlers[ev.type](&ev);
+ else
+ eprintf("unhandled type %d\n", ev.type);
}
static int zk_init(const char *option)
@@ -806,7 +824,7 @@ static int zk_init(const char *option)
return -1;
}
- zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, NULL, NULL,
+ zhandle = zookeeper_init(option, zk_watcher, SESSION_TIMEOUT, NULL, NULL,
0);
if (!zhandle) {
eprintf("failed to connect to zk server %s\n", option);
@@ -824,7 +842,7 @@ static int zk_init(const char *option)
return -1;
}
- ret = register_event(efd, zk_handler, NULL);
+ ret = register_event(efd, zk_event_handler, NULL);
if (ret) {
eprintf("failed to register zookeeper event handler (%d)\n",
ret);
--
1.7.9.5
More information about the sheepdog
mailing list