Kazutaka, I guess this patch addresses inconsistency problem you mentioned. other comments are addressed too. Thanks, Yuan On 09/21/2011 02:55 PM, Liu Yuan wrote: > From: Liu Yuan<tailai.ly at taobao.com> > > [Problem] > > Currently, sheepdog cannot recovery cluster into full functional state if any node in > the cluster fails to join cluster after shutdown, because of being considered unhealthy > (F.g. the targeted epoch content is corrupted, epoch version mismatch). That is, the cluster > can only get worked again only *if* all the nodes join the cluster successfully. > > For 14 nodes in the cluster, > > ==========*===<--- cluster refuses to work > ^ > | > unhealthy node > > This is quite awkward. The cluster with many nodes after being shutdowned, we easily meet > the condition that some of nodes are unhealthy that are rejected by the master during join > stage.This patch gives sheepdog some kind of intelligence to deal with unhealthy nodes and > process to recovery when all the nodes alive reach the agreement. > > [Design] > > This patch add a new concept into sheepdog, the *leave node*. The _leave node_ is the one > that the master checks and finds it unhealthy (unmatched epoch content), so marks it as > 'leave node', meaning that it is supposed to leave the cluster. The leave nodes are queued > in the leave list, *only* exist during SD_STATUS_WAIT_FOR_JOIN. All the leave nodes stop > sheep itself automatically after being started. > > The key idea for *when* the node reach the agreement is very simple, can be summed up into > one equation: > > nr_nodes_in_epoch == nr_nodes_in_sd_list + nr_nodes_in_leave_list > > When this is reached, all the nodes alive in the cluster will begin to recovery and finally > with the epoch incremented by 1, if some other nodes are considered to leave. This is because, > after the cluster recovery, we can then try *re-join* the unhealth nodes and probably succeed! > > [Cases] > > There is no order imposed to start up the nodes in the cluster. That is, you can start up the > nodes after shutdown in arbitrary order, whether it is good node or bad node (Yes, you cannot > know node status before you try to start it up). > > For e.g > > ====**<-- the cluster will recovery with 4 nodes alive with epoch incremented by 1 > ==***=<-- the cluster will recovery with 3 nodes alive with epoch incremented by 1 > =*****<-- the cluster will recovery with 1 nodes alive with epoch incremented by 1 > > |* try to re-join > v > =*==== --> ======<- we might get 6 nodes alive. > > The corner case is that we start one of the bad nodes first before any healthy nodes. If this > happens, all the other nodes are considered 'unhealthy', then the very one node will recovery > after we try to join all the nodes. > > *===== --> the cluster will recovery with 1 nodes alive with epoch incremented by 1 > ^ > | from now on, we can re-join the nodes, and will end up with 6 nodes alive. > > [Note] > > This patch changes join_message layout! So you have to recompile all the sheepdogs in your cluster > before it works out. > > We need to modify the join_message layout, because of the following scenario: > > =**===<-- the last 3 nodes have to need know previous 2 leave nodes information. > > When the nodes following leave nodes join, the master is supposed to reply with leave nodes information > via send_join_response(). > > Signed-off-by: Liu Yuan<tailai.ly at taobao.com> > --- > sheep/group.c | 190 ++++++++++++++++++++++++++++++++++++++++++---------- > sheep/sheep_priv.h | 6 ++ > 2 files changed, 161 insertions(+), 35 deletions(-) > > diff --git a/sheep/group.c b/sheep/group.c > index f35e1b2..bc92de6 100644 > --- a/sheep/group.c > +++ b/sheep/group.c > @@ -64,10 +64,17 @@ struct join_message { > uint32_t pid; > struct sheepdog_node_list_entry ent; > } nodes[SD_MAX_NODES]; > + uint32_t nr_leave_nodes; > + struct { > + uint32_t nodeid; > + uint32_t pid; > + struct sheepdog_node_list_entry ent; > + } leave_nodes[SD_MAX_NODES]; > }; > > struct leave_message { > struct message_header header; > + uint32_t epoch; > }; > > struct vdi_op_message { > @@ -396,13 +403,90 @@ static int get_nodes_nr_epoch(int epoch) > return nr; > } > > +static struct node *find_leave_node(struct node *node) > +{ > + struct node *n; > + list_for_each_entry(n,&sys->leave_list, list) { > + if (node_cmp(&n->ent,&node->ent) == 0) > + return n; > + dprintf("%d\n", node_cmp(&n->ent,&node->ent)); > + } > + return NULL; > + > +} > +static int add_node_to_leave_list(struct message_header *msg) > +{ > + int ret = SD_RES_SUCCESS; > + int nr, i; > + LIST_HEAD(tmp_list); > + struct node *n, *t; > + struct join_message *jm; > + > + switch (msg->op) { > + case SD_MSG_LEAVE: > + n = zalloc(sizeof(*n)); > + if (!n) { > + ret = SD_RES_NO_MEM; > + goto err; > + } > + > + n->nodeid = msg->nodeid; > + n->pid = msg->pid; > + n->ent = msg->from; > + if (find_leave_node(n)) { > + free(n); > + goto ret; > + } > + > + list_add_tail(&n->list,&sys->leave_list); > + goto ret; > + case SD_MSG_JOIN: > + jm = (struct join_message *)msg; > + nr = jm->nr_leave_nodes; > + break; > + default: > + ret = SD_RES_INVALID_PARMS; > + goto err; > + } > + > + for (i = 0; i< nr; i++) { > + n = zalloc(sizeof(*n)); > + if (!n) { > + ret = SD_RES_NO_MEM; > + goto free; > + } > + > + n->nodeid = jm->leave_nodes[i].nodeid; > + n->pid = jm->leave_nodes[i].pid; > + n->ent = jm->leave_nodes[i].ent; > + if (find_leave_node(n)) { > + free(n); > + continue; > + } > + > + list_add_tail(&n->list,&tmp_list); > + } > + list_splice_init(&tmp_list,&sys->leave_list); > + goto ret; > + > +free: > + list_for_each_entry_safe(n, t,&tmp_list, list) { > + free(n); > + } > +ret: > + dprintf("%d\n", get_nodes_nr_from(&sys->leave_list)); > + print_node_list(&sys->leave_list); > +err: > + return ret; > +} > + > static int get_cluster_status(struct sheepdog_node_list_entry *from, > struct sheepdog_node_list_entry *entries, > int nr_entries, uint64_t ctime, uint32_t epoch, > uint32_t *status, uint8_t *inc_epoch) > { > int i; > - int nr_local_entries; > + int nr_local_entries, nr_leave_entries; > struct sheepdog_node_list_entry local_entries[SD_MAX_NODES]; > struct node *node; > uint32_t local_epoch; > @@ -480,8 +564,18 @@ static int get_cluster_status(struct sheepdog_node_list_entry *from, > > nr_entries = get_nodes_nr_from(&sys->sd_node_list) + 1; > > - if (nr_entries != nr_local_entries) > + if (nr_entries != nr_local_entries) { > + nr_leave_entries = get_nodes_nr_from(&sys->leave_list); > + if (nr_local_entries == nr_entries + nr_leave_entries) { > + /* Even though some nodes leave, we can make do with it. > + * Order cluster to do recovery right now. > + */ > + *inc_epoch = 1; > + *status = SD_STATUS_OK; > + return SD_RES_SUCCESS; > + } > return SD_RES_SUCCESS; > + } > > for (i = 0; i< nr_local_entries; i++) { > if (node_cmp(local_entries + i, from) == 0) > @@ -637,13 +731,16 @@ static void update_cluster_info(struct join_message *msg) > { > int i; > int ret, nr_nodes = msg->nr_nodes; > - struct sheepdog_node_list_entry entry[SD_MAX_NODES]; > > eprintf("status = %d, epoch = %d, %d, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished); > if (msg->result != SD_RES_SUCCESS) { > if (is_myself(msg->header.from.addr, msg->header.from.port)) { > eprintf("failed to join sheepdog, %d\n", msg->result); > - sys->status = SD_STATUS_JOIN_FAILED; > + leave_cluster(); > + eprintf("I am really hurt and gonna leave cluster.\n"); > + eprintf("Fix yourself and restart me later, pleaseeeee...Bye.\n"); > + exit(1); > + /* sys->status = SD_STATUS_JOIN_FAILED; */ > } > return; > } > @@ -655,7 +752,7 @@ static void update_cluster_info(struct join_message *msg) > sys->nr_sobjs = msg->nr_sobjs; > > if (sys->join_finished) > - goto out; > + goto join_finished; > > sys->epoch = msg->epoch; > for (i = 0; i< nr_nodes; i++) { > @@ -671,19 +768,15 @@ static void update_cluster_info(struct join_message *msg) > msg->nodes[i].nodeid, msg->nodes[i].pid); > } > > - sys->join_finished = 1; > + if (msg->cluster_status != SD_STATUS_OK) > + add_node_to_leave_list((struct message_header *)msg); > > - if (msg->cluster_status == SD_STATUS_OK&& msg->inc_epoch) { > - nr_nodes = get_ordered_sd_node_list(entry); > + sys->join_finished = 1; > > - dprintf("update epoch, %d, %d\n", sys->epoch, nr_nodes); > - ret = epoch_log_write(sys->epoch, (char *)entry, > - nr_nodes * sizeof(struct sheepdog_node_list_entry)); > - if (ret< 0) > - eprintf("can't write epoch %u\n", sys->epoch); > - } > + if (msg->cluster_status == SD_STATUS_OK&& msg->inc_epoch) > + update_epoch_log(sys->epoch); > > -out: > +join_finished: > ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from); > /* > * this should not happen since __sd_deliver() checks if the > @@ -695,16 +788,8 @@ out: > > if (msg->cluster_status == SD_STATUS_OK) { > if (msg->inc_epoch) { > - nr_nodes = get_ordered_sd_node_list(entry); > - > - dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr_nodes); > - ret = epoch_log_write(sys->epoch + 1, (char *)entry, > - nr_nodes * sizeof(struct sheepdog_node_list_entry)); > - if (ret< 0) > - eprintf("can't write epoch %u\n", sys->epoch + 1); > - > sys->epoch++; > - > + update_epoch_log(sys->epoch); > update_epoch_store(sys->epoch); > } > if (sys->status != SD_STATUS_OK) { > @@ -930,10 +1015,25 @@ static void __sd_deliver(struct cpg_event *cevent) > static void send_join_response(struct work_deliver *w) > { > struct message_header *m; > + struct join_message *jm; > + struct node *node; > > m = w->msg; > - join((struct join_message *)m); > + jm = (struct join_message *)m; > + join(jm); > m->state = DM_FIN; > + > + dprintf("%d, %d\n", jm->result, jm->cluster_status); > + if (jm->result == SD_RES_SUCCESS&& jm->cluster_status != SD_STATUS_OK) { > + jm->nr_leave_nodes = 0; > + list_for_each_entry(node,&sys->leave_list, list) { > + jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid; > + jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid; > + jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent; > + jm->nr_leave_nodes++; > + } > + print_node_list(&sys->leave_list); > + } > send_message(sys->handle, m); > } > > @@ -941,11 +1041,11 @@ static void __sd_deliver_done(struct cpg_event *cevent) > { > struct work_deliver *w = container_of(cevent, struct work_deliver, cev); > struct message_header *m; > + struct leave_message *lm; > char name[128]; > int do_recovery; > - struct node *node; > - struct sheepdog_node_list_entry e[SD_MAX_NODES]; > - int nr; > + struct node *node, *t; > + int nr, nr_local, nr_leave; > > m = w->msg; > > @@ -962,13 +1062,25 @@ static void __sd_deliver_done(struct cpg_event *cevent) > list_del(&node->list); > free(node); > if (sys->status == SD_STATUS_OK) { > - nr = get_ordered_sd_node_list(e); > - dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr); > - epoch_log_write(sys->epoch + 1, (char *)e, > - nr * sizeof(struct sheepdog_node_list_entry)); > - > sys->epoch++; > - > + update_epoch_log(sys->epoch); > + update_epoch_store(sys->epoch); > + } > + } > + if (sys->status == SD_STATUS_WAIT_FOR_JOIN) { > + lm = (struct leave_message *)m; > + add_node_to_leave_list(m); > + > + if (lm->epoch> sys->leave_epoch) > + sys->leave_epoch = lm->epoch; > + > + nr_local = get_nodes_nr_epoch(sys->epoch); > + nr = get_nodes_nr_from(&sys->sd_node_list); > + nr_leave = get_nodes_nr_from(&sys->leave_list); > + if (nr_local == nr + nr_leave) { > + sys->status = SD_STATUS_OK; > + sys->epoch = sys->leave_epoch + 1; > + update_epoch_log(sys->epoch); > update_epoch_store(sys->epoch); > } > } > @@ -1002,8 +1114,12 @@ static void __sd_deliver_done(struct cpg_event *cevent) > } > } > > - if (do_recovery&& sys->status == SD_STATUS_OK) > + if (do_recovery&& sys->status == SD_STATUS_OK) { > + list_for_each_entry_safe(node, t,&sys->leave_list, list) { > + list_del(&node->list); > + } > start_recovery(sys->epoch); > + } > } > > static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, > @@ -1796,6 +1912,7 @@ join_retry: > sys->handle = cpg_handle; > sys->this_nodeid = nodeid; > sys->this_pid = getpid(); > + sys->leave_epoch = 0; > > ret = set_addr(nodeid, port); > if (ret) > @@ -1815,6 +1932,7 @@ join_retry: > INIT_LIST_HEAD(&sys->sd_node_list); > INIT_LIST_HEAD(&sys->cpg_node_list); > INIT_LIST_HEAD(&sys->pending_list); > + INIT_LIST_HEAD(&sys->leave_list); > > INIT_LIST_HEAD(&sys->outstanding_req_list); > INIT_LIST_HEAD(&sys->req_wait_for_obj_list); > @@ -1840,6 +1958,8 @@ int leave_cluster(void) > msg.header.from = sys->this_node; > msg.header.nodeid = sys->this_nodeid; > msg.header.pid = sys->this_pid; > + msg.epoch = get_latest_epoch(); > > + dprintf("%d\n", msg.epoch); > return send_message(sys->handle, (struct message_header *)&msg); > } > diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h > index dc56f54..f1f96e5 100644 > --- a/sheep/sheep_priv.h > +++ b/sheep/sheep_priv.h > @@ -117,6 +117,11 @@ struct cluster_info { > struct list_head cpg_node_list; > struct list_head sd_node_list; > > + /* leave list is only used to account for bad nodes when we start > + * up the cluster nodes after we shutdown the cluster through collie. > + */ > + struct list_head leave_list; > + > /* this array contains a list of ordered virtual nodes */ > struct sheepdog_vnode_list_entry vnodes[SD_MAX_VNODES]; > int nr_vnodes; > @@ -138,6 +143,7 @@ struct cluster_info { > int nr_outstanding_reqs; > > uint32_t recovered_epoch; > + uint32_t leave_epoch; /* The highest number in the clsuter */ > > int use_directio; > |