[Sheepdog] [zookeeper][PATCH v2 03/11] retry again when zoo_* api return ZCONNECTIONLOSS/ZOPERATIONTIMEOUT error
Yunkai Zhang
yunkai.me at gmail.com
Thu Apr 26 17:21:22 CEST 2012
From: Yunkai Zhang <qiushu.zyk at taobao.com>
Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
sheep/cluster/zookeeper.c | 212 +++++++++++++++++++++++++++++++++------------
1 files changed, 155 insertions(+), 57 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index f6823aa..8daddfc 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -22,15 +22,35 @@
#include "work.h"
#define MAX_EVENT_BUF_SIZE (64 * 1024)
+#define SESSION_TIMEOUT 30000
#define BASE_ZNODE "/sheepdog"
#define LOCK_ZNODE BASE_ZNODE "/lock"
#define QUEUE_ZNODE BASE_ZNODE "/queue"
#define MEMBER_ZNODE BASE_ZNODE "/member"
+
+/* zookeeper API wrapper prototypes */
+ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
+ int valuelen, const struct ACL_vector *acl, int flags,
+ char *path_buffer, int path_buffer_len);
+
+ZOOAPI int zk_delete(zhandle_t *zh, const char *path, int version);
+
+ZOOAPI int zk_get(zhandle_t *zh, const char *path, int watch, char *buffer,
+ int* buffer_len, struct Stat *stat);
+
+ZOOAPI int zk_set(zhandle_t *zh, const char *path, const char *buffer,
+ int buflen, int version);
+
+ZOOAPI int zk_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat);
+
+ZOOAPI int zk_get_children(zhandle_t *zh, const char *path, int watch,
+ struct String_vector *strings);
+
/* iterate child znodes */
#define FOR_EACH_ZNODE(zh, parent, path, strs) \
- for (zoo_get_children(zh, parent, 1, strs), \
+ for (zk_get_children(zh, parent, 1, strs), \
(strs)->data += (strs)->count; \
(strs)->count-- ? \
sprintf(path, "%s/%s", parent, *--(strs)->data) : \
@@ -47,6 +67,7 @@ enum zk_event_type {
struct zk_node {
int seq;
int joined;
+ clientid_t clientid;
struct sd_node node;
};
@@ -76,19 +97,95 @@ static size_t nr_zk_nodes;
/* protect queue_start_pos */
static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
+/* protect leave event list */
+static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER;
+
+/* zookeeper API wrapper */
+inline ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
+ int valuelen, const struct ACL_vector *acl, int flags,
+ char *path_buffer, int path_buffer_len)
+{
+ int rc;
+ do {
+ rc = zoo_create(zh, path, value, valuelen, acl,
+ flags, path_buffer, path_buffer_len);
+ if (rc != ZOK)
+ dprintf("rc:%d\n", rc);
+ }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+ return rc;
+}
+
+inline ZOOAPI int zk_delete(zhandle_t *zh, const char *path, int version)
+{
+ int rc;
+ do {
+ rc = zoo_delete(zh, path, version);
+ if (rc != ZOK)
+ dprintf("rc:%d\n", rc);
+ }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+ return rc;
+}
+
+inline ZOOAPI int zk_get(zhandle_t *zh, const char *path, int watch, char *buffer,
+ int* buffer_len, struct Stat *stat)
+{
+ int rc;
+ do {
+ rc = zoo_get(zh, path, watch, buffer, buffer_len, stat);
+ if (rc != ZOK)
+ dprintf("rc:%d\n", rc);
+ }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+ return rc;
+}
+
+inline ZOOAPI int zk_set(zhandle_t *zh, const char *path, const char *buffer,
+ int buflen, int version)
+{
+ int rc;
+ do {
+ rc = zoo_set(zh, path, buffer, buflen, version);
+ if (rc != ZOK)
+ dprintf("rc:%d\n", rc);
+ }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+ return rc;
+}
+
+inline ZOOAPI int zk_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
+{
+ int rc;
+ do {
+ rc = zoo_exists(zh, path, watch, stat);
+ if (rc != ZOK)
+ dprintf("rc:%d\n", rc);
+ }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+ return rc;
+}
+
+inline ZOOAPI int zk_get_children(zhandle_t *zh, const char *path, int watch,
+ struct String_vector *strings)
+{
+ int rc;
+ do {
+ rc = zoo_get_children(zh, path, watch, strings);
+ if (rc != ZOK)
+ dprintf("rc:%d\n", rc);
+ }while(rc==ZOPERATIONTIMEOUT || rc==ZCONNECTIONLOSS);
+ return rc;
+}
+
/* ZooKeeper-based lock */
static void zk_lock(zhandle_t *zh)
{
int rc;
again:
- rc = zoo_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+ rc = zk_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL, NULL, 0);
if (rc == ZOK){
dprintf("locked\n");
return;
}
- else if (rc == ZNODEEXISTS || rc == ZOPERATIONTIMEOUT) {
+ else if (rc == ZNODEEXISTS) {
dprintf("retry, rc:%d\n", rc);
usleep(10000); /* FIXME: use watch notification */
goto again;
@@ -101,7 +198,8 @@ static void zk_unlock(zhandle_t *zh)
{
int rc;
- rc = zoo_delete(zh, LOCK_ZNODE, -1);
+ rc = zk_delete(zh, LOCK_ZNODE, -1);
+
if (rc != ZOK)
panic("failed to release lock\n");
@@ -120,7 +218,7 @@ static int zk_queue_empty(zhandle_t *zh)
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
- rc = zoo_exists(zh, path, 1, NULL);
+ rc = zk_exists(zh, path, 1, NULL);
if (rc == ZOK)
return 0;
@@ -135,14 +233,11 @@ static int zk_queue_push(zhandle_t *zh, struct zk_event *ev)
len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
sprintf(path, "%s/", QUEUE_ZNODE);
- do{
- dprintf("zoo_create ...\n");
- rc = zoo_create(zh, path, (char *)ev, len,
- &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
- dprintf("create path:%s, nr_nodes:%ld, queue_pos:%d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc);
- }while (rc == ZOPERATIONTIMEOUT);
+ rc = zk_create(zh, path, (char *)ev, len,
+ &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
+ dprintf("create path:%s, nr_nodes:%ld, queue_pos:%d, len:%d, rc:%d\n", buf, nr_zk_nodes, queue_pos, len, rc);
if (rc != ZOK)
- panic("failed to zoo_create path:%s, rc:%d\n", path, rc);
+ panic("failed to zk_create path:%s, rc:%d\n", path, rc);
sscanf(buf, QUEUE_ZNODE "/%010d", &seq);
dprintf("path:%s, seq:%d\n", buf, seq);
@@ -175,7 +270,7 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
/* update the last popped data */
len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
- rc = zoo_set(zh, path, (char *)ev, len, -1);
+ rc = zk_set(zh, path, (char *)ev, len, -1);
dprintf("update path:%s, queue_pos:%d, len:%d, rc:%d\n", path, queue_pos, len, rc);
if (rc != ZOK)
panic("failed to zk_set path:%s, rc:%d\n", path, rc);
@@ -195,8 +290,12 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
/* process leave event */
if (!list_empty(&zk_levent_list)) {
dprintf("found a leave event.\n");
+
+ pthread_mutex_lock(&leave_lock);
lev = list_first_entry(&zk_levent_list, typeof(*lev), list);
list_del(&lev->list);
+ pthread_mutex_unlock(&leave_lock);
+
memcpy(ev, lev, sizeof(*ev));
free(lev);
return 0;
@@ -207,10 +306,8 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
len = sizeof(*ev);
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
- do {
- rc = zoo_get(zh, path, 1, (char *)ev, &len, NULL);
- dprintf("read path:%s, nr_nodes:%ld, type:%d, len:%d, rc:%d\n", path, nr_zk_nodes, ev->type, len, rc);
- }while(rc == ZOPERATIONTIMEOUT);
+ rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
+ dprintf("read path:%s, nr_nodes:%ld, type:%d, len:%d, rc:%d\n", path, nr_zk_nodes, ev->type, len, rc);
if (rc != ZOK)
panic("failed to zk_set path:%s, rc:%d\n", path, rc);
@@ -224,7 +321,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
/* watch next data */
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
- rc = zoo_exists(zh, path, 1, NULL);
+ rc = zk_exists(zh, path, 1, NULL);
dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
if (rc == ZOK) {
/* we lost this message, manual notify */
@@ -260,9 +357,9 @@ static int is_zk_queue_valid(zhandle_t *zh)
int rc;
struct String_vector strs;
- rc = zoo_get_children(zh, MEMBER_ZNODE, 1, &strs);
+ rc = zk_get_children(zh, MEMBER_ZNODE, 1, &strs);
if (rc != ZOK)
- panic("failed to zoo_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc);
+ panic("failed to zk_get_children path:%s, rc:%d\n", MEMBER_ZNODE, rc);
return strs.count;
}
@@ -339,9 +436,9 @@ static struct zk_node* find_node(struct zk_node *znodes, int nr_nodes, struct zk
static void zk_queue_init(zhandle_t *zh)
{
- zoo_create(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
- zoo_create(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
- zoo_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
+ zk_create(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
+ zk_create(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
+ zk_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
}
static void zk_data_init(zhandle_t *zh)
@@ -361,15 +458,13 @@ static void zk_data_init(zhandle_t *zh)
if (is_zk_queue_valid(zh)) {
FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) {
- do {
- len = sizeof(znode);
- rc = zoo_get(zh, path, 1, (char *)&znode, &len, NULL);
- if (rc == ZOK && znode.joined == 0) {
- dprintf("wait until znode:%s become joined\n", path);
- usleep(10000);
- continue;
- }
- }while(rc == ZOPERATIONTIMEOUT);
+ len = sizeof(znode);
+ rc = zk_get(zh, path, 1, (char *)&znode, &len, NULL);
+ if (rc == ZOK && znode.joined == 0) {
+ dprintf("wait until znode:%s become joined\n", path);
+ usleep(10000);
+ continue;
+ }
switch(rc) {
case ZOK:
@@ -378,16 +473,9 @@ static void zk_data_init(zhandle_t *zh)
case ZNONODE:
break;
default:
- panic("failed to zoo_get path:%s, rc:%d\n", path, rc);
+ panic("failed to zk_get path:%s, rc:%d\n", path, rc);
}
}
- }else {
- dprintf("clean zookeeper store\n");
- FOR_EACH_ZNODE(zh, QUEUE_ZNODE, path, &strs) {
- rc = zoo_delete(zh, path, -1);
- if (rc != ZOK)
- panic("failed to zk_delete path:%s, rc:%d\n", path, rc);
- }
}
sort_zk_nodes(zk_nodes, nr_zk_nodes);
@@ -452,7 +540,10 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
}
memcpy(lev, &ev, sizeof(ev));
+
+ pthread_mutex_lock(&leave_lock);
list_add_tail(&lev->list, &zk_levent_list);
+ pthread_mutex_unlock(&leave_lock);
/* manual notify */
dprintf("write event to efd:%d\n", efd);
@@ -474,11 +565,18 @@ out:
static void watcher(zhandle_t *zh, int type, int state, const char *path, void* ctx)
{
eventfd_t value = 1;
+ const clientid_t *cid;
char str[256], *p;
int ret, i;
dprintf("path:%s, type:%d\n", path, type);
+ if (type == -1) {
+ cid = zoo_client_id(zh);
+ assert(cid != NULL);
+ dprintf("session change, clientid:%ld\n", cid->client_id);
+ }
+
/* discard useless event */
if (type < 0 || type == ZOO_CHILD_EVENT)
return;
@@ -494,11 +592,8 @@ static void watcher(zhandle_t *zh, int type, int state, const char *path, void*
/* check the failed node */
for (i=0; i<nr_zk_nodes; i++) {
if (strcmp(p, node_to_str(&zk_nodes[i].node)) == 0) {
- /* protect zk_levent_list */
- pthread_mutex_lock(&queue_lock);
dprintf("zk_nodes[%d] leave:%s\n", i, node_to_str(&zk_nodes[i].node));
add_event(zh, EVENT_LEAVE, &zk_nodes[i], NULL, 0, NULL);
- pthread_mutex_unlock(&queue_lock);
return;
}
}
@@ -569,11 +664,13 @@ static int zk_init(const char *option, uint8_t *myaddr)
return -1;
}
- zhandle = zookeeper_init(option, watcher, 2000, 0, NULL, 0);
+ zhandle = zookeeper_init(option, watcher, SESSION_TIMEOUT, 0, NULL, 0);
if (!zhandle) {
eprintf("failed to connect to zk server %s\n", option);
return -1;
}
+ dprintf("request session timeout:%dms, negotiated session timeout:%dms\n",
+ SESSION_TIMEOUT, zoo_recv_timeout(zhandle));
if (get_addr(myaddr) < 0)
return -1;
@@ -600,6 +697,7 @@ static int zk_join(struct sd_node *myself,
{
int rc;
char path[256];
+ const clientid_t *cid;
zk_lock(zhandle);
@@ -609,14 +707,16 @@ static int zk_join(struct sd_node *myself,
this_node.seq = zk_queue_seq(zhandle);
this_node.joined = 0;
- dprintf("this_seq:%d\n", this_node.seq);
+ cid = zoo_client_id(zhandle);
+ assert(cid != NULL);
+ this_node.clientid = *cid;
+
+ dprintf("this_seq:%d, clientid:%ld\n", this_node.seq, cid->client_id);
sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
- do {
- dprintf("try to create member path:%s\n", path);
- rc = zoo_create(zhandle, path, (char *)&this_node, sizeof(this_node),
- &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
- }while(rc == ZOPERATIONTIMEOUT || rc == ZNODEEXISTS);
+ dprintf("try to create member path:%s\n", path);
+ rc = zk_create(zhandle, path, (char *)&this_node, sizeof(this_node),
+ &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
if (rc != ZOK)
panic("failed to create an ephemeral znode, rc:%d\n", rc);
@@ -706,17 +806,15 @@ static int zk_dispatch(void)
len = sizeof(znode);
sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
- do {
- rc = zoo_get(zhandle, path, 0, (char *)&znode, &len, NULL);
- }while(rc == ZOPERATIONTIMEOUT);
+ rc = zk_get(zhandle, path, 0, (char *)&znode, &len, NULL);
if (rc != ZOK)
- panic("failed to zoo_get path:%s, rc:%d\n", path, rc);
+ panic("failed to zk_get path:%s, rc:%d\n", path, rc);
/* update joined state in zookeeper MEMBER_ZNODE list*/
znode.joined = 1;
- rc = zoo_set(zhandle, path, (char *)&znode, sizeof(znode), -1);
+ rc = zk_set(zhandle, path, (char *)&znode, sizeof(znode), -1);
if (rc != ZOK)
- panic("failed to zoo_set path:%s, rc:%d\n", path, rc);
+ panic("failed to zk_set path:%s, rc:%d\n", path, rc);
dprintf("I'm master, push back join event\n");
zk_queue_push_back(zhandle, &ev);
@@ -749,7 +847,7 @@ static int zk_dispatch(void)
nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
- rc = zoo_exists(zhandle, path, 1, NULL);
+ rc = zk_exists(zhandle, path, 1, NULL);
dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
build_node_list(zk_nodes, nr_zk_nodes, entries);
--
1.7.7.6
More information about the sheepdog
mailing list