Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- sheep/cluster/zookeeper.c | 225 ++++++++++++++++++++++++++++++++------------- 1 files changed, 160 insertions(+), 65 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 4ee3c20..16b50e3 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -22,15 +22,35 @@ #include "work.h" #define MAX_EVENT_BUF_SIZE (64 * 1024) +#define SESSION_TIMEOUT 30000 #define BASE_ZNODE "/sheepdog" #define LOCK_ZNODE BASE_ZNODE "/lock" #define QUEUE_ZNODE BASE_ZNODE "/queue" #define MEMBER_ZNODE BASE_ZNODE "/member" + +/* zookeeper API wrapper prototypes */ +ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value, + int valuelen, const struct ACL_vector *acl, int flags, + char *path_buffer, int path_buffer_len); + +ZOOAPI int zk_delete(zhandle_t *zh, const char *path, int version); + +ZOOAPI int zk_get(zhandle_t *zh, const char *path, int watch, char *buffer, + int* buffer_len, struct Stat *stat); + +ZOOAPI int zk_set(zhandle_t *zh, const char *path, const char *buffer, + int buflen, int version); + +ZOOAPI int zk_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat); + +ZOOAPI int zk_get_children(zhandle_t *zh, const char *path, int watch, + struct String_vector *strings); + /* iterate child znodes */ #define FOR_EACH_ZNODE(zh, parent, path, strs) \ - for (zoo_get_children(zh, parent, 1, strs), \ + for (zk_get_children(zh, parent, 1, strs), \ (strs)->data += (strs)->count; \ (strs)->count-- ? \ sprintf(path, "%s/%s", parent, *--(strs)->data) : \ @@ -78,19 +98,89 @@ static char *zk_option; /* protect queue_start_pos */ static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; +/* protect leave event list */ +static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER; + +/* zookeeper API wrapper */ +inline ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value, + int valuelen, const struct ACL_vector *acl, int flags, + char *path_buffer, int path_buffer_len) +{ + int rc; + do { + rc = zoo_create(zh, path, value, valuelen, acl, + flags, path_buffer, path_buffer_len); + dprintf("rc:%d\n", rc); + }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS); + return rc; +} + +inline ZOOAPI int zk_delete(zhandle_t *zh, const char *path, int version) +{ + int rc; + do { + rc = zoo_delete(zh, path, version); + dprintf("rc:%d\n", rc); + }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS); + return rc; +} + +inline ZOOAPI int zk_get(zhandle_t *zh, const char *path, int watch, char *buffer, + int* buffer_len, struct Stat *stat) +{ + int rc; + do { + rc = zoo_get(zh, path, watch, buffer, buffer_len, stat); + dprintf("rc:%d\n", rc); + }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS); + return rc; +} + +inline ZOOAPI int zk_set(zhandle_t *zh, const char *path, const char *buffer, + int buflen, int version) +{ + int rc; + do { + rc = zoo_set(zh, path, buffer, buflen, version); + dprintf("rc:%d\n", rc); + }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS); + return rc; +} + +inline ZOOAPI int zk_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat) +{ + int rc; + do { + rc = zoo_exists(zh, path, watch, stat); + dprintf("rc:%d\n", rc); + }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS); + return rc; +} + +inline ZOOAPI int zk_get_children(zhandle_t *zh, const char *path, int watch, + struct String_vector *strings) +{ + int rc; + do { + rc = zoo_get_children(zh, path, watch, strings); + dprintf("rc:%d\n", rc); + }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS); + return rc; +} + /* ZooKeeper-based lock */ static void zk_lock(zhandle_t *zh) { int rc; again: - rc = zoo_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, + rc = zk_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0); if (rc == ZOK){ dprintf("locked\n"); return; } - else if (rc == ZNODEEXISTS || rc == ZOPERATIONTIMEOUT) { + else if (rc == ZNODEEXISTS) { dprintf("retry, rc:%d\n", rc); usleep(10000); /* FIXME: use watch notification */ goto again; @@ -103,7 +193,8 @@ static void zk_unlock(zhandle_t *zh) { int rc; - rc = zoo_delete(zh, LOCK_ZNODE, -1); + rc = zk_delete(zh, LOCK_ZNODE, -1); + if (rc != ZOK) panic("failed to release lock\n"); @@ -122,7 +213,7 @@ static int zk_queue_empty(zhandle_t *zh) sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); - rc = zoo_exists(zh, path, 1, NULL); + rc = zk_exists(zh, path, 1, NULL); if (rc == ZOK) return 0; @@ -137,14 +228,11 @@ static int zk_queue_push(zhandle_t *zh, struct zk_event *ev) len = (char *)(ev->buf) - (char *)ev + ev->buf_len; sprintf(path, "%s/", QUEUE_ZNODE); - do{ - dprintf("zoo_create ...\n"); - rc = zoo_create(zh, path, (char *)ev, len, - &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf)); - dprintf("create path:%s, nr_nodes:%ld, queue_pos:%d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc); - }while (rc == ZOPERATIONTIMEOUT); + rc = zk_create(zh, path, (char *)ev, len, + &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf)); + dprintf("create path:%s, nr_nodes:%ld, queue_pos:%d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc); if (rc != ZOK) - panic("failed to zoo_create path:%s, rc:%d\n", path, rc); + panic("failed to zk_create path:%s, rc:%d\n", path, rc); sscanf(buf, QUEUE_ZNODE "/%010d", &seq); dprintf("path:%s, seq:%d\n", buf, seq); @@ -177,7 +265,7 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev) /* update the last popped data */ len = (char *)(ev->buf) - (char *)ev + ev->buf_len; sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); - rc = zoo_set(zh, path, (char *)ev, len, -1); + rc = zk_set(zh, path, (char *)ev, len, -1); dprintf("update path:%s, queue_pos:%d, len:%d, rc:%d\n", path, queue_pos, len, rc); if (rc != ZOK) panic("failed to zk_set path:%s, rc:%d\n", path, rc); @@ -197,8 +285,12 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) /* process leave event */ if (!list_empty(&zk_levent_list)) { dprintf("found a leave event.\n"); + + pthread_mutex_lock(&leave_lock); lev = list_first_entry(&zk_levent_list, typeof(*lev), list); list_del(&lev->list); + pthread_mutex_unlock(&leave_lock); + memcpy(ev, lev, sizeof(*ev)); free(lev); return 0; @@ -209,10 +301,8 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) len = sizeof(*ev); sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); - do { - rc = zoo_get(zh, path, 1, (char *)ev, &len, NULL); - dprintf("read path:%s, nr_nodes:%ld, type:%d, len:%d, rc:%d\n", path, nr_zk_nodes, ev->type, len, rc); - }while(rc == ZOPERATIONTIMEOUT); + rc = zk_get(zh, path, 1, (char *)ev, &len, NULL); + dprintf("read path:%s, nr_nodes:%ld, type:%d, len:%d, rc:%d\n", path, nr_zk_nodes, ev->type, len, rc); if (rc != ZOK) panic("failed to zk_set path:%s, rc:%d\n", path, rc); @@ -226,7 +316,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) /* watch next data */ sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); - rc = zoo_exists(zh, path, 1, NULL); + rc = zk_exists(zh, path, 1, NULL); dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK)); if (rc == ZOK) { /* we lost this message, manual notify */ @@ -262,9 +352,9 @@ static int is_zk_queue_valid(zhandle_t *zh) int rc; struct String_vector strs; - rc = zoo_get_children(zh, MEMBER_ZNODE, 1, &strs); + rc = zk_get_children(zh, MEMBER_ZNODE, 1, &strs); if (rc != ZOK) - panic("failed to zoo_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc); + panic("failed to zk_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc); return strs.count; } @@ -310,7 +400,7 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes) i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node)); } memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes)); - + for (i=0; i<nr_nodes; i++) { dprintf("zk_nodes[%d], seq:%d, value:%s\n", i, znodes[i].seq, node_to_str(&zk_nodes[i].node)); @@ -341,9 +431,9 @@ static struct zk_node* find_node(struct zk_node *znodes, int nr_nodes, struct zk static void zk_queue_init(zhandle_t *zh) { - zoo_create(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); - zoo_create(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); - zoo_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); + zk_create(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); + zk_create(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); + zk_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); } static void zk_data_init(zhandle_t *zh) @@ -363,15 +453,13 @@ static void zk_data_init(zhandle_t *zh) if (is_zk_queue_valid(zh)) { FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) { - do { - len = sizeof(znode); - rc = zoo_get(zh, path, 1, (char *)&znode, &len, NULL); - if (rc == ZOK && znode.joined == 0) { - dprintf("wait until znode:%s become joined\n", path); - usleep(10000); - continue; - } - }while(rc == ZOPERATIONTIMEOUT); + len = sizeof(znode); + rc = zk_get(zh, path, 1, (char *)&znode, &len, NULL); + if (rc == ZOK && znode.joined == 0) { + dprintf("wait until znode:%s become joined\n", path); + usleep(10000); + continue; + } switch(rc) { case ZOK: @@ -380,16 +468,9 @@ static void zk_data_init(zhandle_t *zh) case ZNONODE: break; default: - panic("failed to zoo_get path:%s, rc:%d\n", path, rc); + panic("failed to zk_get path:%s, rc:%d\n", path, rc); } } - }else { - dprintf("clean zookeeper store\n"); - FOR_EACH_ZNODE(zh, QUEUE_ZNODE, path, &strs) { - rc = zoo_delete(zh, path, -1); - if (rc != ZOK) - panic("failed to zk_delete path:%s, rc:%d\n", path, rc); - } } sort_zk_nodes(zk_nodes, nr_zk_nodes); @@ -458,7 +539,10 @@ static int add_event(zhandle_t *zh, enum zk_event_type type, } memcpy(lev, &ev, sizeof(ev)); + + pthread_mutex_lock(&leave_lock); list_add_tail(&lev->list, &zk_levent_list); + pthread_mutex_unlock(&leave_lock); /* manual notify */ dprintf("write event to efd:%d\n", efd); @@ -480,11 +564,18 @@ out: static void watcher(zhandle_t *zh, int type, int state, const char *path, void* ctx) { eventfd_t value = 1; + const clientid_t *cid; char str[256], *p; int ret, i; dprintf("path:%s, type:%d\n", path, type); + if (type == -1) { + cid = zoo_client_id(zh); + assert(cid != NULL); + dprintf("session change, clientid:%ld\n", cid->client_id); + } + /* discard useless event */ if (type < 0 || type == ZOO_CHILD_EVENT) return; @@ -500,11 +591,8 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void* /* check the failed node */ for (i=0; i<nr_zk_nodes; i++) { if (strcmp(p, node_to_str(&zk_nodes[i].node)) == 0) { - /* protect zk_levent_list */ - pthread_mutex_lock(&queue_lock); dprintf("zk_nodes[%d] leave:%s\n", i, node_to_str(&zk_nodes[i].node)); add_event(zh, EVENT_LEAVE, &zk_nodes[i], NULL, 0, NULL); - pthread_mutex_unlock(&queue_lock); return; } } @@ -581,11 +669,13 @@ static int zk_init(struct cdrv_handlers *handlers, const char *option, free(zk_option); zk_option = strdup(option); - zhandle = zookeeper_init(option, watcher, 2000, 0, NULL, 0); + zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, 0, NULL, 0); if (!zhandle) { eprintf("failed to connect to zk server %s\n", option); return -1; } + dprintf("request session timeout:%dms, negotiated session timeout:%dms\n", + SESSION_TIMEOUT, zoo_recv_timeout(zhandle)); if (get_addr(myaddr) < 0) return -1; @@ -613,37 +703,44 @@ static int zk_join(struct sd_node *myself, char path[256]; struct zk_node *znode; zhandle_t *zh = NULL; + const clientid_t *cid; zk_lock(zhandle); zk_data_init(zhandle); - + this_node.node = *myself; - this_node.seq = zk_queue_seq(zhandle); - this_node.joined = 0; zk_check_join_cb = check_join_cb; - dprintf("this_seq:%d\n", this_node.seq); - /* try to recover previous session */ znode = find_node(zk_nodes, nr_zk_nodes, &this_node); if (znode) { - zh = zookeeper_init(zk_option, watcher, 2000, &znode->clientid, NULL, 0); + zh = zookeeper_init(zk_option, watcher, SESSION_TIMEOUT, &znode->clientid, NULL, 0); if (zh) { - dprintf("recover previous session successfully! clientid:%ld\n", znode->clientid.client_id); + dprintf("recover previous session successfully! this_seq:%d, clientid:%ld\n", znode->seq, znode->clientid.client_id); + this_node = *znode; zk_unlock(zhandle); - zookeeper_close(zhandle); + do { + rc = zookeeper_close(zhandle); + }while(rc == ZCONNECTIONLOSS || rc == ZOPERATIONTIMEOUT); zhandle = zh; return 0; } } + this_node.seq = zk_queue_seq(zhandle); + this_node.joined = 0; + + cid = zoo_client_id(zhandle); + assert(cid != NULL); + this_node.clientid = *cid; + + dprintf("this_seq:%d, clientid:%ld\n", this_node.seq, cid->client_id); + sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself)); - do { - dprintf("try to create member path:%s\n", path); - rc = zoo_create(zhandle, path, (char *)&this_node, sizeof(this_node), - &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0); - }while(rc == ZOPERATIONTIMEOUT || rc == ZNODEEXISTS); + dprintf("try to create member path:%s\n", path); + rc = zk_create(zhandle, path, (char *)&this_node, sizeof(this_node), + &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0); if (rc != ZOK) panic("failed to create an ephemeral znode, rc:%d\n", rc); @@ -733,17 +830,15 @@ static int zk_dispatch(void) len = sizeof(znode); sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node)); - do { - rc = zoo_get(zhandle, path, 0, (char *)&znode, &len, NULL); - }while(rc == ZOPERATIONTIMEOUT); + rc = zk_get(zhandle, path, 0, (char *)&znode, &len, NULL); if (rc != ZOK) - panic("failed to zoo_get path:%s, rc:%d\n", path, rc); + panic("failed to zk_get path:%s, rc:%d\n", path, rc); /* update joined state in zookeeper MEMBER_ZNODE list*/ znode.joined = 1; - rc = zoo_set(zhandle, path, (char *)&znode, sizeof(znode), -1); + rc = zk_set(zhandle, path, (char *)&znode, sizeof(znode), -1); if (rc != ZOK) - panic("failed to zoo_set path:%s, rc:%d\n", path, rc); + panic("failed to zk_set path:%s, rc:%d\n", path, rc); dprintf("I'm master, push back join event\n"); zk_queue_push_back(zhandle, &ev); @@ -776,7 +871,7 @@ static int zk_dispatch(void) nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined); sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node)); - rc = zoo_exists(zhandle, path, 1, NULL); + rc = zk_exists(zhandle, path, 1, NULL); dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK)); build_node_list(zk_nodes, nr_zk_nodes, entries); -- 1.7.7.6 |