[Sheepdog] [PATCH v4 2/3] cluster: add blocking mechanism to notification

Yibin Shen zituan at taobao.com
Tue Oct 11 08:50:22 CEST 2011


On Tue, Oct 11, 2011 at 2:02 PM, MORITA Kazutaka
<morita.kazutaka at lab.ntt.co.jp> wrote:
> Currently Sheepdog vdi operations (create/delete/lookup/...) are
> processed in two phase multicasting:
>
>  1. multicasts a vdi request
>  2. only the master node handles the request and multicasts the
>    response
>
> During this two phase, we cannot allow any other vdi operations and
> membership changes, and this makes sheep/group.c a bit hard to read.
>
> This patch simplifies this by adding a blocking callback to the
> notification function in the cluster driver.  If the caller of
> cdrv->notify() sets 'block_cb' as an argument, block_cb() is called
> from the cluster driver before the message is notified to any node.
> All the cluster events are blocked in every nodes until the caller
> finishes the vdi operation in block_cb().
>
> With this change, the master node is no longer in charge of vdi
> operations, but this is a good change to make Sheepdog more symmetric.
>
> Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
> ---
>  sheep/cluster.h          |    9 ++-
>  sheep/cluster/corosync.c |  215 ++++++++++++++++++++++++++++++++++++++++------
>  sheep/group.c            |   12 ++--
>  3 files changed, 201 insertions(+), 35 deletions(-)
>
> diff --git a/sheep/cluster.h b/sheep/cluster.h
> index 43f4575..89d0566 100644
> --- a/sheep/cluster.h
> +++ b/sheep/cluster.h
> @@ -71,11 +71,16 @@ struct cluster_driver {
>         *
>         * This function sends 'msg' to all the nodes.  The notified
>         * messages can be read through notify_handler() in
> -        * cdrv_handlers.
> +        * cdrv_handlers.  If 'block_cb' is specified, block_cb() is
> +        * called before 'msg' is notified to all the nodes.  All the
> +        * cluster events including this notification are blocked
> +        * until block_cb() returns or this blocking node leaves the
> +        * cluster.  The sheep daemon can sleep in block_cb(), so this
> +        * callback must be not called from the dispatch (main) thread.
>         *
>         * Returns zero on success, -1 on error
>         */
> -       int (*notify)(void *msg, size_t msg_len);
> +       int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
>
>        /*
>         * Dispatch handlers
> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> index ff52525..d7f7eab 100644
> --- a/sheep/cluster/corosync.c
> +++ b/sheep/cluster/corosync.c
> @@ -14,22 +14,35 @@
>  #include <corosync/cfg.h>
>
>  #include "cluster.h"
> +#include "work.h"
>
>  static cpg_handle_t cpg_handle;
>  static struct cpg_name cpg_group = { 9, "sheepdog" };
>
>  static corosync_cfg_handle_t cfg_handle;
> +static struct sheepid this_sheepid;
> +
> +static struct work_queue *corosync_block_wq;
>
>  static struct cdrv_handlers corosync_handlers;
>
>  static LIST_HEAD(corosync_event_list);
> +static LIST_HEAD(corosync_block_list);
>
> +/* event types which are dispatched in corosync_dispatch() */
>  enum corosync_event_type {
>        COROSYNC_EVENT_TYPE_JOIN,
>        COROSYNC_EVENT_TYPE_LEAVE,
>        COROSYNC_EVENT_TYPE_NOTIFY,
>  };
>
> +/* multicast message type */
> +enum corosync_message_type {
> +       COROSYNC_MSG_TYPE_NOTIFY,
> +       COROSYNC_MSG_TYPE_BLOCK,
> +       COROSYNC_MSG_TYPE_UNBLOCK,
> +};
> +
>  struct corosync_event {
>        enum corosync_event_type type;
>
> @@ -42,6 +55,18 @@ struct corosync_event {
>        void *msg;
>        size_t msg_len;
>
> +       int blocked;
> +       int callbacked;
> +
> +       struct list_head list;
> +};
> +
> +struct corosync_block_msg {
> +       void *msg;
> +       size_t msg_len;
> +       void (*cb)(void *arg);
> +
> +       struct work work;
>        struct list_head list;
>  };
>
> @@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
>        }
>  }
>
> +static int send_message(uint64_t type, void *msg, size_t msg_len)
> +{
> +       struct iovec iov[2];
> +       int ret, iov_cnt = 1;
> +
> +       iov[0].iov_base = &type;
> +       iov[0].iov_len = sizeof(type);
> +       if (msg) {
> +               iov[1].iov_base = msg;
> +               iov[1].iov_len = msg_len;
> +               iov_cnt++;
> +       }
> +retry:
> +       ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt);
> +       switch (ret) {
> +       case CPG_OK:
> +               break;
> +       case CPG_ERR_TRY_AGAIN:
> +               dprintf("failed to send message. try again\n");
> +               sleep(1);
> +               goto retry;
> +       default:
> +               eprintf("failed to send message, %d\n", ret);
> +               return -1;
> +       }
> +       return 0;
> +}
> +
> +static void corosync_block(struct work *work, int idx)
> +{
> +       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> +
> +       bm->cb(bm->msg);
> +}
> +
> +static void corosync_block_done(struct work *work, int idx)
> +{
> +       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> +
> +       send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);
> +
> +       free(bm->msg);
> +       free(bm);
> +}
> +
> +static struct corosync_event *find_block_event(struct sheepid *sender)
> +{
> +       struct corosync_event *cevent;
> +
> +       list_for_each_entry(cevent, &corosync_event_list, list) {
> +               if (!cevent->blocked)
> +                       continue;
> +
> +               if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY &&
> +                   sheepid_cmp(&cevent->sender, sender) == 0)
> +                       return cevent;
> +       }
> +
> +       return NULL;
> +}
> +
>  static void __corosync_dispatch(void)
>  {
>        struct corosync_event *cevent;
> +       struct corosync_block_msg *bm;
>
>        while (!list_empty(&corosync_event_list)) {
>                cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
> @@ -110,6 +197,28 @@ static void __corosync_dispatch(void)
>                                                        cevent->nr_members);
>                        break;
>                case COROSYNC_EVENT_TYPE_NOTIFY:
> +                       if (cevent->blocked) {
> +                               if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 &&
> +                                   !cevent->callbacked) {
> +                                       /* call a block callback function from a worker thread */
> +                                       if (list_empty(&corosync_block_list))
> +                                               panic("cannot call block callback\n");
> +
> +                                       bm = list_first_entry(&corosync_block_list,
> +                                                             typeof(*bm), list);
> +                                       list_del(&bm->list);
> +
> +                                       bm->work.fn = corosync_block;
> +                                       bm->work.done = corosync_block_done;
> +                                       queue_work(corosync_block_wq, &bm->work);
> +
> +                                       cevent->callbacked = 1;
> +                               }
> +
> +                               /* block the rest messages until unblock message comes */
> +                               goto out;
> +                       }
> +
>                        corosync_handlers.notify_handler(&cevent->sender,
>                                                         cevent->msg,
>                                                         cevent->msg_len);
> @@ -119,6 +228,8 @@ static void __corosync_dispatch(void)
>                list_del(&cevent->list);
>                free(cevent);
>        }
> +out:
> +       return;
>  }
>
>  static void cdrv_cpg_deliver(cpg_handle_t handle,
> @@ -127,21 +238,59 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
>                             void *msg, size_t msg_len)
>  {
>        struct corosync_event *cevent;
> +       uint64_t type;
> +       struct sheepid sender;
> +
> +       nodeid_to_addr(nodeid, sender.addr);
> +       sender.pid = pid;
> +
> +       memcpy(&type, msg, sizeof(type));
> +       msg = (uint8_t *)msg + sizeof(type);
> +       msg_len -= sizeof(type);
>
>        cevent = zalloc(sizeof(*cevent));
>        if (!cevent)
>                panic("oom\n");
> -       cevent->msg = zalloc(msg_len);
> -       if (!cevent->msg)
> -               panic("oom\n");
>
> -       cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
> -       nodeid_to_addr(nodeid, cevent->sender.addr);
> -       cevent->sender.pid = pid;
> -       memcpy(cevent->msg, msg, msg_len);
> -       cevent->msg_len = msg_len;
> +       switch (type) {
> +       case COROSYNC_MSG_TYPE_BLOCK:
> +               cevent->blocked = 1;
> +               /* fall through */
> +       case COROSYNC_MSG_TYPE_NOTIFY:
> +               cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
> +               cevent->sender = sender;
> +               cevent->msg_len = msg_len;
> +               if (msg_len) {
> +                       cevent->msg = zalloc(msg_len);
> +                       if (!cevent->msg)
> +                               panic("oom\n");
> +                       memcpy(cevent->msg, msg, msg_len);
> +               } else
> +                       cevent->msg = NULL;
>
> -       list_add_tail(&cevent->list, &corosync_event_list);
> +               list_add_tail(&cevent->list, &corosync_event_list);
> +               break;
> +       case COROSYNC_MSG_TYPE_UNBLOCK:
> +               free(cevent); /* we don't add a new cluster event in this case */
> +
> +               cevent = find_block_event(&sender);
> +               if (!cevent)
> +                       /* block message was casted before this node joins */
> +                       break;
> +
> +               cevent->blocked = 0;
> +               cevent->msg_len = msg_len;
> +               if (msg_len) {
> +                       cevent->msg = realloc(cevent->msg, msg_len);
> +                       if (!cevent->msg)
> +                               panic("oom\n");
> +                       memcpy(cevent->msg, msg, msg_len);
> +               } else {
> +                       free(cevent->msg);
> +                       cevent->msg = NULL;
'cevent->msg' may be referenced in follow-up code after dispatch ,
such as in sd_notify_handler(),
so I think we need do a NULL pointer check in such path.
> +               }
> +               break;
> +       }
>
>        __corosync_dispatch();
>  }
> @@ -177,6 +326,13 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
>
>        /* dispatch leave_handler */
>        for (i = 0; i < left_list_entries; i++) {
> +               cevent = find_block_event(left_sheeps + i);
> +               if (cevent) {
> +                       /* the node left before sending UNBLOCK */
> +                       list_del(&cevent->list);
> +                       free(cevent);
> +               }
> +
>                cevent = zalloc(sizeof(*cevent));
>                if (!cevent)
>                        panic("oom\n");
> @@ -251,6 +407,7 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
>        }
>
>        myid->pid = getpid();
> +       this_sheepid = *myid;
>
>        ret = cpg_fd_get(cpg_handle, &fd);
>        if (ret != CPG_OK) {
> @@ -258,6 +415,8 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
>                return -1;
>        }
>
> +       corosync_block_wq = init_work_queue(1);
> +
>        return fd;
>  }
>
> @@ -297,27 +456,29 @@ static int corosync_leave(void)
>        return 0;
>  }
>
> -static int corosync_notify(void *msg, size_t msg_len)
> +static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))
>  {
> -       struct iovec iov;
>        int ret;
> +       struct corosync_block_msg *bm;
>
> -       iov.iov_base = msg;
> -       iov.iov_len = msg_len;
> -retry:
> -       ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
> -       switch (ret) {
> -       case CPG_OK:
> -               break;
> -       case CPG_ERR_TRY_AGAIN:
> -               dprintf("failed to send message. try again\n");
> -               sleep(1);
> -               goto retry;
> -       default:
> -               eprintf("failed to send message, %d\n", ret);
> -               return -1;
> -       }
> -       return 0;
> +       if (block_cb) {
> +               bm = zalloc(sizeof(*bm));
> +               if (!bm)
> +                       panic("oom\n");
> +               bm->msg = zalloc(msg_len);
> +               if (!bm->msg)
> +                       panic("oom\n");
> +
> +               memcpy(bm->msg, msg, msg_len);
> +               bm->msg_len = msg_len;
> +               bm->cb = block_cb;
> +               list_add_tail(&bm->list, &corosync_block_list);
> +
> +               ret = send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0);
> +       } else
> +               ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, msg, msg_len);
> +
> +       return ret;
>  }
>
>  static int corosync_dispatch(void)
> diff --git a/sheep/group.c b/sheep/group.c
> index 0a6d0b0..946128c 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -366,7 +366,7 @@ forward:
>
>        list_add(&req->pending_list, &sys->pending_list);
>
> -       sys->cdrv->notify(msg, msg->header.msg_length);
> +       sys->cdrv->notify(msg, msg->header.msg_length, NULL);
>
>        free(msg);
>  }
> @@ -1063,7 +1063,7 @@ static int tx_mastership(void)
>        msg.header.from = sys->this_node;
>        msg.header.sheepid = sys->this_sheepid;
>
> -       return sys->cdrv->notify(&msg, msg.header.msg_length);
> +       return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
>  }
>
>  static void send_join_response(struct work_notify *w)
> @@ -1095,7 +1095,7 @@ static void send_join_response(struct work_notify *w)
>                exit(1);
>        }
>        jm->epoch = sys->epoch;
> -       sys->cdrv->notify(m, m->msg_length);
> +       sys->cdrv->notify(m, m->msg_length, NULL);
>  }
>
>  static void __sd_notify_done(struct cpg_event *cevent)
> @@ -1174,7 +1174,7 @@ static void __sd_notify_done(struct cpg_event *cevent)
>                        break;
>                case SD_MSG_VDI_OP:
>                        m->state = DM_FIN;
> -                       sys->cdrv->notify(m, m->msg_length);
> +                       sys->cdrv->notify(m, m->msg_length, NULL);
>                        break;
>                default:
>                        eprintf("unknown message %d\n", m->op);
> @@ -1348,7 +1348,7 @@ static void send_join_request(struct sheepid *id)
>                        msg.nodes[i].ent = entries[i];
>        }
>
> -       sys->cdrv->notify(&msg, msg.header.msg_length);
> +       sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
>
>        vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
>  }
> @@ -1966,5 +1966,5 @@ int leave_cluster(void)
>        msg.epoch = get_latest_epoch();
>
>        dprintf("%d\n", msg.epoch);
> -       return sys->cdrv->notify(&msg, msg.header.msg_length);
> +       return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
>  }
> --
> 1.7.2.5
>
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog
>

________________________________

This email (including any attachments) is confidential and may be legally privileged. If you received this email in error, please delete it immediately and do not copy it or use it for any purpose or disclose its contents to any other person. Thank you.

本电邮(包括任何附件)可能含有机密资料并受法律保护。如您不是正确的收件人,请您立即删除本邮件。请不要将本电邮进行复制并用作任何其他用途、或透露本邮件之内容。谢谢。



More information about the sheepdog mailing list