[sheepdog] [PATCH 01/13] zookeeper: rework zoo helpers

Liu Yuan namei.unix at gmail.com
Tue Dec 18 06:37:50 CET 2012


From: Liu Yuan <tailai.ly at taobao.com>

- panic() inside to reduce redundant checks outside
- introduce zoo_init_node() for init

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/cluster/zookeeper.c |  144 +++++++++++++++++++++++----------------------
 1 file changed, 73 insertions(+), 71 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 91ea608..c01e8ed 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -30,7 +30,6 @@
 #define QUEUE_ZNODE BASE_ZNODE "/queue"
 #define MEMBER_ZNODE BASE_ZNODE "/member"
 
-
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(zh, parent, path, strs)			       \
 	for (zk_get_children(zh, parent, 1, strs),		       \
@@ -85,77 +84,90 @@ static inline bool is_blocking_event(struct zk_event *ev)
 }
 
 /* zookeeper API wrapper */
-static inline ZOOAPI int zk_create(zhandle_t *zh, const char *path,
-		const char *value, int valuelen, const struct ACL_vector *acl,
-		int flags, char *path_buffer, int path_buffer_len)
+
+static inline ZOOAPI int zk_delete_node(zhandle_t *zh, const char *path,
+					int version)
+{
+	int rc;
+	do {
+		rc = zoo_delete(zh, path, version);
+	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	if (rc != ZOK)
+		eprintf("failed, path:%s, rc:%d\n", path, rc);
+	return rc;
+}
+
+static inline ZOOAPI void
+zk_init_node(zhandle_t *zh, const char *path, const char *value, int valuelen,
+	     const struct ACL_vector *acl, int flags, char *path_buffer,
+	     int path_buffer_len)
 {
 	int rc;
 	do {
 		rc = zoo_create(zh, path, value, valuelen, acl,
 				flags, path_buffer, path_buffer_len);
-		if (rc != ZOK)
-			dprintf("rc:%d\n", rc);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	return rc;
+
+	if (rc != ZOK && rc != ZNODEEXISTS)
+		panic("failed, path:%s, rc:%d\n", path, rc);
 }
 
-static inline ZOOAPI int zk_delete(zhandle_t *zh, const char *path, int version)
+static inline ZOOAPI void
+zk_create_node(zhandle_t *zh, const char *path, const char *value, int valuelen,
+	       const struct ACL_vector *acl, int flags, char *path_buffer,
+	       int path_buffer_len)
 {
 	int rc;
 	do {
-		rc = zoo_delete(zh, path, version);
-		if (rc != ZOK)
-			dprintf("rc:%d\n", rc);
+		rc = zoo_create(zh, path, value, valuelen, acl,
+				flags, path_buffer, path_buffer_len);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	return rc;
+	if (rc != ZOK)
+		panic("failed, path:%s, rc:%d\n", path, rc);
 }
 
-static inline ZOOAPI int zk_get(zhandle_t *zh, const char *path, int watch,
+static inline ZOOAPI int zk_get_data(zhandle_t *zh, const char *path, int watch,
 		char *buffer, int *buffer_len, struct Stat *stat)
 {
 	int rc;
 	do {
 		rc = zoo_get(zh, path, watch, buffer, buffer_len, stat);
-		if (rc != ZOK)
-			dprintf("rc:%d\n", rc);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
 	return rc;
 }
 
-static inline ZOOAPI int zk_set(zhandle_t *zh, const char *path,
-		const char *buffer, int buflen, int version)
+static inline ZOOAPI int
+zk_set_data(zhandle_t *zh, const char *path, const char *buffer, int buflen,
+	    int version)
 {
 	int rc;
 	do {
 		rc = zoo_set(zh, path, buffer, buflen, version);
-		if (rc != ZOK)
-			dprintf("rc:%d\n", rc);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
+	if (rc != ZOK)
+		panic("failed, path:%s, rc:%d\n", path, rc);
 	return rc;
 }
 
-static inline ZOOAPI int zk_exists(zhandle_t *zh, const char *path, int watch,
-		struct Stat *stat)
+static inline ZOOAPI int
+zk_node_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
 {
 	int rc;
 	do {
 		rc = zoo_exists(zh, path, watch, stat);
-		if (rc != ZOK)
-			dprintf("rc:%d\n", rc);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
 	return rc;
 }
 
-static inline ZOOAPI int zk_get_children(zhandle_t *zh, const char *path,
+static inline ZOOAPI void zk_get_children(zhandle_t *zh, const char *path,
 		int watch, struct String_vector *strings)
 {
 	int rc;
 	do {
 		rc = zoo_get_children(zh, path, watch, strings);
-		if (rc != ZOK)
-			dprintf("rc:%d\n", rc);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-	return rc;
+	if (rc != ZOK)
+		panic("failed, path:%s, rc:%d\n", path, rc);
 }
 
 /* ZooKeeper-based queue */
@@ -170,7 +182,7 @@ static bool zk_queue_empty(zhandle_t *zh)
 
 	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
 
-	rc = zk_exists(zh, path, 1, NULL);
+	rc = zk_node_exists(zh, path, 1, NULL);
 	if (rc == ZOK)
 		return false;
 
@@ -181,29 +193,23 @@ static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 {
 	static bool first_push = true;
 	int32_t seq;
-	int rc, len;
+	int len;
 	char path[256], buf[256];
 	eventfd_t value = 1;
 
 	len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
 	sprintf(path, "%s/", QUEUE_ZNODE);
-	rc = zk_create(zh, path, (char *)ev, len,
-		&ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
-	dprintf("create path:%s, nr_nodes:%zu, queue_pos:%010d, len:%d,"
-		" rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc);
-	if (rc != ZOK)
-		panic("failed to zk_create path:%s, rc:%d\n", path, rc);
+	zk_create_node(zh, path, (char *)ev, len,
+		       &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
+	dprintf("create path:%s, nr_nodes:%zu, queue_pos:%010d, len:%d\n"
+		, buf, nr_zk_nodes, queue_pos, len);
 
 	sscanf(buf, QUEUE_ZNODE "/%d", &seq);
 	dprintf("path:%s, seq:%010d\n", buf, seq);
 
 	if (first_push) {
 		queue_pos = seq;
-
-		/* manual notify */
-		dprintf("write event to efd:%d\n", efd);
 		eventfd_write(efd, value);
-
 		first_push = false;
 	}
 
@@ -213,7 +219,7 @@ static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 
 static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
 {
-	int rc, len;
+	int len;
 	char path[256];
 
 	queue_pos--;
@@ -224,10 +230,8 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
 		/* update the last popped data */
 		len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
 		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-		rc = zk_set(zh, path, (char *)ev, len, -1);
-		dprintf("update path:%s, queue_pos:%010d, len:%d, rc:%d\n", path, queue_pos, len, rc);
-		if (rc != ZOK)
-			panic("failed to zk_set path:%s, rc:%d\n", path, rc);
+		zk_set_data(zh, path, (char *)ev, len, -1);
+		dprintf("update path:%s, queue_pos:%010d, len:%d\n", path, queue_pos, len);
 	}
 
 	return 0;
@@ -255,7 +259,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 		 * and it have blocked whole cluster, we should ignore it. */
 		len = sizeof(*ev);
 		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-		rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
+		rc = zk_get_data(zh, path, 1, (char *)ev, &len, NULL);
 		if (rc == ZOK &&
 		    node_eq(&ev->sender.node, &lev->sender.node) &&
 		    is_blocking_event(ev)) {
@@ -264,7 +268,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 
 			/* watch next data */
 			sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-			rc = zk_exists(zh, path, 1, NULL);
+			rc = zk_node_exists(zh, path, 1, NULL);
 			dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
 			if (rc == ZOK) {
 				/* we lost this message, manual notify */
@@ -294,11 +298,11 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 
 	len = sizeof(*ev);
 	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-	rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
+	rc = zk_get_data(zh, path, 1, (char *)ev, &len, NULL);
 	dprintf("read path:%s, nr_nodes:%zu, type:%d, len:%d, rc:%d\n", path,
 		nr_zk_nodes, ev->type, len, rc);
 	if (rc != ZOK)
-		panic("failed to zk_set path:%s, rc:%d\n", path, rc);
+		panic("failed to zk_get_data path:%s, rc:%d\n", path, rc);
 
 	queue_pos++;
 
@@ -310,7 +314,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 
 	/* watch next data */
 	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-	rc = zk_exists(zh, path, 1, NULL);
+	rc = zk_node_exists(zh, path, 1, NULL);
 	dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
 	if (rc == ZOK) {
 		/* we lost this message, manual notify */
@@ -324,12 +328,9 @@ out:
 
 static int zk_member_empty(zhandle_t *zh)
 {
-	int rc;
 	struct String_vector strs;
 
-	rc = zk_get_children(zh, MEMBER_ZNODE, 1, &strs);
-	if (rc != ZOK)
-		panic("failed to zk_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc);
+	zk_get_children(zh, MEMBER_ZNODE, 1, &strs);
 
 	return (strs.count == 0);
 }
@@ -450,9 +451,9 @@ static bool is_master(zhandle_t *zh, struct zk_node *znode)
 
 static void zk_queue_init(zhandle_t *zh)
 {
-	zk_create(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
-	zk_create(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
-	zk_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
+	zk_init_node(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
+	zk_init_node(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
+	zk_init_node(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
 }
 
 static void zk_member_init(zhandle_t *zh)
@@ -471,7 +472,7 @@ static void zk_member_init(zhandle_t *zh)
 	if (!zk_member_empty(zh)) {
 		FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) {
 			len = sizeof(znode);
-			rc = zk_get(zh, path, 1, (char *)&znode, &len, NULL);
+			rc = zk_get_data(zh, path, 1, (char *)&znode, &len, NULL);
 			if (rc != ZOK)
 				continue;
 
@@ -481,7 +482,7 @@ static void zk_member_init(zhandle_t *zh)
 			case ZNONODE:
 				break;
 			default:
-				panic("failed to zk_get path:%s, rc:%d\n", path, rc);
+				panic("failed to zk_get_data path:%s, rc:%d\n", path, rc);
 			}
 		}
 	}
@@ -555,7 +556,7 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path,
 	if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
 		ret = sscanf(path, MEMBER_ZNODE "/%s", str);
 		if (ret == 1) {
-			rc = zk_exists(zh, path, 1, NULL);
+			rc = zk_node_exists(zh, path, 1, NULL);
 			dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
 		}
 	}
@@ -588,9 +589,11 @@ static int zk_join(const struct sd_node *myself,
 	this_node.node = *myself;
 
 	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
-	rc = zk_exists(zhandle, path, 1, NULL);
-	if (rc == ZOK)
-		panic("previous zookeeper session exist, shutdown\n");
+	rc = zk_node_exists(zhandle, path, 1, NULL);
+	if (rc == ZOK) {
+		eprintf("previous zookeeper session exist, shoot myself\n");
+		exit(1);
+	}
 
 	this_node.joined = false;
 	cid = zoo_client_id(zhandle);
@@ -608,7 +611,7 @@ static int zk_leave(void)
 	char path[256];
 	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&this_node.node));
 	dprintf("try to delete member path:%s\n", path);
-	return zk_delete(zhandle, path, -1);
+	return zk_delete_node(zhandle, path, -1);
 }
 
 static int zk_notify(void *msg, size_t msg_len)
@@ -661,8 +664,6 @@ static void zk_handler(int listen_fd, int events, void *data)
 		exit(1);
 	}
 
-	dprintf("read event\n");
-
 	ret = eventfd_read(efd, &value);
 	if (ret < 0)
 		return;
@@ -710,7 +711,7 @@ static void zk_handler(int listen_fd, int events, void *data)
 				node_to_str(&ev.sender.node));
 
 			while (retry &&
-			       zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
+			       zk_node_exists(zhandle, path, 1, NULL) == ZNONODE) {
 				usleep(MEMBER_CREATE_INTERVAL * 1000);
 				retry--;
 			}
@@ -744,12 +745,13 @@ static void zk_handler(int listen_fd, int events, void *data)
 			sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
 			if (node_eq(&ev.sender.node, &this_node.node)) {
 				dprintf("create path:%s\n", path);
-				rc = zk_create(zhandle, path, (char *)&ev.sender, sizeof(ev.sender),
-					&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
-				if (rc != ZOK)
-					panic("failed to create an ephemeral znode, rc:%d\n", rc);
+				zk_create_node(zhandle, path,
+					       (char *)&ev.sender,
+					       sizeof(ev.sender),
+					       &ZOO_OPEN_ACL_UNSAFE,
+					       ZOO_EPHEMERAL, NULL, 0);
 			} else {
-				rc = zk_exists(zhandle, path, 1, NULL);
+				rc = zk_node_exists(zhandle, path, 1, NULL);
 				dprintf("watch path:%s, exists:%d\n", path, (rc == ZOK));
 			}
 			break;
-- 
1.7.9.5




More information about the sheepdog mailing list