From: Liu Yuan <tailai.ly at taobao.com> [Problem] Currently, sheepdog cannot recovery cluster into full functional state if any node in the cluster fails to join cluster after *shutdown*, because of being considered unhealthy (F.g. the targeted epoch content is corrupted). That is, the cluster can only get worked again only *if* all the nodes join the cluster successfully. For 14 nodes in the cluster, ==========*=== <--- cluster refuses to work ^ | unhealthy node This is quit awkward. The cluster with many nodes after being shutdowned, we easily meet the condition that some of nodes are unhealthy that are rejected by the master during join stage.This patch gives sheepdog some kind of intelligence to deal with unhealthy nodes and process to recovery when all the nodes alive reach the agreement. [Design] This patch add a new concept into sheepdog, the *leave node*. The _leave node_ is the one that the master checks and finds it unhealthy (unmatched epoch content), so marks it as 'leave node', meaning that it is supposed to leave the cluster. The leave nodes are queued in the leave list, *only* exist during SD_STATUS_WAIT_FOR_FORMAT. All the leave nodes stop sheep itself automatically after being started. The key idea for *when* the node reach the agreement is very simple, can be summed up into one equation: nr_nodes_in_epoch == nr_nodes_in_sd_list + nr_nodes_in_leave_list When this is reached, all the nodes alive in the cluster will begin to recovery and finally with the epoch incremented by 1, if some other nodes are considered to leave. This is because, after the cluster recovery, we can then try *re-join* the unhealth nodes and probably succeed! [Cases] There is no order imposed to start up the nodes in the cluster. That is, you can start up the nodes after shutdown in arbitrary order, whether it is good node or bad node (Yes, you cannot know node status before you try to start it up). For e.g ====** <-- the cluster will recovery with 4 nodes alive with epoch incremented by 1 ==***= <-- the cluster will recovery with 3 nodes alive with epoch incremented by 1 =***** <-- the cluster will recovery with 1 nodes alive with epoch incremented by 1 |* try to re-join v =*==== --> ====== <- we might get 6 nodes alive. The corner case is that we start one of the bad nodes first before any healthy nodes. If this happens, all the other nodes are considered 'unhealthy', then the very one node will recovery after we try to join all the nodes. *===== --> the cluster will recovery with 1 nodes alive with epoch incremented by 1 ^ | from now on, we can re-join the nodes, and will end up with 6 nodes alive. [Note] This patch changes join_message layout! So you have to recompile all the sheepdog in your cluster before it works out. We need to modofy the join_message layout, because of the following scenario: =**=== <-- the last 3 nodes have to need know previous 2 leave nodes information. When the nodes following leave nodes join, the master is supposed to reply with leave nodes information via send_join_response(). Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/group.c | 180 ++++++++++++++++++++++++++++++++++++++++----------- sheep/sheep_priv.h | 12 ++++ sheep/store.c | 5 ++ 3 files changed, 158 insertions(+), 39 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index 83d85cf..43c352d 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -25,13 +25,6 @@ #include "logger.h" #include "work.h" -struct node { - uint32_t nodeid; - uint32_t pid; - struct sheepdog_node_list_entry ent; - struct list_head list; -}; - enum deliver_msg_state { DM_INIT = 1, DM_CONT, @@ -64,6 +57,12 @@ struct join_message { uint32_t pid; struct sheepdog_node_list_entry ent; } nodes[SD_MAX_NODES]; + uint32_t nr_leave_nodes; + struct { + uint32_t nodeid; + uint32_t pid; + struct sheepdog_node_list_entry ent; + } leave_nodes[SD_MAX_NODES]; }; struct leave_message { @@ -396,13 +395,91 @@ static int get_nodes_nr_epoch(int epoch) return nr; } +static struct node * find_leave_node(struct node *node) +{ + struct node *n; + list_for_each_entry(n, &sys->leave_list, list) { + if (n->nodeid == node->nodeid && + node_cmp(&n->ent, &node->ent) == 0) + return n; + dprintf("%d, %d\n", n->nodeid == node->nodeid, node_cmp(&n->ent, &node->ent)); + } + return NULL; + +} +static int add_node_to_leave_list(struct message_header *msg) +{ + int ret = SD_RES_SUCCESS; + int nr, i; + LIST_HEAD(tmp_list); + struct node *n; + struct join_message *jm; + + switch (msg->op) { + case SD_MSG_LEAVE: + n = zalloc(sizeof (*n)); + if (!n) { + ret = SD_RES_NO_MEM; + goto err; + } + + n->nodeid = msg->nodeid; + n->pid = msg->pid; + n->ent = msg->from; + if (find_leave_node(n)) { + free(n); + goto ret; + } + + list_add_tail(&n->list, &sys->leave_list); + goto ret; + case SD_MSG_JOIN: + jm = (struct join_message *)msg; + nr = jm->nr_leave_nodes; + break; + default: + ret = SD_RES_INVALID_PARMS; + goto err; + } + + for (i = 0; i < nr; i++) { + n = zalloc(sizeof (*n)); + if (!n) { + ret = SD_RES_NO_MEM; + goto free; + } + + n->nodeid = jm->leave_nodes[i].nodeid; + n->pid = jm->leave_nodes[i].pid; + n->ent = jm->leave_nodes[i].ent; + if (find_leave_node(n)) { + free(n); + continue; + } + + list_add_tail(&n->list, &tmp_list); + } + list_splice_init(&tmp_list, &sys->leave_list); + goto ret; + +free: + list_for_each_entry(n, &tmp_list, list) { + free(n); + } +ret: + dprintf("%d\n", get_nodes_nr_from(&sys->leave_list)); + print_node_list(&sys->leave_list); +err: + return ret; +} + static int get_cluster_status(struct sheepdog_node_list_entry *from, struct sheepdog_node_list_entry *entries, int nr_entries, uint64_t ctime, uint32_t epoch, uint32_t *status, uint8_t *inc_epoch) { int i; - int nr_local_entries; + int nr_local_entries, nr_leave_entries; struct sheepdog_node_list_entry local_entries[SD_MAX_NODES]; struct node *node; uint32_t local_epoch; @@ -480,8 +557,18 @@ static int get_cluster_status(struct sheepdog_node_list_entry *from, nr_entries = get_nodes_nr_from(&sys->sd_node_list) + 1; - if (nr_entries != nr_local_entries) + if (nr_entries != nr_local_entries) { + nr_leave_entries = get_nodes_nr_from(&sys->leave_list); + if (nr_local_entries == nr_entries + nr_leave_entries) { + /* Even though some nodes leave, we can make do with it. + * Order cluster to do recovery right now. + */ + *inc_epoch = 1; + *status = SD_STATUS_OK; + return SD_RES_SUCCESS; + } return SD_RES_SUCCESS; + } for (i = 0; i < nr_local_entries; i++) { if (node_cmp(local_entries + i, from) == 0) @@ -637,13 +724,16 @@ static void update_cluster_info(struct join_message *msg) { int i; int ret, nr_nodes = msg->nr_nodes; - struct sheepdog_node_list_entry entry[SD_MAX_NODES]; eprintf("status = %d, epoch = %d, %d, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished); if (msg->result != SD_RES_SUCCESS) { if (is_myself(msg->header.from.addr, msg->header.from.port)) { eprintf("failed to join sheepdog, %d\n", msg->result); - sys->status = SD_STATUS_JOIN_FAILED; + leave_cluster(); + eprintf("I am really hurt and gonna leave cluster.\n"); + eprintf("Fix yourself and restart me later, pleaseeeee...Bye.\n"); + exit(1); + /* sys->status = SD_STATUS_JOIN_FAILED; */ } return; } @@ -655,7 +745,7 @@ static void update_cluster_info(struct join_message *msg) sys->nr_sobjs = msg->nr_sobjs; if (sys->join_finished) - goto out; + goto join_finished; sys->epoch = msg->epoch; for (i = 0; i < nr_nodes; i++) { @@ -671,19 +761,15 @@ static void update_cluster_info(struct join_message *msg) msg->nodes[i].nodeid, msg->nodes[i].pid); } - sys->join_finished = 1; + if (msg->cluster_status != SD_STATUS_OK) + add_node_to_leave_list((struct message_header *)msg); - if (msg->cluster_status == SD_STATUS_OK && msg->inc_epoch) { - nr_nodes = get_ordered_sd_node_list(entry); + sys->join_finished = 1; - dprintf("update epoch, %d, %d\n", sys->epoch, nr_nodes); - ret = epoch_log_write(sys->epoch, (char *)entry, - nr_nodes * sizeof(struct sheepdog_node_list_entry)); - if (ret < 0) - eprintf("can't write epoch %u\n", sys->epoch); - } + if (msg->cluster_status == SD_STATUS_OK && msg->inc_epoch) + update_epoch_log(sys->epoch); -out: +join_finished: ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from); /* * this should not happen since __sd_deliver() checks if the @@ -695,16 +781,8 @@ out: if (msg->cluster_status == SD_STATUS_OK) { if (msg->inc_epoch) { - nr_nodes = get_ordered_sd_node_list(entry); - - dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr_nodes); - ret = epoch_log_write(sys->epoch + 1, (char *)entry, - nr_nodes * sizeof(struct sheepdog_node_list_entry)); - if (ret < 0) - eprintf("can't write epoch %u\n", sys->epoch + 1); - sys->epoch++; - + update_epoch_log(sys->epoch); update_epoch_store(sys->epoch); } if (sys->status != SD_STATUS_OK) { @@ -930,10 +1008,25 @@ static void __sd_deliver(struct cpg_event *cevent) static void send_join_response(struct work_deliver *w) { struct message_header *m; + struct join_message *jm; + struct node *node; m = w->msg; - join((struct join_message *)m); + jm = (struct join_message *)m; + join(jm); m->state = DM_FIN; + + dprintf("%d, %d\n", jm->result, jm->cluster_status); + if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) { + jm->nr_leave_nodes = 0; + list_for_each_entry(node, &sys->leave_list, list) { + jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid; + jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid; + jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent; + jm->nr_leave_nodes++; + } + print_node_list(&sys->leave_list); + } send_message(sys->handle, m); } @@ -944,8 +1037,7 @@ static void __sd_deliver_done(struct cpg_event *cevent) char name[128]; int do_recovery; struct node *node; - struct sheepdog_node_list_entry e[SD_MAX_NODES]; - int nr; + int nr, nr_local, nr_leave; m = w->msg; @@ -962,13 +1054,22 @@ static void __sd_deliver_done(struct cpg_event *cevent) list_del(&node->list); free(node); if (sys->status == SD_STATUS_OK) { - nr = get_ordered_sd_node_list(e); - dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr); - epoch_log_write(sys->epoch + 1, (char *)e, - nr * sizeof(struct sheepdog_node_list_entry)); - sys->epoch++; + update_epoch_log(sys->epoch); + update_epoch_store(sys->epoch); + } + } + if (sys->status == SD_STATUS_WAIT_FOR_JOIN) { + add_node_to_leave_list(m); + + nr_local = get_nodes_nr_epoch(sys->epoch); + nr = get_nodes_nr_from(&sys->sd_node_list); + nr_leave = get_nodes_nr_from(&sys->leave_list); + if (nr_local == nr + nr_leave) { + sys->status = SD_STATUS_OK; + sys->epoch++; + update_epoch_log(sys->epoch); update_epoch_store(sys->epoch); } } @@ -1815,6 +1916,7 @@ join_retry: INIT_LIST_HEAD(&sys->sd_node_list); INIT_LIST_HEAD(&sys->cpg_node_list); INIT_LIST_HEAD(&sys->pending_list); + INIT_LIST_HEAD(&sys->leave_list); INIT_LIST_HEAD(&sys->outstanding_req_list); INIT_LIST_HEAD(&sys->req_wait_for_obj_list); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index dc56f54..56a5620 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -38,6 +38,13 @@ #define SD_RES_NETWORK_ERROR 0x81 /* Network error between sheeps */ +struct node { + uint32_t nodeid; + uint32_t pid; + struct sheepdog_node_list_entry ent; + struct list_head list; +}; + enum cpg_event_type { CPG_EVENT_CONCHG, CPG_EVENT_DELIVER, @@ -117,6 +124,11 @@ struct cluster_info { struct list_head cpg_node_list; struct list_head sd_node_list; + /* leave list is only used to account for bad nodes when we start + * up the cluster nodes after we shutdown the cluster through collie. + */ + struct list_head leave_list; + /* this array contains a list of ordered virtual nodes */ struct sheepdog_vnode_list_entry vnodes[SD_MAX_VNODES]; int nr_vnodes; diff --git a/sheep/store.c b/sheep/store.c index 8583455..2bcb952 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -1547,6 +1547,7 @@ static void recover_done(struct work *work, int idx) { struct recovery_work *rw = container_of(work, struct recovery_work, work); uint64_t oid; + struct node *node; if (rw->state == RW_INIT) rw->state = RW_RUN; @@ -1588,6 +1589,10 @@ static void recover_done(struct work *work, int idx) free(rw->oids); free(rw); + list_for_each_entry(node, &sys->leave_list, list) { + list_del(&node->list); + }; + if (!list_empty(&recovery_work_list)) { rw = list_first_entry(&recovery_work_list, struct recovery_work, rw_siblings); -- 1.7.5.1 |