On 10/10/2011 05:58 PM, MORITA Kazutaka wrote: > Thanks for your review! > > At Mon, 10 Oct 2011 17:29:06 +0800, > Liu Yuan wrote: >> On 10/10/2011 04:44 PM, MORITA Kazutaka wrote: >>> Currently Sheepdog vdi operations (create/delete/lookup/...) are >>> processed in two phase multicasting: >>> >>> 1. multicasts a vdi request >>> 2. only the master node handles the request and multicasts the >>> response >>> >>> During this two phase, we cannot allow any other vdi operations and >>> membership changes, and this makes sheep/group.c a bit hard to read. >>> >>> This patch simplifies this by adding a blocking callback to the >>> notification function in the cluster driver. If the caller of >>> cdrv->notify() sets 'block_cb' as an argument, block_cb() is called >>> from the cluster driver before the message is notified to any node. >>> All the cluster events are blocked in every nodes until the caller >>> finishes the vdi operation in block_cb(). >>> >>> With this change, the master node is no longer in charge of vdi >>> operations, but this is a good change to make Sheepdog more symmetric. >>> >>> Signed-off-by: MORITA Kazutaka<morita.kazutaka at lab.ntt.co.jp> >>> --- >>> sheep/cluster.h | 10 ++- >>> sheep/cluster/corosync.c | 211 +++++++++++++++++++++++++++++++++++++++------- >>> sheep/group.c | 12 ++-- >>> 3 files changed, 195 insertions(+), 38 deletions(-) >>> >>> diff --git a/sheep/cluster.h b/sheep/cluster.h >>> index 43f4575..25b2d48 100644 >>> --- a/sheep/cluster.h >>> +++ b/sheep/cluster.h >>> @@ -71,11 +71,17 @@ struct cluster_driver { >>> * >>> * This function sends 'msg' to all the nodes. The notified >>> * messages can be read through notify_handler() in >>> - * cdrv_handlers. >>> + * cdrv_handlers. If 'block_cb' is specified, block_cb() is >>> + * called before 'msg' is notified to all the nodes. All the >>> + * cluster events including this notification are blocked >>> + * until block_cb() returns or this blocking node leaves the >>> + * cluster. The sheep daemon can sleep in block_cb(), so this >>> + * callback must be called from other than the dispatch >>> + * thread. >> from other than? I don't get it. Did you mean from worker thread other >> than main thread? > I meant we cannot call block_cb() in the same thread with > cdrv->dispatch(). It is because cdrv->dispatch() is called in the > main thread and we cannot sleep in the main thread. The corosync > driver calls block_cb() in a worker thread, but some drivers may not. > So "callback must be not called from the dispatch(main) thread" would be easier to understand. >>> * >>> * Returns zero on success, -1 on error >>> */ >>> - int (*notify)(void *msg, size_t msg_len); >>> + int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg)); >>> >>> /* >>> * Dispatch handlers >>> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c >>> index ff52525..629e8a9 100644 >>> --- a/sheep/cluster/corosync.c >>> +++ b/sheep/cluster/corosync.c >>> @@ -14,22 +14,35 @@ >>> #include<corosync/cfg.h> >>> >>> #include "cluster.h" >>> +#include "work.h" >>> >>> static cpg_handle_t cpg_handle; >>> static struct cpg_name cpg_group = { 9, "sheepdog" }; >>> >>> static corosync_cfg_handle_t cfg_handle; >>> +static struct sheepid this_sheepid; >>> + >>> +static struct work_queue *corosync_block_wq; >>> >>> static struct cdrv_handlers corosync_handlers; >>> >>> static LIST_HEAD(corosync_event_list); >>> +static LIST_HEAD(corosync_block_list); >>> >>> +/* event types which are dispatched in corosync_dispatch() */ >>> enum corosync_event_type { >>> COROSYNC_EVENT_TYPE_JOIN, >>> COROSYNC_EVENT_TYPE_LEAVE, >>> COROSYNC_EVENT_TYPE_NOTIFY, >>> }; >>> >>> +/* multicast message type */ >>> +enum corosync_message_type { >>> + COROSYNC_MSG_TYPE_NOTIFY, >>> + COROSYNC_MSG_TYPE_BLOCK, >>> + COROSYNC_MSG_TYPE_UNBLOCK, >>> +}; >>> + >>> struct corosync_event { >>> enum corosync_event_type type; >>> >>> @@ -42,6 +55,18 @@ struct corosync_event { >>> void *msg; >>> size_t msg_len; >>> >>> + int blocked; >>> + int callbacked; >>> + >>> + struct list_head list; >>> +}; >>> + >>> +struct corosync_block_msg { >>> + void *msg; >>> + size_t msg_len; >>> + void (*cb)(void *arg); >>> + >>> + struct work work; >>> struct list_head list; >>> }; >>> >>> @@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, >>> } >>> } >>> >>> +static int send_message(uint64_t type, void *msg, size_t msg_len) >>> +{ >>> + struct iovec iov[2]; >>> + int ret, iov_cnt = 1; >>> + >>> + iov[0].iov_base =&type; >>> + iov[0].iov_len = sizeof(type); >>> + if (msg) { >>> + iov[1].iov_base = msg; >>> + iov[1].iov_len = msg_len; >>> + iov_cnt++; >>> + } >>> +retry: >>> + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt); >>> + switch (ret) { >>> + case CPG_OK: >>> + break; >>> + case CPG_ERR_TRY_AGAIN: >>> + dprintf("failed to send message. try again\n"); >>> + sleep(1); >>> + goto retry; >>> + default: >>> + eprintf("failed to send message, %d\n", ret); >>> + return -1; >>> + } >>> + return 0; >>> +} >>> + >>> +static void corosync_block(struct work *work, int idx) >>> +{ >>> + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); >>> + >>> + bm->cb(bm->msg); >>> +} >>> + >>> +static void corosync_block_done(struct work *work, int idx) >>> +{ >>> + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); >>> + >>> + send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len); >>> + >>> + free(bm->msg); >>> + free(bm); >>> +} >>> + >>> +static struct corosync_event *find_block_event(struct sheepid *sender) >>> +{ >>> + struct corosync_event *cevent; >>> + >>> + list_for_each_entry(cevent,&corosync_event_list, list) { >>> + if (!cevent->blocked) >>> + continue; >>> + >>> + if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY&& >>> + sheepid_cmp(&cevent->sender, sender) == 0) >>> + return cevent; >>> + } >>> + >>> + return NULL; >>> +} >>> + >>> static void __corosync_dispatch(void) >>> { >>> struct corosync_event *cevent; >>> + struct corosync_block_msg *bm; >>> >>> while (!list_empty(&corosync_event_list)) { >>> cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list); >>> @@ -110,6 +197,28 @@ static void __corosync_dispatch(void) >>> cevent->nr_members); >>> break; >>> case COROSYNC_EVENT_TYPE_NOTIFY: >>> + if (cevent->blocked) { >>> + if (sheepid_cmp(&cevent->sender,&this_sheepid) == 0&& >>> + !cevent->callbacked) { >>> + /* call a block callback function from a worker thread */ >>> + if (list_empty(&corosync_block_list)) >>> + panic("cannot call block callback\n"); >>> + >>> + bm = list_first_entry(&corosync_block_list, >>> + typeof(*bm), list); >>> + list_del(&bm->list); >>> + >>> + bm->work.fn = corosync_block; >>> + bm->work.done = corosync_block_done; >>> + queue_work(corosync_block_wq,&bm->work); >>> + >>> + cevent->callbacked = 1; >>> + } >>> + >> is cevent->callbacked really needed? How about a static local one? > This variable is needed to ensure that block_cb() is called only once. > How to set zero to cevent->callbacked if we make it a static local > variable? > Hmm, seems this is the simplest method. >>> + /* block the rest messages until unblock message comes */ >>> + goto out; >>> + } >>> + >>> corosync_handlers.notify_handler(&cevent->sender, >>> cevent->msg, >>> cevent->msg_len); >>> @@ -119,6 +228,8 @@ static void __corosync_dispatch(void) >>> list_del(&cevent->list); >>> free(cevent); >>> } >>> +out: >>> + return; >>> } >>> >>> static void cdrv_cpg_deliver(cpg_handle_t handle, >>> @@ -127,21 +238,49 @@ static void cdrv_cpg_deliver(cpg_handle_t handle, >>> void *msg, size_t msg_len) >>> { >>> struct corosync_event *cevent; >>> + uint64_t type; >>> + struct sheepid sender; >>> >>> - cevent = zalloc(sizeof(*cevent)); >>> - if (!cevent) >>> - panic("oom\n"); >>> - cevent->msg = zalloc(msg_len); >>> - if (!cevent->msg) >>> - panic("oom\n"); >>> + nodeid_to_addr(nodeid, sender.addr); >>> + sender.pid = pid; >>> >>> - cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; >>> - nodeid_to_addr(nodeid, cevent->sender.addr); >>> - cevent->sender.pid = pid; >>> - memcpy(cevent->msg, msg, msg_len); >>> - cevent->msg_len = msg_len; >>> + memcpy(&type, msg, sizeof(type)); >>> + msg = (uint8_t *)msg + sizeof(type); >>> + msg_len -= sizeof(type); >>> >>> - list_add_tail(&cevent->list,&corosync_event_list); >>> + switch (type) { >>> + case COROSYNC_MSG_TYPE_BLOCK: >>> + case COROSYNC_MSG_TYPE_NOTIFY: >>> + cevent = zalloc(sizeof(*cevent)); >>> + if (!cevent) >>> + panic("oom\n"); >>> + cevent->msg = zalloc(msg_len); >>> + if (!cevent->msg) >>> + panic("oom\n"); >>> + >>> + cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; >>> + cevent->sender = sender; >>> + memcpy(cevent->msg, msg, msg_len); >>> + cevent->msg_len = msg_len; >>> + if (type == COROSYNC_MSG_TYPE_BLOCK) >>> + cevent->blocked = 1; >> move cevent->blocked = 1; into case COROSYNC_MSG_TYPE_BLOCK would avoid >> if check. > But cevent is allocated in this case block. > So can we move cevent allocation outside switch clause? Thanks, Yuan |