[sheepdog] [PATCH 12/13] zookeeper: rework block/unblock and leave event handling

Liu Yuan namei.unix at gmail.com
Tue Dec 18 06:38:01 CET 2012


From: Liu Yuan <tailai.ly at taobao.com>

This patch kicks start zookeeper driver to work with the current master.

- Put all the events into the zk_queue and result in a totally ordered event
sequence.
- Block/unblock events are linked into a dedicated list to implement blocking
semantics as corosyn
- better master choosing method to allow concurrent start of multile nodes.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/cluster/zookeeper.c |  302 ++++++++++++++++-----------------------------
 1 file changed, 109 insertions(+), 193 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 8b15f1b..feb5ec8 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -11,11 +11,9 @@
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
-#include <search.h>
 #include <sys/epoll.h>
 #include <sys/eventfd.h>
 #include <zookeeper/zookeeper.h>
-#include <urcu/uatomic.h>
 
 #include "cluster.h"
 #include "event.h"
@@ -30,6 +28,7 @@
 #define BASE_ZNODE "/sheepdog"
 #define QUEUE_ZNODE BASE_ZNODE "/queue"
 #define MEMBER_ZNODE BASE_ZNODE "/member"
+#define MASTER_ZNONE BASE_ZNODE "/master"
 
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(parent, path, strs)			       \
@@ -45,6 +44,7 @@ enum zk_event_type {
 	EVENT_JOIN_RESPONSE,
 	EVENT_LEAVE,
 	EVENT_BLOCK,
+	EVENT_UNBLOCK,
 	EVENT_NOTIFY,
 };
 
@@ -54,33 +54,19 @@ struct zk_node {
 };
 
 struct zk_event {
+	struct list_head list;
+	bool callbacked;
 	enum zk_event_type type;
 	struct zk_node sender;
-
 	enum cluster_join_result join_result;
-
 	size_t buf_len;
 	uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
 };
 
-static uatomic_bool zk_notify_blocked;
-
-/* leave event circular array */
-static struct zk_event zk_levents[SD_MAX_NODES];
-static int nr_zk_levents;
-static unsigned zk_levent_head;
-static unsigned zk_levent_tail;
-static bool called_by_zk_unblock;
-
 static struct sd_node sd_nodes[SD_MAX_NODES];
 static size_t nr_sd_nodes;
-
 struct rb_root zk_node_root = RB_ROOT;
-
-static inline bool is_blocking_event(struct zk_event *ev)
-{
-	return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_REQUEST;
-}
+static LIST_HEAD(zk_block_event_list);
 
 static struct zk_node *zk_tree_insert(struct zk_node *new)
 {
@@ -103,10 +89,8 @@ static struct zk_node *zk_tree_insert(struct zk_node *new)
 			/* already has this entry */
 			return entry;
 	}
-
 	rb_link_node(&new->rb, parent, p);
 	rb_insert_color(&new->rb, &zk_node_root);
-
 	return NULL; /* insert successfully */
 }
 
@@ -128,7 +112,6 @@ static struct zk_node *zk_tree_search(const struct node_id *nid)
 		else
 			return t; /* found it */
 	}
-
 	return NULL;
 }
 
@@ -160,7 +143,7 @@ zk_init_node(const char *path)
 		panic("failed, path:%s, rc:%d\n", path, rc);
 }
 
-static inline ZOOAPI void
+static inline ZOOAPI int
 zk_create_node(const char *path, const char *value, int valuelen,
 	       const struct ACL_vector *acl, int flags, char *path_buffer,
 	       int path_buffer_len)
@@ -170,8 +153,7 @@ zk_create_node(const char *path, const char *value, int valuelen,
 		rc = zoo_create(zhandle, path, value, valuelen, acl,
 				flags, path_buffer, path_buffer_len);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	if (rc != ZOK)
-		panic("failed, path:%s, rc:%d\n", path, rc);
+	return rc;
 }
 
 static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
@@ -218,11 +200,11 @@ static inline ZOOAPI void zk_get_children(const char *path,
 		panic("failed:%s, rc:%d\n", path, rc);
 }
 
-/* ZooKeeper-based queue */
+/* ZooKeeper-based queue give us an totally ordered events */
 static int efd;
 static int32_t queue_pos;
 
-static bool zk_queue_empty(void)
+static bool zk_queue_peek(void)
 {
 	int rc;
 	char path[256];
@@ -239,25 +221,26 @@ static bool zk_queue_empty(void)
 static void zk_queue_push(struct zk_event *ev)
 {
 	static bool first_push = true;
-	int len;
+	int len, ret;
 	char path[256], buf[256];
-	eventfd_t value = 1;
 
 	len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
 	sprintf(path, "%s/", QUEUE_ZNODE);
-	zk_create_node(path, (char *)ev, len,
-		       &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
-	dprintf("create:%s, queue_pos:%010"PRId32", len:%d\n", buf, queue_pos,
-		len);
-
+	ret = zk_create_node(path, (char *)ev, len,
+			     &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf,
+			     sizeof(buf));
 	if (first_push) {
-		uint32_t seq;
+		int32_t seq;
 
 		sscanf(buf, QUEUE_ZNODE "/%"PRId32, &seq);
 		queue_pos = seq;
-		eventfd_write(efd, value);
+		eventfd_write(efd, 1);
 		first_push = false;
 	}
+
+	if (ret == ZOK)
+		dprintf("create:%s, queue_pos:%010"PRId32", len:%d\n",
+			buf, queue_pos, len);
 }
 
 /*
@@ -280,94 +263,26 @@ static int zk_queue_push_back(struct zk_event *ev)
 	return 0;
 }
 
-/*
- * Peek next queue event and if it exists, we must watch it and manually notify
- * it in order not to lose it.
- */
-static void zk_queue_peek_next_notify(const char *path)
-{
-	int rc = zk_node_exists(path);
-	if (rc == ZOK)
-		eventfd_write(efd, 1);
-}
-
 static int zk_queue_pop(struct zk_event *ev)
 {
-	int rc, len;
-	int nr_levents;
+	int len;
 	char path[256];
-	struct zk_event *lev;
-	eventfd_t value = 1;
-
-	/*
-	 * Continue to process LEAVE event even if we have an unfinished BLOCK
-	 * event.
-	 */
-	if (!called_by_zk_unblock && uatomic_read(&nr_zk_levents)) {
-		nr_levents = uatomic_sub_return(&nr_zk_levents, 1) + 1;
-		dprintf("nr_levents:%d, head:%u\n", nr_levents, zk_levent_head);
 
-		lev = &zk_levents[zk_levent_head%SD_MAX_NODES];
-
-		/*
-		 * If the node pointed to by queue_pos was send by this leaver,
-		 * and it have blocked whole cluster, we should ignore it.
-		 */
-		len = sizeof(*ev);
-		sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
-		rc = zk_get_data(path, ev, &len);
-		if (rc == ZOK &&
-		    node_eq(&ev->sender.node, &lev->sender.node) &&
-		    is_blocking_event(ev)) {
-			dprintf("this queue_pos:%010"PRId32" have blocked whole"
-				" cluster, ignore it\n", queue_pos);
-			queue_pos++;
-
-			sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
-			zk_queue_peek_next_notify(path);
-		}
-
-		memcpy(ev, lev, sizeof(*ev));
-		zk_levent_head++;
-
-		if (uatomic_read(&nr_zk_levents) || rc == ZOK) {
-			/*
-			 * we have pending leave events or queue nodes,
-			 * manual notify
-			 */
-			dprintf("write event to efd:%d\n", efd);
-			eventfd_write(efd, value);
-		}
-
-		return 0;
-	}
-
-	if (!called_by_zk_unblock && uatomic_is_true(&zk_notify_blocked))
-		return -1;
-
-	if (zk_queue_empty())
+	if (zk_queue_peek())
 		return -1;
 
 	len = sizeof(*ev);
 	sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
-	rc = zk_get_data(path, ev, &len);
-	if (rc != ZOK)
-		panic("failed to zk_get_data path:%s, rc:%d\n", path, rc);
-	dprintf("read path:%s, type:%d, len:%d, rc:%d\n", path, ev->type,
-		len, rc);
+	assert(zk_get_data(path, ev, &len) == ZOK);
+	dprintf("read path:%s, type:%d, len:%d\n", path, ev->type, len);
 
+	/* watch next and kick next event if any */
 	queue_pos++;
-
-	/*
-	 * This event will be pushed back to the queue,
-	 * we just wait for the arrival of its updated,
-	 * not need to watch next data.
-	 */
-	if (is_blocking_event(ev))
-		return 0;
-
 	sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
-	zk_queue_peek_next_notify(path);
+	if (zk_node_exists(path) == ZOK)
+		/* Someone has created this node, go kick event handler */
+		eventfd_write(efd, 1);
+
 	return 0;
 }
 
@@ -418,20 +333,27 @@ static inline void build_node_list(void)
 	dprintf("nr_sd_nodes:%zu\n", nr_sd_nodes);
 }
 
+static inline int zk_master_create(void)
+{
+	return zk_create_node(MASTER_ZNONE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+			      ZOO_EPHEMERAL, NULL, 0);
+}
+
 static bool is_master(void)
 {
-	struct rb_node *n;
 	struct zk_node *zk;
 
 	if (!nr_sd_nodes) {
-		if (zk_member_empty())
-			return true;
-		else
+		if (zk_member_empty()) {
+			if (zk_master_create() == ZOK)
+				return true;
+			else
+				return false;
+		} else
 			return false;
 	}
 
-	n = rb_first(&zk_node_root);
-	zk = rb_entry(n, struct zk_node, rb);
+	zk = rb_entry(rb_first(&zk_node_root), struct zk_node, rb);
 	if (node_eq(&zk->node, &this_node.node))
 		return true;
 
@@ -486,39 +408,20 @@ static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
 	return 0;
 }
 
-static int leave_event(struct zk_node *znode)
-{
-	int nr_levents;
-	struct zk_event *ev;
-	const eventfd_t value = 1;
-
-	ev = &zk_levents[zk_levent_tail % SD_MAX_NODES];
-	ev->type = EVENT_LEAVE;
-	ev->sender = *znode;
-	ev->buf_len = 0;
-
-	nr_levents = uatomic_add_return(&nr_zk_levents, 1);
-	dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
-
-	zk_levent_tail++;
-
-	eventfd_write(efd, value);
-	return 0;
-}
-
 static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 		       void *ctx)
 {
+	struct zk_node znode;
 	char str[256], *p;
 	int ret;
-	struct zk_node znode;
 
+/* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */
 	dprintf("path:%s, type:%d\n", path, type);
-
 	if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
 		ret = sscanf(path, MEMBER_ZNODE "/%s", str);
 		if (ret == 1)
 			zk_node_exists(path);
+		/* kick off the event handler */
 		eventfd_write(efd, 1);
 	} else if (type == ZOO_DELETED_EVENT) {
 		ret = sscanf(path, MEMBER_ZNODE "/%s", str);
@@ -526,12 +429,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 			return;
 		p = strrchr(path, '/');
 		p++;
-
 		str_to_node(p, &znode.node);
-		dprintf("zk_nodes leave:%s\n", node_to_str(&znode.node));
-
-		leave_event(&znode);
-		return;
+		/* FIXME: remove redundant leave events */
+		add_event(EVENT_LEAVE, &znode, NULL, 0);
 	}
 
 }
@@ -547,21 +447,16 @@ static int zk_join(const struct sd_node *myself,
 	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
 	rc = zk_node_exists(path);
 	if (rc == ZOK) {
-		eprintf("Previous zookeeper session exist, shoot myself.\n"
-			"Wait for a while and restart me again\n");
+		eprintf("Previous zookeeper session exist, shoot myself.\n");
 		exit(1);
 	}
 
-	return add_event(EVENT_JOIN_REQUEST, &this_node,
-			 opaque, opaque_len);
+	return add_event(EVENT_JOIN_REQUEST, &this_node, opaque, opaque_len);
 }
 
 static int zk_leave(void)
 {
-	char path[256];
-	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&this_node.node));
-	dprintf("try to delete member path:%s\n", path);
-	return zk_delete_node(path, -1);
+	return add_event(EVENT_LEAVE, &this_node, NULL, 0);
 }
 
 static int zk_notify(void *msg, size_t msg_len)
@@ -576,27 +471,7 @@ static void zk_block(void)
 
 static void zk_unblock(void *msg, size_t msg_len)
 {
-	int rc;
-	struct zk_event ev;
-	eventfd_t value = 1;
-
-	called_by_zk_unblock = true;
-	rc = zk_queue_pop(&ev);
-	called_by_zk_unblock = false;
-	assert(rc == 0);
-
-	ev.type = EVENT_NOTIFY;
-	ev.buf_len = msg_len;
-	if (msg)
-		memcpy(ev.buf, msg, msg_len);
-
-	zk_queue_push_back(&ev);
-
-	uatomic_set_false(&zk_notify_blocked);
-
-	/* this notify is necessary */
-	dprintf("write event to efd:%d\n", efd);
-	eventfd_write(efd, value);
+	add_event(EVENT_UNBLOCK, &this_node, msg, msg_len);
 }
 
 static void zk_handle_join_request(struct zk_event *ev)
@@ -604,23 +479,19 @@ static void zk_handle_join_request(struct zk_event *ev)
 	enum cluster_join_result res;
 
 	dprintf("sender: %s\n", node_to_str(&ev->sender.node));
-
 	if (!is_master()) {
 		/* Let's await master acking the join-request */
 		queue_pos--;
 		return;
 	}
-
 	res = sd_check_join_cb(&ev->sender.node, ev->buf);
 	ev->join_result = res;
 	ev->type = EVENT_JOIN_RESPONSE;
-
 	zk_queue_push_back(ev);
-
 	if (res == CJ_RES_MASTER_TRANSFER) {
 		eprintf("failed to join sheepdog cluster: "
 			"please retry when master is up\n");
-		zk_leave();
+		add_event(EVENT_LEAVE, &this_node, NULL, 0);
 		exit(1);
 	}
 	dprintf("I'm the master now\n");
@@ -634,12 +505,8 @@ static void zk_handle_join_response(struct zk_event *ev)
 	if (is_master() &&
 	    !node_eq(&ev->sender.node, &this_node.node)) {
 		/* wait util the member node has been created */
-		int retry =
-			MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
-
-		sprintf(path, MEMBER_ZNODE "/%s",
-			node_to_str(&ev->sender.node));
-
+		int retry = MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
+		sprintf(path, MEMBER_ZNODE"/%s", node_to_str(&ev->sender.node));
 		while (retry && zk_node_exists(path) == ZNONODE) {
 			usleep(MEMBER_CREATE_INTERVAL * 1000);
 			retry--;
@@ -669,13 +536,11 @@ static void zk_handle_join_response(struct zk_event *ev)
 	case CJ_RES_SUCCESS:
 	case CJ_RES_JOIN_LATER:
 	case CJ_RES_MASTER_TRANSFER:
-		sprintf(path, MEMBER_ZNODE "/%s",
-			node_to_str(&ev->sender.node));
+		sprintf(path, MEMBER_ZNODE"/%s", node_to_str(&ev->sender.node));
 		if (node_eq(&ev->sender.node, &this_node.node)) {
 			dprintf("create path:%s\n", path);
 			zk_create_node(path, (char *)&ev->sender,
-				       sizeof(ev->sender),
-				       &ZOO_OPEN_ACL_UNSAFE,
+				       sizeof(ev->sender), &ZOO_OPEN_ACL_UNSAFE,
 				       ZOO_EPHEMERAL, NULL, 0);
 		} else {
 			zk_node_exists(path);
@@ -690,14 +555,45 @@ static void zk_handle_join_response(struct zk_event *ev)
 			ev->join_result, ev->buf);
 }
 
+/* When block event is deleted from list, we should call this function */
+static void kick_next_block_event(void)
+{
+	struct zk_event *zke;
+
+	if (list_empty(&zk_block_event_list))
+		return;
+	zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
+	if (!zke->callbacked)
+		zke->callbacked = sd_block_handler(&zke->sender.node);
+}
+
+static bool block_event_list_del(struct zk_node *n)
+{
+	struct zk_event *ev, *t;
+	bool ret = false;
+
+	list_for_each_entry_safe(ev, t, &zk_block_event_list, list) {
+		if (node_eq(&ev->sender.node, &n->node)) {
+			list_del(&ev->list);
+			free(ev);
+			ret = true;
+		}
+	}
+
+	return ret;
+}
+
 static void zk_handle_leave(struct zk_event *ev)
 {
 	struct zk_node *n = zk_tree_search(&ev->sender.node.nid);
+
 	if (!n) {
 		dprintf("can't find this leave node:%s, ignore it.\n",
 			node_to_str(&ev->sender.node));
 		return;
 	}
+	if (block_event_list_del(n))
+		kick_next_block_event();
 	zk_tree_del(n);
 	build_node_list();
 	sd_leave_handler(&ev->sender.node, sd_nodes, nr_sd_nodes);
@@ -705,10 +601,29 @@ static void zk_handle_leave(struct zk_event *ev)
 
 static void zk_handle_block(struct zk_event *ev)
 {
+	struct zk_event *zke = xzalloc(sizeof(*zke));
+
 	dprintf("BLOCK\n");
-	queue_pos--;
-	if (sd_block_handler(&ev->sender.node))
-		assert(uatomic_set_true(&zk_notify_blocked));
+	*zke = *ev;
+	zke->callbacked = false;
+	list_add_tail(&zke->list, &zk_block_event_list);
+	zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
+	if (!zke->callbacked)
+		zke->callbacked = sd_block_handler(&zke->sender.node);
+}
+
+static void zk_handle_unblock(struct zk_event *ev)
+{
+	struct zk_event *zke;
+
+	dprintf("UNBLOCK\n");
+	zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
+	if (zke->callbacked)
+		add_event(EVENT_NOTIFY, &zke->sender, ev->buf, ev->buf_len);
+
+	list_del(&zke->list);
+	free(zke);
+	kick_next_block_event();
 }
 
 static void zk_handle_notify(struct zk_event *ev)
@@ -722,6 +637,7 @@ static void (*const zk_event_handlers[])(struct zk_event *ev) = {
 	[EVENT_JOIN_RESPONSE]	= zk_handle_join_response,
 	[EVENT_LEAVE]		= zk_handle_leave,
 	[EVENT_BLOCK]		= zk_handle_block,
+	[EVENT_UNBLOCK]		= zk_handle_unblock,
 	[EVENT_NOTIFY]		= zk_handle_notify,
 };
 
-- 
1.7.9.5




More information about the sheepdog mailing list