At Mon, 10 Oct 2011 18:15:21 +0800, Liu Yuan wrote: > > On 10/10/2011 05:58 PM, MORITA Kazutaka wrote: > > Thanks for your review! > > > > At Mon, 10 Oct 2011 17:29:06 +0800, > > Liu Yuan wrote: > >> On 10/10/2011 04:44 PM, MORITA Kazutaka wrote: > >>> Currently Sheepdog vdi operations (create/delete/lookup/...) are > >>> processed in two phase multicasting: > >>> > >>> 1. multicasts a vdi request > >>> 2. only the master node handles the request and multicasts the > >>> response > >>> > >>> During this two phase, we cannot allow any other vdi operations and > >>> membership changes, and this makes sheep/group.c a bit hard to read. > >>> > >>> This patch simplifies this by adding a blocking callback to the > >>> notification function in the cluster driver. If the caller of > >>> cdrv->notify() sets 'block_cb' as an argument, block_cb() is called > >>> from the cluster driver before the message is notified to any node. > >>> All the cluster events are blocked in every nodes until the caller > >>> finishes the vdi operation in block_cb(). > >>> > >>> With this change, the master node is no longer in charge of vdi > >>> operations, but this is a good change to make Sheepdog more symmetric. > >>> > >>> Signed-off-by: MORITA Kazutaka<morita.kazutaka at lab.ntt.co.jp> > >>> --- > >>> sheep/cluster.h | 10 ++- > >>> sheep/cluster/corosync.c | 211 +++++++++++++++++++++++++++++++++++++++------- > >>> sheep/group.c | 12 ++-- > >>> 3 files changed, 195 insertions(+), 38 deletions(-) > >>> > >>> diff --git a/sheep/cluster.h b/sheep/cluster.h > >>> index 43f4575..25b2d48 100644 > >>> --- a/sheep/cluster.h > >>> +++ b/sheep/cluster.h > >>> @@ -71,11 +71,17 @@ struct cluster_driver { > >>> * > >>> * This function sends 'msg' to all the nodes. The notified > >>> * messages can be read through notify_handler() in > >>> - * cdrv_handlers. > >>> + * cdrv_handlers. If 'block_cb' is specified, block_cb() is > >>> + * called before 'msg' is notified to all the nodes. All the > >>> + * cluster events including this notification are blocked > >>> + * until block_cb() returns or this blocking node leaves the > >>> + * cluster. The sheep daemon can sleep in block_cb(), so this > >>> + * callback must be called from other than the dispatch > >>> + * thread. > >> from other than? I don't get it. Did you mean from worker thread other > >> than main thread? > > I meant we cannot call block_cb() in the same thread with > > cdrv->dispatch(). It is because cdrv->dispatch() is called in the > > main thread and we cannot sleep in the main thread. The corosync > > driver calls block_cb() in a worker thread, but some drivers may not. > > > > So "callback must be not called from the dispatch(main) thread" would be > easier to understand. Thanks, it's much better. > >>> * > >>> * Returns zero on success, -1 on error > >>> */ > >>> - int (*notify)(void *msg, size_t msg_len); > >>> + int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg)); > >>> > >>> /* > >>> * Dispatch handlers > >>> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c > >>> index ff52525..629e8a9 100644 > >>> --- a/sheep/cluster/corosync.c > >>> +++ b/sheep/cluster/corosync.c > >>> @@ -14,22 +14,35 @@ > >>> #include<corosync/cfg.h> > >>> > >>> #include "cluster.h" > >>> +#include "work.h" > >>> > >>> static cpg_handle_t cpg_handle; > >>> static struct cpg_name cpg_group = { 9, "sheepdog" }; > >>> > >>> static corosync_cfg_handle_t cfg_handle; > >>> +static struct sheepid this_sheepid; > >>> + > >>> +static struct work_queue *corosync_block_wq; > >>> > >>> static struct cdrv_handlers corosync_handlers; > >>> > >>> static LIST_HEAD(corosync_event_list); > >>> +static LIST_HEAD(corosync_block_list); > >>> > >>> +/* event types which are dispatched in corosync_dispatch() */ > >>> enum corosync_event_type { > >>> COROSYNC_EVENT_TYPE_JOIN, > >>> COROSYNC_EVENT_TYPE_LEAVE, > >>> COROSYNC_EVENT_TYPE_NOTIFY, > >>> }; > >>> > >>> +/* multicast message type */ > >>> +enum corosync_message_type { > >>> + COROSYNC_MSG_TYPE_NOTIFY, > >>> + COROSYNC_MSG_TYPE_BLOCK, > >>> + COROSYNC_MSG_TYPE_UNBLOCK, > >>> +}; > >>> + > >>> struct corosync_event { > >>> enum corosync_event_type type; > >>> > >>> @@ -42,6 +55,18 @@ struct corosync_event { > >>> void *msg; > >>> size_t msg_len; > >>> > >>> + int blocked; > >>> + int callbacked; > >>> + > >>> + struct list_head list; > >>> +}; > >>> + > >>> +struct corosync_block_msg { > >>> + void *msg; > >>> + size_t msg_len; > >>> + void (*cb)(void *arg); > >>> + > >>> + struct work work; > >>> struct list_head list; > >>> }; > >>> > >>> @@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, > >>> } > >>> } > >>> > >>> +static int send_message(uint64_t type, void *msg, size_t msg_len) > >>> +{ > >>> + struct iovec iov[2]; > >>> + int ret, iov_cnt = 1; > >>> + > >>> + iov[0].iov_base =&type; > >>> + iov[0].iov_len = sizeof(type); > >>> + if (msg) { > >>> + iov[1].iov_base = msg; > >>> + iov[1].iov_len = msg_len; > >>> + iov_cnt++; > >>> + } > >>> +retry: > >>> + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt); > >>> + switch (ret) { > >>> + case CPG_OK: > >>> + break; > >>> + case CPG_ERR_TRY_AGAIN: > >>> + dprintf("failed to send message. try again\n"); > >>> + sleep(1); > >>> + goto retry; > >>> + default: > >>> + eprintf("failed to send message, %d\n", ret); > >>> + return -1; > >>> + } > >>> + return 0; > >>> +} > >>> + > >>> +static void corosync_block(struct work *work, int idx) > >>> +{ > >>> + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); > >>> + > >>> + bm->cb(bm->msg); > >>> +} > >>> + > >>> +static void corosync_block_done(struct work *work, int idx) > >>> +{ > >>> + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); > >>> + > >>> + send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len); > >>> + > >>> + free(bm->msg); > >>> + free(bm); > >>> +} > >>> + > >>> +static struct corosync_event *find_block_event(struct sheepid *sender) > >>> +{ > >>> + struct corosync_event *cevent; > >>> + > >>> + list_for_each_entry(cevent,&corosync_event_list, list) { > >>> + if (!cevent->blocked) > >>> + continue; > >>> + > >>> + if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY&& > >>> + sheepid_cmp(&cevent->sender, sender) == 0) > >>> + return cevent; > >>> + } > >>> + > >>> + return NULL; > >>> +} > >>> + > >>> static void __corosync_dispatch(void) > >>> { > >>> struct corosync_event *cevent; > >>> + struct corosync_block_msg *bm; > >>> > >>> while (!list_empty(&corosync_event_list)) { > >>> cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list); > >>> @@ -110,6 +197,28 @@ static void __corosync_dispatch(void) > >>> cevent->nr_members); > >>> break; > >>> case COROSYNC_EVENT_TYPE_NOTIFY: > >>> + if (cevent->blocked) { > >>> + if (sheepid_cmp(&cevent->sender,&this_sheepid) == 0&& > >>> + !cevent->callbacked) { > >>> + /* call a block callback function from a worker thread */ > >>> + if (list_empty(&corosync_block_list)) > >>> + panic("cannot call block callback\n"); > >>> + > >>> + bm = list_first_entry(&corosync_block_list, > >>> + typeof(*bm), list); > >>> + list_del(&bm->list); > >>> + > >>> + bm->work.fn = corosync_block; > >>> + bm->work.done = corosync_block_done; > >>> + queue_work(corosync_block_wq,&bm->work); > >>> + > >>> + cevent->callbacked = 1; > >>> + } > >>> + > >> is cevent->callbacked really needed? How about a static local one? > > This variable is needed to ensure that block_cb() is called only once. > > How to set zero to cevent->callbacked if we make it a static local > > variable? > > > Hmm, seems this is the simplest method. > > >>> + /* block the rest messages until unblock message comes */ > >>> + goto out; > >>> + } > >>> + > >>> corosync_handlers.notify_handler(&cevent->sender, > >>> cevent->msg, > >>> cevent->msg_len); > >>> @@ -119,6 +228,8 @@ static void __corosync_dispatch(void) > >>> list_del(&cevent->list); > >>> free(cevent); > >>> } > >>> +out: > >>> + return; > >>> } > >>> > >>> static void cdrv_cpg_deliver(cpg_handle_t handle, > >>> @@ -127,21 +238,49 @@ static void cdrv_cpg_deliver(cpg_handle_t handle, > >>> void *msg, size_t msg_len) > >>> { > >>> struct corosync_event *cevent; > >>> + uint64_t type; > >>> + struct sheepid sender; > >>> > >>> - cevent = zalloc(sizeof(*cevent)); > >>> - if (!cevent) > >>> - panic("oom\n"); > >>> - cevent->msg = zalloc(msg_len); > >>> - if (!cevent->msg) > >>> - panic("oom\n"); > >>> + nodeid_to_addr(nodeid, sender.addr); > >>> + sender.pid = pid; > >>> > >>> - cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; > >>> - nodeid_to_addr(nodeid, cevent->sender.addr); > >>> - cevent->sender.pid = pid; > >>> - memcpy(cevent->msg, msg, msg_len); > >>> - cevent->msg_len = msg_len; > >>> + memcpy(&type, msg, sizeof(type)); > >>> + msg = (uint8_t *)msg + sizeof(type); > >>> + msg_len -= sizeof(type); > >>> > >>> - list_add_tail(&cevent->list,&corosync_event_list); > >>> + switch (type) { > >>> + case COROSYNC_MSG_TYPE_BLOCK: > >>> + case COROSYNC_MSG_TYPE_NOTIFY: > >>> + cevent = zalloc(sizeof(*cevent)); > >>> + if (!cevent) > >>> + panic("oom\n"); > >>> + cevent->msg = zalloc(msg_len); > >>> + if (!cevent->msg) > >>> + panic("oom\n"); > >>> + > >>> + cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; > >>> + cevent->sender = sender; > >>> + memcpy(cevent->msg, msg, msg_len); > >>> + cevent->msg_len = msg_len; > >>> + if (type == COROSYNC_MSG_TYPE_BLOCK) > >>> + cevent->blocked = 1; > >> move cevent->blocked = 1; into case COROSYNC_MSG_TYPE_BLOCK would avoid > >> if check. > > But cevent is allocated in this case block. > > > So can we move cevent allocation outside switch clause? Hmm, yes, though we need to free it when we receive COROSYNC_MSG_TYPE_UNBLOCK. I'll send a v3 patchset later. Thanks, Kazutaka |