[Sheepdog] [PATCH v3 2/3] cluster: add blocking mechanism to notification

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Tue Oct 11 06:36:07 CEST 2011


At Tue, 11 Oct 2011 12:03:10 +0800,
Yibin Shen wrote:
> 
> On Tue, Oct 11, 2011 at 2:06 AM, MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp<mailto:morita.kazutaka at lab.ntt.co.jp>> wrote:
> Currently Sheepdog vdi operations (create/delete/lookup/...) are
> processed in two phase multicasting:
> 
>  1. multicasts a vdi request
>  2. only the master node handles the request and multicasts the
>    response
> 
> During this two phase, we cannot allow any other vdi operations and
> membership changes, and this makes sheep/group.c a bit hard to read.
> 
> This patch simplifies this by adding a blocking callback to the
> notification function in the cluster driver.  If the caller of
> cdrv->notify() sets 'block_cb' as an argument, block_cb() is called
> from the cluster driver before the message is notified to any node.
> All the cluster events are blocked in every nodes until the caller
> finishes the vdi operation in block_cb().
> 
> With this change, the master node is no longer in charge of vdi
> operations, but this is a good change to make Sheepdog more symmetric.
> 
> Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp<mailto:morita.kazutaka at lab.ntt.co.jp>>
> ---
>  sheep/cluster.h          |    9 ++-
>  sheep/cluster/corosync.c |  208 ++++++++++++++++++++++++++++++++++++++++------
>  sheep/group.c            |   12 ++--
>  3 files changed, 194 insertions(+), 35 deletions(-)
> 
> diff --git a/sheep/cluster.h b/sheep/cluster.h
> index 43f4575..89d0566 100644
> --- a/sheep/cluster.h
> +++ b/sheep/cluster.h
> @@ -71,11 +71,16 @@ struct cluster_driver {
>         *
>         * This function sends 'msg' to all the nodes.  The notified
>         * messages can be read through notify_handler() in
> -        * cdrv_handlers.
> +        * cdrv_handlers.  If 'block_cb' is specified, block_cb() is
> +        * called before 'msg' is notified to all the nodes.  All the
> +        * cluster events including this notification are blocked
> +        * until block_cb() returns or this blocking node leaves the
> +        * cluster.  The sheep daemon can sleep in block_cb(), so this
> +        * callback must be not called from the dispatch (main) thread.
>         *
>         * Returns zero on success, -1 on error
>         */
> -       int (*notify)(void *msg, size_t msg_len);
> +       int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
> 
>        /*
>         * Dispatch handlers
> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> index ff52525..5d7b412 100644
> --- a/sheep/cluster/corosync.c
> +++ b/sheep/cluster/corosync.c
> @@ -14,22 +14,35 @@
>  #include <corosync/cfg.h>
> 
>  #include "cluster.h"
> +#include "work.h"
> 
>  static cpg_handle_t cpg_handle;
>  static struct cpg_name cpg_group = { 9, "sheepdog" };
> 
>  static corosync_cfg_handle_t cfg_handle;
> +static struct sheepid this_sheepid;
> +
> +static struct work_queue *corosync_block_wq;
> 
>  static struct cdrv_handlers corosync_handlers;
> 
>  static LIST_HEAD(corosync_event_list);
> +static LIST_HEAD(corosync_block_list);
> 
> +/* event types which are dispatched in corosync_dispatch() */
>  enum corosync_event_type {
>        COROSYNC_EVENT_TYPE_JOIN,
>        COROSYNC_EVENT_TYPE_LEAVE,
>        COROSYNC_EVENT_TYPE_NOTIFY,
>  };
> 
> +/* multicast message type */
> +enum corosync_message_type {
> +       COROSYNC_MSG_TYPE_NOTIFY,
> +       COROSYNC_MSG_TYPE_BLOCK,
> +       COROSYNC_MSG_TYPE_UNBLOCK,
> +};
> +
>  struct corosync_event {
>        enum corosync_event_type type;
> 
> @@ -42,6 +55,18 @@ struct corosync_event {
>        void *msg;
>        size_t msg_len;
> 
> +       int blocked;
> +       int callbacked;
> +
> +       struct list_head list;
> +};
> +
> +struct corosync_block_msg {
> +       void *msg;
> +       size_t msg_len;
> +       void (*cb)(void *arg);
> +
> +       struct work work;
>        struct list_head list;
>  };
> 
> @@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
>        }
>  }
> 
> +static int send_message(uint64_t type, void *msg, size_t msg_len)
> +{
> +       struct iovec iov[2];
> +       int ret, iov_cnt = 1;
> +
> +       iov[0].iov_base = &type;
> +       iov[0].iov_len = sizeof(type);
> +       if (msg) {
> +               iov[1].iov_base = msg;
> +               iov[1].iov_len = msg_len;
> +               iov_cnt++;
> +       }
> +retry:
> +       ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt);
> +       switch (ret) {
> +       case CPG_OK:
> +               break;
> +       case CPG_ERR_TRY_AGAIN:
> +               dprintf("failed to send message. try again\n");
> +               sleep(1);
> +               goto retry;
> +       default:
> +               eprintf("failed to send message, %d\n", ret);
> +               return -1;
> +       }
> +       return 0;
> +}
> +
> +static void corosync_block(struct work *work, int idx)
> +{
> +       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> +
> +       bm->cb(bm->msg);
> +}
> +
> +static void corosync_block_done(struct work *work, int idx)
> +{
> +       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> +
> +       send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);
> +
> +       free(bm->msg);
> +       free(bm);
> +}
> +
> +static struct corosync_event *find_block_event(struct sheepid *sender)
> +{
> +       struct corosync_event *cevent;
> +
> +       list_for_each_entry(cevent, &corosync_event_list, list) {
> +               if (!cevent->blocked)
> +                       continue;
> +
> +               if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY &&
> +                   sheepid_cmp(&cevent->sender, sender) == 0)
> +                       return cevent;
> +       }
> +
> +       return NULL;
> +}
> +
>  static void __corosync_dispatch(void)
>  {
>        struct corosync_event *cevent;
> +       struct corosync_block_msg *bm;
> 
>        while (!list_empty(&corosync_event_list)) {
>                cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
> @@ -110,6 +197,28 @@ static void __corosync_dispatch(void)
>                                                        cevent->nr_members);
>                        break;
>                case COROSYNC_EVENT_TYPE_NOTIFY:
> +                       if (cevent->blocked) {
> +                               if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 &&
> +                                   !cevent->callbacked) {
> +                                       /* call a block callback function from a worker thread */
> +                                       if (list_empty(&corosync_block_list))
> +                                               panic("cannot call block callback\n");
> +
> +                                       bm = list_first_entry(&corosync_block_list,
> +                                                             typeof(*bm), list);
> +                                       list_del(&bm->list);
> +
> +                                       bm->work.fn = corosync_block;
> +                                       bm->work.done = corosync_block_done;
> +                                       queue_work(corosync_block_wq, &bm->work);
> +
> +                                       cevent->callbacked = 1;
> +                               }
> +
> +                               /* block the rest messages until unblock message comes */
> +                               goto out;
> +                       }
> +
>                        corosync_handlers.notify_handler(&cevent->sender,
>                                                         cevent->msg,
>                                                         cevent->msg_len);
> @@ -119,6 +228,8 @@ static void __corosync_dispatch(void)
>                list_del(&cevent->list);
>                free(cevent);
>        }
> +out:
> +       return;
>  }
> 
>  static void cdrv_cpg_deliver(cpg_handle_t handle,
> @@ -127,21 +238,52 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
>                             void *msg, size_t msg_len)
>  {
>        struct corosync_event *cevent;
> +       uint64_t type;
> +       struct sheepid sender;
> +
> +       nodeid_to_addr(nodeid, sender.addr);
> +       sender.pid = pid;
> +
> +       memcpy(&type, msg, sizeof(type));
> +       msg = (uint8_t *)msg + sizeof(type);
> +       msg_len -= sizeof(type);
> 
>        cevent = zalloc(sizeof(*cevent));
>        if (!cevent)
>                panic("oom\n");
> -       cevent->msg = zalloc(msg_len);
> -       if (!cevent->msg)
> -               panic("oom\n");
> 
> -       cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
> -       nodeid_to_addr(nodeid, cevent->sender.addr);
> -       cevent->sender.pid = pid;
> -       memcpy(cevent->msg, msg, msg_len);
> -       cevent->msg_len = msg_len;
> +       switch (type) {
> +       case COROSYNC_MSG_TYPE_BLOCK:
> +               cevent->blocked = 1;
> +               /* fall through */
> +       case COROSYNC_MSG_TYPE_NOTIFY:
> +               cevent->msg = zalloc(msg_len);
> +               if (!cevent->msg)
> +                       panic("oom\n");
> 
> -       list_add_tail(&cevent->list, &corosync_event_list);
> +               cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
> +               cevent->sender = sender;
> +               memcpy(cevent->msg, msg, msg_len);
> if we doesn't set message body , such as send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0) in corosync_notify(),
> 'msg' will point to a unknown address , so I think we should init it before insert it into the event_list,

Thanks, will fix it.

Kazutaka



More information about the sheepdog mailing list