[Sheepdog] [PATCH V2 2/2] sheep: teach sheepdog to better recovery the cluster
Liu Yuan
namei.unix at gmail.com
Wed Sep 21 08:59:26 CEST 2011
Kazutaka,
I guess this patch addresses inconsistency problem you mentioned.
other comments are addressed too.
Thanks,
Yuan
On 09/21/2011 02:55 PM, Liu Yuan wrote:
> From: Liu Yuan<tailai.ly at taobao.com>
>
> [Problem]
>
> Currently, sheepdog cannot recovery cluster into full functional state if any node in
> the cluster fails to join cluster after shutdown, because of being considered unhealthy
> (F.g. the targeted epoch content is corrupted, epoch version mismatch). That is, the cluster
> can only get worked again only *if* all the nodes join the cluster successfully.
>
> For 14 nodes in the cluster,
>
> ==========*===<--- cluster refuses to work
> ^
> |
> unhealthy node
>
> This is quite awkward. The cluster with many nodes after being shutdowned, we easily meet
> the condition that some of nodes are unhealthy that are rejected by the master during join
> stage.This patch gives sheepdog some kind of intelligence to deal with unhealthy nodes and
> process to recovery when all the nodes alive reach the agreement.
>
> [Design]
>
> This patch add a new concept into sheepdog, the *leave node*. The _leave node_ is the one
> that the master checks and finds it unhealthy (unmatched epoch content), so marks it as
> 'leave node', meaning that it is supposed to leave the cluster. The leave nodes are queued
> in the leave list, *only* exist during SD_STATUS_WAIT_FOR_JOIN. All the leave nodes stop
> sheep itself automatically after being started.
>
> The key idea for *when* the node reach the agreement is very simple, can be summed up into
> one equation:
>
> nr_nodes_in_epoch == nr_nodes_in_sd_list + nr_nodes_in_leave_list
>
> When this is reached, all the nodes alive in the cluster will begin to recovery and finally
> with the epoch incremented by 1, if some other nodes are considered to leave. This is because,
> after the cluster recovery, we can then try *re-join* the unhealth nodes and probably succeed!
>
> [Cases]
>
> There is no order imposed to start up the nodes in the cluster. That is, you can start up the
> nodes after shutdown in arbitrary order, whether it is good node or bad node (Yes, you cannot
> know node status before you try to start it up).
>
> For e.g
>
> ====**<-- the cluster will recovery with 4 nodes alive with epoch incremented by 1
> ==***=<-- the cluster will recovery with 3 nodes alive with epoch incremented by 1
> =*****<-- the cluster will recovery with 1 nodes alive with epoch incremented by 1
>
> |* try to re-join
> v
> =*==== --> ======<- we might get 6 nodes alive.
>
> The corner case is that we start one of the bad nodes first before any healthy nodes. If this
> happens, all the other nodes are considered 'unhealthy', then the very one node will recovery
> after we try to join all the nodes.
>
> *===== --> the cluster will recovery with 1 nodes alive with epoch incremented by 1
> ^
> | from now on, we can re-join the nodes, and will end up with 6 nodes alive.
>
> [Note]
>
> This patch changes join_message layout! So you have to recompile all the sheepdogs in your cluster
> before it works out.
>
> We need to modify the join_message layout, because of the following scenario:
>
> =**===<-- the last 3 nodes have to need know previous 2 leave nodes information.
>
> When the nodes following leave nodes join, the master is supposed to reply with leave nodes information
> via send_join_response().
>
> Signed-off-by: Liu Yuan<tailai.ly at taobao.com>
> ---
> sheep/group.c | 190 ++++++++++++++++++++++++++++++++++++++++++----------
> sheep/sheep_priv.h | 6 ++
> 2 files changed, 161 insertions(+), 35 deletions(-)
>
> diff --git a/sheep/group.c b/sheep/group.c
> index f35e1b2..bc92de6 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -64,10 +64,17 @@ struct join_message {
> uint32_t pid;
> struct sheepdog_node_list_entry ent;
> } nodes[SD_MAX_NODES];
> + uint32_t nr_leave_nodes;
> + struct {
> + uint32_t nodeid;
> + uint32_t pid;
> + struct sheepdog_node_list_entry ent;
> + } leave_nodes[SD_MAX_NODES];
> };
>
> struct leave_message {
> struct message_header header;
> + uint32_t epoch;
> };
>
> struct vdi_op_message {
> @@ -396,13 +403,90 @@ static int get_nodes_nr_epoch(int epoch)
> return nr;
> }
>
> +static struct node *find_leave_node(struct node *node)
> +{
> + struct node *n;
> + list_for_each_entry(n,&sys->leave_list, list) {
> + if (node_cmp(&n->ent,&node->ent) == 0)
> + return n;
> + dprintf("%d\n", node_cmp(&n->ent,&node->ent));
> + }
> + return NULL;
> +
> +}
> +static int add_node_to_leave_list(struct message_header *msg)
> +{
> + int ret = SD_RES_SUCCESS;
> + int nr, i;
> + LIST_HEAD(tmp_list);
> + struct node *n, *t;
> + struct join_message *jm;
> +
> + switch (msg->op) {
> + case SD_MSG_LEAVE:
> + n = zalloc(sizeof(*n));
> + if (!n) {
> + ret = SD_RES_NO_MEM;
> + goto err;
> + }
> +
> + n->nodeid = msg->nodeid;
> + n->pid = msg->pid;
> + n->ent = msg->from;
> + if (find_leave_node(n)) {
> + free(n);
> + goto ret;
> + }
> +
> + list_add_tail(&n->list,&sys->leave_list);
> + goto ret;
> + case SD_MSG_JOIN:
> + jm = (struct join_message *)msg;
> + nr = jm->nr_leave_nodes;
> + break;
> + default:
> + ret = SD_RES_INVALID_PARMS;
> + goto err;
> + }
> +
> + for (i = 0; i< nr; i++) {
> + n = zalloc(sizeof(*n));
> + if (!n) {
> + ret = SD_RES_NO_MEM;
> + goto free;
> + }
> +
> + n->nodeid = jm->leave_nodes[i].nodeid;
> + n->pid = jm->leave_nodes[i].pid;
> + n->ent = jm->leave_nodes[i].ent;
> + if (find_leave_node(n)) {
> + free(n);
> + continue;
> + }
> +
> + list_add_tail(&n->list,&tmp_list);
> + }
> + list_splice_init(&tmp_list,&sys->leave_list);
> + goto ret;
> +
> +free:
> + list_for_each_entry_safe(n, t,&tmp_list, list) {
> + free(n);
> + }
> +ret:
> + dprintf("%d\n", get_nodes_nr_from(&sys->leave_list));
> + print_node_list(&sys->leave_list);
> +err:
> + return ret;
> +}
> +
> static int get_cluster_status(struct sheepdog_node_list_entry *from,
> struct sheepdog_node_list_entry *entries,
> int nr_entries, uint64_t ctime, uint32_t epoch,
> uint32_t *status, uint8_t *inc_epoch)
> {
> int i;
> - int nr_local_entries;
> + int nr_local_entries, nr_leave_entries;
> struct sheepdog_node_list_entry local_entries[SD_MAX_NODES];
> struct node *node;
> uint32_t local_epoch;
> @@ -480,8 +564,18 @@ static int get_cluster_status(struct sheepdog_node_list_entry *from,
>
> nr_entries = get_nodes_nr_from(&sys->sd_node_list) + 1;
>
> - if (nr_entries != nr_local_entries)
> + if (nr_entries != nr_local_entries) {
> + nr_leave_entries = get_nodes_nr_from(&sys->leave_list);
> + if (nr_local_entries == nr_entries + nr_leave_entries) {
> + /* Even though some nodes leave, we can make do with it.
> + * Order cluster to do recovery right now.
> + */
> + *inc_epoch = 1;
> + *status = SD_STATUS_OK;
> + return SD_RES_SUCCESS;
> + }
> return SD_RES_SUCCESS;
> + }
>
> for (i = 0; i< nr_local_entries; i++) {
> if (node_cmp(local_entries + i, from) == 0)
> @@ -637,13 +731,16 @@ static void update_cluster_info(struct join_message *msg)
> {
> int i;
> int ret, nr_nodes = msg->nr_nodes;
> - struct sheepdog_node_list_entry entry[SD_MAX_NODES];
>
> eprintf("status = %d, epoch = %d, %d, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished);
> if (msg->result != SD_RES_SUCCESS) {
> if (is_myself(msg->header.from.addr, msg->header.from.port)) {
> eprintf("failed to join sheepdog, %d\n", msg->result);
> - sys->status = SD_STATUS_JOIN_FAILED;
> + leave_cluster();
> + eprintf("I am really hurt and gonna leave cluster.\n");
> + eprintf("Fix yourself and restart me later, pleaseeeee...Bye.\n");
> + exit(1);
> + /* sys->status = SD_STATUS_JOIN_FAILED; */
> }
> return;
> }
> @@ -655,7 +752,7 @@ static void update_cluster_info(struct join_message *msg)
> sys->nr_sobjs = msg->nr_sobjs;
>
> if (sys->join_finished)
> - goto out;
> + goto join_finished;
>
> sys->epoch = msg->epoch;
> for (i = 0; i< nr_nodes; i++) {
> @@ -671,19 +768,15 @@ static void update_cluster_info(struct join_message *msg)
> msg->nodes[i].nodeid, msg->nodes[i].pid);
> }
>
> - sys->join_finished = 1;
> + if (msg->cluster_status != SD_STATUS_OK)
> + add_node_to_leave_list((struct message_header *)msg);
>
> - if (msg->cluster_status == SD_STATUS_OK&& msg->inc_epoch) {
> - nr_nodes = get_ordered_sd_node_list(entry);
> + sys->join_finished = 1;
>
> - dprintf("update epoch, %d, %d\n", sys->epoch, nr_nodes);
> - ret = epoch_log_write(sys->epoch, (char *)entry,
> - nr_nodes * sizeof(struct sheepdog_node_list_entry));
> - if (ret< 0)
> - eprintf("can't write epoch %u\n", sys->epoch);
> - }
> + if (msg->cluster_status == SD_STATUS_OK&& msg->inc_epoch)
> + update_epoch_log(sys->epoch);
>
> -out:
> +join_finished:
> ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from);
> /*
> * this should not happen since __sd_deliver() checks if the
> @@ -695,16 +788,8 @@ out:
>
> if (msg->cluster_status == SD_STATUS_OK) {
> if (msg->inc_epoch) {
> - nr_nodes = get_ordered_sd_node_list(entry);
> -
> - dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr_nodes);
> - ret = epoch_log_write(sys->epoch + 1, (char *)entry,
> - nr_nodes * sizeof(struct sheepdog_node_list_entry));
> - if (ret< 0)
> - eprintf("can't write epoch %u\n", sys->epoch + 1);
> -
> sys->epoch++;
> -
> + update_epoch_log(sys->epoch);
> update_epoch_store(sys->epoch);
> }
> if (sys->status != SD_STATUS_OK) {
> @@ -930,10 +1015,25 @@ static void __sd_deliver(struct cpg_event *cevent)
> static void send_join_response(struct work_deliver *w)
> {
> struct message_header *m;
> + struct join_message *jm;
> + struct node *node;
>
> m = w->msg;
> - join((struct join_message *)m);
> + jm = (struct join_message *)m;
> + join(jm);
> m->state = DM_FIN;
> +
> + dprintf("%d, %d\n", jm->result, jm->cluster_status);
> + if (jm->result == SD_RES_SUCCESS&& jm->cluster_status != SD_STATUS_OK) {
> + jm->nr_leave_nodes = 0;
> + list_for_each_entry(node,&sys->leave_list, list) {
> + jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid;
> + jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid;
> + jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
> + jm->nr_leave_nodes++;
> + }
> + print_node_list(&sys->leave_list);
> + }
> send_message(sys->handle, m);
> }
>
> @@ -941,11 +1041,11 @@ static void __sd_deliver_done(struct cpg_event *cevent)
> {
> struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
> struct message_header *m;
> + struct leave_message *lm;
> char name[128];
> int do_recovery;
> - struct node *node;
> - struct sheepdog_node_list_entry e[SD_MAX_NODES];
> - int nr;
> + struct node *node, *t;
> + int nr, nr_local, nr_leave;
>
> m = w->msg;
>
> @@ -962,13 +1062,25 @@ static void __sd_deliver_done(struct cpg_event *cevent)
> list_del(&node->list);
> free(node);
> if (sys->status == SD_STATUS_OK) {
> - nr = get_ordered_sd_node_list(e);
> - dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr);
> - epoch_log_write(sys->epoch + 1, (char *)e,
> - nr * sizeof(struct sheepdog_node_list_entry));
> -
> sys->epoch++;
> -
> + update_epoch_log(sys->epoch);
> + update_epoch_store(sys->epoch);
> + }
> + }
> + if (sys->status == SD_STATUS_WAIT_FOR_JOIN) {
> + lm = (struct leave_message *)m;
> + add_node_to_leave_list(m);
> +
> + if (lm->epoch> sys->leave_epoch)
> + sys->leave_epoch = lm->epoch;
> +
> + nr_local = get_nodes_nr_epoch(sys->epoch);
> + nr = get_nodes_nr_from(&sys->sd_node_list);
> + nr_leave = get_nodes_nr_from(&sys->leave_list);
> + if (nr_local == nr + nr_leave) {
> + sys->status = SD_STATUS_OK;
> + sys->epoch = sys->leave_epoch + 1;
> + update_epoch_log(sys->epoch);
> update_epoch_store(sys->epoch);
> }
> }
> @@ -1002,8 +1114,12 @@ static void __sd_deliver_done(struct cpg_event *cevent)
> }
> }
>
> - if (do_recovery&& sys->status == SD_STATUS_OK)
> + if (do_recovery&& sys->status == SD_STATUS_OK) {
> + list_for_each_entry_safe(node, t,&sys->leave_list, list) {
> + list_del(&node->list);
> + }
> start_recovery(sys->epoch);
> + }
> }
>
> static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
> @@ -1796,6 +1912,7 @@ join_retry:
> sys->handle = cpg_handle;
> sys->this_nodeid = nodeid;
> sys->this_pid = getpid();
> + sys->leave_epoch = 0;
>
> ret = set_addr(nodeid, port);
> if (ret)
> @@ -1815,6 +1932,7 @@ join_retry:
> INIT_LIST_HEAD(&sys->sd_node_list);
> INIT_LIST_HEAD(&sys->cpg_node_list);
> INIT_LIST_HEAD(&sys->pending_list);
> + INIT_LIST_HEAD(&sys->leave_list);
>
> INIT_LIST_HEAD(&sys->outstanding_req_list);
> INIT_LIST_HEAD(&sys->req_wait_for_obj_list);
> @@ -1840,6 +1958,8 @@ int leave_cluster(void)
> msg.header.from = sys->this_node;
> msg.header.nodeid = sys->this_nodeid;
> msg.header.pid = sys->this_pid;
> + msg.epoch = get_latest_epoch();
>
> + dprintf("%d\n", msg.epoch);
> return send_message(sys->handle, (struct message_header *)&msg);
> }
> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> index dc56f54..f1f96e5 100644
> --- a/sheep/sheep_priv.h
> +++ b/sheep/sheep_priv.h
> @@ -117,6 +117,11 @@ struct cluster_info {
> struct list_head cpg_node_list;
> struct list_head sd_node_list;
>
> + /* leave list is only used to account for bad nodes when we start
> + * up the cluster nodes after we shutdown the cluster through collie.
> + */
> + struct list_head leave_list;
> +
> /* this array contains a list of ordered virtual nodes */
> struct sheepdog_vnode_list_entry vnodes[SD_MAX_VNODES];
> int nr_vnodes;
> @@ -138,6 +143,7 @@ struct cluster_info {
> int nr_outstanding_reqs;
>
> uint32_t recovered_epoch;
> + uint32_t leave_epoch; /* The highest number in the clsuter */
>
> int use_directio;
>
More information about the sheepdog
mailing list