From: HaiTing Yao <wujue.yht at taobao.com> I have tested node join/leave with corosync and zookeeper driver. How do I test it: 1, restart sheepdog daemon I used killall to kill all sheepdog thread then restarted the daemon 2, simulate corosync token lost use tc tool to slow down the network speed sudo tc qdisc add dev eth0 root netem delay 800ms Then within the specified time, deleted the rule: sudo tc qdisc del dev eth0 root Signed-off-by: HaiTing Yao <wujue.yht at taobao.com> --- sheep/cluster/corosync.c | 75 +++++++++++++++++- sheep/group.c | 190 ++++++++++++++++++++++++++++++++++++++++++++++ sheep/sdnet.c | 12 +++ 3 files changed, 272 insertions(+), 5 deletions(-) diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index f6eec1d..20f8a81 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -20,6 +20,7 @@ struct cpg_node { uint32_t nodeid; uint32_t pid; uint32_t gone; + uint32_t status; struct sd_node ent; }; @@ -323,8 +324,20 @@ static int __corosync_dispatch_one(struct corosync_event *cevent) add_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); nr_cpg_nodes++; /* fall through */ + case CJ_RES_COME_BACK: case CJ_RES_FAIL: case CJ_RES_JOIN_LATER: + if (cevent->result == CJ_RES_COME_BACK) { + for (idx = 0; idx < nr_cpg_nodes; idx++) { + if (!node_cmp(&cpg_nodes[idx].ent, &cevent->sender.ent)) { + dprintf("will chang cpg status from %u to ok\n", + cpg_nodes[idx].status); + cpg_nodes[idx].status = NODE_STATUS_NORMAL; + cpg_nodes[idx].gone = 0; + cpg_nodes[idx].pid = cevent->sender.pid; + } + } + } build_node_list(cpg_nodes, nr_cpg_nodes, entries); corosync_handlers.join_handler(&cevent->sender.ent, entries, nr_cpg_nodes, cevent->result, @@ -338,8 +351,11 @@ static int __corosync_dispatch_one(struct corosync_event *cevent) break; cevent->sender.ent = cpg_nodes[idx].ent; - del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); - nr_cpg_nodes--; + if (!temp_failure_enabled()) { + del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); + nr_cpg_nodes--; + } else + cpg_nodes[idx].status = NODE_STATUS_FAIL; build_node_list(cpg_nodes, nr_cpg_nodes, entries); corosync_handlers.leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes); @@ -543,7 +559,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, size_t joined_list_entries) { struct corosync_event *cevent; - int i; + int i, j = 0; struct cpg_node joined_sheep[SD_MAX_NODES]; struct cpg_node left_sheep[SD_MAX_NODES]; @@ -607,8 +623,34 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, panic("failed to allocate memory\n"); cevent->type = COROSYNC_EVENT_TYPE_JOIN; - cevent->sender = joined_sheep[i]; - cevent->blocked = 1; /* FIXME: add explanation */ + + if (temp_failure_enabled()) { + for (j = 0; j < nr_cpg_nodes; j++) { + if (cpg_node_equal(cpg_nodes + j, joined_sheep + i)) { + dprintf("node ID %u come back from temp failure\n", + joined_sheep[i].nodeid); + cpg_nodes[j].status = NODE_STATUS_NORMAL; + cpg_nodes[j].gone = 0; + cevent->sender = cpg_nodes[j]; + cevent->blocked = 0; + cevent->result = CJ_RES_COME_BACK; + break; + } + } + if ((joined_list[i].reason == CPG_REASON_NODEUP) && + (j == nr_cpg_nodes)) { + dprintf("the node %u has been removed\n", joined_sheep[i].nodeid); + free(cevent); + return; + } + } + + /* the joined node not find in current list or disabled the temp failure */ + if ((j == nr_cpg_nodes) || (!temp_failure_enabled())) { + cevent->sender = joined_sheep[i]; + cevent->blocked = 1; /* FIXME: add explanation */ + } + if (member_list_entries == joined_list_entries - left_list_entries && cpg_node_equal(&joined_sheep[0], &this_node)) cevent->first_node = 1; @@ -747,6 +789,28 @@ static int corosync_dispatch(void) return 0; } +static int corosync_remove_node(void *node) +{ + int ret = -1, i; + struct sd_node *leave_node = (struct sd_node *)node; + + if (!leave_node) + return ret; + + for (i = 0; i < nr_cpg_nodes; i++) { + if (!node_cmp(&cpg_nodes[i].ent, leave_node)) { + dprintf("will delete cpg node %u\n", cpg_nodes[i].nodeid); + nr_cpg_nodes--; + memmove(cpg_nodes + i, cpg_nodes + i + 1, + sizeof(struct cpg_node) * (nr_cpg_nodes - i)); + ret = 0; + break; + } + } + + return ret; +} + struct cluster_driver cdrv_corosync = { .name = "corosync", @@ -755,6 +819,7 @@ struct cluster_driver cdrv_corosync = { .leave = corosync_leave, .notify = corosync_notify, .dispatch = corosync_dispatch, + .remove_node = corosync_remove_node, }; cdrv_register(cdrv_corosync); diff --git a/sheep/group.c b/sheep/group.c index a1027b7..6c6cf6a 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -28,6 +28,8 @@ static int cdrv_fd; extern struct store_driver *sd_store; extern char *obj_path; +struct temp_failure failure_timers[SD_MAX_NODES_TEMP_FAIL]; + struct node { struct sd_node ent; struct list_head list; @@ -387,6 +389,20 @@ static int get_cluster_status(struct sd_node *from, if (ret) goto out; + for (i = 0; i < sys->nr_nodes; i++) { + if (sys->nodes[i].status == NODE_STATUS_FAIL) { + ret = SD_RES_CLUSTER_TEMP_FAILURE; + *inc_epoch = 0; + if (!node_cmp(from, sys->nodes + i)) { + ret = SD_RES_NODE_COME_BACK; + } + } + } + + if ((ret == SD_RES_CLUSTER_TEMP_FAILURE) || + (ret == SD_RES_NODE_COME_BACK)) + goto out; + switch (sys_stat) { case SD_STATUS_HALT: case SD_STATUS_OK: @@ -791,6 +807,8 @@ static enum cluster_join_result sd_check_join_cb( else if (jm->result == SD_RES_OLD_NODE_VER || jm->result == SD_RES_NEW_NODE_VER) return CJ_RES_JOIN_LATER; + else if (jm->result == SD_RES_NODE_COME_BACK) + return CJ_RES_COME_BACK; else return CJ_RES_FAIL; } @@ -833,6 +851,11 @@ static void __sd_join_done(struct cpg_event *cevent) update_cluster_info(jm, &w->joined, w->member_list, w->member_list_entries); + if (jm->result == SD_RES_NODE_COME_BACK) { + dprintf("the node come back after reboot and join again\n"); + return; + } + if (sys_can_recover() && jm->inc_epoch) { list_for_each_entry_safe(node, t, &sys->leave_list, list) { list_del(&node->list); @@ -1188,6 +1211,33 @@ gateway_work: queue_work(sys->cpg_wqueue, &cpg_event_work); } +static int node_status_update(struct sd_node *target, + enum node_vnode_status status) +{ + int i = 0; + int j = 0; + int ret = -1; + int found = -1; + + for (i = 0; i < sys->nr_nodes; i++) { + if (!memcmp(target->addr, sys->nodes[i].addr, sizeof(target->addr))) { + sys->nodes[i].status = status; + found = 0; + ret = 0; + } + + if (!ret) { + for (j = 0; j < sys->nr_vnodes; j++) { + if (sys->vnodes[j].node_idx == i) + sys->vnodes[j].status = status; + } + ret = -1; + } + } + + return found; +} + static void sd_join_handler(struct sd_node *joined, struct sd_node *members, size_t nr_members, enum cluster_join_result result, @@ -1201,6 +1251,15 @@ static void sd_join_handler(struct sd_node *joined, struct join_message *jm; int le = get_latest_epoch(); + if (temp_failure_enabled()) { + if (!node_status_update(joined, NODE_STATUS_NORMAL)) { + dprintf("the addr %u.%u.%u.%u come back and upate the status\n", + joined->addr[12], joined->addr[13], + joined->addr[14], joined->addr[15]); + return; + } + } + if (node_cmp(joined, &sys->this_node) == 0) { if (result == CJ_RES_FAIL) { eprintf("Fail to join. The joining node has an invalid epoch.\n"); @@ -1215,6 +1274,7 @@ static void sd_join_handler(struct sd_node *joined, switch (result) { case CJ_RES_SUCCESS: + case CJ_RES_COME_BACK: dprintf("join %s\n", node_to_str(joined)); for (i = 0; i < nr_members; i++) dprintf("[%x] %s\n", i, node_to_str(members + i)); @@ -1330,6 +1390,128 @@ static void sd_join_handler(struct sd_node *joined, } } +static void temp_failure_timeout(void *data) +{ + int idx; + int size; + int found = -1; + char ipstr[INET6_ADDRSTRLEN]; + struct cpg_event *cevent = NULL; + struct work_leave *w = NULL; + struct temp_failure *leave_timer = NULL; + struct sd_node *leave_node = NULL; + struct sd_node *tmp = NULL; + + dprintf("temp failure time up, now check it\n"); + + leave_timer = (struct temp_failure*)data; + leave_node = &(leave_timer->node); + + addr_to_str(ipstr, sizeof(ipstr), leave_node->addr, 0); + + if (sys_stat_shutdown()) + return; + + w = zalloc(sizeof(*w)); + if (!w) + panic("failed to allocate memory for a confchg event\n"); + + cevent = &w->cev; + cevent->ctype = CPG_EVENT_LEAVE; + + size = sizeof(struct sd_node) * (sys->nr_nodes - 1); + w->member_list = zalloc(size); + if (!w->member_list) { + free(w); + panic("failed to allocate memory for a confchg event\n"); + } + + tmp = w->member_list; + + size = sizeof(struct sd_node); + w->member_list_entries = sys->nr_nodes - 1; + w->left = *leave_node; + + for (idx = 0; idx < sys->nr_nodes; idx++) { + if (!memcmp(leave_node->addr, sys->nodes[idx].addr, + sizeof(leave_node->addr))) { + found = idx; + } else { + if ((idx == (sys->nr_nodes - 1)) && (found < 0)) + break; + memcpy(tmp++, sys->nodes + idx, size); + } + } + + if (found < 0) { + dprintf("can not find right node for %s\n", ipstr); + goto temp_failure_check_out; + } + + if (sys->nodes[found].status != NODE_STATUS_FAIL) { + dprintf("the node %s have been ok\n", ipstr); + goto temp_failure_check_out; + } + + dprintf("the node %s perhaps will never come back\n", ipstr); + + if (sys->cdrv->remove_node) + sys->cdrv->remove_node(leave_node); + + list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + start_cpg_event_work(); + + unregister_event(cdrv_fd); + + leave_timer->busy = 0; + + return; + +temp_failure_check_out: + + leave_timer->busy = 0; + + if (w) { + if (w->member_list) + free(w->member_list); + free(w); + } + + return; +} + +static int add_temp_failure_timer(struct sd_node *left) +{ + int i; + + for (i = 0; i < SD_MAX_NODES_TEMP_FAIL; i++) + if (!failure_timers[i].busy) + break; + + if (i == SD_MAX_NODES_TEMP_FAIL) { + eprintf("so many nodes became failed\n"); + return -1; + } + + dprintf("will use No. %d timer for temporary failure\n", i); + + memset(failure_timers + i, 0, sizeof(struct temp_failure)); + + failure_timers[i].busy = 1; + failure_timers[i].leave_timer.callback = temp_failure_timeout; + failure_timers[i].leave_timer.data = failure_timers + i; + memcpy(&(failure_timers[i].node), left, sizeof(struct sd_node)); + + add_timer(&(failure_timers[i].leave_timer), sys->templeft_time); + + return 0; +} + +int temp_failure_enabled(void) +{ + return sys->templeft_time; +} + static void sd_leave_handler(struct sd_node *left, struct sd_node *members, size_t nr_members) @@ -1337,6 +1519,7 @@ static void sd_leave_handler(struct sd_node *left, struct cpg_event *cevent; struct work_leave *w = NULL; int i, size; + int ret; dprintf("leave %s\n", node_to_str(left)); for (i = 0; i < nr_members; i++) @@ -1345,6 +1528,13 @@ static void sd_leave_handler(struct sd_node *left, if (sys_stat_shutdown()) return; + if (temp_failure_enabled()) { + node_status_update(left, NODE_STATUS_FAIL); + ret = add_temp_failure_timer(left); + if (!ret) + return; + } + w = zalloc(sizeof(*w)); if (!w) goto oom; diff --git a/sheep/sdnet.c b/sheep/sdnet.c index 5db9f29..024f71c 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -854,6 +854,18 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch) addr_to_str(name, sizeof(name), addr, 0); + for (i = 0; i < sys->nr_nodes; i++) { + if (!memcmp(addr, sys->nodes[i].addr, + sizeof(sys->nodes[i].addr)) && + (port == sys->nodes[i].port)) { + if (sys->nodes[i].status == NODE_STATUS_FAIL) { + dprintf("the ip %s port %u failing now\n", + name, port); + return -1; + } + } + } + fd = connect_to(name, port); if (fd < 0) return -1; -- 1.7.1 |