From: Liu Yuan <tailai.ly at taobao.com> [Problem] Currently, sheepdog cannot recovery cluster into full functional state if any node in the cluster fails to join cluster after shutdown, because of being considered unhealthy (F.g. the targeted epoch content is corrupted, epoch version mismatch). That is, the cluster can only get worked again only *if* all the nodes join the cluster successfully. For 14 nodes in the cluster, ==========*=== <--- cluster refuses to work ^ | unhealthy node This is quite awkward. The cluster with many nodes after being shutdowned, we easily meet the condition that some of nodes are unhealthy that are rejected by the master during join stage.This patch gives sheepdog some kind of intelligence to deal with unhealthy nodes and process to recovery when all the nodes alive reach the agreement. [Design] This patch add a new concept into sheepdog, the *leave node*. The _leave node_ is the one that the master checks and finds it unhealthy (unmatched epoch content), so marks it as 'leave node', meaning that it is supposed to leave the cluster. The leave nodes are queued in the leave list, *only* exist during SD_STATUS_WAIT_FOR_JOIN. All the leave nodes stop sheep itself automatically after being started. The key idea for *when* the node reach the agreement is very simple, can be summed up into one equation: nr_nodes_in_epoch == nr_nodes_in_sd_list + nr_nodes_in_leave_list When this is reached, all the nodes alive in the cluster will begin to recovery and finally with the epoch incremented by 1, if some other nodes are considered to leave. This is because, after the cluster recovery, we can then try *re-join* the unhealth nodes and probably succeed! [Cases] There is no order imposed to start up the nodes in the cluster. That is, you can start up the nodes after shutdown in arbitrary order, whether it is good node or bad node (Yes, you cannot know node status before you try to start it up). For e.g ====** <-- the cluster will recovery with 4 nodes alive with epoch incremented by 1 ==***= <-- the cluster will recovery with 3 nodes alive with epoch incremented by 1 =***** <-- the cluster will recovery with 1 nodes alive with epoch incremented by 1 |* try to re-join v =*==== --> ====== <- we might get 6 nodes alive. The corner case is that we start one of the bad nodes first before any healthy nodes. If this happens, all the other nodes are considered 'unhealthy', then the very one node will recovery after we try to join all the nodes. *===== --> the cluster will recovery with 1 nodes alive with epoch incremented by 1 ^ | from now on, we can re-join the nodes, and will end up with 6 nodes alive. [Note] This patch changes join_message layout! So you have to recompile all the sheepdogs in your cluster before it works out. We need to modify the join_message layout, because of the following scenario: =**=== <-- the last 3 nodes have to need know previous 2 leave nodes information. When the nodes following leave nodes join, the master is supposed to reply with leave nodes information via send_join_response(). Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/group.c | 190 ++++++++++++++++++++++++++++++++++++++++++---------- sheep/sheep_priv.h | 6 ++ 2 files changed, 161 insertions(+), 35 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index f35e1b2..bc92de6 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -64,10 +64,17 @@ struct join_message { uint32_t pid; struct sheepdog_node_list_entry ent; } nodes[SD_MAX_NODES]; + uint32_t nr_leave_nodes; + struct { + uint32_t nodeid; + uint32_t pid; + struct sheepdog_node_list_entry ent; + } leave_nodes[SD_MAX_NODES]; }; struct leave_message { struct message_header header; + uint32_t epoch; }; struct vdi_op_message { @@ -396,13 +403,90 @@ static int get_nodes_nr_epoch(int epoch) return nr; } +static struct node *find_leave_node(struct node *node) +{ + struct node *n; + list_for_each_entry(n, &sys->leave_list, list) { + if (node_cmp(&n->ent, &node->ent) == 0) + return n; + dprintf("%d\n", node_cmp(&n->ent, &node->ent)); + } + return NULL; + +} +static int add_node_to_leave_list(struct message_header *msg) +{ + int ret = SD_RES_SUCCESS; + int nr, i; + LIST_HEAD(tmp_list); + struct node *n, *t; + struct join_message *jm; + + switch (msg->op) { + case SD_MSG_LEAVE: + n = zalloc(sizeof(*n)); + if (!n) { + ret = SD_RES_NO_MEM; + goto err; + } + + n->nodeid = msg->nodeid; + n->pid = msg->pid; + n->ent = msg->from; + if (find_leave_node(n)) { + free(n); + goto ret; + } + + list_add_tail(&n->list, &sys->leave_list); + goto ret; + case SD_MSG_JOIN: + jm = (struct join_message *)msg; + nr = jm->nr_leave_nodes; + break; + default: + ret = SD_RES_INVALID_PARMS; + goto err; + } + + for (i = 0; i < nr; i++) { + n = zalloc(sizeof(*n)); + if (!n) { + ret = SD_RES_NO_MEM; + goto free; + } + + n->nodeid = jm->leave_nodes[i].nodeid; + n->pid = jm->leave_nodes[i].pid; + n->ent = jm->leave_nodes[i].ent; + if (find_leave_node(n)) { + free(n); + continue; + } + + list_add_tail(&n->list, &tmp_list); + } + list_splice_init(&tmp_list, &sys->leave_list); + goto ret; + +free: + list_for_each_entry_safe(n, t, &tmp_list, list) { + free(n); + } +ret: + dprintf("%d\n", get_nodes_nr_from(&sys->leave_list)); + print_node_list(&sys->leave_list); +err: + return ret; +} + static int get_cluster_status(struct sheepdog_node_list_entry *from, struct sheepdog_node_list_entry *entries, int nr_entries, uint64_t ctime, uint32_t epoch, uint32_t *status, uint8_t *inc_epoch) { int i; - int nr_local_entries; + int nr_local_entries, nr_leave_entries; struct sheepdog_node_list_entry local_entries[SD_MAX_NODES]; struct node *node; uint32_t local_epoch; @@ -480,8 +564,18 @@ static int get_cluster_status(struct sheepdog_node_list_entry *from, nr_entries = get_nodes_nr_from(&sys->sd_node_list) + 1; - if (nr_entries != nr_local_entries) + if (nr_entries != nr_local_entries) { + nr_leave_entries = get_nodes_nr_from(&sys->leave_list); + if (nr_local_entries == nr_entries + nr_leave_entries) { + /* Even though some nodes leave, we can make do with it. + * Order cluster to do recovery right now. + */ + *inc_epoch = 1; + *status = SD_STATUS_OK; + return SD_RES_SUCCESS; + } return SD_RES_SUCCESS; + } for (i = 0; i < nr_local_entries; i++) { if (node_cmp(local_entries + i, from) == 0) @@ -637,13 +731,16 @@ static void update_cluster_info(struct join_message *msg) { int i; int ret, nr_nodes = msg->nr_nodes; - struct sheepdog_node_list_entry entry[SD_MAX_NODES]; eprintf("status = %d, epoch = %d, %d, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished); if (msg->result != SD_RES_SUCCESS) { if (is_myself(msg->header.from.addr, msg->header.from.port)) { eprintf("failed to join sheepdog, %d\n", msg->result); - sys->status = SD_STATUS_JOIN_FAILED; + leave_cluster(); + eprintf("I am really hurt and gonna leave cluster.\n"); + eprintf("Fix yourself and restart me later, pleaseeeee...Bye.\n"); + exit(1); + /* sys->status = SD_STATUS_JOIN_FAILED; */ } return; } @@ -655,7 +752,7 @@ static void update_cluster_info(struct join_message *msg) sys->nr_sobjs = msg->nr_sobjs; if (sys->join_finished) - goto out; + goto join_finished; sys->epoch = msg->epoch; for (i = 0; i < nr_nodes; i++) { @@ -671,19 +768,15 @@ static void update_cluster_info(struct join_message *msg) msg->nodes[i].nodeid, msg->nodes[i].pid); } - sys->join_finished = 1; + if (msg->cluster_status != SD_STATUS_OK) + add_node_to_leave_list((struct message_header *)msg); - if (msg->cluster_status == SD_STATUS_OK && msg->inc_epoch) { - nr_nodes = get_ordered_sd_node_list(entry); + sys->join_finished = 1; - dprintf("update epoch, %d, %d\n", sys->epoch, nr_nodes); - ret = epoch_log_write(sys->epoch, (char *)entry, - nr_nodes * sizeof(struct sheepdog_node_list_entry)); - if (ret < 0) - eprintf("can't write epoch %u\n", sys->epoch); - } + if (msg->cluster_status == SD_STATUS_OK && msg->inc_epoch) + update_epoch_log(sys->epoch); -out: +join_finished: ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from); /* * this should not happen since __sd_deliver() checks if the @@ -695,16 +788,8 @@ out: if (msg->cluster_status == SD_STATUS_OK) { if (msg->inc_epoch) { - nr_nodes = get_ordered_sd_node_list(entry); - - dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr_nodes); - ret = epoch_log_write(sys->epoch + 1, (char *)entry, - nr_nodes * sizeof(struct sheepdog_node_list_entry)); - if (ret < 0) - eprintf("can't write epoch %u\n", sys->epoch + 1); - sys->epoch++; - + update_epoch_log(sys->epoch); update_epoch_store(sys->epoch); } if (sys->status != SD_STATUS_OK) { @@ -930,10 +1015,25 @@ static void __sd_deliver(struct cpg_event *cevent) static void send_join_response(struct work_deliver *w) { struct message_header *m; + struct join_message *jm; + struct node *node; m = w->msg; - join((struct join_message *)m); + jm = (struct join_message *)m; + join(jm); m->state = DM_FIN; + + dprintf("%d, %d\n", jm->result, jm->cluster_status); + if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) { + jm->nr_leave_nodes = 0; + list_for_each_entry(node, &sys->leave_list, list) { + jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid; + jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid; + jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent; + jm->nr_leave_nodes++; + } + print_node_list(&sys->leave_list); + } send_message(sys->handle, m); } @@ -941,11 +1041,11 @@ static void __sd_deliver_done(struct cpg_event *cevent) { struct work_deliver *w = container_of(cevent, struct work_deliver, cev); struct message_header *m; + struct leave_message *lm; char name[128]; int do_recovery; - struct node *node; - struct sheepdog_node_list_entry e[SD_MAX_NODES]; - int nr; + struct node *node, *t; + int nr, nr_local, nr_leave; m = w->msg; @@ -962,13 +1062,25 @@ static void __sd_deliver_done(struct cpg_event *cevent) list_del(&node->list); free(node); if (sys->status == SD_STATUS_OK) { - nr = get_ordered_sd_node_list(e); - dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr); - epoch_log_write(sys->epoch + 1, (char *)e, - nr * sizeof(struct sheepdog_node_list_entry)); - sys->epoch++; - + update_epoch_log(sys->epoch); + update_epoch_store(sys->epoch); + } + } + if (sys->status == SD_STATUS_WAIT_FOR_JOIN) { + lm = (struct leave_message *)m; + add_node_to_leave_list(m); + + if (lm->epoch > sys->leave_epoch) + sys->leave_epoch = lm->epoch; + + nr_local = get_nodes_nr_epoch(sys->epoch); + nr = get_nodes_nr_from(&sys->sd_node_list); + nr_leave = get_nodes_nr_from(&sys->leave_list); + if (nr_local == nr + nr_leave) { + sys->status = SD_STATUS_OK; + sys->epoch = sys->leave_epoch + 1; + update_epoch_log(sys->epoch); update_epoch_store(sys->epoch); } } @@ -1002,8 +1114,12 @@ static void __sd_deliver_done(struct cpg_event *cevent) } } - if (do_recovery && sys->status == SD_STATUS_OK) + if (do_recovery && sys->status == SD_STATUS_OK) { + list_for_each_entry_safe(node, t, &sys->leave_list, list) { + list_del(&node->list); + } start_recovery(sys->epoch); + } } static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, @@ -1796,6 +1912,7 @@ join_retry: sys->handle = cpg_handle; sys->this_nodeid = nodeid; sys->this_pid = getpid(); + sys->leave_epoch = 0; ret = set_addr(nodeid, port); if (ret) @@ -1815,6 +1932,7 @@ join_retry: INIT_LIST_HEAD(&sys->sd_node_list); INIT_LIST_HEAD(&sys->cpg_node_list); INIT_LIST_HEAD(&sys->pending_list); + INIT_LIST_HEAD(&sys->leave_list); INIT_LIST_HEAD(&sys->outstanding_req_list); INIT_LIST_HEAD(&sys->req_wait_for_obj_list); @@ -1840,6 +1958,8 @@ int leave_cluster(void) msg.header.from = sys->this_node; msg.header.nodeid = sys->this_nodeid; msg.header.pid = sys->this_pid; + msg.epoch = get_latest_epoch(); + dprintf("%d\n", msg.epoch); return send_message(sys->handle, (struct message_header *)&msg); } diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index dc56f54..f1f96e5 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -117,6 +117,11 @@ struct cluster_info { struct list_head cpg_node_list; struct list_head sd_node_list; + /* leave list is only used to account for bad nodes when we start + * up the cluster nodes after we shutdown the cluster through collie. + */ + struct list_head leave_list; + /* this array contains a list of ordered virtual nodes */ struct sheepdog_vnode_list_entry vnodes[SD_MAX_VNODES]; int nr_vnodes; @@ -138,6 +143,7 @@ struct cluster_info { int nr_outstanding_reqs; uint32_t recovered_epoch; + uint32_t leave_epoch; /* The highest number in the clsuter */ int use_directio; -- 1.7.5.1 |