From: HaiTing Yao <wujue.yht at taobao.com> I have tested node join/leave with corosync and zookeeper driver. How do I test it: 1, restart sheepdog daemon I used killall to kill all sheepdog thread then restarted the daemon 2, simulate corosync token lost use tc tool to slow down the network speed sudo tc qdisc add dev eth0 root netem delay 800ms Then within the specified time, deleted the rule: sudo tc qdisc del dev eth0 root Signed-off-by: HaiTing Yao <wujue.yht at taobao.com> --- sheep/cluster/corosync.c | 81 +++++++++++++++++++- sheep/group.c | 187 ++++++++++++++++++++++++++++++++++++++++++++++ sheep/sdnet.c | 12 +++ 3 files changed, 275 insertions(+), 5 deletions(-) diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index 4a588e9..64b837e 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -20,6 +20,7 @@ struct cpg_node { uint32_t nodeid; uint32_t pid; uint32_t gone; + uint32_t status; struct sd_node ent; }; @@ -319,8 +320,21 @@ static int __corosync_dispatch_one(struct corosync_event *cevent) add_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); nr_cpg_nodes++; /* fall through */ + case CJ_RES_COME_BACK: case CJ_RES_FAIL: case CJ_RES_JOIN_LATER: + if (cevent->result == CJ_RES_COME_BACK) { + for (idx = 0; idx < nr_cpg_nodes; idx++) { + if (!node_cmp(&cpg_nodes[idx].ent, + &cevent->sender.ent)) { + cpg_nodes[idx].status = + NODE_STATUS_NORMAL; + cpg_nodes[idx].gone = 0; + cpg_nodes[idx].pid = + cevent->sender.pid; + } + } + } build_node_list(cpg_nodes, nr_cpg_nodes, entries); sd_join_handler(&cevent->sender.ent, entries, nr_cpg_nodes, cevent->result, @@ -334,8 +348,11 @@ static int __corosync_dispatch_one(struct corosync_event *cevent) break; cevent->sender.ent = cpg_nodes[idx].ent; - del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); - nr_cpg_nodes--; + if (!temp_failure_enabled()) { + del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); + nr_cpg_nodes--; + } else + cpg_nodes[idx].status = NODE_STATUS_FAIL; build_node_list(cpg_nodes, nr_cpg_nodes, entries); sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes); break; @@ -538,7 +555,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, size_t joined_list_entries) { struct corosync_event *cevent; - int i; + int i, j = 0; struct cpg_node joined_sheep[SD_MAX_NODES]; struct cpg_node left_sheep[SD_MAX_NODES]; @@ -602,8 +619,38 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, panic("failed to allocate memory\n"); cevent->type = COROSYNC_EVENT_TYPE_JOIN; - cevent->sender = joined_sheep[i]; - cevent->blocked = 1; /* FIXME: add explanation */ + + if (temp_failure_enabled()) { + for (j = 0; j < nr_cpg_nodes; j++) { + if (cpg_node_equal(cpg_nodes + j, + joined_sheep + i)) { + cpg_nodes[j].status = + NODE_STATUS_NORMAL; + cpg_nodes[j].gone = 0; + cevent->sender = cpg_nodes[j]; + cevent->blocked = 0; + cevent->result = CJ_RES_COME_BACK; + break; + } + } + if ((joined_list[i].reason == CPG_REASON_NODEUP) && + (j == nr_cpg_nodes)) { + dprintf("the node %u has been removed\n", + joined_sheep[i].nodeid); + free(cevent); + return; + } + } + + /* + * the joined node not find in current list + * or disabled the temp failure + */ + if ((j == nr_cpg_nodes) || (!temp_failure_enabled())) { + cevent->sender = joined_sheep[i]; + cevent->blocked = 1; /* FIXME: add explanation */ + } + if (member_list_entries == joined_list_entries - left_list_entries && cpg_node_equal(&joined_sheep[0], &this_node)) cevent->first_node = 1; @@ -739,6 +786,29 @@ static int corosync_dispatch(void) return 0; } +static int corosync_remove_node(void *node) +{ + int ret = -1, i; + struct sd_node *leave_node = (struct sd_node *)node; + + if (!leave_node) + return ret; + + for (i = 0; i < nr_cpg_nodes; i++) { + if (!node_cmp(&cpg_nodes[i].ent, leave_node)) { + dprintf("will delete cpg node %u\n", + cpg_nodes[i].nodeid); + nr_cpg_nodes--; + memmove(cpg_nodes + i, cpg_nodes + i + 1, + sizeof(struct cpg_node) * (nr_cpg_nodes - i)); + ret = 0; + break; + } + } + + return ret; +} + struct cluster_driver cdrv_corosync = { .name = "corosync", @@ -747,6 +817,7 @@ struct cluster_driver cdrv_corosync = { .leave = corosync_leave, .notify = corosync_notify, .dispatch = corosync_dispatch, + .remove_node = corosync_remove_node, }; cdrv_register(cdrv_corosync); diff --git a/sheep/group.c b/sheep/group.c index f9a5437..5ec7c67 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -27,6 +27,8 @@ static int cdrv_fd; +struct temp_failure failure_timers[SD_MAX_NODES_TEMP_FAIL]; + struct node { struct sd_node ent; struct list_head list; @@ -391,6 +393,19 @@ static int get_cluster_status(struct sd_node *from, if (ret) goto out; + for (i = 0; i < sys->nr_nodes; i++) { + if (sys->nodes[i].status == NODE_STATUS_FAIL) { + ret = SD_RES_CLUSTER_TEMP_FAILURE; + *inc_epoch = 0; + if (!node_cmp(from, sys->nodes + i)) + ret = SD_RES_NODE_COME_BACK; + } + } + + if ((ret == SD_RES_CLUSTER_TEMP_FAILURE) || + (ret == SD_RES_NODE_COME_BACK)) + goto out; + switch (sys_stat) { case SD_STATUS_HALT: case SD_STATUS_OK: @@ -788,6 +803,8 @@ enum cluster_join_result sd_check_join_cb(struct sd_node *joining, void *opaque) else if (jm->result == SD_RES_OLD_NODE_VER || jm->result == SD_RES_NEW_NODE_VER) return CJ_RES_JOIN_LATER; + else if (jm->result == SD_RES_NODE_COME_BACK) + return CJ_RES_COME_BACK; else return CJ_RES_FAIL; } @@ -832,6 +849,11 @@ static void __sd_join_done(struct event_struct *cevent) w->member_list_entries); } + if (jm->result == SD_RES_NODE_COME_BACK) { + dprintf("the node come back after reboot and join again\n"); + return; + } + if (sys_can_recover() && jm->inc_epoch) { list_for_each_entry_safe(node, t, &sys->leave_list, list) { list_del(&node->list); @@ -1097,6 +1119,34 @@ void process_request_event_queues(void) process_request_queue(); } +static int node_status_update(struct sd_node *target, + enum node_vnode_status status) +{ + int i = 0; + int j = 0; + int ret = -1; + int found = -1; + + for (i = 0; i < sys->nr_nodes; i++) { + if (!memcmp(target->addr, sys->nodes[i].addr, + sizeof(target->addr))) { + sys->nodes[i].status = status; + found = 0; + ret = 0; + } + + if (!ret) { + for (j = 0; j < sys->nr_vnodes; j++) { + if (sys->vnodes[j].node_idx == i) + sys->vnodes[j].status = status; + } + ret = -1; + } + } + + return found; +} + void sd_join_handler(struct sd_node *joined, struct sd_node *members, size_t nr_members, enum cluster_join_result result, void *opaque) @@ -1109,6 +1159,12 @@ void sd_join_handler(struct sd_node *joined, struct sd_node *members, struct join_message *jm; int le = get_latest_epoch(); + if (temp_failure_enabled()) { + if (!node_status_update(joined, NODE_STATUS_NORMAL)) { + return; + } + } + if (node_cmp(joined, &sys->this_node) == 0) { if (result == CJ_RES_FAIL) { eprintf("Fail to join. The joining node has an invalid epoch.\n"); @@ -1123,6 +1179,7 @@ void sd_join_handler(struct sd_node *joined, struct sd_node *members, switch (result) { case CJ_RES_SUCCESS: + case CJ_RES_COME_BACK: dprintf("join %s\n", node_to_str(joined)); for (i = 0; i < nr_members; i++) dprintf("[%x] %s\n", i, node_to_str(members + i)); @@ -1235,12 +1292,135 @@ void sd_join_handler(struct sd_node *joined, struct sd_node *members, } } +static void temp_failure_timeout(void *data) +{ + int idx; + int size; + int found = -1; + char ipstr[INET6_ADDRSTRLEN]; + struct cpg_event *cevent = NULL; + struct work_leave *w = NULL; + struct temp_failure *leave_timer = NULL; + struct sd_node *leave_node = NULL; + struct sd_node *tmp = NULL; + + dprintf("temp failure time up, now check it\n"); + + leave_timer = (struct temp_failure *)data; + leave_node = &(leave_timer->node); + + addr_to_str(ipstr, sizeof(ipstr), leave_node->addr, 0); + + if (sys_stat_shutdown()) + return; + + w = zalloc(sizeof(*w)); + if (!w) + panic("failed to allocate memory for a confchg event\n"); + + cevent = &w->cev; + cevent->ctype = CPG_EVENT_LEAVE; + + size = sizeof(struct sd_node) * (sys->nr_nodes - 1); + w->member_list = zalloc(size); + if (!w->member_list) { + free(w); + panic("failed to allocate memory for a confchg event\n"); + } + + tmp = w->member_list; + + size = sizeof(struct sd_node); + w->member_list_entries = sys->nr_nodes - 1; + w->left = *leave_node; + + for (idx = 0; idx < sys->nr_nodes; idx++) { + if (!memcmp(leave_node->addr, sys->nodes[idx].addr, + sizeof(leave_node->addr))) { + found = idx; + } else { + if ((idx == (sys->nr_nodes - 1)) && (found < 0)) + break; + memcpy(tmp++, sys->nodes + idx, size); + } + } + + if (found < 0) { + dprintf("can not find right node for %s\n", ipstr); + goto temp_failure_check_out; + } + + if (sys->nodes[found].status != NODE_STATUS_FAIL) { + dprintf("the node %s have been ok\n", ipstr); + goto temp_failure_check_out; + } + + dprintf("the node %s perhaps will never come back\n", ipstr); + + if (sys->cdrv->remove_node) + sys->cdrv->remove_node(leave_node); + + list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + start_cpg_event_work(); + + unregister_event(cdrv_fd); + + leave_timer->busy = 0; + + return; + +temp_failure_check_out: + + leave_timer->busy = 0; + + if (w) { + if (w->member_list) + free(w->member_list); + free(w); + } + + return; +} + +static int add_temp_failure_timer(struct sd_node *left) +{ + int i; + + for (i = 0; i < SD_MAX_NODES_TEMP_FAIL; i++) + if (!failure_timers[i].busy) + break; + + if (i == SD_MAX_NODES_TEMP_FAIL) { + eprintf("so many nodes became failed\n"); + return -1; + } + + dprintf("will use No. %d timer for temporary failure\n", i); + + memset(failure_timers + i, 0, sizeof(struct temp_failure)); + + failure_timers[i].busy = 1; + failure_timers[i].leave_timer.callback = temp_failure_timeout; + failure_timers[i].leave_timer.data = failure_timers + i; + memcpy(&(failure_timers[i].node), left, sizeof(struct sd_node)); + + add_timer(&(failure_timers[i].leave_timer), sys->templeft_time); + + return 0; +} + +int temp_failure_enabled(void) +{ + return sys->templeft_time; +} + void sd_leave_handler(struct sd_node *left, struct sd_node *members, size_t nr_members) { struct event_struct *cevent; struct work_leave *w = NULL; int i, size; + int ret; dprintf("leave %s\n", node_to_str(left)); for (i = 0; i < nr_members; i++) @@ -1249,6 +1429,13 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members, if (sys_stat_shutdown()) return; + if (temp_failure_enabled()) { + node_status_update(left, NODE_STATUS_FAIL); + ret = add_temp_failure_timer(left); + if (!ret) + return; + } + w = zalloc(sizeof(*w)); if (!w) goto oom; diff --git a/sheep/sdnet.c b/sheep/sdnet.c index 1e001af..20484a7 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -753,6 +753,18 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch) addr_to_str(name, sizeof(name), addr, 0); + for (i = 0; i < sys->nr_nodes; i++) { + if (!memcmp(addr, sys->nodes[i].addr, + sizeof(sys->nodes[i].addr)) && + (port == sys->nodes[i].port)) { + if (sys->nodes[i].status == NODE_STATUS_FAIL) { + dprintf("the ip %s port %u failing now\n", + name, port); + return -1; + } + } + } + fd = connect_to(name, port); if (fd < 0) return -1; -- 1.7.1 |