Thanks for your review! At Mon, 10 Oct 2011 17:29:06 +0800, Liu Yuan wrote: > > On 10/10/2011 04:44 PM, MORITA Kazutaka wrote: > > Currently Sheepdog vdi operations (create/delete/lookup/...) are > > processed in two phase multicasting: > > > > 1. multicasts a vdi request > > 2. only the master node handles the request and multicasts the > > response > > > > During this two phase, we cannot allow any other vdi operations and > > membership changes, and this makes sheep/group.c a bit hard to read. > > > > This patch simplifies this by adding a blocking callback to the > > notification function in the cluster driver. If the caller of > > cdrv->notify() sets 'block_cb' as an argument, block_cb() is called > > from the cluster driver before the message is notified to any node. > > All the cluster events are blocked in every nodes until the caller > > finishes the vdi operation in block_cb(). > > > > With this change, the master node is no longer in charge of vdi > > operations, but this is a good change to make Sheepdog more symmetric. > > > > Signed-off-by: MORITA Kazutaka<morita.kazutaka at lab.ntt.co.jp> > > --- > > sheep/cluster.h | 10 ++- > > sheep/cluster/corosync.c | 211 +++++++++++++++++++++++++++++++++++++++------- > > sheep/group.c | 12 ++-- > > 3 files changed, 195 insertions(+), 38 deletions(-) > > > > diff --git a/sheep/cluster.h b/sheep/cluster.h > > index 43f4575..25b2d48 100644 > > --- a/sheep/cluster.h > > +++ b/sheep/cluster.h > > @@ -71,11 +71,17 @@ struct cluster_driver { > > * > > * This function sends 'msg' to all the nodes. The notified > > * messages can be read through notify_handler() in > > - * cdrv_handlers. > > + * cdrv_handlers. If 'block_cb' is specified, block_cb() is > > + * called before 'msg' is notified to all the nodes. All the > > + * cluster events including this notification are blocked > > + * until block_cb() returns or this blocking node leaves the > > + * cluster. The sheep daemon can sleep in block_cb(), so this > > + * callback must be called from other than the dispatch > > + * thread. > > from other than? I don't get it. Did you mean from worker thread other > than main thread? I meant we cannot call block_cb() in the same thread with cdrv->dispatch(). It is because cdrv->dispatch() is called in the main thread and we cannot sleep in the main thread. The corosync driver calls block_cb() in a worker thread, but some drivers may not. > > > * > > * Returns zero on success, -1 on error > > */ > > - int (*notify)(void *msg, size_t msg_len); > > + int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg)); > > > > /* > > * Dispatch handlers > > diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c > > index ff52525..629e8a9 100644 > > --- a/sheep/cluster/corosync.c > > +++ b/sheep/cluster/corosync.c > > @@ -14,22 +14,35 @@ > > #include<corosync/cfg.h> > > > > #include "cluster.h" > > +#include "work.h" > > > > static cpg_handle_t cpg_handle; > > static struct cpg_name cpg_group = { 9, "sheepdog" }; > > > > static corosync_cfg_handle_t cfg_handle; > > +static struct sheepid this_sheepid; > > + > > +static struct work_queue *corosync_block_wq; > > > > static struct cdrv_handlers corosync_handlers; > > > > static LIST_HEAD(corosync_event_list); > > +static LIST_HEAD(corosync_block_list); > > > > +/* event types which are dispatched in corosync_dispatch() */ > > enum corosync_event_type { > > COROSYNC_EVENT_TYPE_JOIN, > > COROSYNC_EVENT_TYPE_LEAVE, > > COROSYNC_EVENT_TYPE_NOTIFY, > > }; > > > > +/* multicast message type */ > > +enum corosync_message_type { > > + COROSYNC_MSG_TYPE_NOTIFY, > > + COROSYNC_MSG_TYPE_BLOCK, > > + COROSYNC_MSG_TYPE_UNBLOCK, > > +}; > > + > > struct corosync_event { > > enum corosync_event_type type; > > > > @@ -42,6 +55,18 @@ struct corosync_event { > > void *msg; > > size_t msg_len; > > > > + int blocked; > > + int callbacked; > > + > > + struct list_head list; > > +}; > > + > > +struct corosync_block_msg { > > + void *msg; > > + size_t msg_len; > > + void (*cb)(void *arg); > > + > > + struct work work; > > struct list_head list; > > }; > > > > @@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, > > } > > } > > > > +static int send_message(uint64_t type, void *msg, size_t msg_len) > > +{ > > + struct iovec iov[2]; > > + int ret, iov_cnt = 1; > > + > > + iov[0].iov_base =&type; > > + iov[0].iov_len = sizeof(type); > > + if (msg) { > > + iov[1].iov_base = msg; > > + iov[1].iov_len = msg_len; > > + iov_cnt++; > > + } > > +retry: > > + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt); > > + switch (ret) { > > + case CPG_OK: > > + break; > > + case CPG_ERR_TRY_AGAIN: > > + dprintf("failed to send message. try again\n"); > > + sleep(1); > > + goto retry; > > + default: > > + eprintf("failed to send message, %d\n", ret); > > + return -1; > > + } > > + return 0; > > +} > > + > > +static void corosync_block(struct work *work, int idx) > > +{ > > + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); > > + > > + bm->cb(bm->msg); > > +} > > + > > +static void corosync_block_done(struct work *work, int idx) > > +{ > > + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); > > + > > + send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len); > > + > > + free(bm->msg); > > + free(bm); > > +} > > + > > +static struct corosync_event *find_block_event(struct sheepid *sender) > > +{ > > + struct corosync_event *cevent; > > + > > + list_for_each_entry(cevent,&corosync_event_list, list) { > > + if (!cevent->blocked) > > + continue; > > + > > + if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY&& > > + sheepid_cmp(&cevent->sender, sender) == 0) > > + return cevent; > > + } > > + > > + return NULL; > > +} > > + > > static void __corosync_dispatch(void) > > { > > struct corosync_event *cevent; > > + struct corosync_block_msg *bm; > > > > while (!list_empty(&corosync_event_list)) { > > cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list); > > @@ -110,6 +197,28 @@ static void __corosync_dispatch(void) > > cevent->nr_members); > > break; > > case COROSYNC_EVENT_TYPE_NOTIFY: > > + if (cevent->blocked) { > > + if (sheepid_cmp(&cevent->sender,&this_sheepid) == 0&& > > + !cevent->callbacked) { > > + /* call a block callback function from a worker thread */ > > + if (list_empty(&corosync_block_list)) > > + panic("cannot call block callback\n"); > > + > > + bm = list_first_entry(&corosync_block_list, > > + typeof(*bm), list); > > + list_del(&bm->list); > > + > > + bm->work.fn = corosync_block; > > + bm->work.done = corosync_block_done; > > + queue_work(corosync_block_wq,&bm->work); > > + > > + cevent->callbacked = 1; > > + } > > + > > is cevent->callbacked really needed? How about a static local one? This variable is needed to ensure that block_cb() is called only once. How to set zero to cevent->callbacked if we make it a static local variable? > > > + /* block the rest messages until unblock message comes */ > > + goto out; > > + } > > + > > corosync_handlers.notify_handler(&cevent->sender, > > cevent->msg, > > cevent->msg_len); > > @@ -119,6 +228,8 @@ static void __corosync_dispatch(void) > > list_del(&cevent->list); > > free(cevent); > > } > > +out: > > + return; > > } > > > > static void cdrv_cpg_deliver(cpg_handle_t handle, > > @@ -127,21 +238,49 @@ static void cdrv_cpg_deliver(cpg_handle_t handle, > > void *msg, size_t msg_len) > > { > > struct corosync_event *cevent; > > + uint64_t type; > > + struct sheepid sender; > > > > - cevent = zalloc(sizeof(*cevent)); > > - if (!cevent) > > - panic("oom\n"); > > - cevent->msg = zalloc(msg_len); > > - if (!cevent->msg) > > - panic("oom\n"); > > + nodeid_to_addr(nodeid, sender.addr); > > + sender.pid = pid; > > > > - cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; > > - nodeid_to_addr(nodeid, cevent->sender.addr); > > - cevent->sender.pid = pid; > > - memcpy(cevent->msg, msg, msg_len); > > - cevent->msg_len = msg_len; > > + memcpy(&type, msg, sizeof(type)); > > + msg = (uint8_t *)msg + sizeof(type); > > + msg_len -= sizeof(type); > > > > - list_add_tail(&cevent->list,&corosync_event_list); > > + switch (type) { > > + case COROSYNC_MSG_TYPE_BLOCK: > > + case COROSYNC_MSG_TYPE_NOTIFY: > > + cevent = zalloc(sizeof(*cevent)); > > + if (!cevent) > > + panic("oom\n"); > > + cevent->msg = zalloc(msg_len); > > + if (!cevent->msg) > > + panic("oom\n"); > > + > > + cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; > > + cevent->sender = sender; > > + memcpy(cevent->msg, msg, msg_len); > > + cevent->msg_len = msg_len; > > + if (type == COROSYNC_MSG_TYPE_BLOCK) > > + cevent->blocked = 1; > > move cevent->blocked = 1; into case COROSYNC_MSG_TYPE_BLOCK would avoid > if check. But cevent is allocated in this case block. Thanks, Kazutaka |