[Sheepdog] [PATCH V2 2/2] sheep: teach sheepdog to better recovery the cluster

Liu Yuan namei.unix at gmail.com
Wed Sep 21 08:59:26 CEST 2011


Kazutaka,
     I guess this patch addresses inconsistency problem you mentioned. 
other comments are addressed too.

Thanks,
Yuan

On 09/21/2011 02:55 PM, Liu Yuan wrote:
> From: Liu Yuan<tailai.ly at taobao.com>
>
> [Problem]
>
> Currently, sheepdog cannot recovery cluster into full functional state if any node in
> the cluster fails to join cluster after shutdown, because of being considered unhealthy
> (F.g. the targeted epoch content is corrupted, epoch version mismatch). That is, the cluster
> can only get worked again only *if* all the nodes join the cluster successfully.
>
> For 14 nodes in the cluster,
>
>        ==========*===<--- cluster refuses to work
>                  ^
>                  |
>            unhealthy node
>
> This is quite awkward. The cluster with many nodes after being shutdowned, we easily meet
> the condition that some of nodes are unhealthy that are rejected by the master during join
> stage.This patch gives sheepdog some kind of intelligence to deal with unhealthy nodes and
> process to recovery when all the nodes alive reach the agreement.
>
> [Design]
>
> This patch add a new concept into sheepdog, the *leave node*. The _leave node_ is the one
> that the master checks and finds it unhealthy (unmatched epoch content), so marks it as
> 'leave node', meaning that it is supposed to leave the cluster. The leave nodes are queued
> in the leave list, *only* exist during SD_STATUS_WAIT_FOR_JOIN. All the leave nodes stop
> sheep itself automatically after being started.
>
> The key idea for *when* the node reach the agreement is very simple, can be summed up into
> one equation:
>
> 	nr_nodes_in_epoch == nr_nodes_in_sd_list + nr_nodes_in_leave_list
>
> When this is reached, all the nodes alive in the cluster will begin to recovery and finally
> with the epoch incremented by 1, if some other nodes are considered to leave. This is because,
> after the cluster recovery, we can then try *re-join* the unhealth nodes and probably succeed!
>
> [Cases]
>
> There is no order imposed to start up the nodes in the cluster. That is, you can start up the
> nodes after shutdown in arbitrary order, whether it is good node or bad node (Yes, you cannot
> know node status before you try to start it up).
>
> For e.g
>
>           ====**<-- the cluster will recovery with 4 nodes alive with epoch incremented by 1
>           ==***=<-- the cluster will recovery with 3 nodes alive with epoch incremented by 1
>           =*****<-- the cluster will recovery with 1 nodes alive with epoch incremented by 1
>
>                   |* try to re-join
>                   v
>           =*==== -->  ======<- we might get 6 nodes alive.
>
> The corner case is that we start one of the bad nodes first before any healthy nodes. If this
> happens, all the other nodes are considered 'unhealthy', then the very one node will recovery
> after we try to join all the nodes.
>
>           *===== -->  the cluster will recovery with 1 nodes alive with epoch incremented by 1
>                   ^
>                   | from now on, we can re-join the nodes, and will end up with 6 nodes alive.
>
> [Note]
>
> This patch changes join_message layout! So you have to recompile all the sheepdogs in your cluster
> before it works out.
>
> We need to modify the join_message layout, because of the following scenario:
>
>           =**===<-- the last 3 nodes have to need know previous 2 leave nodes information.
>
> When the nodes following leave nodes join, the master is supposed to reply with leave nodes information
> via send_join_response().
>
> Signed-off-by: Liu Yuan<tailai.ly at taobao.com>
> ---
>   sheep/group.c      |  190 ++++++++++++++++++++++++++++++++++++++++++----------
>   sheep/sheep_priv.h |    6 ++
>   2 files changed, 161 insertions(+), 35 deletions(-)
>
> diff --git a/sheep/group.c b/sheep/group.c
> index f35e1b2..bc92de6 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -64,10 +64,17 @@ struct join_message {
>   		uint32_t pid;
>   		struct sheepdog_node_list_entry ent;
>   	} nodes[SD_MAX_NODES];
> +	uint32_t nr_leave_nodes;
> +	struct {
> +		uint32_t nodeid;
> +		uint32_t pid;
> +		struct sheepdog_node_list_entry ent;
> +	} leave_nodes[SD_MAX_NODES];
>   };
>
>   struct leave_message {
>   	struct message_header header;
> +	uint32_t epoch;
>   };
>
>   struct vdi_op_message {
> @@ -396,13 +403,90 @@ static int get_nodes_nr_epoch(int epoch)
>   	return nr;
>   }
>
> +static struct node *find_leave_node(struct node *node)
> +{
> +	struct node *n;
> +	list_for_each_entry(n,&sys->leave_list, list) {
> +		if (node_cmp(&n->ent,&node->ent) == 0)
> +			return n;
> +		dprintf("%d\n", node_cmp(&n->ent,&node->ent));
> +	}
> +	return NULL;
> +
> +}
> +static int add_node_to_leave_list(struct message_header *msg)
> +{
> +	int ret = SD_RES_SUCCESS;
> +	int nr, i;
> +	LIST_HEAD(tmp_list);
> +	struct node *n, *t;
> +	struct join_message *jm;
> +
> +	switch (msg->op) {
> +	case SD_MSG_LEAVE:
> +		n = zalloc(sizeof(*n));
> +		if (!n) {
> +			ret = SD_RES_NO_MEM;
> +			goto err;
> +		}
> +
> +		n->nodeid = msg->nodeid;
> +		n->pid = msg->pid;
> +		n->ent = msg->from;
> +		if (find_leave_node(n)) {
> +			free(n);
> +			goto ret;
> +		}
> +
> +		list_add_tail(&n->list,&sys->leave_list);
> +		goto ret;
> +	case SD_MSG_JOIN:
> +		jm = (struct join_message *)msg;
> +		nr = jm->nr_leave_nodes;
> +		break;
> +	default:
> +		ret = SD_RES_INVALID_PARMS;
> +		goto err;
> +	}
> +
> +	for (i = 0; i<  nr; i++) {
> +		n = zalloc(sizeof(*n));
> +		if (!n) {
> +			ret = SD_RES_NO_MEM;
> +			goto free;
> +		}
> +
> +		n->nodeid = jm->leave_nodes[i].nodeid;
> +		n->pid = jm->leave_nodes[i].pid;
> +		n->ent = jm->leave_nodes[i].ent;
> +		if (find_leave_node(n)) {
> +			free(n);
> +			continue;
> +		}
> +
> +		list_add_tail(&n->list,&tmp_list);
> +	}
> +	list_splice_init(&tmp_list,&sys->leave_list);
> +	goto ret;
> +
> +free:
> +	list_for_each_entry_safe(n, t,&tmp_list, list) {
> +		free(n);
> +	}
> +ret:
> +	dprintf("%d\n", get_nodes_nr_from(&sys->leave_list));
> +	print_node_list(&sys->leave_list);
> +err:
> +	return ret;
> +}
> +
>   static int get_cluster_status(struct sheepdog_node_list_entry *from,
>   			      struct sheepdog_node_list_entry *entries,
>   			      int nr_entries, uint64_t ctime, uint32_t epoch,
>   			      uint32_t *status, uint8_t *inc_epoch)
>   {
>   	int i;
> -	int nr_local_entries;
> +	int nr_local_entries, nr_leave_entries;
>   	struct sheepdog_node_list_entry local_entries[SD_MAX_NODES];
>   	struct node *node;
>   	uint32_t local_epoch;
> @@ -480,8 +564,18 @@ static int get_cluster_status(struct sheepdog_node_list_entry *from,
>
>   		nr_entries = get_nodes_nr_from(&sys->sd_node_list) + 1;
>
> -		if (nr_entries != nr_local_entries)
> +		if (nr_entries != nr_local_entries) {
> +			nr_leave_entries = get_nodes_nr_from(&sys->leave_list);
> +			if (nr_local_entries == nr_entries + nr_leave_entries) {
> +				/* Even though some nodes leave, we can make do with it.
> +				 * Order cluster to do recovery right now.
> +				 */
> +				*inc_epoch = 1;
> +				*status = SD_STATUS_OK;
> +				return SD_RES_SUCCESS;
> +			}
>   			return SD_RES_SUCCESS;
> +		}
>
>   		for (i = 0; i<  nr_local_entries; i++) {
>   			if (node_cmp(local_entries + i, from) == 0)
> @@ -637,13 +731,16 @@ static void update_cluster_info(struct join_message *msg)
>   {
>   	int i;
>   	int ret, nr_nodes = msg->nr_nodes;
> -	struct sheepdog_node_list_entry entry[SD_MAX_NODES];
>
>   	eprintf("status = %d, epoch = %d, %d, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished);
>   	if (msg->result != SD_RES_SUCCESS) {
>   		if (is_myself(msg->header.from.addr, msg->header.from.port)) {
>   			eprintf("failed to join sheepdog, %d\n", msg->result);
> -			sys->status = SD_STATUS_JOIN_FAILED;
> +			leave_cluster();
> +			eprintf("I am really hurt and gonna leave cluster.\n");
> +			eprintf("Fix yourself and restart me later, pleaseeeee...Bye.\n");
> +			exit(1);
> +			/* sys->status = SD_STATUS_JOIN_FAILED; */
>   		}
>   		return;
>   	}
> @@ -655,7 +752,7 @@ static void update_cluster_info(struct join_message *msg)
>   		sys->nr_sobjs = msg->nr_sobjs;
>
>   	if (sys->join_finished)
> -		goto out;
> +		goto join_finished;
>
>   	sys->epoch = msg->epoch;
>   	for (i = 0; i<  nr_nodes; i++) {
> @@ -671,19 +768,15 @@ static void update_cluster_info(struct join_message *msg)
>   				msg->nodes[i].nodeid, msg->nodes[i].pid);
>   	}
>
> -	sys->join_finished = 1;
> +	if (msg->cluster_status != SD_STATUS_OK)
> +		add_node_to_leave_list((struct message_header *)msg);
>
> -	if (msg->cluster_status == SD_STATUS_OK&&  msg->inc_epoch) {
> -		nr_nodes = get_ordered_sd_node_list(entry);
> +	sys->join_finished = 1;
>
> -		dprintf("update epoch, %d, %d\n", sys->epoch, nr_nodes);
> -		ret = epoch_log_write(sys->epoch, (char *)entry,
> -				      nr_nodes * sizeof(struct sheepdog_node_list_entry));
> -		if (ret<  0)
> -			eprintf("can't write epoch %u\n", sys->epoch);
> -	}
> +	if (msg->cluster_status == SD_STATUS_OK&&  msg->inc_epoch)
> +		update_epoch_log(sys->epoch);
>
> -out:
> +join_finished:
>   	ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from);
>   	/*
>   	 * this should not happen since __sd_deliver() checks if the
> @@ -695,16 +788,8 @@ out:
>
>   	if (msg->cluster_status == SD_STATUS_OK) {
>   		if (msg->inc_epoch) {
> -			nr_nodes = get_ordered_sd_node_list(entry);
> -
> -			dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr_nodes);
> -			ret = epoch_log_write(sys->epoch + 1, (char *)entry,
> -					      nr_nodes * sizeof(struct sheepdog_node_list_entry));
> -			if (ret<  0)
> -				eprintf("can't write epoch %u\n", sys->epoch + 1);
> -
>   			sys->epoch++;
> -
> +			update_epoch_log(sys->epoch);
>   			update_epoch_store(sys->epoch);
>   		}
>   		if (sys->status != SD_STATUS_OK) {
> @@ -930,10 +1015,25 @@ static void __sd_deliver(struct cpg_event *cevent)
>   static void send_join_response(struct work_deliver *w)
>   {
>   	struct message_header *m;
> +	struct join_message *jm;
> +	struct node *node;
>
>   	m = w->msg;
> -	join((struct join_message *)m);
> +	jm = (struct join_message *)m;
> +	join(jm);
>   	m->state = DM_FIN;
> +
> +	dprintf("%d, %d\n", jm->result, jm->cluster_status);
> +	if (jm->result == SD_RES_SUCCESS&&  jm->cluster_status != SD_STATUS_OK) {
> +		jm->nr_leave_nodes = 0;
> +		list_for_each_entry(node,&sys->leave_list, list) {
> +			jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid;
> +			jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid;
> +			jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
> +			jm->nr_leave_nodes++;
> +		}
> +		print_node_list(&sys->leave_list);
> +	}
>   	send_message(sys->handle, m);
>   }
>
> @@ -941,11 +1041,11 @@ static void __sd_deliver_done(struct cpg_event *cevent)
>   {
>   	struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
>   	struct message_header *m;
> +	struct leave_message *lm;
>   	char name[128];
>   	int do_recovery;
> -	struct node *node;
> -	struct sheepdog_node_list_entry e[SD_MAX_NODES];
> -	int nr;
> +	struct node *node, *t;
> +	int nr, nr_local, nr_leave;
>
>   	m = w->msg;
>
> @@ -962,13 +1062,25 @@ static void __sd_deliver_done(struct cpg_event *cevent)
>   				list_del(&node->list);
>   				free(node);
>   				if (sys->status == SD_STATUS_OK) {
> -					nr = get_ordered_sd_node_list(e);
> -					dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr);
> -					epoch_log_write(sys->epoch + 1, (char *)e,
> -							nr * sizeof(struct sheepdog_node_list_entry));
> -
>   					sys->epoch++;
> -
> +					update_epoch_log(sys->epoch);
> +					update_epoch_store(sys->epoch);
> +				}
> +			}
> +			if (sys->status == SD_STATUS_WAIT_FOR_JOIN) {
> +				lm = (struct leave_message *)m;
> +				add_node_to_leave_list(m);
> +				
> +				if (lm->epoch>  sys->leave_epoch)
> +					sys->leave_epoch = lm->epoch;
> +
> +				nr_local = get_nodes_nr_epoch(sys->epoch);
> +				nr = get_nodes_nr_from(&sys->sd_node_list);
> +				nr_leave = get_nodes_nr_from(&sys->leave_list);
> +				if (nr_local == nr + nr_leave) {
> +					sys->status = SD_STATUS_OK;
> +					sys->epoch = sys->leave_epoch + 1;
> +					update_epoch_log(sys->epoch);
>   					update_epoch_store(sys->epoch);
>   				}
>   			}
> @@ -1002,8 +1114,12 @@ static void __sd_deliver_done(struct cpg_event *cevent)
>   		}
>   	}
>
> -	if (do_recovery&&  sys->status == SD_STATUS_OK)
> +	if (do_recovery&&  sys->status == SD_STATUS_OK) {
> +		list_for_each_entry_safe(node, t,&sys->leave_list, list) {
> +			list_del(&node->list);
> +		}
>   		start_recovery(sys->epoch);
> +	}
>   }
>
>   static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
> @@ -1796,6 +1912,7 @@ join_retry:
>   	sys->handle = cpg_handle;
>   	sys->this_nodeid = nodeid;
>   	sys->this_pid = getpid();
> +	sys->leave_epoch = 0;
>
>   	ret = set_addr(nodeid, port);
>   	if (ret)
> @@ -1815,6 +1932,7 @@ join_retry:
>   	INIT_LIST_HEAD(&sys->sd_node_list);
>   	INIT_LIST_HEAD(&sys->cpg_node_list);
>   	INIT_LIST_HEAD(&sys->pending_list);
> +	INIT_LIST_HEAD(&sys->leave_list);
>
>   	INIT_LIST_HEAD(&sys->outstanding_req_list);
>   	INIT_LIST_HEAD(&sys->req_wait_for_obj_list);
> @@ -1840,6 +1958,8 @@ int leave_cluster(void)
>   	msg.header.from = sys->this_node;
>   	msg.header.nodeid = sys->this_nodeid;
>   	msg.header.pid = sys->this_pid;
> +	msg.epoch = get_latest_epoch();
>
> +	dprintf("%d\n", msg.epoch);
>   	return send_message(sys->handle, (struct message_header *)&msg);
>   }
> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> index dc56f54..f1f96e5 100644
> --- a/sheep/sheep_priv.h
> +++ b/sheep/sheep_priv.h
> @@ -117,6 +117,11 @@ struct cluster_info {
>   	struct list_head cpg_node_list;
>   	struct list_head sd_node_list;
>
> +	/* leave list is only used to account for bad nodes when we start
> +	 * up the cluster nodes after we shutdown the cluster through collie.
> +	 */
> +	struct list_head leave_list;
> +
>   	/* this array contains a list of ordered virtual nodes */
>   	struct sheepdog_vnode_list_entry vnodes[SD_MAX_VNODES];
>   	int nr_vnodes;
> @@ -138,6 +143,7 @@ struct cluster_info {
>   	int nr_outstanding_reqs;
>
>   	uint32_t recovered_epoch;
> +	uint32_t leave_epoch; /* The highest number in the clsuter */
>
>   	int use_directio;
>




More information about the sheepdog mailing list