[Sheepdog] [zookeeper][PATCH 4/6] retry again when zoo_* api return ZCONNECTIONLOSS/ZOPERATIONTIMEOUT error

Yunkai Zhang yunkai.me at gmail.com
Thu Apr 5 12:23:04 CEST 2012


Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 sheep/cluster/zookeeper.c |  225 ++++++++++++++++++++++++++++++++-------------
 1 files changed, 160 insertions(+), 65 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 4ee3c20..16b50e3 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -22,15 +22,35 @@
 #include "work.h"
 
 #define MAX_EVENT_BUF_SIZE (64 * 1024)
+#define SESSION_TIMEOUT 30000
 
 #define BASE_ZNODE "/sheepdog"
 #define LOCK_ZNODE BASE_ZNODE "/lock"
 #define QUEUE_ZNODE BASE_ZNODE "/queue"
 #define MEMBER_ZNODE BASE_ZNODE "/member"
 
+
+/* zookeeper API wrapper prototypes */
+ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
+        int valuelen, const struct ACL_vector *acl, int flags,
+        char *path_buffer, int path_buffer_len);
+
+ZOOAPI int zk_delete(zhandle_t *zh, const char *path, int version);
+
+ZOOAPI int zk_get(zhandle_t *zh, const char *path, int watch, char *buffer,
+                   int* buffer_len, struct Stat *stat);
+
+ZOOAPI int zk_set(zhandle_t *zh, const char *path, const char *buffer,
+                   int buflen, int version);
+
+ZOOAPI int zk_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat);
+
+ZOOAPI int zk_get_children(zhandle_t *zh, const char *path, int watch,
+                            struct String_vector *strings);
+
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(zh, parent, path, strs)			       \
-	for (zoo_get_children(zh, parent, 1, strs),		       \
+	for (zk_get_children(zh, parent, 1, strs),		       \
 		     (strs)->data += (strs)->count;		       \
 	     (strs)->count-- ?					       \
 		     sprintf(path, "%s/%s", parent, *--(strs)->data) : \
@@ -78,19 +98,89 @@ static char *zk_option;
 /* protect queue_start_pos */
 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
 
+/* protect leave event list */
+static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER;
+
+/* zookeeper API wrapper */
+inline ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
+        int valuelen, const struct ACL_vector *acl, int flags,
+        char *path_buffer, int path_buffer_len)
+{
+	int rc;
+	do {
+		rc = zoo_create(zh, path, value, valuelen, acl,
+				flags, path_buffer, path_buffer_len);
+		dprintf("rc:%d\n", rc);
+	}while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+	return rc;
+}
+
+inline ZOOAPI int zk_delete(zhandle_t *zh, const char *path, int version)
+{
+	int rc;
+	do {
+		rc = zoo_delete(zh, path, version);
+		dprintf("rc:%d\n", rc);
+	}while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+	return rc;
+}
+
+inline ZOOAPI int zk_get(zhandle_t *zh, const char *path, int watch, char *buffer,
+                   int* buffer_len, struct Stat *stat)
+{
+	int rc;
+	do {
+		rc = zoo_get(zh, path, watch, buffer, buffer_len, stat);
+		dprintf("rc:%d\n", rc);
+	}while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+	return rc;
+}
+
+inline ZOOAPI int zk_set(zhandle_t *zh, const char *path, const char *buffer,
+                   int buflen, int version)
+{
+	int rc;
+	do {
+		rc = zoo_set(zh, path, buffer, buflen, version);
+		dprintf("rc:%d\n", rc);
+	}while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+	return rc;
+}
+
+inline ZOOAPI int zk_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
+{
+	int rc;
+	do {
+		rc = zoo_exists(zh, path, watch, stat);
+		dprintf("rc:%d\n", rc);
+	}while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+	return rc;
+}
+
+inline ZOOAPI int zk_get_children(zhandle_t *zh, const char *path, int watch,
+                            struct String_vector *strings)
+{
+	int rc;
+	do {
+		rc = zoo_get_children(zh, path, watch, strings);
+		dprintf("rc:%d\n", rc);
+	}while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+	return rc;
+}
+
 /* ZooKeeper-based lock */
 
 static void zk_lock(zhandle_t *zh)
 {
 	int rc;
 again:
-	rc = zoo_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+	rc = zk_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
 			ZOO_EPHEMERAL, NULL, 0);
 	if (rc == ZOK){
 		dprintf("locked\n");
 		return;
 	}
-	else if (rc == ZNODEEXISTS || rc == ZOPERATIONTIMEOUT) {
+	else if (rc == ZNODEEXISTS) {
 		dprintf("retry, rc:%d\n", rc);
 		usleep(10000); /* FIXME: use watch notification */
 		goto again;
@@ -103,7 +193,8 @@ static void zk_unlock(zhandle_t *zh)
 {
 	int rc;
 
-	rc = zoo_delete(zh, LOCK_ZNODE, -1);
+	rc = zk_delete(zh, LOCK_ZNODE, -1);
+
 	if (rc != ZOK)
 		panic("failed to release lock\n");
 
@@ -122,7 +213,7 @@ static int zk_queue_empty(zhandle_t *zh)
 
 	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
 
-	rc = zoo_exists(zh, path, 1, NULL);
+	rc = zk_exists(zh, path, 1, NULL);
 	if (rc == ZOK)
 		return 0;
 
@@ -137,14 +228,11 @@ static int zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 
 	len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
 	sprintf(path, "%s/", QUEUE_ZNODE);
-	do{
-		dprintf("zoo_create ...\n");
-		rc = zoo_create(zh, path, (char *)ev, len,
-			&ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
-		dprintf("create path:%s, nr_nodes:%ld, queue_pos:%d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc);
-	}while (rc == ZOPERATIONTIMEOUT);
+	rc = zk_create(zh, path, (char *)ev, len,
+		&ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
+	dprintf("create path:%s, nr_nodes:%ld, queue_pos:%d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc);
 	if (rc != ZOK)
-		panic("failed to zoo_create path:%s, rc:%d\n", path, rc);
+		panic("failed to zk_create path:%s, rc:%d\n", path, rc);
 
 	sscanf(buf, QUEUE_ZNODE "/%010d", &seq);
 	dprintf("path:%s, seq:%d\n", buf, seq);
@@ -177,7 +265,7 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
 		/* update the last popped data */
 		len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
 		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-		rc = zoo_set(zh, path, (char *)ev, len, -1);
+		rc = zk_set(zh, path, (char *)ev, len, -1);
 		dprintf("update path:%s, queue_pos:%d, len:%d, rc:%d\n", path, queue_pos, len, rc);
 		if (rc != ZOK)
 			panic("failed to zk_set path:%s, rc:%d\n", path, rc);
@@ -197,8 +285,12 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 	/* process leave event */
 	if (!list_empty(&zk_levent_list)) {
 		dprintf("found a leave event.\n");
+
+		pthread_mutex_lock(&leave_lock);
 		lev = list_first_entry(&zk_levent_list, typeof(*lev), list);
 		list_del(&lev->list);
+		pthread_mutex_unlock(&leave_lock);
+
 		memcpy(ev, lev, sizeof(*ev));
 		free(lev);
 		return 0;
@@ -209,10 +301,8 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 
 	len = sizeof(*ev);
 	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-	do {
-		rc = zoo_get(zh, path, 1, (char *)ev, &len, NULL);
-		dprintf("read path:%s, nr_nodes:%ld, type:%d, len:%d, rc:%d\n", path, nr_zk_nodes, ev->type, len, rc);
-	}while(rc == ZOPERATIONTIMEOUT);
+	rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
+	dprintf("read path:%s, nr_nodes:%ld, type:%d, len:%d, rc:%d\n", path, nr_zk_nodes, ev->type, len, rc);
 	if (rc != ZOK)
 		panic("failed to zk_set path:%s, rc:%d\n", path, rc);
 
@@ -226,7 +316,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 
 	/* watch next data */
 	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-	rc = zoo_exists(zh, path, 1, NULL);
+	rc = zk_exists(zh, path, 1, NULL);
 	dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
 	if (rc == ZOK) {
 		/* we lost this message, manual notify */
@@ -262,9 +352,9 @@ static int is_zk_queue_valid(zhandle_t *zh)
 	int rc;
 	struct String_vector strs;
 
-	rc = zoo_get_children(zh, MEMBER_ZNODE, 1, &strs);
+	rc = zk_get_children(zh, MEMBER_ZNODE, 1, &strs);
 	if (rc != ZOK)
-		panic("failed to zoo_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc);
+		panic("failed to zk_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc);
 
 	return strs.count;
 }
@@ -310,7 +400,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
 			i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node));
 	}
 	memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes));
-	
+
 	for (i=0; i<nr_nodes; i++) {
 		dprintf("zk_nodes[%d], seq:%d, value:%s\n",
 			i, znodes[i].seq, node_to_str(&zk_nodes[i].node));
@@ -341,9 +431,9 @@ static struct zk_node* find_node(struct zk_node *znodes, int nr_nodes, struct zk
 
 static void zk_queue_init(zhandle_t *zh)
 {
-	zoo_create(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
-	zoo_create(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
-	zoo_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
+	zk_create(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
+	zk_create(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
+	zk_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
 }
 
 static void zk_data_init(zhandle_t *zh)
@@ -363,15 +453,13 @@ static void zk_data_init(zhandle_t *zh)
 
 	if (is_zk_queue_valid(zh)) {
 		FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) {
-			do {
-				len = sizeof(znode);
-				rc = zoo_get(zh, path, 1, (char *)&znode, &len, NULL);
-				if (rc == ZOK && znode.joined == 0) {
-					dprintf("wait until znode:%s become joined\n", path);
-					usleep(10000);
-					continue;
-				}
-			}while(rc == ZOPERATIONTIMEOUT);
+			len = sizeof(znode);
+			rc = zk_get(zh, path, 1, (char *)&znode, &len, NULL);
+			if (rc == ZOK && znode.joined == 0) {
+				dprintf("wait until znode:%s become joined\n", path);
+				usleep(10000);
+				continue;
+			}
 
 			switch(rc) {
 			case ZOK:
@@ -380,16 +468,9 @@ static void zk_data_init(zhandle_t *zh)
 			case ZNONODE:
 				break;
 			default:
-				panic("failed to zoo_get path:%s, rc:%d\n", path, rc);
+				panic("failed to zk_get path:%s, rc:%d\n", path, rc);
 			}
 		}
-	}else {
-		dprintf("clean zookeeper store\n");
-		FOR_EACH_ZNODE(zh, QUEUE_ZNODE, path, &strs) {
-			rc = zoo_delete(zh, path, -1);
-			if (rc != ZOK)
-				panic("failed to zk_delete path:%s, rc:%d\n", path, rc);
-		}
 	}
 
 	sort_zk_nodes(zk_nodes, nr_zk_nodes);
@@ -458,7 +539,10 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 		}
 
 		memcpy(lev, &ev, sizeof(ev));
+
+		pthread_mutex_lock(&leave_lock);
 		list_add_tail(&lev->list, &zk_levent_list);
+		pthread_mutex_unlock(&leave_lock);
 
 		/* manual notify */
 		dprintf("write event to efd:%d\n", efd);
@@ -480,11 +564,18 @@ out:
 static void watcher(zhandle_t *zh, int type, int state, const char *path, void* ctx)
 {
 	eventfd_t value = 1;
+	const clientid_t *cid;
 	char str[256], *p;
 	int ret, i;
 
 	dprintf("path:%s, type:%d\n", path, type);
 
+	if (type == -1) {
+		cid = zoo_client_id(zh);
+		assert(cid != NULL);
+		dprintf("session change, clientid:%ld\n", cid->client_id);
+	}
+
 	/* discard useless event */
 	if (type < 0 || type == ZOO_CHILD_EVENT)
 		return;
@@ -500,11 +591,8 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void*
 		/* check the failed node */
 		for (i=0; i<nr_zk_nodes; i++) {
 			if (strcmp(p, node_to_str(&zk_nodes[i].node)) == 0) {
-				/* protect zk_levent_list */
-				pthread_mutex_lock(&queue_lock);
 				dprintf("zk_nodes[%d] leave:%s\n", i, node_to_str(&zk_nodes[i].node));
 				add_event(zh, EVENT_LEAVE, &zk_nodes[i], NULL, 0, NULL);
-				pthread_mutex_unlock(&queue_lock);
 				return;
 			}
 		}
@@ -581,11 +669,13 @@ static int zk_init(struct cdrv_handlers *handlers, const char *option,
 		free(zk_option);
 	zk_option = strdup(option);
 
-	zhandle = zookeeper_init(option, watcher, 2000, 0, NULL, 0);
+	zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, 0, NULL, 0);
 	if (!zhandle) {
 		eprintf("failed to connect to zk server %s\n", option);
 		return -1;
 	}
+	dprintf("request session timeout:%dms, negotiated session timeout:%dms\n",
+			SESSION_TIMEOUT, zoo_recv_timeout(zhandle));
 
 	if (get_addr(myaddr) < 0)
 		return -1;
@@ -613,37 +703,44 @@ static int zk_join(struct sd_node *myself,
 	char path[256];
 	struct zk_node *znode;
 	zhandle_t *zh = NULL;
+	const clientid_t *cid;
 
 	zk_lock(zhandle);
 
 	zk_data_init(zhandle);
-	
+
 	this_node.node = *myself;
-	this_node.seq = zk_queue_seq(zhandle);
-	this_node.joined = 0;
 	zk_check_join_cb = check_join_cb;
 
-	dprintf("this_seq:%d\n", this_node.seq);
-
 	/* try to recover previous session */
 	znode = find_node(zk_nodes, nr_zk_nodes, &this_node);
 	if (znode) {
-		zh = zookeeper_init(zk_option, watcher, 2000, &znode->clientid, NULL, 0);
+		zh = zookeeper_init(zk_option, watcher, SESSION_TIMEOUT, &znode->clientid, NULL, 0);
 		if (zh) {
-			dprintf("recover previous session successfully! clientid:%ld\n", znode->clientid.client_id);
+			dprintf("recover previous session successfully! this_seq:%d, clientid:%ld\n", znode->seq, znode->clientid.client_id);
+			this_node = *znode;
 			zk_unlock(zhandle);
-			zookeeper_close(zhandle);
+			do {
+				rc = zookeeper_close(zhandle);
+			}while(rc == ZCONNECTIONLOSS || rc == ZOPERATIONTIMEOUT);
 			zhandle = zh;
 			return 0;
 		}
 	}
 
+	this_node.seq = zk_queue_seq(zhandle);
+	this_node.joined = 0;
+
+	cid = zoo_client_id(zhandle);
+	assert(cid != NULL);
+	this_node.clientid = *cid;
+
+	dprintf("this_seq:%d, clientid:%ld\n", this_node.seq, cid->client_id);
+
 	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
-	do {
-		dprintf("try to create member path:%s\n", path);
-		rc = zoo_create(zhandle, path, (char *)&this_node, sizeof(this_node),
-			&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
-	}while(rc == ZOPERATIONTIMEOUT || rc == ZNODEEXISTS);
+	dprintf("try to create member path:%s\n", path);
+	rc = zk_create(zhandle, path, (char *)&this_node, sizeof(this_node),
+		&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
 	if (rc != ZOK)
 		panic("failed to create an ephemeral znode, rc:%d\n", rc);
 
@@ -733,17 +830,15 @@ static int zk_dispatch(void)
 
 				len = sizeof(znode);
 				sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
-				do {
-					rc = zoo_get(zhandle, path, 0, (char *)&znode, &len, NULL);
-				}while(rc == ZOPERATIONTIMEOUT);
+				rc = zk_get(zhandle, path, 0, (char *)&znode, &len, NULL);
 				if (rc != ZOK)
-					panic("failed to zoo_get path:%s, rc:%d\n", path, rc);
+					panic("failed to zk_get path:%s, rc:%d\n", path, rc);
 
 				/* update joined state in zookeeper MEMBER_ZNODE list*/
 				znode.joined = 1;
-				rc = zoo_set(zhandle, path, (char *)&znode, sizeof(znode), -1);
+				rc = zk_set(zhandle, path, (char *)&znode, sizeof(znode), -1);
 				if (rc != ZOK)
-					panic("failed to zoo_set path:%s, rc:%d\n", path, rc);
+					panic("failed to zk_set path:%s, rc:%d\n", path, rc);
 
 				dprintf("I'm master, push back join event\n");
 				zk_queue_push_back(zhandle, &ev);
@@ -776,7 +871,7 @@ static int zk_dispatch(void)
 				nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
 
 		sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
-		rc = zoo_exists(zhandle, path, 1, NULL);
+		rc = zk_exists(zhandle, path, 1, NULL);
 		dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
 
 		build_node_list(zk_nodes, nr_zk_nodes, entries);
-- 
1.7.7.6




More information about the sheepdog mailing list