[sheepdog] [PATCH 6/8] zookeeper: fix kick_block_event logic

Liu Yuan namei.unix at gmail.com
Sun Dec 23 16:26:30 CET 2012


From: Liu Yuan <tailai.ly at taobao.com>

Kick block event only if there is no nonblock event. We perfer to
handle nonblock event becasue:

1. Sheep assuems that unblock() and notify() is a transaction, so we
   can only kick next block event after sd_notify_handler() is called
2. We should process leave/join event as soon as possible.

- use zk_node for block event

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/cluster/zookeeper.c |  106 +++++++++++++++++++++++----------------------
 1 file changed, 54 insertions(+), 52 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index fe4093d..336c827 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -49,13 +49,13 @@ enum zk_event_type {
 };
 
 struct zk_node {
+	struct list_head list;
 	struct rb_node rb;
 	struct sd_node node;
+	bool callbacked;
 };
 
 struct zk_event {
-	struct list_head list;
-	bool callbacked;
 	enum zk_event_type type;
 	struct zk_node sender;
 	enum cluster_join_result join_result;
@@ -66,7 +66,7 @@ struct zk_event {
 static struct sd_node sd_nodes[SD_MAX_NODES];
 static size_t nr_sd_nodes;
 struct rb_root zk_node_root = RB_ROOT;
-static LIST_HEAD(zk_block_event_list);
+static LIST_HEAD(zk_block_list);
 static bool joined;
 
 static struct zk_node *zk_tree_insert(struct zk_node *new)
@@ -214,9 +214,9 @@ static bool zk_queue_peek(void)
 
 	rc = zk_node_exists(path);
 	if (rc == ZOK)
-		return false;
+		return true;
 
-	return true;
+	return false;
 }
 
 static void zk_queue_push(struct zk_event *ev)
@@ -264,29 +264,17 @@ static int zk_queue_push_back(struct zk_event *ev)
 	return 0;
 }
 
-static int zk_queue_pop(struct zk_event *ev)
+static void zk_queue_pop_advance(struct zk_event *ev)
 {
 	int len;
 	char path[256];
 
-	if (zk_queue_peek())
-		return -1;
-
 	len = sizeof(*ev);
 	sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
 	assert(zk_get_data(path, ev, &len) == ZOK);
 	dprintf("%s, type:%d, len:%d, pos:%"PRId32"\n",
 		path, ev->type, len, queue_pos);
-
-	/* watch next */
 	queue_pos++;
-	sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
-	/* If not joined, we must wait for join response. No kick. */
-	if (zk_node_exists(path) == ZOK && joined)
-		/* Someone has created this node, go kick event handler */
-		eventfd_write(efd, 1);
-
-	return 0;
 }
 
 static int zk_member_empty(void)
@@ -550,32 +538,27 @@ static void zk_handle_join_response(struct zk_event *ev)
 			ev->join_result, ev->buf);
 }
 
-/* When block event is deleted from list, we should call this function */
-static void kick_next_block_event(void)
+static void kick_block_event(void)
 {
-	struct zk_event *zke;
+	struct zk_node *block;
 
-	if (list_empty(&zk_block_event_list))
+	if (list_empty(&zk_block_list))
 		return;
-	zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
-	if (!zke->callbacked)
-		zke->callbacked = sd_block_handler(&zke->sender.node);
+	block = list_first_entry(&zk_block_list, typeof(*block), list);
+	if (!block->callbacked)
+		block->callbacked = sd_block_handler(&block->node);
 }
 
-static bool block_event_list_del(struct zk_node *n)
+static void block_event_list_del(struct zk_node *n)
 {
-	struct zk_event *ev, *t;
-	bool ret = false;
+	struct zk_node *ev, *t;
 
-	list_for_each_entry_safe(ev, t, &zk_block_event_list, list) {
-		if (node_eq(&ev->sender.node, &n->node)) {
+	list_for_each_entry_safe(ev, t, &zk_block_list, list) {
+		if (node_eq(&ev->node, &n->node)) {
 			list_del(&ev->list);
 			free(ev);
-			ret = true;
 		}
 	}
-
-	return ret;
 }
 
 static void zk_handle_leave(struct zk_event *ev)
@@ -587,8 +570,7 @@ static void zk_handle_leave(struct zk_event *ev)
 			node_to_str(&ev->sender.node));
 		return;
 	}
-	if (block_event_list_del(n))
-		kick_next_block_event();
+	block_event_list_del(n);
 	zk_tree_del(n);
 	build_node_list();
 	sd_leave_handler(&ev->sender.node, sd_nodes, nr_sd_nodes);
@@ -596,29 +578,30 @@ static void zk_handle_leave(struct zk_event *ev)
 
 static void zk_handle_block(struct zk_event *ev)
 {
-	struct zk_event *zke = xzalloc(sizeof(*zke));
+	struct zk_node *block = xmalloc(sizeof(*block));
 
 	dprintf("BLOCK\n");
-	*zke = *ev;
-	zke->callbacked = false;
-	list_add_tail(&zke->list, &zk_block_event_list);
-	zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
-	if (!zke->callbacked)
-		zke->callbacked = sd_block_handler(&zke->sender.node);
+	block->node = ev->sender.node;
+	block->callbacked = false;
+	list_add_tail(&block->list, &zk_block_list);
+	block = list_first_entry(&zk_block_list, typeof(*block), list);
+	if (!block->callbacked)
+		block->callbacked = sd_block_handler(&block->node);
 }
 
 static void zk_handle_unblock(struct zk_event *ev)
 {
-	struct zk_event *zke;
+	struct zk_node *block;
 
 	dprintf("UNBLOCK\n");
-	zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
-	if (zke->callbacked)
-		add_event(EVENT_NOTIFY, &zke->sender, ev->buf, ev->buf_len);
+	if (list_empty(&zk_block_list))
+		return;
+	block = list_first_entry(&zk_block_list, typeof(*block), list);
+	if (block->callbacked)
+		add_event(EVENT_NOTIFY, block, ev->buf, ev->buf_len);
 
-	list_del(&zke->list);
-	free(zke);
-	kick_next_block_event();
+	list_del(&block->list);
+	free(block);
 }
 
 static void zk_handle_notify(struct zk_event *ev)
@@ -643,6 +626,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 	eventfd_t value;
 	struct zk_event ev;
 
+	dprintf("%d, %d\n", events, queue_pos);
 	if (events & EPOLLHUP) {
 		eprintf("zookeeper driver received EPOLLHUP event, exiting.\n");
 		log_close();
@@ -654,13 +638,31 @@ static void zk_event_handler(int listen_fd, int events, void *data)
 		return;
 	}
 
-	if (zk_queue_pop(&ev) < 0)
-		return;
+	if (!zk_queue_peek())
+		goto kick_block_event;
 
+	zk_queue_pop_advance(&ev);
 	if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
 		zk_event_handlers[ev.type](&ev);
 	else
-		eprintf("unhandled type %d\n", ev.type);
+		panic("unhandled type %d\n", ev.type);
+
+	 /* Someone has created next event, go kick event handler. */
+	if (zk_queue_peek() && joined) {
+		eventfd_write(efd, 1);
+		return;
+	}
+
+kick_block_event:
+	/*
+	 * Kick block event only if there is no nonblock event. We perfer to
+	 * handle nonblock event becasue:
+	 *
+	 * 1. Sheep assuems that unblock() and notify() is a transaction, so we
+	 *    can only kick next block event after sd_notify_handler() is called
+	 * 2. We should process leave/join event as soon as possible.
+	 */
+	kick_block_event();
 }
 
 static int zk_init(const char *option)
-- 
1.7.9.5




More information about the sheepdog mailing list