[sheepdog] [PATCH 03/13] zookeeper: refactor event handling

Liu Yuan namei.unix at gmail.com
Tue Dec 18 06:37:52 CET 2012


From: Liu Yuan <tailai.ly at taobao.com>

- use function pointers to replace lengthy switch case

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/cluster/zookeeper.c |  366 ++++++++++++++++++++++++---------------------
 1 file changed, 192 insertions(+), 174 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index a3a5933..b67bafa 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -189,10 +189,9 @@ static bool zk_queue_empty(zhandle_t *zh)
 	return true;
 }
 
-static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
+static void zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 {
 	static bool first_push = true;
-	int32_t seq;
 	int len;
 	char path[256], buf[256];
 	eventfd_t value = 1;
@@ -201,42 +200,55 @@ static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 	sprintf(path, "%s/", QUEUE_ZNODE);
 	zk_create_node(zh, path, (char *)ev, len,
 		       &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
-	dprintf("create path:%s, nr_nodes:%zu, queue_pos:%010d, len:%d\n"
-		, buf, nr_zk_nodes, queue_pos, len);
-
-	sscanf(buf, QUEUE_ZNODE "/%d", &seq);
-	dprintf("path:%s, seq:%010d\n", buf, seq);
+	dprintf("create path:%s, nr_nodes:%zu, queue_pos:%010d, len:%d\n", buf,
+		nr_zk_nodes, queue_pos, len);
 
 	if (first_push) {
+		int32_t seq;
+
+		sscanf(buf, QUEUE_ZNODE "/%d", &seq);
 		queue_pos = seq;
 		eventfd_write(efd, value);
 		first_push = false;
 	}
-
-	return seq;
-
 }
 
+/*
+ * Change the event in place and expect the dedicated handler to be called
+ * via zk_watcher which wakes up one of the zk_event_handlers.
+ */
 static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
 {
 	int len;
 	char path[256];
 
 	queue_pos--;
-
 	dprintf("queue_pos:%010d\n", queue_pos);
 
 	if (ev) {
-		/* update the last popped data */
 		len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
 		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
 		zk_set_data(zh, path, (char *)ev, len, -1);
-		dprintf("update path:%s, queue_pos:%010d, len:%d\n", path, queue_pos, len);
+		dprintf("update path:%s, queue_pos:%010d, len:%d\n", path,
+			queue_pos, len);
 	}
 
 	return 0;
 }
 
+/*
+ * Peek next queue event and if it exists, we must watch it and manually notify
+ * it in order not to lose it.
+ */
+static void zk_queue_peek_next_notify(zhandle_t *zh, const char *path)
+{
+	int rc = zk_node_exists(zh, path, 1, NULL);
+	if (rc == ZOK) {
+		dprintf("%s\n", path);
+		eventfd_write(efd, 1);
+	}
+}
+
 static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 {
 	int rc, len;
@@ -246,43 +258,41 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 	eventfd_t value = 1;
 
 	/*
-	 * Continue to process LEAVE event even if
-	 * we have an unfinished BLOCK event.
+	 * Continue to process LEAVE event even if we have an unfinished BLOCK
+	 * event.
 	 */
 	if (!called_by_zk_unblock && uatomic_read(&nr_zk_levents)) {
 		nr_levents = uatomic_sub_return(&nr_zk_levents, 1) + 1;
-		dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head);
+		dprintf("nr_levents:%d, head:%u\n", nr_levents, zk_levent_head);
 
 		lev = &zk_levents[zk_levent_head%SD_MAX_NODES];
 
-		/* if the node pointed to by queue_pos was send by this leaver,
-		 * and it have blocked whole cluster, we should ignore it. */
+		/*
+		 * If the node pointed to by queue_pos was send by this leaver,
+		 * and it have blocked whole cluster, we should ignore it.
+		 */
 		len = sizeof(*ev);
 		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
 		rc = zk_get_data(zh, path, 1, (char *)ev, &len, NULL);
 		if (rc == ZOK &&
 		    node_eq(&ev->sender.node, &lev->sender.node) &&
 		    is_blocking_event(ev)) {
-			dprintf("this queue_pos:%010d have blocked whole cluster, ignore it\n", queue_pos);
+			dprintf("this queue_pos:%010d have blocked whole "
+				"cluster, ignore it\n", queue_pos);
 			queue_pos++;
 
-			/* watch next data */
 			sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-			rc = zk_node_exists(zh, path, 1, NULL);
-			dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
-			if (rc == ZOK) {
-				/* we lost this message, manual notify */
-				dprintf("write event to efd:%d\n", efd);
-				eventfd_write(efd, value);
-			}
+			zk_queue_peek_next_notify(zh, path);
 		}
 
 		memcpy(ev, lev, sizeof(*ev));
 		zk_levent_head++;
 
 		if (uatomic_read(&nr_zk_levents) || rc == ZOK) {
-			/* we have pending leave events
-			 * or queue nodes, manual notify */
+			/*
+			 * we have pending leave events or queue nodes,
+			 * manual notify
+			 */
 			dprintf("write event to efd:%d\n", efd);
 			eventfd_write(efd, value);
 		}
@@ -299,30 +309,23 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 	len = sizeof(*ev);
 	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
 	rc = zk_get_data(zh, path, 1, (char *)ev, &len, NULL);
-	dprintf("read path:%s, nr_nodes:%zu, type:%d, len:%d, rc:%d\n", path,
-		nr_zk_nodes, ev->type, len, rc);
 	if (rc != ZOK)
 		panic("failed to zk_get_data path:%s, rc:%d\n", path, rc);
+	dprintf("read path:%s, nr_nodes:%zu, type:%d, len:%d, rc:%d\n", path,
+		nr_zk_nodes, ev->type, len, rc);
 
 	queue_pos++;
 
-	/* this event will be pushed back to the queue,
+	/*
+	 * This event will be pushed back to the queue,
 	 * we just wait for the arrival of its updated,
-	 * not need to watch next data. */
+	 * not need to watch next data.
+	 */
 	if (is_blocking_event(ev))
-		goto out;
+		return 0;
 
-	/* watch next data */
 	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-	rc = zk_node_exists(zh, path, 1, NULL);
-	dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
-	if (rc == ZOK) {
-		/* we lost this message, manual notify */
-		dprintf("write event to efd:%d\n", efd);
-		eventfd_write(efd, value);
-	}
-
-out:
+	zk_queue_peek_next_notify(zh, path);
 	return 0;
 }
 
@@ -526,13 +529,11 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
 
 	zk_levent_tail++;
 
-	/* manual notify */
-	dprintf("write event to efd:%d\n", efd);
 	eventfd_write(efd, value);
 	return 0;
 }
 
-static void watcher(zhandle_t *zh, int type, int state, const char *path,
+static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 		    void *ctx)
 {
 	eventfd_t value = 1;
@@ -591,7 +592,8 @@ static int zk_join(const struct sd_node *myself,
 	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
 	rc = zk_node_exists(zhandle, path, 1, NULL);
 	if (rc == ZOK) {
-		eprintf("previous zookeeper session exist, shoot myself\n");
+		eprintf("Previous zookeeper session exist, shoot myself.\n"
+			"Wait for a while and restart me again\n");
 		exit(1);
 	}
 
@@ -649,149 +651,165 @@ static void zk_unblock(void *msg, size_t msg_len)
 	eventfd_write(efd, value);
 }
 
-static void zk_handler(int listen_fd, int events, void *data)
+static void zk_handle_join_request(struct zk_event *ev)
 {
-	int ret, rc;
-	char path[256];
-	eventfd_t value;
-	struct zk_event ev;
-	struct zk_node *n;
 	enum cluster_join_result res;
 
-	if (events & EPOLLHUP) {
-		eprintf("zookeeper driver received EPOLLHUP event, exiting.\n");
-		log_close();
-		exit(1);
-	}
+	dprintf("nr_nodes: %zu, sender: %s, joined: %d\n",
+		nr_zk_nodes, node_to_str(&ev->sender.node),
+		ev->sender.joined);
 
-	ret = eventfd_read(efd, &value);
-	if (ret < 0)
+	if (!is_master(zhandle, &this_node)) {
+		zk_queue_push_back(zhandle, NULL);
 		return;
+	}
 
-	ret = zk_queue_pop(zhandle, &ev);
-	if (ret < 0)
-		goto out;
-
-	switch (ev.type) {
-	case EVENT_JOIN_REQUEST:
-		dprintf("JOIN REQUEST nr_nodes: %zu, sender: %s, joined: %d\n",
-			nr_zk_nodes, node_to_str(&ev.sender.node),
-			ev.sender.joined);
+	res = sd_check_join_cb(&ev->sender.node, ev->buf);
+	ev->join_result = res;
+	ev->type = EVENT_JOIN_RESPONSE;
+	ev->sender.joined = true;
 
-		if (!is_master(zhandle, &this_node)) {
-			zk_queue_push_back(zhandle, NULL);
-			break;
-		}
+	dprintf("I'm master, push back join event\n");
+	zk_queue_push_back(zhandle, ev);
 
-		res = sd_check_join_cb(&ev.sender.node, ev.buf);
-		ev.join_result = res;
-		ev.type = EVENT_JOIN_RESPONSE;
-		ev.sender.joined = true;
+	if (res == CJ_RES_MASTER_TRANSFER) {
+		eprintf("failed to join sheepdog cluster: "
+			"please retry when master is up\n");
+		zk_leave();
+		exit(1);
+	}
+}
 
-		dprintf("I'm master, push back join event\n");
-		zk_queue_push_back(zhandle, &ev);
+static void zk_handle_join_response(struct zk_event *ev)
+{
+	char path[256];
+	int rc;
 
-		if (res == CJ_RES_MASTER_TRANSFER) {
-			eprintf("failed to join sheepdog cluster: "
-				"please retry when master is up\n");
-			zk_leave();
-			exit(1);
-		}
-		break;
-	case EVENT_JOIN_RESPONSE:
-		dprintf("JOIN RESPONSE\n");
-
-		if (is_master(zhandle, &this_node) &&
-		    !node_eq(&ev.sender.node, &this_node.node)) {
-			/* wait util the member node has been created */
-			int retry =
-				MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
-
-			sprintf(path, MEMBER_ZNODE "/%s",
-				node_to_str(&ev.sender.node));
-
-			while (retry &&
-			       zk_node_exists(zhandle, path, 1, NULL) == ZNONODE) {
-				usleep(MEMBER_CREATE_INTERVAL * 1000);
-				retry--;
-			}
-			if (retry <= 0) {
-				dprintf("Sender:%s failed to create member, ignore it\n",
-						node_to_str(&ev.sender.node));
-				goto out;
-			}
-		}
+	dprintf("JOIN RESPONSE\n");
+	if (is_master(zhandle, &this_node) &&
+	    !node_eq(&ev->sender.node, &this_node.node)) {
+		/* wait util the member node has been created */
+		int retry =
+			MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
 
-		if (node_eq(&ev.sender.node, &this_node.node))
-			zk_member_init(zhandle);
+		sprintf(path, MEMBER_ZNODE "/%s",
+			node_to_str(&ev->sender.node));
 
-		if (ev.join_result == CJ_RES_MASTER_TRANSFER)
-			/*
-			 * Sheepdog assumes that only one sheep(master will kill
-			 * itself) is alive in MASTER_TRANSFER scenario. So only
-			 * the joining sheep will run into here.
-			 */
-			node_btree_clear(&zk_node_btroot);
-
-		node_btree_add(&zk_node_btroot, &ev.sender);
-		dprintf("one sheep joined[down], nr_nodes:%zu, sender:%s,"
-			" joined:%d\n", nr_zk_nodes,
-			node_to_str(&ev.sender.node), ev.sender.joined);
-
-		switch (ev.join_result) {
-		case CJ_RES_SUCCESS:
-		case CJ_RES_JOIN_LATER:
-		case CJ_RES_MASTER_TRANSFER:
-			sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
-			if (node_eq(&ev.sender.node, &this_node.node)) {
-				dprintf("create path:%s\n", path);
-				zk_create_node(zhandle, path,
-					       (char *)&ev.sender,
-					       sizeof(ev.sender),
-					       &ZOO_OPEN_ACL_UNSAFE,
-					       ZOO_EPHEMERAL, NULL, 0);
-			} else {
-				rc = zk_node_exists(zhandle, path, 1, NULL);
-				dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
-			}
-			break;
-		default:
-			break;
+		while (retry &&
+		       zk_node_exists(zhandle, path, 1, NULL) == ZNONODE) {
+			usleep(MEMBER_CREATE_INTERVAL * 1000);
+			retry--;
 		}
-
-		build_node_list(zk_node_btroot);
-		sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes,
-				    ev.join_result, ev.buf);
-		break;
-	case EVENT_LEAVE:
-		dprintf("LEAVE EVENT\n");
-		n = node_btree_find(&zk_node_btroot, &ev.sender);
-		if (!n) {
-			dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
-			goto out;
+		if (retry <= 0) {
+			dprintf("Sender:%s failed to create member, ignore it\n",
+				node_to_str(&ev->sender.node));
+			return;
 		}
+	}
 
-		node_btree_del(&zk_node_btroot, n);
-		dprintf("one sheep left, nr_nodes:%zu\n", nr_zk_nodes);
-
-		build_node_list(zk_node_btroot);
-		sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
-		break;
-	case EVENT_BLOCK:
-		dprintf("BLOCK\n");
-		zk_queue_push_back(zhandle, NULL);
-		if (sd_block_handler(&ev.sender.node)) {
-			bool result = uatomic_set_true(&zk_notify_blocked);
-			assert(result);
+	if (node_eq(&ev->sender.node, &this_node.node))
+		zk_member_init(zhandle);
+
+	if (ev->join_result == CJ_RES_MASTER_TRANSFER)
+		/*
+		 * Sheepdog assumes that only one sheep(master will kill
+		 * itself) is alive in MASTER_TRANSFER scenario. So only
+		 * the joining sheep will run into here.
+		 */
+		node_btree_clear(&zk_node_btroot);
+
+	node_btree_add(&zk_node_btroot, &ev->sender);
+	dprintf("nr_nodes:%zu, sender:%s, joined:%d\n", nr_zk_nodes,
+		node_to_str(&ev->sender.node), ev->sender.joined);
+
+	switch (ev->join_result) {
+	case CJ_RES_SUCCESS:
+	case CJ_RES_JOIN_LATER:
+	case CJ_RES_MASTER_TRANSFER:
+		sprintf(path, MEMBER_ZNODE "/%s",
+			node_to_str(&ev->sender.node));
+		if (node_eq(&ev->sender.node, &this_node.node)) {
+			dprintf("create path:%s\n", path);
+			zk_create_node(zhandle, path,
+				       (char *)&ev->sender,
+				       sizeof(ev->sender),
+				       &ZOO_OPEN_ACL_UNSAFE,
+				       ZOO_EPHEMERAL, NULL, 0);
+		} else {
+			rc = zk_node_exists(zhandle, path, 1, NULL);
+			dprintf("watch:%s, exists:%d\n", path, (rc == ZOK));
 		}
 		break;
-	case EVENT_NOTIFY:
-		dprintf("NOTIFY\n");
-		sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
+	default:
 		break;
 	}
-out:
-	return;
+
+	build_node_list(zk_node_btroot);
+	sd_join_handler(&ev->sender.node, sd_nodes, nr_sd_nodes,
+			ev->join_result, ev->buf);
+}
+
+static void zk_handle_leave(struct zk_event *ev)
+{
+	struct zk_node *n =  node_btree_find(&zk_node_btroot, &ev->sender);
+	if (!n) {
+		dprintf("can't find this leave node:%s, ignore it.\n",
+			node_to_str(&ev->sender.node));
+		return;
+	}
+
+	node_btree_del(&zk_node_btroot, n);
+	dprintf("one sheep left, nr_nodes:%zu\n", nr_zk_nodes);
+
+	build_node_list(zk_node_btroot);
+	sd_leave_handler(&ev->sender.node, sd_nodes, nr_sd_nodes);
+}
+
+static void zk_handle_block(struct zk_event *ev)
+{
+	dprintf("BLOCK\n");
+	zk_queue_push_back(zhandle, NULL);
+	if (sd_block_handler(&ev->sender.node))
+		assert(uatomic_set_true(&zk_notify_blocked));
+}
+
+static void zk_handle_notify(struct zk_event *ev)
+{
+	dprintf("NOTIFY\n");
+	sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
+}
+
+static void (*const zk_event_handlers[])(struct zk_event *ev) = {
+	[EVENT_JOIN_REQUEST]	= zk_handle_join_request,
+	[EVENT_JOIN_RESPONSE]	= zk_handle_join_response,
+	[EVENT_LEAVE]		= zk_handle_leave,
+	[EVENT_BLOCK]		= zk_handle_block,
+	[EVENT_NOTIFY]		= zk_handle_notify,
+};
+
+static const int zk_max_event_handlers = ARRAY_SIZE(zk_event_handlers);
+
+static void zk_event_handler(int listen_fd, int events, void *data)
+{
+	eventfd_t value;
+	struct zk_event ev;
+
+	if (events & EPOLLHUP) {
+		eprintf("zookeeper driver received EPOLLHUP event, exiting.\n");
+		log_close();
+		exit(1);
+	}
+
+	if (eventfd_read(efd, &value) < 0)
+		return;
+
+	if (zk_queue_pop(zhandle, &ev) < 0)
+		return;
+
+	if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
+		zk_event_handlers[ev.type](&ev);
+	else
+		eprintf("unhandled type %d\n", ev.type);
 }
 
 static int zk_init(const char *option)
@@ -806,7 +824,7 @@ static int zk_init(const char *option)
 		return -1;
 	}
 
-	zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, NULL, NULL,
+	zhandle = zookeeper_init(option, zk_watcher, SESSION_TIMEOUT, NULL, NULL,
 				 0);
 	if (!zhandle) {
 		eprintf("failed to connect to zk server %s\n", option);
@@ -824,7 +842,7 @@ static int zk_init(const char *option)
 		return -1;
 	}
 
-	ret = register_event(efd, zk_handler, NULL);
+	ret = register_event(efd, zk_event_handler, NULL);
 	if (ret) {
 		eprintf("failed to register zookeeper event handler (%d)\n",
 			ret);
-- 
1.7.9.5




More information about the sheepdog mailing list