[Sheepdog] [zookeeper][PATCH v2 11/11] Remove zk_lock from zk_join

Yunkai Zhang yunkai.me at gmail.com
Thu Apr 26 17:21:30 CEST 2012


From: Yunkai Zhang <qiushu.zyk at taobao.com>

Use ip address and port number to sort member list instead of sequence
number from /sheepdog/queue.

Benefit from these changes, the order of member list no longer depends
on the order of joining, then need not to take following steps into
one transaction:
- get sequence number from /sheepdog/queue
- create znode in /sheepdog/member/
- send join message to cluster

As a result, we can remove lock from zk_join, and move the creation of
znode in /sheepdog/member/ into zk_dispatch.

I use binary tree to store member list so that it can make the sort more
quickly. Methods with node_btree_xxx prefix are used to operate this
binary tree.

==*Note*==:
When sheep startups, it fetchs member list by reading /sheepdog/member/*,
if the result is empty, sheep will consider itself as *master*.

Now we have removed lock from zk_join,if we start multiple sheeps
simultaneously, one problem arises: there may exist more than one *master*,
this is bad.

To prevent this problem, we can start multiple sheeps like this:
- start the fist sheep alone, and sleep 2 seconds:
  $ sheep -d /store/0 -z 0 -p 7000 -c zookeeper:localhost:2181
  $ sleep 2

- start other sheeps simultaneously(need not to sleep between them):
  $ for i in {1..100}; do sheep -d /store/$i -z $i -p $((7000 + $i)) \
   -c zookeeper:localhost:2181

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 include/net.h             |    1 +
 lib/net.c                 |   14 ++
 sheep/cluster.h           |   34 ++++-
 sheep/cluster/zookeeper.c |  378 +++++++++++++++++++--------------------------
 4 files changed, 209 insertions(+), 218 deletions(-)

diff --git a/include/net.h b/include/net.h
index 2d087e2..f657e20 100644
--- a/include/net.h
+++ b/include/net.h
@@ -45,6 +45,7 @@ int exec_req(int sockfd, struct sd_req *hdr, void *data,
 int create_listen_ports(int port, int (*callback)(int fd, void *), void *data);
 
 char *addr_to_str(char *str, int size, uint8_t *addr, uint16_t port);
+uint8_t *str_to_addr(int af, const char *ipstr, uint8_t *addr);
 int set_nonblocking(int fd);
 int set_nodelay(int fd);
 int set_timeout(int fd);
diff --git a/lib/net.c b/lib/net.c
index 8ac7c9e..91524a5 100644
--- a/lib/net.c
+++ b/lib/net.c
@@ -377,6 +377,20 @@ char *addr_to_str(char *str, int size, uint8_t *addr, uint16_t port)
 	return str;
 }
 
+uint8_t *str_to_addr(int af, const char *ipstr, uint8_t *addr)
+{
+	int addr_start_idx = 0;
+
+	if (af == AF_INET)
+		addr_start_idx = 12;
+
+	memset(addr, 0, addr_start_idx);
+	if (!inet_pton(af, ipstr, addr + addr_start_idx))
+		return NULL;
+
+	return addr;
+}
+
 int set_nonblocking(int fd)
 {
 	int ret;
diff --git a/sheep/cluster.h b/sheep/cluster.h
index b230b3a..a7e5aae 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -15,6 +15,7 @@
 #include <stdlib.h>
 #include <stdint.h>
 #include <inttypes.h>
+#include <arpa/inet.h>
 #include <memory.h>
 
 #include "sheepdog_proto.h"
@@ -145,13 +146,42 @@ static inline char *node_to_str(struct sd_node *id)
 {
 	static char str[256];
 	char name[256];
+	int af = AF_INET6;
+	uint8_t *addr = id->addr;
+
+	/* Find address family type */
+	if (addr[12]) {
+		int  oct_no = 0;
+		while (!addr[oct_no] && oct_no++ < 12 );
+		if (oct_no == 12) {
+			af = AF_INET;
+		}
+	}
 
-	snprintf(str, sizeof(str), "ip: %s, port: %d",
-		 addr_to_str(name, sizeof(name), id->addr, 0), id->port);
+	snprintf(str, sizeof(str), "%s ip:%s port:%d",
+		(af == AF_INET)?"IPv4":"IPv6",
+		addr_to_str(name, sizeof(name), id->addr, 0), id->port);
 
 	return str;
 }
 
+static inline struct sd_node *str_to_node(const char *str, struct sd_node *id)
+{
+	int port, af = AF_INET6;
+	char v[8], ip[256];
+
+	sscanf(str, "%s ip:%s port:%d", v, ip, &port);
+	id->port = port;
+
+	if (strcmp(v, "IPv4") == 0)
+		af = AF_INET;
+
+	if(!str_to_addr(af, ip, id->addr))
+		return NULL;
+
+	return id;
+}
+
 /* callbacks back into sheepdog from the cluster drivers */
 void sd_join_handler(struct sd_node *joined, struct sd_node *members,
 		size_t nr_members, enum cluster_join_result result,
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 8999131..5b773d5 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -21,10 +21,11 @@
 #include "work.h"
 
 #define MAX_EVENT_BUF_SIZE (64 * 1024)
-#define SESSION_TIMEOUT 30000
+#define SESSION_TIMEOUT 30000		/* millisecond */
+#define MEMBER_CREATE_TIMEOUT SESSION_TIMEOUT
+#define MEMBER_CREATE_INTERVAL 10	/* millisecond */
 
 #define BASE_ZNODE "/sheepdog"
-#define LOCK_ZNODE BASE_ZNODE "/lock"
 #define QUEUE_ZNODE BASE_ZNODE "/queue"
 #define MEMBER_ZNODE BASE_ZNODE "/member"
 
@@ -60,11 +61,9 @@ enum zk_event_type {
 	EVENT_JOIN = 1,
 	EVENT_LEAVE,
 	EVENT_NOTIFY,
-	EVENT_IGNORE,
 };
 
 struct zk_node {
-	int32_t seq;
 	int joined;
 	clientid_t clientid;
 	struct sd_node node;
@@ -93,7 +92,10 @@ static int nr_zk_levents;
 static unsigned zk_levent_head;
 static unsigned zk_levent_tail;
 
-static struct zk_node zk_nodes[SD_MAX_NODES];
+static void *zk_node_btroot;
+static struct zk_node *zk_master;
+static struct sd_node sd_nodes[SD_MAX_NODES];
+static size_t nr_sd_nodes;
 static size_t nr_zk_nodes;
 
 /* zookeeper API wrapper */
@@ -169,39 +171,6 @@ inline ZOOAPI int zk_get_children(zhandle_t *zh, const char *path, int watch,
 	return rc;
 }
 
-/* ZooKeeper-based lock */
-
-static void zk_lock(zhandle_t *zh)
-{
-	int rc;
-again:
-	rc = zk_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
-			ZOO_EPHEMERAL, NULL, 0);
-	if (rc == ZOK){
-		dprintf("locked\n");
-		return;
-	}
-	else if (rc == ZNODEEXISTS) {
-		dprintf("retry, rc:%d\n", rc);
-		usleep(10000); /* FIXME: use watch notification */
-		goto again;
-	} else {
-		panic("failed to create a lock, rc:%d\n", rc);
-	}
-}
-
-static void zk_unlock(zhandle_t *zh)
-{
-	int rc;
-
-	rc = zk_delete(zh, LOCK_ZNODE, -1);
-
-	if (rc != ZOK)
-		panic("failed to release lock\n");
-
-	dprintf("unlocked\n");
-}
-
 /* ZooKeeper-based queue */
 
 static int efd;
@@ -241,9 +210,6 @@ static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
 	dprintf("path:%s, seq:%010d\n", buf, seq);
 
 	if (first_push) {
-
-		/* the first pushed data should be EVENT_IGNORE */
-		assert(ev->type == EVENT_IGNORE);
 		queue_pos = seq;
 
 		/* manual notify */
@@ -289,7 +255,8 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 	eventfd_t value = 1;
 
 	/* process leave event */
-	if (__sync_add_and_fetch(&nr_zk_levents, 0)) {
+	if (!__sync_add_and_fetch(&zk_notify_blocked, 0)
+		&& __sync_add_and_fetch(&nr_zk_levents, 0)) {
 		nr_levents = __sync_sub_and_fetch(&nr_zk_levents, 1) + 1;
 		dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head);
 
@@ -357,29 +324,10 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 	}
 
 out:
-	/* ignore LEAVE event */
-	if (ev->type == EVENT_LEAVE){
-		return -1;
-	}
-
 	return 0;
 }
 
-static int zk_queue_seq(zhandle_t *zh)
-{
-	int32_t seq;
-	struct zk_event ev;
-
-	memset(&ev, 0, sizeof(ev));
-	ev.type = EVENT_IGNORE;
-
-	dprintf("enter ...\n");
-	seq = zk_queue_push(zh, &ev);
-
-	return seq;
-}
-
-static int is_zk_queue_valid(zhandle_t *zh)
+static int zk_member_empty(zhandle_t *zh)
 {
 	int rc;
 	struct String_vector strs;
@@ -388,72 +336,120 @@ static int is_zk_queue_valid(zhandle_t *zh)
 	if (rc != ZOK)
 		panic("failed to zk_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc);
 
-	return strs.count;
+	return (strs.count == 0);
 }
 
-static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
+static inline int zk_node_cmp(const void *a, const void *b)
 {
-	int i, j, k;
-	struct idxs {
-		int idx;
-		int32_t seq;
-	} idxs[SD_MAX_NODES], t;
-	struct zk_node N[SD_MAX_NODES];
+	const struct zk_node *znode1 = a;
+	const struct zk_node *znode2 = b;
+	return node_cmp(&znode1->node, &znode2->node);
+}
 
-	if (nr_nodes <= 1)
-		return;
+static void node_btree_add(void **btroot, struct zk_node *znode)
+{
+	struct zk_node *n, **p;
 
-	for (i=0; i<nr_nodes; i++) {
-		idxs[i].idx = i;
-		idxs[i].seq = znodes[i].seq;
-		dprintf("zk_nodes[%d], seq:%010d, value:%s\n",
-			i, znodes[i].seq, node_to_str(&znodes[i].node));
-	}
+	n = (struct zk_node *)malloc(sizeof(struct zk_node));
+	if (n == NULL)
+		panic("malloc, oom\n");
 
-	/* sort idxs by seq */
-	for (i=nr_nodes-1; i>0; i--) {
-		k = i;
-		for (j=i-1; j>=0; j--) {
-			if (idxs[k].seq < idxs[j].seq) {
-				k = j;
-			}
-		}
+	*n = *znode;
 
-		if (i != k) {
-			t = idxs[i];
-			idxs[i] = idxs[k];
-			idxs[k] = t;
-		}
+	p = (struct zk_node **)tsearch((void *)n, btroot, zk_node_cmp);
+	if (p == NULL)
+		panic("tsearch, oom\n");
+	else if (*p != n) {
+		**p = *n;
+		free(n);
 	}
+	nr_zk_nodes++;
+}
+
+static inline void node_btree_del(void **btroot, struct zk_node *znode)
+{
+	tdelete((void *)znode, btroot, zk_node_cmp);
+	free(znode);
+	nr_zk_nodes--;
+}
+
+static inline void node_btree_clear(void **btroot)
+{
+	tdestroy(*btroot, free);
+	*btroot = NULL;
+}
+
+static struct zk_node *node_btree_find(void **btroot, struct zk_node *znode)
+{
+	struct zk_node **p;
+
+	p = (struct zk_node **)tfind((void *)znode, btroot, zk_node_cmp);
+	if (p)
+		return *p;
+
+	return NULL;
+}
+
+static void node_btree_build_list_fn(const void *nodep,
+		const VISIT which, const int depth)
+{
+	struct zk_node *znode;
 
-	for (i=0; i<nr_nodes; i++) {
-		N[i] = znodes[idxs[i].idx];
-		dprintf("N[%d], seq:%010d, value:%s\n",
-			i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node));
+	switch (which) {
+	case preorder:
+		break;
+	case postorder:
+	case leaf:
+		znode = *(struct zk_node **) nodep;
+		sd_nodes[nr_sd_nodes++] = znode->node;
+		break;
+	case endorder:
+		break;
 	}
-	memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes));
 }
 
-static void build_node_list(struct zk_node *znodes, size_t nr_nodes,
-			    struct sd_node *entries)
+static inline void build_node_list(void *btroot)
 {
-	int i;
+	nr_sd_nodes = 0;
+	twalk(btroot, node_btree_build_list_fn);
+	assert(nr_sd_nodes == nr_zk_nodes);
+	dprintf("nr_sd_nodes:%lu\n", nr_sd_nodes);
+}
 
-	for (i = 0; i < nr_nodes; i++)
-		entries[i] = znodes[i].node;
+static void node_btree_find_master_fn(const void *nodep,
+		const VISIT which, const int depth)
+{
+	switch (which) {
+	case preorder:
+		break;
+	case postorder:
+	case leaf:
+		if (zk_master)
+			break;
+		zk_master = *(struct zk_node **) nodep;
+		dprintf("master:%s\n", node_to_str(&zk_master->node));
+		break;
+	case endorder:
+		break;
+	}
 }
 
-static struct zk_node* find_node(struct zk_node *znodes, int nr_nodes, struct zk_node *znode)
+static int is_master(zhandle_t *zh, struct zk_node *znode)
 {
-	int i;
+	zk_master = NULL;
 
-	for (i=0; i<nr_nodes; i++) {
-		if (node_cmp(&znode->node, &znodes[i].node) == 0) {
-			return &znodes[i];
-		}
+	if (!zk_node_btroot){
+		if (zk_member_empty(zh))
+			return 1;
+		else
+			return 0;
 	}
 
-	return NULL;
+	twalk(zk_node_btroot, node_btree_find_master_fn);
+	if (node_cmp(&zk_master->node, &znode->node) == 0)
+		return 1;
+
+	return 0;
 }
 
 static void zk_queue_init(zhandle_t *zh)
@@ -463,7 +459,7 @@ static void zk_queue_init(zhandle_t *zh)
 	zk_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
 }
 
-static void zk_data_init(zhandle_t *zh)
+static void zk_member_init(zhandle_t *zh)
 {
 	static int finished;
 	int rc, len;
@@ -476,22 +472,16 @@ static void zk_data_init(zhandle_t *zh)
 
 	finished = 1;
 
-	queue_pos = -1;
-
-	if (is_zk_queue_valid(zh)) {
+	if (!zk_member_empty(zh)) {
 		FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) {
 			len = sizeof(znode);
 			rc = zk_get(zh, path, 1, (char *)&znode, &len, NULL);
-			if (rc == ZOK && znode.joined == 0) {
-				dprintf("wait until znode:%s become joined\n", path);
-				usleep(10000);
+			if (rc != ZOK)
 				continue;
-			}
 
 			switch(rc) {
 			case ZOK:
-				zk_nodes[nr_zk_nodes] = znode;
-				nr_zk_nodes++;
+				node_btree_add(&zk_node_btroot, &znode);
 			case ZNONODE:
 				break;
 			default:
@@ -499,9 +489,6 @@ static void zk_data_init(zhandle_t *zh)
 			}
 		}
 	}
-
-	sort_zk_nodes(zk_nodes, nr_zk_nodes);
-
 	dprintf("nr_nodes:%ld\n", nr_zk_nodes);
 }
 
@@ -514,28 +501,6 @@ static struct work_queue *zk_block_wq;
 
 static struct zk_node this_node;
 
-static int is_master(struct zk_node *znode)
-{
-	int i;
-	struct zk_node *n = znode;
-
-	if (!n)
-		return -1;
-
-	if (nr_zk_nodes == 0)
-		return 0;
-
-	for (i = 0; i < SD_MAX_NODES; i++) {
-		if (zk_nodes[i].joined)
-			break;
-	}
-
-	if (node_cmp(&zk_nodes[i].node, &n->node) == 0)
-		return i;
-
-	return -1;
-}
-
 static int add_event(zhandle_t *zh, enum zk_event_type type,
 		     struct zk_node *znode, void *buf,
 		     size_t buf_len, void (*block_cb)(void *arg))
@@ -574,8 +539,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 		ev.blocked = !!block_cb;
 		ev.block_cb = block_cb;
 		break;
-	case EVENT_IGNORE:
-		break;
 	}
 
 	zk_queue_push(zh, &ev);
@@ -588,7 +551,8 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void*
 	eventfd_t value = 1;
 	const clientid_t *cid;
 	char str[256], *p;
-	int ret, rc, i;
+	int ret, rc;
+	struct zk_node znode;
 
 	dprintf("path:%s, type:%d\n", path, type);
 
@@ -602,7 +566,7 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void*
 	if (type < 0 || type == ZOO_CHILD_EVENT)
 		return;
 
-	if (type == ZOO_CHANGED_EVENT) {
+	if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
 		ret = sscanf(path, MEMBER_ZNODE "/%s", str);
 		if (ret == 1) {
 			rc = zk_exists(zh, path, 1, NULL);
@@ -618,14 +582,11 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void*
 		p = strrchr(path, '/');
 		p++;
 
-		/* check the failed node */
-		for (i=0; i<nr_zk_nodes; i++) {
-			if (strcmp(p, node_to_str(&zk_nodes[i].node)) == 0) {
-				dprintf("zk_nodes[%d] leave:%s\n", i, node_to_str(&zk_nodes[i].node));
-				add_event(zh, EVENT_LEAVE, &zk_nodes[i], NULL, 0, NULL);
-				return;
-			}
-		}
+		str_to_node(p, &znode.node);
+		dprintf("zk_nodes leave:%s\n", node_to_str(&znode.node));
+
+		add_event(zh, EVENT_LEAVE, &znode, NULL, 0, NULL);
+		return;
 	}
 
 	dprintf("write event to efd:%d\n", efd);
@@ -726,39 +687,24 @@ static int zk_join(struct sd_node *myself,
 {
 	int rc;
 	char path[256];
-	struct zk_node *znode;
 	const clientid_t *cid;
 
-	zk_lock(zhandle);
-
-	zk_data_init(zhandle);
-
 	this_node.node = *myself;
 
-	znode = find_node(zk_nodes, nr_zk_nodes, &this_node);
-	if (znode)
+	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
+	rc = zk_exists(zhandle, path, 1, NULL);
+	if (rc == ZOK)
 		panic("previous zookeeper session exist, shutdown\n");
 
-	this_node.seq = zk_queue_seq(zhandle);
 	this_node.joined = 0;
-
 	cid = zoo_client_id(zhandle);
 	assert(cid != NULL);
 	this_node.clientid = *cid;
 
-	dprintf("this_seq:%010d, clientid:%ld\n", this_node.seq, cid->client_id);
-
-	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
-	dprintf("try to create member path:%s\n", path);
-	rc = zk_create(zhandle, path, (char *)&this_node, sizeof(this_node),
-		&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
-	if (rc != ZOK)
-		panic("failed to create an ephemeral znode, rc:%d\n", rc);
+	dprintf("clientid:%ld\n", cid->client_id);
 
 	rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
 
-	zk_unlock(zhandle);
-
 	return rc;
 }
 
@@ -802,12 +748,11 @@ static void zk_block_done(struct work *work)
 
 static int zk_dispatch(void)
 {
-	int ret, rc, len, idx;
+	int ret, rc, retry;
 	char path[256];
 	eventfd_t value;
 	struct zk_event ev;
-	struct zk_node znode, *n;
-	struct sd_node entries[SD_MAX_NODES];
+	struct zk_node *n;
 	enum cluster_join_result res;
 	static struct work work = {
 		.fn = zk_block,
@@ -833,24 +778,12 @@ static int zk_dispatch(void)
 		if (ev.blocked) {
 			dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n",
 					nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
-			if (is_master(&this_node) >= 0) {
+			if (is_master(zhandle, &this_node)) {
 				res = sd_check_join_cb(&ev.sender.node, ev.buf);
 				ev.join_result = res;
 				ev.blocked = 0;
 				ev.sender.joined = 1;
 
-				len = sizeof(znode);
-				sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
-				rc = zk_get(zhandle, path, 0, (char *)&znode, &len, NULL);
-				if (rc != ZOK)
-					panic("failed to zk_get path:%s, rc:%d\n", path, rc);
-
-				/* update joined state in zookeeper MEMBER_ZNODE list*/
-				znode.joined = 1;
-				rc = zk_set(zhandle, path, (char *)&znode, sizeof(znode), -1);
-				if (rc != ZOK)
-					panic("failed to zk_set path:%s, rc:%d\n", path, rc);
-
 				dprintf("I'm master, push back join event\n");
 				zk_queue_push_back(zhandle, &ev);
 
@@ -863,55 +796,70 @@ static int zk_dispatch(void)
 				zk_queue_push_back(zhandle, NULL);
 
 			goto out;
+		} else if (is_master(zhandle, &this_node)
+			&& node_cmp(&ev.sender.node, &this_node.node) != 0) {
+			/* wait util member have been created */
+			sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
+			retry = MEMBER_CREATE_TIMEOUT/MEMBER_CREATE_INTERVAL;
+			while (retry && zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
+				usleep(MEMBER_CREATE_INTERVAL*1000);
+				retry--;
+			}
+			if (retry <= 0) {
+				dprintf("Sender:%s failed to create member, ignore it\n",
+						node_to_str(&ev.sender.node));
+				goto out;
+			}
 		}
 
+		if (node_cmp(&ev.sender.node, &this_node.node) == 0)
+			zk_member_init(zhandle);
+
 		if (ev.join_result == CJ_RES_MASTER_TRANSFER) {
-			/* FIXME: This code is tricky, but Sheepdog assumes that */
-			/* nr_nodes = 1 when join_result = MASTER_TRANSFER... */
-			//ev.nr_nodes = 1;
-			nr_zk_nodes = 1;
-			zk_nodes[0] = this_node;
-			zk_nodes[0].joined  = 1;
+			/* FIXME: This code is tricky, but Sheepdog assumes that
+			 * nr_nodes = 1 when join_result = MASTER_TRANSFER... */
+			node_btree_clear(&zk_node_btroot);
+			node_btree_add(&zk_node_btroot, &this_node);
+
 			zk_queue_push_back(zhandle, &ev);
 			zk_queue_pop(zhandle, &ev);
 		}
 
-		zk_nodes[nr_zk_nodes] = ev.sender;
-		nr_zk_nodes++;
+		node_btree_add(&zk_node_btroot, &ev.sender);
 		dprintf("one sheep joined[down], nr_nodes:%ld, sender:%s, joined:%d\n",
 				nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
 
 		if (ev.join_result == CJ_RES_SUCCESS) {
 			sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
-			rc = zk_exists(zhandle, path, 1, NULL);
-			dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
-			if (rc != ZOK) {
-				dprintf("sender have left:%s\n", node_to_str(&ev.sender.node));
-				add_event(zhandle, EVENT_LEAVE, &ev.sender, NULL, 0, NULL);
+			if (node_cmp(&ev.sender.node, &this_node.node) == 0) {
+				dprintf("create path:%s\n", path);
+				rc = zk_create(zhandle, path, (char *)&ev.sender, sizeof(ev.sender),
+					&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
+				if (rc != ZOK)
+					panic("failed to create an ephemeral znode, rc:%d\n", rc);
+			} else {
+				rc = zk_exists(zhandle, path, 1, NULL);
+				dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
 			}
 		}
 
-		build_node_list(zk_nodes, nr_zk_nodes, entries);
-		sd_join_handler(&ev.sender.node, entries, nr_zk_nodes,
+		build_node_list(zk_node_btroot);
+		sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes,
 				    ev.join_result, ev.buf);
 		break;
 	case EVENT_LEAVE:
 		dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
-		/*reset master if necessary */
-		n = find_node(zk_nodes, nr_zk_nodes, &ev.sender);
+		n = node_btree_find(&zk_node_btroot, &ev.sender);
 		if (!n) {
 			dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
 			goto out;
 		}
 
-		idx = n - zk_nodes;
-		nr_zk_nodes--;
-
-		memmove(n, n + 1, sizeof(*n) * (nr_zk_nodes - idx));
-		dprintf("one sheep left, nr_nodes:%ld, idx:%d\n", nr_zk_nodes, idx);
+		node_btree_del(&zk_node_btroot, n);
+		dprintf("one sheep left, nr_nodes:%ld\n", nr_zk_nodes);
 
-		build_node_list(zk_nodes, nr_zk_nodes, entries);
-		sd_leave_handler(&ev.sender.node, entries, nr_zk_nodes);
+		build_node_list(zk_node_btroot);
+		sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
 		break;
 	case EVENT_NOTIFY:
 		dprintf("NOTIFY, blocked:%d\n", ev.blocked);
@@ -932,8 +880,6 @@ static int zk_dispatch(void)
 
 		sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
 		break;
-	case EVENT_IGNORE:
-		break;
 	}
 out:
 	return 0;
-- 
1.7.7.6




More information about the sheepdog mailing list