[Sheepdog] [zookeeper][PATCH v2 11/11] Remove zk_lock from zk_join
Yunkai Zhang
yunkai.me at gmail.com
Thu Apr 26 17:21:30 CEST 2012
From: Yunkai Zhang <qiushu.zyk at taobao.com>
Use ip address and port number to sort member list instead of sequence
number from /sheepdog/queue.
Benefit from these changes, the order of member list no longer depends
on the order of joining, then need not to take following steps into
one transaction:
- get sequence number from /sheepdog/queue
- create znode in /sheepdog/member/
- send join message to cluster
As a result, we can remove lock from zk_join, and move the creation of
znode in /sheepdog/member/ into zk_dispatch.
I use binary tree to store member list so that it can make the sort more
quickly. Methods with node_btree_xxx prefix are used to operate this
binary tree.
==*Note*==:
When sheep startups, it fetchs member list by reading /sheepdog/member/*,
if the result is empty, sheep will consider itself as *master*.
Now we have removed lock from zk_join,if we start multiple sheeps
simultaneously, one problem arises: there may exist more than one *master*,
this is bad.
To prevent this problem, we can start multiple sheeps like this:
- start the fist sheep alone, and sleep 2 seconds:
$ sheep -d /store/0 -z 0 -p 7000 -c zookeeper:localhost:2181
$ sleep 2
- start other sheeps simultaneously(need not to sleep between them):
$ for i in {1..100}; do sheep -d /store/$i -z $i -p $((7000 + $i)) \
-c zookeeper:localhost:2181
Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
include/net.h | 1 +
lib/net.c | 14 ++
sheep/cluster.h | 34 ++++-
sheep/cluster/zookeeper.c | 378 +++++++++++++++++++--------------------------
4 files changed, 209 insertions(+), 218 deletions(-)
diff --git a/include/net.h b/include/net.h
index 2d087e2..f657e20 100644
--- a/include/net.h
+++ b/include/net.h
@@ -45,6 +45,7 @@ int exec_req(int sockfd, struct sd_req *hdr, void *data,
int create_listen_ports(int port, int (*callback)(int fd, void *), void *data);
char *addr_to_str(char *str, int size, uint8_t *addr, uint16_t port);
+uint8_t *str_to_addr(int af, const char *ipstr, uint8_t *addr);
int set_nonblocking(int fd);
int set_nodelay(int fd);
int set_timeout(int fd);
diff --git a/lib/net.c b/lib/net.c
index 8ac7c9e..91524a5 100644
--- a/lib/net.c
+++ b/lib/net.c
@@ -377,6 +377,20 @@ char *addr_to_str(char *str, int size, uint8_t *addr, uint16_t port)
return str;
}
+uint8_t *str_to_addr(int af, const char *ipstr, uint8_t *addr)
+{
+ int addr_start_idx = 0;
+
+ if (af == AF_INET)
+ addr_start_idx = 12;
+
+ memset(addr, 0, addr_start_idx);
+ if (!inet_pton(af, ipstr, addr + addr_start_idx))
+ return NULL;
+
+ return addr;
+}
+
int set_nonblocking(int fd)
{
int ret;
diff --git a/sheep/cluster.h b/sheep/cluster.h
index b230b3a..a7e5aae 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -15,6 +15,7 @@
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
+#include <arpa/inet.h>
#include <memory.h>
#include "sheepdog_proto.h"
@@ -145,13 +146,42 @@ static inline char *node_to_str(struct sd_node *id)
{
static char str[256];
char name[256];
+ int af = AF_INET6;
+ uint8_t *addr = id->addr;
+
+ /* Find address family type */
+ if (addr[12]) {
+ int oct_no = 0;
+ while (!addr[oct_no] && oct_no++ < 12 );
+ if (oct_no == 12) {
+ af = AF_INET;
+ }
+ }
- snprintf(str, sizeof(str), "ip: %s, port: %d",
- addr_to_str(name, sizeof(name), id->addr, 0), id->port);
+ snprintf(str, sizeof(str), "%s ip:%s port:%d",
+ (af == AF_INET)?"IPv4":"IPv6",
+ addr_to_str(name, sizeof(name), id->addr, 0), id->port);
return str;
}
+static inline struct sd_node *str_to_node(const char *str, struct sd_node *id)
+{
+ int port, af = AF_INET6;
+ char v[8], ip[256];
+
+ sscanf(str, "%s ip:%s port:%d", v, ip, &port);
+ id->port = port;
+
+ if (strcmp(v, "IPv4") == 0)
+ af = AF_INET;
+
+ if(!str_to_addr(af, ip, id->addr))
+ return NULL;
+
+ return id;
+}
+
/* callbacks back into sheepdog from the cluster drivers */
void sd_join_handler(struct sd_node *joined, struct sd_node *members,
size_t nr_members, enum cluster_join_result result,
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 8999131..5b773d5 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -21,10 +21,11 @@
#include "work.h"
#define MAX_EVENT_BUF_SIZE (64 * 1024)
-#define SESSION_TIMEOUT 30000
+#define SESSION_TIMEOUT 30000 /* millisecond */
+#define MEMBER_CREATE_TIMEOUT SESSION_TIMEOUT
+#define MEMBER_CREATE_INTERVAL 10 /* millisecond */
#define BASE_ZNODE "/sheepdog"
-#define LOCK_ZNODE BASE_ZNODE "/lock"
#define QUEUE_ZNODE BASE_ZNODE "/queue"
#define MEMBER_ZNODE BASE_ZNODE "/member"
@@ -60,11 +61,9 @@ enum zk_event_type {
EVENT_JOIN = 1,
EVENT_LEAVE,
EVENT_NOTIFY,
- EVENT_IGNORE,
};
struct zk_node {
- int32_t seq;
int joined;
clientid_t clientid;
struct sd_node node;
@@ -93,7 +92,10 @@ static int nr_zk_levents;
static unsigned zk_levent_head;
static unsigned zk_levent_tail;
-static struct zk_node zk_nodes[SD_MAX_NODES];
+static void *zk_node_btroot;
+static struct zk_node *zk_master;
+static struct sd_node sd_nodes[SD_MAX_NODES];
+static size_t nr_sd_nodes;
static size_t nr_zk_nodes;
/* zookeeper API wrapper */
@@ -169,39 +171,6 @@ inline ZOOAPI int zk_get_children(zhandle_t *zh, const char *path, int watch,
return rc;
}
-/* ZooKeeper-based lock */
-
-static void zk_lock(zhandle_t *zh)
-{
- int rc;
-again:
- rc = zk_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
- ZOO_EPHEMERAL, NULL, 0);
- if (rc == ZOK){
- dprintf("locked\n");
- return;
- }
- else if (rc == ZNODEEXISTS) {
- dprintf("retry, rc:%d\n", rc);
- usleep(10000); /* FIXME: use watch notification */
- goto again;
- } else {
- panic("failed to create a lock, rc:%d\n", rc);
- }
-}
-
-static void zk_unlock(zhandle_t *zh)
-{
- int rc;
-
- rc = zk_delete(zh, LOCK_ZNODE, -1);
-
- if (rc != ZOK)
- panic("failed to release lock\n");
-
- dprintf("unlocked\n");
-}
-
/* ZooKeeper-based queue */
static int efd;
@@ -241,9 +210,6 @@ static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev)
dprintf("path:%s, seq:%010d\n", buf, seq);
if (first_push) {
-
- /* the first pushed data should be EVENT_IGNORE */
- assert(ev->type == EVENT_IGNORE);
queue_pos = seq;
/* manual notify */
@@ -289,7 +255,8 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
eventfd_t value = 1;
/* process leave event */
- if (__sync_add_and_fetch(&nr_zk_levents, 0)) {
+ if (!__sync_add_and_fetch(&zk_notify_blocked, 0)
+ && __sync_add_and_fetch(&nr_zk_levents, 0)) {
nr_levents = __sync_sub_and_fetch(&nr_zk_levents, 1) + 1;
dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head);
@@ -357,29 +324,10 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
}
out:
- /* ignore LEAVE event */
- if (ev->type == EVENT_LEAVE){
- return -1;
- }
-
return 0;
}
-static int zk_queue_seq(zhandle_t *zh)
-{
- int32_t seq;
- struct zk_event ev;
-
- memset(&ev, 0, sizeof(ev));
- ev.type = EVENT_IGNORE;
-
- dprintf("enter ...\n");
- seq = zk_queue_push(zh, &ev);
-
- return seq;
-}
-
-static int is_zk_queue_valid(zhandle_t *zh)
+static int zk_member_empty(zhandle_t *zh)
{
int rc;
struct String_vector strs;
@@ -388,72 +336,120 @@ static int is_zk_queue_valid(zhandle_t *zh)
if (rc != ZOK)
panic("failed to zk_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc);
- return strs.count;
+ return (strs.count == 0);
}
-static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
+static inline int zk_node_cmp(const void *a, const void *b)
{
- int i, j, k;
- struct idxs {
- int idx;
- int32_t seq;
- } idxs[SD_MAX_NODES], t;
- struct zk_node N[SD_MAX_NODES];
+ const struct zk_node *znode1 = a;
+ const struct zk_node *znode2 = b;
+ return node_cmp(&znode1->node, &znode2->node);
+}
- if (nr_nodes <= 1)
- return;
+static void node_btree_add(void **btroot, struct zk_node *znode)
+{
+ struct zk_node *n, **p;
- for (i=0; i<nr_nodes; i++) {
- idxs[i].idx = i;
- idxs[i].seq = znodes[i].seq;
- dprintf("zk_nodes[%d], seq:%010d, value:%s\n",
- i, znodes[i].seq, node_to_str(&znodes[i].node));
- }
+ n = (struct zk_node *)malloc(sizeof(struct zk_node));
+ if (n == NULL)
+ panic("malloc, oom\n");
- /* sort idxs by seq */
- for (i=nr_nodes-1; i>0; i--) {
- k = i;
- for (j=i-1; j>=0; j--) {
- if (idxs[k].seq < idxs[j].seq) {
- k = j;
- }
- }
+ *n = *znode;
- if (i != k) {
- t = idxs[i];
- idxs[i] = idxs[k];
- idxs[k] = t;
- }
+ p = (struct zk_node **)tsearch((void *)n, btroot, zk_node_cmp);
+ if (p == NULL)
+ panic("tsearch, oom\n");
+ else if (*p != n) {
+ **p = *n;
+ free(n);
}
+ nr_zk_nodes++;
+}
+
+static inline void node_btree_del(void **btroot, struct zk_node *znode)
+{
+ tdelete((void *)znode, btroot, zk_node_cmp);
+ free(znode);
+ nr_zk_nodes--;
+}
+
+static inline void node_btree_clear(void **btroot)
+{
+ tdestroy(*btroot, free);
+ *btroot = NULL;
+}
+
+static struct zk_node *node_btree_find(void **btroot, struct zk_node *znode)
+{
+ struct zk_node **p;
+
+ p = (struct zk_node **)tfind((void *)znode, btroot, zk_node_cmp);
+ if (p)
+ return *p;
+
+ return NULL;
+}
+
+static void node_btree_build_list_fn(const void *nodep,
+ const VISIT which, const int depth)
+{
+ struct zk_node *znode;
- for (i=0; i<nr_nodes; i++) {
- N[i] = znodes[idxs[i].idx];
- dprintf("N[%d], seq:%010d, value:%s\n",
- i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node));
+ switch (which) {
+ case preorder:
+ break;
+ case postorder:
+ case leaf:
+ znode = *(struct zk_node **) nodep;
+ sd_nodes[nr_sd_nodes++] = znode->node;
+ break;
+ case endorder:
+ break;
}
- memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes));
}
-static void build_node_list(struct zk_node *znodes, size_t nr_nodes,
- struct sd_node *entries)
+static inline void build_node_list(void *btroot)
{
- int i;
+ nr_sd_nodes = 0;
+ twalk(btroot, node_btree_build_list_fn);
+ assert(nr_sd_nodes == nr_zk_nodes);
+ dprintf("nr_sd_nodes:%lu\n", nr_sd_nodes);
+}
- for (i = 0; i < nr_nodes; i++)
- entries[i] = znodes[i].node;
+static void node_btree_find_master_fn(const void *nodep,
+ const VISIT which, const int depth)
+{
+ switch (which) {
+ case preorder:
+ break;
+ case postorder:
+ case leaf:
+ if (zk_master)
+ break;
+ zk_master = *(struct zk_node **) nodep;
+ dprintf("master:%s\n", node_to_str(&zk_master->node));
+ break;
+ case endorder:
+ break;
+ }
}
-static struct zk_node* find_node(struct zk_node *znodes, int nr_nodes, struct zk_node *znode)
+static int is_master(zhandle_t *zh, struct zk_node *znode)
{
- int i;
+ zk_master = NULL;
- for (i=0; i<nr_nodes; i++) {
- if (node_cmp(&znode->node, &znodes[i].node) == 0) {
- return &znodes[i];
- }
+ if (!zk_node_btroot){
+ if (zk_member_empty(zh))
+ return 1;
+ else
+ return 0;
}
- return NULL;
+ twalk(zk_node_btroot, node_btree_find_master_fn);
+ if (node_cmp(&zk_master->node, &znode->node) == 0)
+ return 1;
+
+ return 0;
}
static void zk_queue_init(zhandle_t *zh)
@@ -463,7 +459,7 @@ static void zk_queue_init(zhandle_t *zh)
zk_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
}
-static void zk_data_init(zhandle_t *zh)
+static void zk_member_init(zhandle_t *zh)
{
static int finished;
int rc, len;
@@ -476,22 +472,16 @@ static void zk_data_init(zhandle_t *zh)
finished = 1;
- queue_pos = -1;
-
- if (is_zk_queue_valid(zh)) {
+ if (!zk_member_empty(zh)) {
FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) {
len = sizeof(znode);
rc = zk_get(zh, path, 1, (char *)&znode, &len, NULL);
- if (rc == ZOK && znode.joined == 0) {
- dprintf("wait until znode:%s become joined\n", path);
- usleep(10000);
+ if (rc != ZOK)
continue;
- }
switch(rc) {
case ZOK:
- zk_nodes[nr_zk_nodes] = znode;
- nr_zk_nodes++;
+ node_btree_add(&zk_node_btroot, &znode);
case ZNONODE:
break;
default:
@@ -499,9 +489,6 @@ static void zk_data_init(zhandle_t *zh)
}
}
}
-
- sort_zk_nodes(zk_nodes, nr_zk_nodes);
-
dprintf("nr_nodes:%ld\n", nr_zk_nodes);
}
@@ -514,28 +501,6 @@ static struct work_queue *zk_block_wq;
static struct zk_node this_node;
-static int is_master(struct zk_node *znode)
-{
- int i;
- struct zk_node *n = znode;
-
- if (!n)
- return -1;
-
- if (nr_zk_nodes == 0)
- return 0;
-
- for (i = 0; i < SD_MAX_NODES; i++) {
- if (zk_nodes[i].joined)
- break;
- }
-
- if (node_cmp(&zk_nodes[i].node, &n->node) == 0)
- return i;
-
- return -1;
-}
-
static int add_event(zhandle_t *zh, enum zk_event_type type,
struct zk_node *znode, void *buf,
size_t buf_len, void (*block_cb)(void *arg))
@@ -574,8 +539,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
ev.blocked = !!block_cb;
ev.block_cb = block_cb;
break;
- case EVENT_IGNORE:
- break;
}
zk_queue_push(zh, &ev);
@@ -588,7 +551,8 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void*
eventfd_t value = 1;
const clientid_t *cid;
char str[256], *p;
- int ret, rc, i;
+ int ret, rc;
+ struct zk_node znode;
dprintf("path:%s, type:%d\n", path, type);
@@ -602,7 +566,7 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void*
if (type < 0 || type == ZOO_CHILD_EVENT)
return;
- if (type == ZOO_CHANGED_EVENT) {
+ if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
ret = sscanf(path, MEMBER_ZNODE "/%s", str);
if (ret == 1) {
rc = zk_exists(zh, path, 1, NULL);
@@ -618,14 +582,11 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void*
p = strrchr(path, '/');
p++;
- /* check the failed node */
- for (i=0; i<nr_zk_nodes; i++) {
- if (strcmp(p, node_to_str(&zk_nodes[i].node)) == 0) {
- dprintf("zk_nodes[%d] leave:%s\n", i, node_to_str(&zk_nodes[i].node));
- add_event(zh, EVENT_LEAVE, &zk_nodes[i], NULL, 0, NULL);
- return;
- }
- }
+ str_to_node(p, &znode.node);
+ dprintf("zk_nodes leave:%s\n", node_to_str(&znode.node));
+
+ add_event(zh, EVENT_LEAVE, &znode, NULL, 0, NULL);
+ return;
}
dprintf("write event to efd:%d\n", efd);
@@ -726,39 +687,24 @@ static int zk_join(struct sd_node *myself,
{
int rc;
char path[256];
- struct zk_node *znode;
const clientid_t *cid;
- zk_lock(zhandle);
-
- zk_data_init(zhandle);
-
this_node.node = *myself;
- znode = find_node(zk_nodes, nr_zk_nodes, &this_node);
- if (znode)
+ sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
+ rc = zk_exists(zhandle, path, 1, NULL);
+ if (rc == ZOK)
panic("previous zookeeper session exist, shutdown\n");
- this_node.seq = zk_queue_seq(zhandle);
this_node.joined = 0;
-
cid = zoo_client_id(zhandle);
assert(cid != NULL);
this_node.clientid = *cid;
- dprintf("this_seq:%010d, clientid:%ld\n", this_node.seq, cid->client_id);
-
- sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
- dprintf("try to create member path:%s\n", path);
- rc = zk_create(zhandle, path, (char *)&this_node, sizeof(this_node),
- &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
- if (rc != ZOK)
- panic("failed to create an ephemeral znode, rc:%d\n", rc);
+ dprintf("clientid:%ld\n", cid->client_id);
rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
- zk_unlock(zhandle);
-
return rc;
}
@@ -802,12 +748,11 @@ static void zk_block_done(struct work *work)
static int zk_dispatch(void)
{
- int ret, rc, len, idx;
+ int ret, rc, retry;
char path[256];
eventfd_t value;
struct zk_event ev;
- struct zk_node znode, *n;
- struct sd_node entries[SD_MAX_NODES];
+ struct zk_node *n;
enum cluster_join_result res;
static struct work work = {
.fn = zk_block,
@@ -833,24 +778,12 @@ static int zk_dispatch(void)
if (ev.blocked) {
dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n",
nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
- if (is_master(&this_node) >= 0) {
+ if (is_master(zhandle, &this_node)) {
res = sd_check_join_cb(&ev.sender.node, ev.buf);
ev.join_result = res;
ev.blocked = 0;
ev.sender.joined = 1;
- len = sizeof(znode);
- sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
- rc = zk_get(zhandle, path, 0, (char *)&znode, &len, NULL);
- if (rc != ZOK)
- panic("failed to zk_get path:%s, rc:%d\n", path, rc);
-
- /* update joined state in zookeeper MEMBER_ZNODE list*/
- znode.joined = 1;
- rc = zk_set(zhandle, path, (char *)&znode, sizeof(znode), -1);
- if (rc != ZOK)
- panic("failed to zk_set path:%s, rc:%d\n", path, rc);
-
dprintf("I'm master, push back join event\n");
zk_queue_push_back(zhandle, &ev);
@@ -863,55 +796,70 @@ static int zk_dispatch(void)
zk_queue_push_back(zhandle, NULL);
goto out;
+ } else if (is_master(zhandle, &this_node)
+ && node_cmp(&ev.sender.node, &this_node.node) != 0) {
+ /* wait util member have been created */
+ sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
+ retry = MEMBER_CREATE_TIMEOUT/MEMBER_CREATE_INTERVAL;
+ while (retry && zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
+ usleep(MEMBER_CREATE_INTERVAL*1000);
+ retry--;
+ }
+ if (retry <= 0) {
+ dprintf("Sender:%s failed to create member, ignore it\n",
+ node_to_str(&ev.sender.node));
+ goto out;
+ }
}
+ if (node_cmp(&ev.sender.node, &this_node.node) == 0)
+ zk_member_init(zhandle);
+
if (ev.join_result == CJ_RES_MASTER_TRANSFER) {
- /* FIXME: This code is tricky, but Sheepdog assumes that */
- /* nr_nodes = 1 when join_result = MASTER_TRANSFER... */
- //ev.nr_nodes = 1;
- nr_zk_nodes = 1;
- zk_nodes[0] = this_node;
- zk_nodes[0].joined = 1;
+ /* FIXME: This code is tricky, but Sheepdog assumes that
+ * nr_nodes = 1 when join_result = MASTER_TRANSFER... */
+ node_btree_clear(&zk_node_btroot);
+ node_btree_add(&zk_node_btroot, &this_node);
+
zk_queue_push_back(zhandle, &ev);
zk_queue_pop(zhandle, &ev);
}
- zk_nodes[nr_zk_nodes] = ev.sender;
- nr_zk_nodes++;
+ node_btree_add(&zk_node_btroot, &ev.sender);
dprintf("one sheep joined[down], nr_nodes:%ld, sender:%s, joined:%d\n",
nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
if (ev.join_result == CJ_RES_SUCCESS) {
sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
- rc = zk_exists(zhandle, path, 1, NULL);
- dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
- if (rc != ZOK) {
- dprintf("sender have left:%s\n", node_to_str(&ev.sender.node));
- add_event(zhandle, EVENT_LEAVE, &ev.sender, NULL, 0, NULL);
+ if (node_cmp(&ev.sender.node, &this_node.node) == 0) {
+ dprintf("create path:%s\n", path);
+ rc = zk_create(zhandle, path, (char *)&ev.sender, sizeof(ev.sender),
+ &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
+ if (rc != ZOK)
+ panic("failed to create an ephemeral znode, rc:%d\n", rc);
+ } else {
+ rc = zk_exists(zhandle, path, 1, NULL);
+ dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
}
}
- build_node_list(zk_nodes, nr_zk_nodes, entries);
- sd_join_handler(&ev.sender.node, entries, nr_zk_nodes,
+ build_node_list(zk_node_btroot);
+ sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes,
ev.join_result, ev.buf);
break;
case EVENT_LEAVE:
dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
- /*reset master if necessary */
- n = find_node(zk_nodes, nr_zk_nodes, &ev.sender);
+ n = node_btree_find(&zk_node_btroot, &ev.sender);
if (!n) {
dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
goto out;
}
- idx = n - zk_nodes;
- nr_zk_nodes--;
-
- memmove(n, n + 1, sizeof(*n) * (nr_zk_nodes - idx));
- dprintf("one sheep left, nr_nodes:%ld, idx:%d\n", nr_zk_nodes, idx);
+ node_btree_del(&zk_node_btroot, n);
+ dprintf("one sheep left, nr_nodes:%ld\n", nr_zk_nodes);
- build_node_list(zk_nodes, nr_zk_nodes, entries);
- sd_leave_handler(&ev.sender.node, entries, nr_zk_nodes);
+ build_node_list(zk_node_btroot);
+ sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
break;
case EVENT_NOTIFY:
dprintf("NOTIFY, blocked:%d\n", ev.blocked);
@@ -932,8 +880,6 @@ static int zk_dispatch(void)
sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
break;
- case EVENT_IGNORE:
- break;
}
out:
return 0;
--
1.7.7.6
More information about the sheepdog
mailing list