On 10/10/2011 01:46 PM, MORITA Kazutaka wrote: > Currently Sheepdog vdi operations (create/delete/lookup/...) are > processed in two phase multicasting: > > 1. multicasts a vdi request > 2. only the master node handles the request and multicasts the > response > > During this two phase, we cannot allow any other vdi operations and > membership changes, and this makes sheep/group.c a bit hard to read. > > This patch simplifies this by adding a blocking callback to the > notification function in the cluster driver. If the caller of > cdrv->notify() sets 'block_cb' as an argument, block_cb() is called > from the cluster driver before the message is notified to any node. > All the cluster events are blocked in every nodes until the caller > finishes the vdi operation in block_cb(). > > With this change, the master node is no longer in charge of vdi > operations, but this is a good change to make Sheepdog more symmetric. > > Signed-off-by: MORITA Kazutaka<morita.kazutaka at lab.ntt.co.jp> > --- > sheep/cluster.h | 10 ++- > sheep/cluster/corosync.c | 209 +++++++++++++++++++++++++++++++++++++++------- > sheep/group.c | 12 ++-- > 3 files changed, 191 insertions(+), 40 deletions(-) > > diff --git a/sheep/cluster.h b/sheep/cluster.h > index 43f4575..25b2d48 100644 > --- a/sheep/cluster.h > +++ b/sheep/cluster.h > @@ -71,11 +71,17 @@ struct cluster_driver { > * > * This function sends 'msg' to all the nodes. The notified > * messages can be read through notify_handler() in > - * cdrv_handlers. > + * cdrv_handlers. If 'block_cb' is specified, block_cb() is > + * called before 'msg' is notified to all the nodes. All the > + * cluster events including this notification are blocked > + * until block_cb() returns or this blocking node leaves the > + * cluster. The sheep daemon can sleep in block_cb(), so this > + * callback must be called from other than the dispatch > + * thread. > * > * Returns zero on success, -1 on error > */ > - int (*notify)(void *msg, size_t msg_len); > + int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg)); > > /* > * Dispatch handlers > diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c > index e0f9a9c..022912b 100644 > --- a/sheep/cluster/corosync.c > +++ b/sheep/cluster/corosync.c > @@ -14,20 +14,27 @@ > #include<corosync/cfg.h> > > #include "cluster.h" > +#include "work.h" > > static cpg_handle_t cpg_handle; > static struct cpg_name cpg_group = { 9, "sheepdog" }; > > static corosync_cfg_handle_t cfg_handle; > +static struct sheepid this_sheepid; > + > +static struct work_queue *corosync_block_wq; > > static struct cdrv_handlers corosync_handlers; > > static LIST_HEAD(corosync_event_list); > +static LIST_HEAD(corosync_block_list); > > enum corosync_event_type { > COROSYNC_EVENT_TYPE_JOIN, > COROSYNC_EVENT_TYPE_LEAVE, > COROSYNC_EVENT_TYPE_NOTIFY, > + COROSYNC_EVENT_TYPE_BLOCK, > + COROSYNC_EVENT_TYPE_UNBLOCK, > }; > how about separate block/unblock message handling from cluster message? More and more message handling, if any in the later dev, would make corosync_dispatch() more complicated and bloated. Current unblock message handling in corosync_dispatch() already looks odd (tricky) to me. > struct corosync_event { > @@ -42,6 +49,17 @@ struct corosync_event { > void *msg; > size_t msg_len; > > + int blocked; > + > + struct list_head list; > +}; > + > +struct corosync_block_msg { > + void *msg; > + size_t msg_len; > + void (*cb)(void *arg); > + > + struct work work; > struct list_head list; > }; > > @@ -91,9 +109,70 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, > } > } > > +static int send_message(uint64_t type, void *msg, size_t msg_len) > +{ > + struct iovec iov[2]; > + int ret, iov_cnt = 1; > + > + iov[0].iov_base =&type; > + iov[0].iov_len = sizeof(type); > + if (msg) { > + iov[1].iov_base = msg; > + iov[1].iov_len = msg_len; > + iov_cnt++; > + } > +retry: > + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt); > + switch (ret) { > + case CPG_OK: > + break; > + case CPG_ERR_TRY_AGAIN: > + dprintf("failed to send message. try again\n"); > + sleep(1); > + goto retry; > + default: > + eprintf("failed to send message, %d\n", ret); > + return -1; > + } > + return 0; > +} > + > +static void corosync_block(struct work *work, int idx) > +{ > + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); > + > + bm->cb(bm->msg); > +} > + > +static void corosync_block_done(struct work *work, int idx) > +{ > + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); > + > + send_message(COROSYNC_EVENT_TYPE_UNBLOCK, bm->msg, bm->msg_len); > + > + free(bm->msg); > + free(bm); > +} > + > +static struct corosync_event *find_block_event(struct sheepid *sender) > +{ > + struct corosync_event *cevent; > + > + list_for_each_entry(cevent,&corosync_event_list, list) { > + if (cevent->type != COROSYNC_EVENT_TYPE_BLOCK) > + continue; > + > + if (sheepid_cmp(&cevent->sender, sender) == 0) > + return cevent; > + } > + > + return NULL; > +} > + > static void __corosync_dispatch(void) > { > struct corosync_event *cevent; > + struct corosync_block_msg *bm; > > while (!list_empty(&corosync_event_list)) { > cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list); > @@ -114,11 +193,36 @@ static void __corosync_dispatch(void) > cevent->msg, > cevent->msg_len); > break; > + case COROSYNC_EVENT_TYPE_BLOCK: > + if (sheepid_cmp(&cevent->sender,&this_sheepid) != 0) > + cevent->blocked = 1; > + > + if (!cevent->blocked) { > + /* call a block callback function from a worker thread */ > + if (list_empty(&corosync_block_list)) > + panic("cannot call block callback\n"); > + > + bm = list_first_entry(&corosync_block_list, typeof(*bm), list); > + list_del(&bm->list); > + > + bm->work.fn = corosync_block; > + bm->work.done = corosync_block_done; > + queue_work(corosync_block_wq,&bm->work); > + cevent->blocked = 1; > + } > + might variable *blocked* be static local one if no other ref? Thanks, Yuan |