[Sheepdog] [RFC PATCH 2/3] cluster: add blocking mechanism to notification

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Mon Oct 10 10:07:38 CEST 2011


At Mon, 10 Oct 2011 15:34:44 +0800,
Liu Yuan wrote:
> 
> On 10/10/2011 01:46 PM, MORITA Kazutaka wrote:
> > Currently Sheepdog vdi operations (create/delete/lookup/...) are
> > processed in two phase multicasting:
> >
> >   1. multicasts a vdi request
> >   2. only the master node handles the request and multicasts the
> >      response
> >
> > During this two phase, we cannot allow any other vdi operations and
> > membership changes, and this makes sheep/group.c a bit hard to read.
> >
> > This patch simplifies this by adding a blocking callback to the
> > notification function in the cluster driver.  If the caller of
> > cdrv->notify() sets 'block_cb' as an argument, block_cb() is called
> > from the cluster driver before the message is notified to any node.
> > All the cluster events are blocked in every nodes until the caller
> > finishes the vdi operation in block_cb().
> >
> > With this change, the master node is no longer in charge of vdi
> > operations, but this is a good change to make Sheepdog more symmetric.
> >
> > Signed-off-by: MORITA Kazutaka<morita.kazutaka at lab.ntt.co.jp>
> > ---
> >   sheep/cluster.h          |   10 ++-
> >   sheep/cluster/corosync.c |  209 +++++++++++++++++++++++++++++++++++++++-------
> >   sheep/group.c            |   12 ++--
> >   3 files changed, 191 insertions(+), 40 deletions(-)
> >
> > diff --git a/sheep/cluster.h b/sheep/cluster.h
> > index 43f4575..25b2d48 100644
> > --- a/sheep/cluster.h
> > +++ b/sheep/cluster.h
> > @@ -71,11 +71,17 @@ struct cluster_driver {
> >   	 *
> >   	 * This function sends 'msg' to all the nodes.  The notified
> >   	 * messages can be read through notify_handler() in
> > -	 * cdrv_handlers.
> > +	 * cdrv_handlers.  If 'block_cb' is specified, block_cb() is
> > +	 * called before 'msg' is notified to all the nodes.  All the
> > +	 * cluster events including this notification are blocked
> > +	 * until block_cb() returns or this blocking node leaves the
> > +	 * cluster.  The sheep daemon can sleep in block_cb(), so this
> > +	 * callback must be called from other than the dispatch
> > +	 * thread.
> >   	 *
> >   	 * Returns zero on success, -1 on error
> >   	 */
> > -	int (*notify)(void *msg, size_t msg_len);
> > +	int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
> >
> >   	/*
> >   	 * Dispatch handlers
> > diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> > index e0f9a9c..022912b 100644
> > --- a/sheep/cluster/corosync.c
> > +++ b/sheep/cluster/corosync.c
> > @@ -14,20 +14,27 @@
> >   #include<corosync/cfg.h>
> >
> >   #include "cluster.h"
> > +#include "work.h"
> >
> >   static cpg_handle_t cpg_handle;
> >   static struct cpg_name cpg_group = { 9, "sheepdog" };
> >
> >   static corosync_cfg_handle_t cfg_handle;
> > +static struct sheepid this_sheepid;
> > +
> > +static struct work_queue *corosync_block_wq;
> >
> >   static struct cdrv_handlers corosync_handlers;
> >
> >   static LIST_HEAD(corosync_event_list);
> > +static LIST_HEAD(corosync_block_list);
> >
> >   enum corosync_event_type {
> >   	COROSYNC_EVENT_TYPE_JOIN,
> >   	COROSYNC_EVENT_TYPE_LEAVE,
> >   	COROSYNC_EVENT_TYPE_NOTIFY,
> > +	COROSYNC_EVENT_TYPE_BLOCK,
> > +	COROSYNC_EVENT_TYPE_UNBLOCK,
> >   };
> >
> 
> how about separate block/unblock message handling from cluster message?  
> More and more message handling, if any in the later dev, would make 
> corosync_dispatch() more complicated  and bloated. Current unblock 
> message handling in corosync_dispatch() already looks odd (tricky) to me.

Thanks for your comment.  How about splitting into two enums?

/* event types which are dispatched in corosync_dispatc() */
enum corosync_event_type {
	COROSYNC_EVENT_TYPE_JOIN,
	COROSYNC_EVENT_TYPE_LEAVE,
	COROSYNC_EVENT_TYPE_NOTIFY,
};

/* corosync multicast message type */
enum corosync_message_type {
	COROSYNC_MSG_TYPE_NOTIFY,
	COROSYNC_MSG_TYPE_BLOCK,
	COROSYNC_MSG_TYPE_UNBLOCK,
};

> 
> >   struct corosync_event {
> > @@ -42,6 +49,17 @@ struct corosync_event {
> >   	void *msg;
> >   	size_t msg_len;
> >
> > +	int blocked;
> > +
> > +	struct list_head list;
> > +};
> > +
> > +struct corosync_block_msg {
> > +	void *msg;
> > +	size_t msg_len;
> > +	void (*cb)(void *arg);
> > +
> > +	struct work work;
> >   	struct list_head list;
> >   };
> >
> > @@ -91,9 +109,70 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
> >   	}
> >   }
> >
> > +static int send_message(uint64_t type, void *msg, size_t msg_len)
> > +{
> > +	struct iovec iov[2];
> > +	int ret, iov_cnt = 1;
> > +
> > +	iov[0].iov_base =&type;
> > +	iov[0].iov_len = sizeof(type);
> > +	if (msg) {
> > +		iov[1].iov_base = msg;
> > +		iov[1].iov_len = msg_len;
> > +		iov_cnt++;
> > +	}
> > +retry:
> > +	ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt);
> > +	switch (ret) {
> > +	case CPG_OK:
> > +		break;
> > +	case CPG_ERR_TRY_AGAIN:
> > +		dprintf("failed to send message. try again\n");
> > +		sleep(1);
> > +		goto retry;
> > +	default:
> > +		eprintf("failed to send message, %d\n", ret);
> > +		return -1;
> > +	}
> > +	return 0;
> > +}
> > +
> > +static void corosync_block(struct work *work, int idx)
> > +{
> > +	struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> > +
> > +	bm->cb(bm->msg);
> > +}
> > +
> > +static void corosync_block_done(struct work *work, int idx)
> > +{
> > +	struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> > +
> > +	send_message(COROSYNC_EVENT_TYPE_UNBLOCK, bm->msg, bm->msg_len);
> > +
> > +	free(bm->msg);
> > +	free(bm);
> > +}
> > +
> > +static struct corosync_event *find_block_event(struct sheepid *sender)
> > +{
> > +	struct corosync_event *cevent;
> > +
> > +	list_for_each_entry(cevent,&corosync_event_list, list) {
> > +		if (cevent->type != COROSYNC_EVENT_TYPE_BLOCK)
> > +			continue;
> > +
> > +		if (sheepid_cmp(&cevent->sender, sender) == 0)
> > +			return cevent;
> > +	}
> > +
> > +	return NULL;
> > +}
> > +
> >   static void __corosync_dispatch(void)
> >   {
> >   	struct corosync_event *cevent;
> > +	struct corosync_block_msg *bm;
> >
> >   	while (!list_empty(&corosync_event_list)) {
> >   		cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
> > @@ -114,11 +193,36 @@ static void __corosync_dispatch(void)
> >   							 cevent->msg,
> >   							 cevent->msg_len);
> >   			break;
> > +		case COROSYNC_EVENT_TYPE_BLOCK:
> > +			if (sheepid_cmp(&cevent->sender,&this_sheepid) != 0)
> > +				cevent->blocked = 1;
> > +
> > +			if (!cevent->blocked) {
> > +				/* call a block callback function from a worker thread */
> > +				if (list_empty(&corosync_block_list))
> > +					panic("cannot call block callback\n");
> > +
> > +				bm = list_first_entry(&corosync_block_list, typeof(*bm), list);
> > +				list_del(&bm->list);
> > +
> > +				bm->work.fn = corosync_block;
> > +				bm->work.done = corosync_block_done;
> > +				queue_work(corosync_block_wq,&bm->work);
> > +				cevent->blocked = 1;
> > +			}
> > +
> 
> might variable *blocked* be static local one if no other ref?

I guess if we remove COROSYNC_EVENT_TYPE_BLOCK/UNBLOCK from
COROSYNC_EVENT_TYPE, we need to access 'blocked' from outside.
I'll send a fixed patch, then let's discuss again.

Thanks,

Kazutaka



More information about the sheepdog mailing list