[Sheepdog] [PATCH v2 2/3] cluster: add blocking mechanism to notification
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Mon Oct 10 15:22:19 CEST 2011
At Mon, 10 Oct 2011 18:15:21 +0800,
Liu Yuan wrote:
>
> On 10/10/2011 05:58 PM, MORITA Kazutaka wrote:
> > Thanks for your review!
> >
> > At Mon, 10 Oct 2011 17:29:06 +0800,
> > Liu Yuan wrote:
> >> On 10/10/2011 04:44 PM, MORITA Kazutaka wrote:
> >>> Currently Sheepdog vdi operations (create/delete/lookup/...) are
> >>> processed in two phase multicasting:
> >>>
> >>> 1. multicasts a vdi request
> >>> 2. only the master node handles the request and multicasts the
> >>> response
> >>>
> >>> During this two phase, we cannot allow any other vdi operations and
> >>> membership changes, and this makes sheep/group.c a bit hard to read.
> >>>
> >>> This patch simplifies this by adding a blocking callback to the
> >>> notification function in the cluster driver. If the caller of
> >>> cdrv->notify() sets 'block_cb' as an argument, block_cb() is called
> >>> from the cluster driver before the message is notified to any node.
> >>> All the cluster events are blocked in every nodes until the caller
> >>> finishes the vdi operation in block_cb().
> >>>
> >>> With this change, the master node is no longer in charge of vdi
> >>> operations, but this is a good change to make Sheepdog more symmetric.
> >>>
> >>> Signed-off-by: MORITA Kazutaka<morita.kazutaka at lab.ntt.co.jp>
> >>> ---
> >>> sheep/cluster.h | 10 ++-
> >>> sheep/cluster/corosync.c | 211 +++++++++++++++++++++++++++++++++++++++-------
> >>> sheep/group.c | 12 ++--
> >>> 3 files changed, 195 insertions(+), 38 deletions(-)
> >>>
> >>> diff --git a/sheep/cluster.h b/sheep/cluster.h
> >>> index 43f4575..25b2d48 100644
> >>> --- a/sheep/cluster.h
> >>> +++ b/sheep/cluster.h
> >>> @@ -71,11 +71,17 @@ struct cluster_driver {
> >>> *
> >>> * This function sends 'msg' to all the nodes. The notified
> >>> * messages can be read through notify_handler() in
> >>> - * cdrv_handlers.
> >>> + * cdrv_handlers. If 'block_cb' is specified, block_cb() is
> >>> + * called before 'msg' is notified to all the nodes. All the
> >>> + * cluster events including this notification are blocked
> >>> + * until block_cb() returns or this blocking node leaves the
> >>> + * cluster. The sheep daemon can sleep in block_cb(), so this
> >>> + * callback must be called from other than the dispatch
> >>> + * thread.
> >> from other than? I don't get it. Did you mean from worker thread other
> >> than main thread?
> > I meant we cannot call block_cb() in the same thread with
> > cdrv->dispatch(). It is because cdrv->dispatch() is called in the
> > main thread and we cannot sleep in the main thread. The corosync
> > driver calls block_cb() in a worker thread, but some drivers may not.
> >
>
> So "callback must be not called from the dispatch(main) thread" would be
> easier to understand.
Thanks, it's much better.
> >>> *
> >>> * Returns zero on success, -1 on error
> >>> */
> >>> - int (*notify)(void *msg, size_t msg_len);
> >>> + int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
> >>>
> >>> /*
> >>> * Dispatch handlers
> >>> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> >>> index ff52525..629e8a9 100644
> >>> --- a/sheep/cluster/corosync.c
> >>> +++ b/sheep/cluster/corosync.c
> >>> @@ -14,22 +14,35 @@
> >>> #include<corosync/cfg.h>
> >>>
> >>> #include "cluster.h"
> >>> +#include "work.h"
> >>>
> >>> static cpg_handle_t cpg_handle;
> >>> static struct cpg_name cpg_group = { 9, "sheepdog" };
> >>>
> >>> static corosync_cfg_handle_t cfg_handle;
> >>> +static struct sheepid this_sheepid;
> >>> +
> >>> +static struct work_queue *corosync_block_wq;
> >>>
> >>> static struct cdrv_handlers corosync_handlers;
> >>>
> >>> static LIST_HEAD(corosync_event_list);
> >>> +static LIST_HEAD(corosync_block_list);
> >>>
> >>> +/* event types which are dispatched in corosync_dispatch() */
> >>> enum corosync_event_type {
> >>> COROSYNC_EVENT_TYPE_JOIN,
> >>> COROSYNC_EVENT_TYPE_LEAVE,
> >>> COROSYNC_EVENT_TYPE_NOTIFY,
> >>> };
> >>>
> >>> +/* multicast message type */
> >>> +enum corosync_message_type {
> >>> + COROSYNC_MSG_TYPE_NOTIFY,
> >>> + COROSYNC_MSG_TYPE_BLOCK,
> >>> + COROSYNC_MSG_TYPE_UNBLOCK,
> >>> +};
> >>> +
> >>> struct corosync_event {
> >>> enum corosync_event_type type;
> >>>
> >>> @@ -42,6 +55,18 @@ struct corosync_event {
> >>> void *msg;
> >>> size_t msg_len;
> >>>
> >>> + int blocked;
> >>> + int callbacked;
> >>> +
> >>> + struct list_head list;
> >>> +};
> >>> +
> >>> +struct corosync_block_msg {
> >>> + void *msg;
> >>> + size_t msg_len;
> >>> + void (*cb)(void *arg);
> >>> +
> >>> + struct work work;
> >>> struct list_head list;
> >>> };
> >>>
> >>> @@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
> >>> }
> >>> }
> >>>
> >>> +static int send_message(uint64_t type, void *msg, size_t msg_len)
> >>> +{
> >>> + struct iovec iov[2];
> >>> + int ret, iov_cnt = 1;
> >>> +
> >>> + iov[0].iov_base =&type;
> >>> + iov[0].iov_len = sizeof(type);
> >>> + if (msg) {
> >>> + iov[1].iov_base = msg;
> >>> + iov[1].iov_len = msg_len;
> >>> + iov_cnt++;
> >>> + }
> >>> +retry:
> >>> + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt);
> >>> + switch (ret) {
> >>> + case CPG_OK:
> >>> + break;
> >>> + case CPG_ERR_TRY_AGAIN:
> >>> + dprintf("failed to send message. try again\n");
> >>> + sleep(1);
> >>> + goto retry;
> >>> + default:
> >>> + eprintf("failed to send message, %d\n", ret);
> >>> + return -1;
> >>> + }
> >>> + return 0;
> >>> +}
> >>> +
> >>> +static void corosync_block(struct work *work, int idx)
> >>> +{
> >>> + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> >>> +
> >>> + bm->cb(bm->msg);
> >>> +}
> >>> +
> >>> +static void corosync_block_done(struct work *work, int idx)
> >>> +{
> >>> + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> >>> +
> >>> + send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);
> >>> +
> >>> + free(bm->msg);
> >>> + free(bm);
> >>> +}
> >>> +
> >>> +static struct corosync_event *find_block_event(struct sheepid *sender)
> >>> +{
> >>> + struct corosync_event *cevent;
> >>> +
> >>> + list_for_each_entry(cevent,&corosync_event_list, list) {
> >>> + if (!cevent->blocked)
> >>> + continue;
> >>> +
> >>> + if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY&&
> >>> + sheepid_cmp(&cevent->sender, sender) == 0)
> >>> + return cevent;
> >>> + }
> >>> +
> >>> + return NULL;
> >>> +}
> >>> +
> >>> static void __corosync_dispatch(void)
> >>> {
> >>> struct corosync_event *cevent;
> >>> + struct corosync_block_msg *bm;
> >>>
> >>> while (!list_empty(&corosync_event_list)) {
> >>> cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
> >>> @@ -110,6 +197,28 @@ static void __corosync_dispatch(void)
> >>> cevent->nr_members);
> >>> break;
> >>> case COROSYNC_EVENT_TYPE_NOTIFY:
> >>> + if (cevent->blocked) {
> >>> + if (sheepid_cmp(&cevent->sender,&this_sheepid) == 0&&
> >>> + !cevent->callbacked) {
> >>> + /* call a block callback function from a worker thread */
> >>> + if (list_empty(&corosync_block_list))
> >>> + panic("cannot call block callback\n");
> >>> +
> >>> + bm = list_first_entry(&corosync_block_list,
> >>> + typeof(*bm), list);
> >>> + list_del(&bm->list);
> >>> +
> >>> + bm->work.fn = corosync_block;
> >>> + bm->work.done = corosync_block_done;
> >>> + queue_work(corosync_block_wq,&bm->work);
> >>> +
> >>> + cevent->callbacked = 1;
> >>> + }
> >>> +
> >> is cevent->callbacked really needed? How about a static local one?
> > This variable is needed to ensure that block_cb() is called only once.
> > How to set zero to cevent->callbacked if we make it a static local
> > variable?
> >
> Hmm, seems this is the simplest method.
>
> >>> + /* block the rest messages until unblock message comes */
> >>> + goto out;
> >>> + }
> >>> +
> >>> corosync_handlers.notify_handler(&cevent->sender,
> >>> cevent->msg,
> >>> cevent->msg_len);
> >>> @@ -119,6 +228,8 @@ static void __corosync_dispatch(void)
> >>> list_del(&cevent->list);
> >>> free(cevent);
> >>> }
> >>> +out:
> >>> + return;
> >>> }
> >>>
> >>> static void cdrv_cpg_deliver(cpg_handle_t handle,
> >>> @@ -127,21 +238,49 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
> >>> void *msg, size_t msg_len)
> >>> {
> >>> struct corosync_event *cevent;
> >>> + uint64_t type;
> >>> + struct sheepid sender;
> >>>
> >>> - cevent = zalloc(sizeof(*cevent));
> >>> - if (!cevent)
> >>> - panic("oom\n");
> >>> - cevent->msg = zalloc(msg_len);
> >>> - if (!cevent->msg)
> >>> - panic("oom\n");
> >>> + nodeid_to_addr(nodeid, sender.addr);
> >>> + sender.pid = pid;
> >>>
> >>> - cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
> >>> - nodeid_to_addr(nodeid, cevent->sender.addr);
> >>> - cevent->sender.pid = pid;
> >>> - memcpy(cevent->msg, msg, msg_len);
> >>> - cevent->msg_len = msg_len;
> >>> + memcpy(&type, msg, sizeof(type));
> >>> + msg = (uint8_t *)msg + sizeof(type);
> >>> + msg_len -= sizeof(type);
> >>>
> >>> - list_add_tail(&cevent->list,&corosync_event_list);
> >>> + switch (type) {
> >>> + case COROSYNC_MSG_TYPE_BLOCK:
> >>> + case COROSYNC_MSG_TYPE_NOTIFY:
> >>> + cevent = zalloc(sizeof(*cevent));
> >>> + if (!cevent)
> >>> + panic("oom\n");
> >>> + cevent->msg = zalloc(msg_len);
> >>> + if (!cevent->msg)
> >>> + panic("oom\n");
> >>> +
> >>> + cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
> >>> + cevent->sender = sender;
> >>> + memcpy(cevent->msg, msg, msg_len);
> >>> + cevent->msg_len = msg_len;
> >>> + if (type == COROSYNC_MSG_TYPE_BLOCK)
> >>> + cevent->blocked = 1;
> >> move cevent->blocked = 1; into case COROSYNC_MSG_TYPE_BLOCK would avoid
> >> if check.
> > But cevent is allocated in this case block.
> >
> So can we move cevent allocation outside switch clause?
Hmm, yes, though we need to free it when we receive
COROSYNC_MSG_TYPE_UNBLOCK.
I'll send a v3 patchset later.
Thanks,
Kazutaka
More information about the sheepdog
mailing list