In the old implementation, zookeeper driver needs master to handle join requests and avoid errors during concurrent joining. However, as join requests can be handled by any joined node, we have cheeper way to avoid errors during concurrent joining. In this patch, we use ephemeral znode to simulate distributed lock. Before a node send join request, it have to acquire the lock. And under the protection of the lock, zk_member_empty() is safe because the membership it stable. Because the znode is ephemeral, the lock will automatically released if the node is down. Signed-off-by: Kai Zhang <kyle at zelin.io> --- sheep/cluster/zookeeper.c | 228 +++++++++------------------------------------ 1 file changed, 43 insertions(+), 185 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 1ace8b6..ccfcff9 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -30,7 +30,7 @@ #define BASE_ZNODE "/sheepdog" #define QUEUE_ZNODE BASE_ZNODE "/queue" #define MEMBER_ZNODE BASE_ZNODE "/member" -#define MASTER_ZNONE BASE_ZNODE "/master" +#define JOIN_LOCK_ZNODE BASE_ZNODE "/join_lock" /* iterate child znodes */ #define FOR_EACH_ZNODE(parent, path, strs) \ @@ -72,15 +72,11 @@ static struct sd_node sd_nodes[SD_MAX_NODES]; static size_t nr_sd_nodes; static struct rb_root zk_node_root = RB_ROOT; static pthread_rwlock_t zk_tree_lock = PTHREAD_RWLOCK_INITIALIZER; -static pthread_rwlock_t zk_compete_master_lock = PTHREAD_RWLOCK_INITIALIZER; static LIST_HEAD(zk_block_list); -static uatomic_bool is_master; static uatomic_bool stop; static bool joined; static bool first_push = true; -static void zk_compete_master(void); - static struct zk_node *zk_tree_insert(struct zk_node *new) { struct rb_node **p = &zk_node_root.rb_node; @@ -418,7 +414,6 @@ static int push_join_response(struct zk_event *ev) ev->nr_nodes = nr_sd_nodes; memcpy(zk_event_sd_nodes(ev), sd_nodes, nr_sd_nodes * sizeof(struct sd_node)); - queue_pos--; len = offsetof(typeof(*ev), buf) + ev->buf_len; snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos); @@ -505,7 +500,6 @@ static inline void build_node_list(void) static int zk_queue_init(void) { RETURN_IF_ERROR(zk_init_node(BASE_ZNODE), "path %s", BASE_ZNODE); - RETURN_IF_ERROR(zk_init_node(MASTER_ZNONE), "path %s", MASTER_ZNONE); RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE); RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE); return ZOK; @@ -571,12 +565,6 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, } else if (type == ZOO_DELETED_EVENT) { struct zk_node *n; - ret = sscanf(path, MASTER_ZNONE "/%s", str); - if (ret == 1) { - zk_compete_master(); - return; - } - ret = sscanf(path, MEMBER_ZNODE "/%s", str); if (ret != 1) return; @@ -614,186 +602,49 @@ static int add_join_event(void *msg, size_t msg_len) return zk_queue_push(&ev); } -static int zk_get_least_seq(const char *parent, char *least_seq_path, - int path_len, void *buf, int *buf_len) +static int zk_acquire_lock(const char *path) { - char path[MAX_NODE_STR_LEN], *p, *tmp; - struct String_vector strs; - int rc, least_seq = INT_MAX , seq; - + int rc; while (true) { - RETURN_IF_ERROR(zk_get_children(parent, &strs), ""); + rc = zk_node_exists(path); - FOR_EACH_ZNODE(parent, path, &strs) { - p = strrchr(path, '/'); - seq = strtol(++p, &tmp, 10); - if (seq < least_seq) - least_seq = seq; - } + if (rc == ZOK) { + /* lock is dominated by others */ + usleep(50000); + continue; + } else if (rc != ZNONODE) + return rc; - snprintf(path, MAX_NODE_STR_LEN, "%s/%010"PRId32, - parent, least_seq); - rc = zk_get_data(path, buf, buf_len); - switch (rc) { - case ZOK: - strncpy(least_seq_path, path, path_len); - return ZOK; - case ZNONODE: - break; - default: - sd_eprintf("failed, %s", zerror(rc)); + rc = zk_create_node(path, "", 0, &ZOO_OPEN_ACL_UNSAFE, + ZOO_EPHEMERAL, NULL, 0); + + if (rc != ZNODEEXISTS) return rc; - } } } -static int zk_find_master(int *master_seq, char *master_name) +static void zk_release_lock(const char *path) { - int rc, len = MAX_NODE_STR_LEN; - char master_compete_path[MAX_NODE_STR_LEN]; - - if (*master_seq < 0) { - RETURN_IF_ERROR(zk_get_least_seq(MASTER_ZNONE, - master_compete_path, - MAX_NODE_STR_LEN, master_name, - &len), ""); - sscanf(master_compete_path, MASTER_ZNONE "/%"PRId32, - master_seq); - return ZOK; - } else { - while (true) { - snprintf(master_compete_path, len, - MASTER_ZNONE "/%010"PRId32, *master_seq); - rc = zk_get_data(master_compete_path, master_name, - &len); - switch (rc) { - case ZOK: - return ZOK; - case ZNONODE: - sd_iprintf("detect master leave, " - "start to compete master"); - (*master_seq)++; - break; - default: - sd_eprintf("failed, %s", zerror(rc)); - return rc; - } - } - } + zk_delete_node(path, -1); } -/* - * block until last sheep joined - * last_sheep returns sequence number of last sheep or -1 if no previous sheep - */ -static int zk_verify_last_sheep_join(int seq, int *last_sheep) +static int zk_member_empty(bool *empty) { - int rc, len = MAX_NODE_STR_LEN; - char path[MAX_NODE_STR_LEN], name[MAX_NODE_STR_LEN]; + struct String_vector strs; + int rc; - for (*last_sheep = seq - 1; *last_sheep >= 0; (*last_sheep)--) { - snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNONE "/%010"PRId32, - *last_sheep); - rc = zk_get_data(path, name, &len); - switch (rc) { - case ZNONODE: - continue; - case ZOK: - break; - default: - sd_eprintf("failed, %s", zerror(rc)); - return rc; - } + rc = zk_get_children(MEMBER_ZNODE, &strs); + if (rc != ZOK) + return rc; - if (!strcmp(name, node_to_str(&this_node.node))) - continue; + if (strs.count == 0) + *empty = true; + else + *empty = false; - snprintf(path, MAX_NODE_STR_LEN, MEMBER_ZNODE "/%s", name); - rc = zk_node_exists(path); - switch (rc) { - case ZOK: - return ZOK; - case ZNONODE: - (*last_sheep)++; - break; - default: - sd_eprintf("failed, %s", zerror(rc)); - return rc; - } - } return ZOK; } -/* - * Create sequential node under MASTER_ZNODE. - * Sheep with least sequential number win the competition. - */ -static void zk_compete_master(void) -{ - int rc, last_joined_sheep; - char master_name[MAX_NODE_STR_LEN]; - char my_compete_path[MAX_NODE_STR_LEN]; - static int master_seq = -1, my_seq; - - /* - * This is to protect master_seq and my_seq because this function will - * be called by both main thread and zookeeper's event thread. - */ - pthread_rwlock_wrlock(&zk_compete_master_lock); - - if (uatomic_is_true(&is_master) || uatomic_is_true(&stop)) - goto out_unlock; - - if (!joined) { - sd_dprintf("start to compete master for the first time"); - do { - if (uatomic_is_true(&stop)) - goto out_unlock; - /* duplicate sequential node has no side-effect */ - rc = zk_create_seq_node(MASTER_ZNONE "/", - node_to_str(&this_node.node), - MAX_NODE_STR_LEN, - my_compete_path, - MAX_NODE_STR_LEN, true); - } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); - CHECK_ZK_RC(rc, MASTER_ZNONE "/"); - if (rc != ZOK) - goto out_unlock; - - sd_dprintf("my compete path: %s", my_compete_path); - sscanf(my_compete_path, MASTER_ZNONE "/%"PRId32, - &my_seq); - } - - if (zk_find_master(&master_seq, master_name) != ZOK) - goto out_unlock; - - if (!strcmp(master_name, node_to_str(&this_node.node))) - goto success; - else if (joined) { - sd_dprintf("lost"); - goto out_unlock; - } else { - if (zk_verify_last_sheep_join(my_seq, - &last_joined_sheep) != ZOK) - goto out_unlock; - - if (last_joined_sheep < 0) { - /* all previous sheep has quit, i'm master */ - master_seq = my_seq; - goto success; - } else { - sd_dprintf("lost"); - goto out_unlock; - } - } -success: - uatomic_set_true(&is_master); - sd_dprintf("success"); -out_unlock: - pthread_rwlock_unlock(&zk_compete_master_lock); -} - static int zk_join(const struct sd_node *myself, void *opaque, size_t opaque_len) { @@ -809,7 +660,7 @@ static int zk_join(const struct sd_node *myself, exit(1); } - zk_compete_master(); + RETURN_VALUE_IF_ERROR(zk_acquire_lock(JOIN_LOCK_ZNODE), -1, ""); RETURN_VALUE_IF_ERROR(add_join_event(opaque, opaque_len), -1, ""); return 0; @@ -846,17 +697,24 @@ static int zk_unblock(void *msg, size_t msg_len) static void zk_handle_join(struct zk_event *ev) { + bool member_empty = false; sd_dprintf("sender: %s", node_to_str(&ev->sender.node)); - if (!uatomic_is_true(&is_master)) { - /* Let's await master acking the join-request */ - queue_pos--; - return; - } - sd_join_handler(&ev->sender.node, sd_nodes, nr_sd_nodes, ev->buf); - push_join_response(ev); + queue_pos--; + + /* + * If the join request is local and there is no joined node, sender node + * handle the join request by itself. Otherwise, it wait other nodes to + * handle it. + */ + if (node_eq(&ev->sender.node, &this_node.node)) { + RETURN_VOID_IF_ERROR(zk_member_empty(&member_empty), ""); + if (!member_empty) + return; + } - sd_dprintf("I'm the master now"); + if (sd_join_handler(&ev->sender.node, sd_nodes, nr_sd_nodes, ev->buf)) + push_join_response(ev); } static void watch_all_nodes(void) @@ -911,6 +769,7 @@ static void zk_handle_accept(struct zk_event *ev) &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0); RETURN_VOID_IF_ERROR(rc, ""); + zk_release_lock(JOIN_LOCK_ZNODE); } else zk_node_exists(path); @@ -1126,7 +985,6 @@ static int zk_init(const char *option) } uatomic_set_false(&stop); - uatomic_set_false(&is_master); if (zk_queue_init() != ZOK) return -1; -- 1.7.9.5 |