From: Yunkai Zhang <qiushu.zyk at taobao.com> Use ip address and port number to sort member list instead of sequence number from /sheepdog/queue. Benefit from these changes, the order of member list no longer depends on the order of joining, then need not to take following steps into one transaction: - get sequence number from /sheepdog/queue - create znode in /sheepdog/member/ - send join message to cluster As a result, we can remove lock from zk_join, and move the creation of znode in /sheepdog/member/ into zk_dispatch. I use binary tree to store member list so that it can make the sort more quickly. Methods with node_btree_xxx prefix are used to operate this binary tree. ==*Note*==: When sheep startups, it fetchs member list by reading /sheepdog/member/*, if the result is empty, sheep will consider itself as *master*. Now we have removed lock from zk_join,if we start multiple sheeps simultaneously, one problem arises: there may exist more than one *master*, this is bad. To prevent this problem, we can start multiple sheeps like this: - start the fist sheep alone, and sleep 2 seconds: $ sheep -d /store/0 -z 0 -p 7000 -c zookeeper:localhost:2181 $ sleep 2 - start other sheeps simultaneously(need not to sleep between them): $ for i in {1..100}; do sheep -d /store/$i -z $i -p $((7000 + $i)) \ -c zookeeper:localhost:2181 Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- include/net.h | 1 + lib/net.c | 14 ++ sheep/cluster.h | 34 ++++- sheep/cluster/zookeeper.c | 378 +++++++++++++++++++-------------------------- 4 files changed, 209 insertions(+), 218 deletions(-) diff --git a/include/net.h b/include/net.h index 2d087e2..f657e20 100644 --- a/include/net.h +++ b/include/net.h @@ -45,6 +45,7 @@ int exec_req(int sockfd, struct sd_req *hdr, void *data, int create_listen_ports(int port, int (*callback)(int fd, void *), void *data); char *addr_to_str(char *str, int size, uint8_t *addr, uint16_t port); +uint8_t *str_to_addr(int af, const char *ipstr, uint8_t *addr); int set_nonblocking(int fd); int set_nodelay(int fd); int set_timeout(int fd); diff --git a/lib/net.c b/lib/net.c index 8ac7c9e..91524a5 100644 --- a/lib/net.c +++ b/lib/net.c @@ -377,6 +377,20 @@ char *addr_to_str(char *str, int size, uint8_t *addr, uint16_t port) return str; } +uint8_t *str_to_addr(int af, const char *ipstr, uint8_t *addr) +{ + int addr_start_idx = 0; + + if (af == AF_INET) + addr_start_idx = 12; + + memset(addr, 0, addr_start_idx); + if (!inet_pton(af, ipstr, addr + addr_start_idx)) + return NULL; + + return addr; +} + int set_nonblocking(int fd) { int ret; diff --git a/sheep/cluster.h b/sheep/cluster.h index b230b3a..a7e5aae 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -15,6 +15,7 @@ #include <stdlib.h> #include <stdint.h> #include <inttypes.h> +#include <arpa/inet.h> #include <memory.h> #include "sheepdog_proto.h" @@ -145,13 +146,42 @@ static inline char *node_to_str(struct sd_node *id) { static char str[256]; char name[256]; + int af = AF_INET6; + uint8_t *addr = id->addr; + + /* Find address family type */ + if (addr[12]) { + int oct_no = 0; + while (!addr[oct_no] && oct_no++ < 12 ); + if (oct_no == 12) { + af = AF_INET; + } + } - snprintf(str, sizeof(str), "ip: %s, port: %d", - addr_to_str(name, sizeof(name), id->addr, 0), id->port); + snprintf(str, sizeof(str), "%s ip:%s port:%d", + (af == AF_INET)?"IPv4":"IPv6", + addr_to_str(name, sizeof(name), id->addr, 0), id->port); return str; } +static inline struct sd_node *str_to_node(const char *str, struct sd_node *id) +{ + int port, af = AF_INET6; + char v[8], ip[256]; + + sscanf(str, "%s ip:%s port:%d", v, ip, &port); + id->port = port; + + if (strcmp(v, "IPv4") == 0) + af = AF_INET; + + if(!str_to_addr(af, ip, id->addr)) + return NULL; + + return id; +} + /* callbacks back into sheepdog from the cluster drivers */ void sd_join_handler(struct sd_node *joined, struct sd_node *members, size_t nr_members, enum cluster_join_result result, diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 8999131..5b773d5 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -21,10 +21,11 @@ #include "work.h" #define MAX_EVENT_BUF_SIZE (64 * 1024) -#define SESSION_TIMEOUT 30000 +#define SESSION_TIMEOUT 30000 /* millisecond */ +#define MEMBER_CREATE_TIMEOUT SESSION_TIMEOUT +#define MEMBER_CREATE_INTERVAL 10 /* millisecond */ #define BASE_ZNODE "/sheepdog" -#define LOCK_ZNODE BASE_ZNODE "/lock" #define QUEUE_ZNODE BASE_ZNODE "/queue" #define MEMBER_ZNODE BASE_ZNODE "/member" @@ -60,11 +61,9 @@ enum zk_event_type { EVENT_JOIN = 1, EVENT_LEAVE, EVENT_NOTIFY, - EVENT_IGNORE, }; struct zk_node { - int32_t seq; int joined; clientid_t clientid; struct sd_node node; @@ -93,7 +92,10 @@ static int nr_zk_levents; static unsigned zk_levent_head; static unsigned zk_levent_tail; -static struct zk_node zk_nodes[SD_MAX_NODES]; +static void *zk_node_btroot; +static struct zk_node *zk_master; +static struct sd_node sd_nodes[SD_MAX_NODES]; +static size_t nr_sd_nodes; static size_t nr_zk_nodes; /* zookeeper API wrapper */ @@ -169,39 +171,6 @@ inline ZOOAPI int zk_get_children(zhandle_t *zh, const char *path, int watch, return rc; } -/* ZooKeeper-based lock */ - -static void zk_lock(zhandle_t *zh) -{ - int rc; -again: - rc = zk_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, - ZOO_EPHEMERAL, NULL, 0); - if (rc == ZOK){ - dprintf("locked\n"); - return; - } - else if (rc == ZNODEEXISTS) { - dprintf("retry, rc:%d\n", rc); - usleep(10000); /* FIXME: use watch notification */ - goto again; - } else { - panic("failed to create a lock, rc:%d\n", rc); - } -} - -static void zk_unlock(zhandle_t *zh) -{ - int rc; - - rc = zk_delete(zh, LOCK_ZNODE, -1); - - if (rc != ZOK) - panic("failed to release lock\n"); - - dprintf("unlocked\n"); -} - /* ZooKeeper-based queue */ static int efd; @@ -241,9 +210,6 @@ static int32_t zk_queue_push(zhandle_t *zh, struct zk_event *ev) dprintf("path:%s, seq:%010d\n", buf, seq); if (first_push) { - - /* the first pushed data should be EVENT_IGNORE */ - assert(ev->type == EVENT_IGNORE); queue_pos = seq; /* manual notify */ @@ -289,7 +255,8 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) eventfd_t value = 1; /* process leave event */ - if (__sync_add_and_fetch(&nr_zk_levents, 0)) { + if (!__sync_add_and_fetch(&zk_notify_blocked, 0) + && __sync_add_and_fetch(&nr_zk_levents, 0)) { nr_levents = __sync_sub_and_fetch(&nr_zk_levents, 1) + 1; dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head); @@ -357,29 +324,10 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) } out: - /* ignore LEAVE event */ - if (ev->type == EVENT_LEAVE){ - return -1; - } - return 0; } -static int zk_queue_seq(zhandle_t *zh) -{ - int32_t seq; - struct zk_event ev; - - memset(&ev, 0, sizeof(ev)); - ev.type = EVENT_IGNORE; - - dprintf("enter ...\n"); - seq = zk_queue_push(zh, &ev); - - return seq; -} - -static int is_zk_queue_valid(zhandle_t *zh) +static int zk_member_empty(zhandle_t *zh) { int rc; struct String_vector strs; @@ -388,72 +336,120 @@ static int is_zk_queue_valid(zhandle_t *zh) if (rc != ZOK) panic("failed to zk_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc); - return strs.count; + return (strs.count == 0); } -static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes) +static inline int zk_node_cmp(const void *a, const void *b) { - int i, j, k; - struct idxs { - int idx; - int32_t seq; - } idxs[SD_MAX_NODES], t; - struct zk_node N[SD_MAX_NODES]; + const struct zk_node *znode1 = a; + const struct zk_node *znode2 = b; + return node_cmp(&znode1->node, &znode2->node); +} - if (nr_nodes <= 1) - return; +static void node_btree_add(void **btroot, struct zk_node *znode) +{ + struct zk_node *n, **p; - for (i=0; i<nr_nodes; i++) { - idxs[i].idx = i; - idxs[i].seq = znodes[i].seq; - dprintf("zk_nodes[%d], seq:%010d, value:%s\n", - i, znodes[i].seq, node_to_str(&znodes[i].node)); - } + n = (struct zk_node *)malloc(sizeof(struct zk_node)); + if (n == NULL) + panic("malloc, oom\n"); - /* sort idxs by seq */ - for (i=nr_nodes-1; i>0; i--) { - k = i; - for (j=i-1; j>=0; j--) { - if (idxs[k].seq < idxs[j].seq) { - k = j; - } - } + *n = *znode; - if (i != k) { - t = idxs[i]; - idxs[i] = idxs[k]; - idxs[k] = t; - } + p = (struct zk_node **)tsearch((void *)n, btroot, zk_node_cmp); + if (p == NULL) + panic("tsearch, oom\n"); + else if (*p != n) { + **p = *n; + free(n); } + nr_zk_nodes++; +} + +static inline void node_btree_del(void **btroot, struct zk_node *znode) +{ + tdelete((void *)znode, btroot, zk_node_cmp); + free(znode); + nr_zk_nodes--; +} + +static inline void node_btree_clear(void **btroot) +{ + tdestroy(*btroot, free); + *btroot = NULL; +} + +static struct zk_node *node_btree_find(void **btroot, struct zk_node *znode) +{ + struct zk_node **p; + + p = (struct zk_node **)tfind((void *)znode, btroot, zk_node_cmp); + if (p) + return *p; + + return NULL; +} + +static void node_btree_build_list_fn(const void *nodep, + const VISIT which, const int depth) +{ + struct zk_node *znode; - for (i=0; i<nr_nodes; i++) { - N[i] = znodes[idxs[i].idx]; - dprintf("N[%d], seq:%010d, value:%s\n", - i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node)); + switch (which) { + case preorder: + break; + case postorder: + case leaf: + znode = *(struct zk_node **) nodep; + sd_nodes[nr_sd_nodes++] = znode->node; + break; + case endorder: + break; } - memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes)); } -static void build_node_list(struct zk_node *znodes, size_t nr_nodes, - struct sd_node *entries) +static inline void build_node_list(void *btroot) { - int i; + nr_sd_nodes = 0; + twalk(btroot, node_btree_build_list_fn); + assert(nr_sd_nodes == nr_zk_nodes); + dprintf("nr_sd_nodes:%lu\n", nr_sd_nodes); +} - for (i = 0; i < nr_nodes; i++) - entries[i] = znodes[i].node; +static void node_btree_find_master_fn(const void *nodep, + const VISIT which, const int depth) +{ + switch (which) { + case preorder: + break; + case postorder: + case leaf: + if (zk_master) + break; + zk_master = *(struct zk_node **) nodep; + dprintf("master:%s\n", node_to_str(&zk_master->node)); + break; + case endorder: + break; + } } -static struct zk_node* find_node(struct zk_node *znodes, int nr_nodes, struct zk_node *znode) +static int is_master(zhandle_t *zh, struct zk_node *znode) { - int i; + zk_master = NULL; - for (i=0; i<nr_nodes; i++) { - if (node_cmp(&znode->node, &znodes[i].node) == 0) { - return &znodes[i]; - } + if (!zk_node_btroot){ + if (zk_member_empty(zh)) + return 1; + else + return 0; } - return NULL; + twalk(zk_node_btroot, node_btree_find_master_fn); + if (node_cmp(&zk_master->node, &znode->node) == 0) + return 1; + + return 0; } static void zk_queue_init(zhandle_t *zh) @@ -463,7 +459,7 @@ static void zk_queue_init(zhandle_t *zh) zk_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); } -static void zk_data_init(zhandle_t *zh) +static void zk_member_init(zhandle_t *zh) { static int finished; int rc, len; @@ -476,22 +472,16 @@ static void zk_data_init(zhandle_t *zh) finished = 1; - queue_pos = -1; - - if (is_zk_queue_valid(zh)) { + if (!zk_member_empty(zh)) { FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) { len = sizeof(znode); rc = zk_get(zh, path, 1, (char *)&znode, &len, NULL); - if (rc == ZOK && znode.joined == 0) { - dprintf("wait until znode:%s become joined\n", path); - usleep(10000); + if (rc != ZOK) continue; - } switch(rc) { case ZOK: - zk_nodes[nr_zk_nodes] = znode; - nr_zk_nodes++; + node_btree_add(&zk_node_btroot, &znode); case ZNONODE: break; default: @@ -499,9 +489,6 @@ static void zk_data_init(zhandle_t *zh) } } } - - sort_zk_nodes(zk_nodes, nr_zk_nodes); - dprintf("nr_nodes:%ld\n", nr_zk_nodes); } @@ -514,28 +501,6 @@ static struct work_queue *zk_block_wq; static struct zk_node this_node; -static int is_master(struct zk_node *znode) -{ - int i; - struct zk_node *n = znode; - - if (!n) - return -1; - - if (nr_zk_nodes == 0) - return 0; - - for (i = 0; i < SD_MAX_NODES; i++) { - if (zk_nodes[i].joined) - break; - } - - if (node_cmp(&zk_nodes[i].node, &n->node) == 0) - return i; - - return -1; -} - static int add_event(zhandle_t *zh, enum zk_event_type type, struct zk_node *znode, void *buf, size_t buf_len, void (*block_cb)(void *arg)) @@ -574,8 +539,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type, ev.blocked = !!block_cb; ev.block_cb = block_cb; break; - case EVENT_IGNORE: - break; } zk_queue_push(zh, &ev); @@ -588,7 +551,8 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void* eventfd_t value = 1; const clientid_t *cid; char str[256], *p; - int ret, rc, i; + int ret, rc; + struct zk_node znode; dprintf("path:%s, type:%d\n", path, type); @@ -602,7 +566,7 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void* if (type < 0 || type == ZOO_CHILD_EVENT) return; - if (type == ZOO_CHANGED_EVENT) { + if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) { ret = sscanf(path, MEMBER_ZNODE "/%s", str); if (ret == 1) { rc = zk_exists(zh, path, 1, NULL); @@ -618,14 +582,11 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void* p = strrchr(path, '/'); p++; - /* check the failed node */ - for (i=0; i<nr_zk_nodes; i++) { - if (strcmp(p, node_to_str(&zk_nodes[i].node)) == 0) { - dprintf("zk_nodes[%d] leave:%s\n", i, node_to_str(&zk_nodes[i].node)); - add_event(zh, EVENT_LEAVE, &zk_nodes[i], NULL, 0, NULL); - return; - } - } + str_to_node(p, &znode.node); + dprintf("zk_nodes leave:%s\n", node_to_str(&znode.node)); + + add_event(zh, EVENT_LEAVE, &znode, NULL, 0, NULL); + return; } dprintf("write event to efd:%d\n", efd); @@ -726,39 +687,24 @@ static int zk_join(struct sd_node *myself, { int rc; char path[256]; - struct zk_node *znode; const clientid_t *cid; - zk_lock(zhandle); - - zk_data_init(zhandle); - this_node.node = *myself; - znode = find_node(zk_nodes, nr_zk_nodes, &this_node); - if (znode) + sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself)); + rc = zk_exists(zhandle, path, 1, NULL); + if (rc == ZOK) panic("previous zookeeper session exist, shutdown\n"); - this_node.seq = zk_queue_seq(zhandle); this_node.joined = 0; - cid = zoo_client_id(zhandle); assert(cid != NULL); this_node.clientid = *cid; - dprintf("this_seq:%010d, clientid:%ld\n", this_node.seq, cid->client_id); - - sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself)); - dprintf("try to create member path:%s\n", path); - rc = zk_create(zhandle, path, (char *)&this_node, sizeof(this_node), - &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0); - if (rc != ZOK) - panic("failed to create an ephemeral znode, rc:%d\n", rc); + dprintf("clientid:%ld\n", cid->client_id); rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL); - zk_unlock(zhandle); - return rc; } @@ -802,12 +748,11 @@ static void zk_block_done(struct work *work) static int zk_dispatch(void) { - int ret, rc, len, idx; + int ret, rc, retry; char path[256]; eventfd_t value; struct zk_event ev; - struct zk_node znode, *n; - struct sd_node entries[SD_MAX_NODES]; + struct zk_node *n; enum cluster_join_result res; static struct work work = { .fn = zk_block, @@ -833,24 +778,12 @@ static int zk_dispatch(void) if (ev.blocked) { dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n", nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined); - if (is_master(&this_node) >= 0) { + if (is_master(zhandle, &this_node)) { res = sd_check_join_cb(&ev.sender.node, ev.buf); ev.join_result = res; ev.blocked = 0; ev.sender.joined = 1; - len = sizeof(znode); - sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node)); - rc = zk_get(zhandle, path, 0, (char *)&znode, &len, NULL); - if (rc != ZOK) - panic("failed to zk_get path:%s, rc:%d\n", path, rc); - - /* update joined state in zookeeper MEMBER_ZNODE list*/ - znode.joined = 1; - rc = zk_set(zhandle, path, (char *)&znode, sizeof(znode), -1); - if (rc != ZOK) - panic("failed to zk_set path:%s, rc:%d\n", path, rc); - dprintf("I'm master, push back join event\n"); zk_queue_push_back(zhandle, &ev); @@ -863,55 +796,70 @@ static int zk_dispatch(void) zk_queue_push_back(zhandle, NULL); goto out; + } else if (is_master(zhandle, &this_node) + && node_cmp(&ev.sender.node, &this_node.node) != 0) { + /* wait util member have been created */ + sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node)); + retry = MEMBER_CREATE_TIMEOUT/MEMBER_CREATE_INTERVAL; + while (retry && zk_exists(zhandle, path, 1, NULL) == ZNONODE) { + usleep(MEMBER_CREATE_INTERVAL*1000); + retry--; + } + if (retry <= 0) { + dprintf("Sender:%s failed to create member, ignore it\n", + node_to_str(&ev.sender.node)); + goto out; + } } + if (node_cmp(&ev.sender.node, &this_node.node) == 0) + zk_member_init(zhandle); + if (ev.join_result == CJ_RES_MASTER_TRANSFER) { - /* FIXME: This code is tricky, but Sheepdog assumes that */ - /* nr_nodes = 1 when join_result = MASTER_TRANSFER... */ - //ev.nr_nodes = 1; - nr_zk_nodes = 1; - zk_nodes[0] = this_node; - zk_nodes[0].joined = 1; + /* FIXME: This code is tricky, but Sheepdog assumes that + * nr_nodes = 1 when join_result = MASTER_TRANSFER... */ + node_btree_clear(&zk_node_btroot); + node_btree_add(&zk_node_btroot, &this_node); + zk_queue_push_back(zhandle, &ev); zk_queue_pop(zhandle, &ev); } - zk_nodes[nr_zk_nodes] = ev.sender; - nr_zk_nodes++; + node_btree_add(&zk_node_btroot, &ev.sender); dprintf("one sheep joined[down], nr_nodes:%ld, sender:%s, joined:%d\n", nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined); if (ev.join_result == CJ_RES_SUCCESS) { sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node)); - rc = zk_exists(zhandle, path, 1, NULL); - dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK)); - if (rc != ZOK) { - dprintf("sender have left:%s\n", node_to_str(&ev.sender.node)); - add_event(zhandle, EVENT_LEAVE, &ev.sender, NULL, 0, NULL); + if (node_cmp(&ev.sender.node, &this_node.node) == 0) { + dprintf("create path:%s\n", path); + rc = zk_create(zhandle, path, (char *)&ev.sender, sizeof(ev.sender), + &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0); + if (rc != ZOK) + panic("failed to create an ephemeral znode, rc:%d\n", rc); + } else { + rc = zk_exists(zhandle, path, 1, NULL); + dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK)); } } - build_node_list(zk_nodes, nr_zk_nodes, entries); - sd_join_handler(&ev.sender.node, entries, nr_zk_nodes, + build_node_list(zk_node_btroot); + sd_join_handler(&ev.sender.node, sd_nodes, nr_sd_nodes, ev.join_result, ev.buf); break; case EVENT_LEAVE: dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked); - /*reset master if necessary */ - n = find_node(zk_nodes, nr_zk_nodes, &ev.sender); + n = node_btree_find(&zk_node_btroot, &ev.sender); if (!n) { dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node)); goto out; } - idx = n - zk_nodes; - nr_zk_nodes--; - - memmove(n, n + 1, sizeof(*n) * (nr_zk_nodes - idx)); - dprintf("one sheep left, nr_nodes:%ld, idx:%d\n", nr_zk_nodes, idx); + node_btree_del(&zk_node_btroot, n); + dprintf("one sheep left, nr_nodes:%ld\n", nr_zk_nodes); - build_node_list(zk_nodes, nr_zk_nodes, entries); - sd_leave_handler(&ev.sender.node, entries, nr_zk_nodes); + build_node_list(zk_node_btroot); + sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes); break; case EVENT_NOTIFY: dprintf("NOTIFY, blocked:%d\n", ev.blocked); @@ -932,8 +880,6 @@ static int zk_dispatch(void) sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len); break; - case EVENT_IGNORE: - break; } out: return 0; -- 1.7.7.6 |