[sheepdog] [PATCH] zookeeper: do not overload event types

Christoph Hellwig hch at infradead.org
Tue May 29 11:37:21 CEST 2012


Use different types for join requests vs responses, and block vs notify
events intead of using the blocked field to overload the type.

Signed-off-by: Christoph Hellwig <hch at lst.de>

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 859f2b0..471c721 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -42,8 +42,10 @@
 	     free(*(strs)->data))
 
 enum zk_event_type {
-	EVENT_JOIN = 1,
+	EVENT_JOIN_REQUEST = 1,
+	EVENT_JOIN_RESPONSE,
 	EVENT_LEAVE,
+	EVENT_BLOCK,
 	EVENT_NOTIFY,
 };
 
@@ -59,7 +61,6 @@ struct zk_event {
 
 	enum cluster_join_result join_result;
 
-	int blocked; /* set non-zero when sheep must block this event */
 	int callbacked; /* set non-zero after sd_block_handler() was called */
 
 	size_t buf_len;
@@ -80,6 +81,11 @@ static struct sd_node sd_nodes[SD_MAX_NODES];
 static size_t nr_sd_nodes;
 static size_t nr_zk_nodes;
 
+static inline int is_blocking_event(struct zk_event *ev)
+{
+	return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_RESPONSE;
+}
+
 /* zookeeper API wrapper */
 static inline ZOOAPI int zk_create(zhandle_t *zh, const char *path,
 		const char *value, int valuelen, const struct ACL_vector *acl,
@@ -251,7 +257,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 		rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
 		if (rc == ZOK &&
 		    node_eq(&ev->sender.node, &lev->sender.node) &&
-		    ev->blocked) {
+		    is_blocking_event(ev)) {
 			dprintf("this queue_pos:%010d have blocked whole cluster, ignore it\n", queue_pos);
 			queue_pos++;
 
@@ -294,7 +300,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 	/* this event will be pushed back to the queue,
 	 * we just wait for the arrival of its updated,
 	 * not need to watch next data. */
-	if (ev->blocked)
+	if (is_blocking_event(ev))
 		goto out;
 
 	/* watch next data */
@@ -485,7 +491,7 @@ static struct zk_node this_node;
 
 static int add_event(zhandle_t *zh, enum zk_event_type type,
 		     struct zk_node *znode, void *buf,
-		     size_t buf_len, int blocked)
+		     size_t buf_len)
 {
 	struct zk_event ev;
 
@@ -493,7 +499,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 	ev.sender = *znode;
 	ev.buf_len = buf_len;
 	ev.callbacked = 0;
-	ev.blocked = blocked;
 	if (buf)
 		memcpy(ev.buf, buf, buf_len);
 	zk_queue_push(zh, &ev);
@@ -511,7 +516,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
 	ev->sender = *znode;
 	ev->buf_len = 0;
 	ev->callbacked = 0;
-	ev->blocked = 0;
 
 	nr_levents = uatomic_add_return(&nr_zk_levents, 1);
 	dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
@@ -644,9 +648,8 @@ static int zk_join(struct sd_node *myself,
 
 	dprintf("clientid:%ld\n", cid->client_id);
 
-	rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
-
-	return rc;
+	return add_event(zhandle, EVENT_JOIN_REQUEST, &this_node,
+			 opaque, opaque_len);
 }
 
 static int zk_leave(void)
@@ -659,12 +662,12 @@ static int zk_leave(void)
 
 static int zk_notify(void *msg, size_t msg_len)
 {
-	return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
+	return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len);
 }
 
 static void zk_block(void)
 {
-	add_event(zhandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
+	add_event(zhandle, EVENT_BLOCK, &this_node, NULL, 0);
 }
 
 static void zk_unblock(void *msg, size_t msg_len)
@@ -676,7 +679,7 @@ static void zk_unblock(void *msg, size_t msg_len)
 	rc = zk_queue_pop(zhandle, &ev);
 	assert(rc == 0);
 
-	ev.blocked = 0;
+	ev.type = EVENT_NOTIFY;
 	ev.buf_len = msg_len;
 	if (msg)
 		memcpy(ev.buf, msg, msg_len);
@@ -692,7 +695,7 @@ static void zk_unblock(void *msg, size_t msg_len)
 
 static void zk_handler(int listen_fd, int events, void *data)
 {
-	int ret, rc, retry;
+	int ret, rc;
 	char path[256];
 	eventfd_t value;
 	struct zk_event ev;
@@ -719,37 +722,46 @@ static void zk_handler(int listen_fd, int events, void *data)
 		goto out;
 
 	switch (ev.type) {
-	case EVENT_JOIN:
-		dprintf("JOIN EVENT, blocked:%d\n", ev.blocked);
-		if (ev.blocked) {
-			dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n",
-					nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
-			if (is_master(zhandle, &this_node)) {
-				res = sd_check_join_cb(&ev.sender.node, ev.buf);
-				ev.join_result = res;
-				ev.blocked = 0;
-				ev.sender.joined = 1;
-
-				dprintf("I'm master, push back join event\n");
-				zk_queue_push_back(zhandle, &ev);
-
-				if (res == CJ_RES_MASTER_TRANSFER) {
-					eprintf("failed to join sheepdog cluster: "
-						"please retry when master is up\n");
-					zk_leave();
-					exit(1);
-				}
-			} else
-				zk_queue_push_back(zhandle, NULL);
+	case EVENT_JOIN_REQUEST:
+		dprintf("JOIN REQUEST nr_nodes: %ld, sender: %s, joined: %d\n",
+			nr_zk_nodes, node_to_str(&ev.sender.node),
+			ev.sender.joined);
+	
+		if (!is_master(zhandle, &this_node)) {
+			zk_queue_push_back(zhandle, NULL);
+			break;
+		}
 
-			goto out;
-		} else if (is_master(zhandle, &this_node)
-			&& !node_eq(&ev.sender.node, &this_node.node)) {
-			/* wait util member have been created */
-			sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
-			retry = MEMBER_CREATE_TIMEOUT/MEMBER_CREATE_INTERVAL;
-			while (retry && zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
-				usleep(MEMBER_CREATE_INTERVAL*1000);
+		res = sd_check_join_cb(&ev.sender.node, ev.buf);
+		ev.join_result = res;
+		ev.type = EVENT_JOIN_RESPONSE;
+		ev.sender.joined = 1;
+
+		dprintf("I'm master, push back join event\n");
+		zk_queue_push_back(zhandle, &ev);
+
+		if (res == CJ_RES_MASTER_TRANSFER) {
+			eprintf("failed to join sheepdog cluster: "
+				"please retry when master is up\n");
+			zk_leave();
+			exit(1);
+		}
+		break;
+	case EVENT_JOIN_RESPONSE:
+		dprintf("JOIN RESPONSE\n");
+
+		if (is_master(zhandle, &this_node) &&
+		    !node_eq(&ev.sender.node, &this_node.node)) {
+			/* wait util the member node has been created */
+			int retry =
+				MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
+
+			sprintf(path, MEMBER_ZNODE "/%s",
+				node_to_str(&ev.sender.node));
+
+			while (retry &&
+			       zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
+				usleep(MEMBER_CREATE_INTERVAL * 1000);
 				retry--;
 			}
 			if (retry <= 0) {
@@ -793,7 +805,7 @@ static void zk_handler(int listen_fd, int events, void *data)
 				    ev.join_result, ev.buf);
 		break;
 	case EVENT_LEAVE:
-		dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
+		dprintf("LEAVE EVENT\n");
 		n = node_btree_find(&zk_node_btroot, &ev.sender);
 		if (!n) {
 			dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
@@ -806,21 +818,21 @@ static void zk_handler(int listen_fd, int events, void *data)
 		build_node_list(zk_node_btroot);
 		sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
 		break;
-	case EVENT_NOTIFY:
-		dprintf("NOTIFY, blocked:%d\n", ev.blocked);
-		if (ev.blocked) {
-			if (node_eq(&ev.sender.node, &this_node.node)
-					&& !ev.callbacked) {
-				uatomic_inc(&zk_notify_blocked);
-				ev.callbacked = 1;
-				zk_queue_push_back(zhandle, &ev);
-				sd_block_handler();
-			} else
-				zk_queue_push_back(zhandle, NULL);
-
-			goto out;
+	case EVENT_BLOCK:
+		dprintf("BLOCK\n");
+		if (node_eq(&ev.sender.node, &this_node.node)
+				&& !ev.callbacked) {
+			uatomic_inc(&zk_notify_blocked);
+			ev.callbacked = 1;
+			zk_queue_push_back(zhandle, &ev);
+			sd_block_handler();
+		} else {
+			zk_queue_push_back(zhandle, NULL);
 		}
 
+		break;
+	case EVENT_NOTIFY:
+		dprintf("NOTIFY\n");
 		sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
 		break;
 	}



More information about the sheepdog mailing list