[Sheepdog] [PATCH v3 2/3] cluster: add blocking mechanism to notification

Yibin Shen zituan at taobao.com
Tue Oct 11 06:03:10 CEST 2011



On Tue, Oct 11, 2011 at 2:06 AM, MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp<mailto:morita.kazutaka at lab.ntt.co.jp>> wrote:
Currently Sheepdog vdi operations (create/delete/lookup/...) are
processed in two phase multicasting:

 1. multicasts a vdi request
 2. only the master node handles the request and multicasts the
   response

During this two phase, we cannot allow any other vdi operations and
membership changes, and this makes sheep/group.c a bit hard to read.

This patch simplifies this by adding a blocking callback to the
notification function in the cluster driver.  If the caller of
cdrv->notify() sets 'block_cb' as an argument, block_cb() is called
from the cluster driver before the message is notified to any node.
All the cluster events are blocked in every nodes until the caller
finishes the vdi operation in block_cb().

With this change, the master node is no longer in charge of vdi
operations, but this is a good change to make Sheepdog more symmetric.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp<mailto:morita.kazutaka at lab.ntt.co.jp>>
---
 sheep/cluster.h          |    9 ++-
 sheep/cluster/corosync.c |  208 ++++++++++++++++++++++++++++++++++++++++------
 sheep/group.c            |   12 ++--
 3 files changed, 194 insertions(+), 35 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 43f4575..89d0566 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -71,11 +71,16 @@ struct cluster_driver {
        *
        * This function sends 'msg' to all the nodes.  The notified
        * messages can be read through notify_handler() in
-        * cdrv_handlers.
+        * cdrv_handlers.  If 'block_cb' is specified, block_cb() is
+        * called before 'msg' is notified to all the nodes.  All the
+        * cluster events including this notification are blocked
+        * until block_cb() returns or this blocking node leaves the
+        * cluster.  The sheep daemon can sleep in block_cb(), so this
+        * callback must be not called from the dispatch (main) thread.
        *
        * Returns zero on success, -1 on error
        */
-       int (*notify)(void *msg, size_t msg_len);
+       int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));

       /*
        * Dispatch handlers
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index ff52525..5d7b412 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -14,22 +14,35 @@
 #include <corosync/cfg.h>

 #include "cluster.h"
+#include "work.h"

 static cpg_handle_t cpg_handle;
 static struct cpg_name cpg_group = { 9, "sheepdog" };

 static corosync_cfg_handle_t cfg_handle;
+static struct sheepid this_sheepid;
+
+static struct work_queue *corosync_block_wq;

 static struct cdrv_handlers corosync_handlers;

 static LIST_HEAD(corosync_event_list);
+static LIST_HEAD(corosync_block_list);

+/* event types which are dispatched in corosync_dispatch() */
 enum corosync_event_type {
       COROSYNC_EVENT_TYPE_JOIN,
       COROSYNC_EVENT_TYPE_LEAVE,
       COROSYNC_EVENT_TYPE_NOTIFY,
 };

+/* multicast message type */
+enum corosync_message_type {
+       COROSYNC_MSG_TYPE_NOTIFY,
+       COROSYNC_MSG_TYPE_BLOCK,
+       COROSYNC_MSG_TYPE_UNBLOCK,
+};
+
 struct corosync_event {
       enum corosync_event_type type;

@@ -42,6 +55,18 @@ struct corosync_event {
       void *msg;
       size_t msg_len;

+       int blocked;
+       int callbacked;
+
+       struct list_head list;
+};
+
+struct corosync_block_msg {
+       void *msg;
+       size_t msg_len;
+       void (*cb)(void *arg);
+
+       struct work work;
       struct list_head list;
 };

@@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
       }
 }

+static int send_message(uint64_t type, void *msg, size_t msg_len)
+{
+       struct iovec iov[2];
+       int ret, iov_cnt = 1;
+
+       iov[0].iov_base = &type;
+       iov[0].iov_len = sizeof(type);
+       if (msg) {
+               iov[1].iov_base = msg;
+               iov[1].iov_len = msg_len;
+               iov_cnt++;
+       }
+retry:
+       ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt);
+       switch (ret) {
+       case CPG_OK:
+               break;
+       case CPG_ERR_TRY_AGAIN:
+               dprintf("failed to send message. try again\n");
+               sleep(1);
+               goto retry;
+       default:
+               eprintf("failed to send message, %d\n", ret);
+               return -1;
+       }
+       return 0;
+}
+
+static void corosync_block(struct work *work, int idx)
+{
+       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
+
+       bm->cb(bm->msg);
+}
+
+static void corosync_block_done(struct work *work, int idx)
+{
+       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
+
+       send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);
+
+       free(bm->msg);
+       free(bm);
+}
+
+static struct corosync_event *find_block_event(struct sheepid *sender)
+{
+       struct corosync_event *cevent;
+
+       list_for_each_entry(cevent, &corosync_event_list, list) {
+               if (!cevent->blocked)
+                       continue;
+
+               if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY &&
+                   sheepid_cmp(&cevent->sender, sender) == 0)
+                       return cevent;
+       }
+
+       return NULL;
+}
+
 static void __corosync_dispatch(void)
 {
       struct corosync_event *cevent;
+       struct corosync_block_msg *bm;

       while (!list_empty(&corosync_event_list)) {
               cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
@@ -110,6 +197,28 @@ static void __corosync_dispatch(void)
                                                       cevent->nr_members);
                       break;
               case COROSYNC_EVENT_TYPE_NOTIFY:
+                       if (cevent->blocked) {
+                               if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 &&
+                                   !cevent->callbacked) {
+                                       /* call a block callback function from a worker thread */
+                                       if (list_empty(&corosync_block_list))
+                                               panic("cannot call block callback\n");
+
+                                       bm = list_first_entry(&corosync_block_list,
+                                                             typeof(*bm), list);
+                                       list_del(&bm->list);
+
+                                       bm->work.fn = corosync_block;
+                                       bm->work.done = corosync_block_done;
+                                       queue_work(corosync_block_wq, &bm->work);
+
+                                       cevent->callbacked = 1;
+                               }
+
+                               /* block the rest messages until unblock message comes */
+                               goto out;
+                       }
+
                       corosync_handlers.notify_handler(&cevent->sender,
                                                        cevent->msg,
                                                        cevent->msg_len);
@@ -119,6 +228,8 @@ static void __corosync_dispatch(void)
               list_del(&cevent->list);
               free(cevent);
       }
+out:
+       return;
 }

 static void cdrv_cpg_deliver(cpg_handle_t handle,
@@ -127,21 +238,52 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
                            void *msg, size_t msg_len)
 {
       struct corosync_event *cevent;
+       uint64_t type;
+       struct sheepid sender;
+
+       nodeid_to_addr(nodeid, sender.addr);
+       sender.pid = pid;
+
+       memcpy(&type, msg, sizeof(type));
+       msg = (uint8_t *)msg + sizeof(type);
+       msg_len -= sizeof(type);

       cevent = zalloc(sizeof(*cevent));
       if (!cevent)
               panic("oom\n");
-       cevent->msg = zalloc(msg_len);
-       if (!cevent->msg)
-               panic("oom\n");

-       cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
-       nodeid_to_addr(nodeid, cevent->sender.addr);
-       cevent->sender.pid = pid;
-       memcpy(cevent->msg, msg, msg_len);
-       cevent->msg_len = msg_len;
+       switch (type) {
+       case COROSYNC_MSG_TYPE_BLOCK:
+               cevent->blocked = 1;
+               /* fall through */
+       case COROSYNC_MSG_TYPE_NOTIFY:
+               cevent->msg = zalloc(msg_len);
+               if (!cevent->msg)
+                       panic("oom\n");

-       list_add_tail(&cevent->list, &corosync_event_list);
+               cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
+               cevent->sender = sender;
+               memcpy(cevent->msg, msg, msg_len);
if we doesn't set message body , such as send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0) in corosync_notify(),
'msg' will point to a unknown address , so I think we should init it before insert it into the event_list,
+               cevent->msg_len = msg_len;
+
+               list_add_tail(&cevent->list, &corosync_event_list);
+               break;
+       case COROSYNC_MSG_TYPE_UNBLOCK:
+               free(cevent); /* we don't add a new cluster event in this case */
+
+               cevent = find_block_event(&sender);
+               if (!cevent)
+                       /* block message was casted before this node joins */
+                       break;
+
+               cevent->blocked = 0;
+               cevent->msg = realloc(cevent->msg, msg_len);
+               if (!cevent->msg)
+                       panic("oom\n");
+               memcpy(cevent->msg, msg, msg_len);
+               cevent->msg_len = msg_len;
+               break;
+       }

       __corosync_dispatch();
 }
@@ -177,6 +319,13 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,

       /* dispatch leave_handler */
       for (i = 0; i < left_list_entries; i++) {
+               cevent = find_block_event(left_sheeps + i);
+               if (cevent) {
+                       /* the node left before sending UNBLOCK */
+                       list_del(&cevent->list);
+                       free(cevent);
+               }
+
               cevent = zalloc(sizeof(*cevent));
               if (!cevent)
                       panic("oom\n");
@@ -251,6 +400,7 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
       }

       myid->pid = getpid();
+       this_sheepid = *myid;

       ret = cpg_fd_get(cpg_handle, &fd);
       if (ret != CPG_OK) {
@@ -258,6 +408,8 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
               return -1;
       }

+       corosync_block_wq = init_work_queue(1);
+
       return fd;
 }

@@ -297,27 +449,29 @@ static int corosync_leave(void)
       return 0;
 }

-static int corosync_notify(void *msg, size_t msg_len)
+static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))
 {
-       struct iovec iov;
       int ret;
+       struct corosync_block_msg *bm;

-       iov.iov_base = msg;
-       iov.iov_len = msg_len;
-retry:
-       ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
-       switch (ret) {
-       case CPG_OK:
-               break;
-       case CPG_ERR_TRY_AGAIN:
-               dprintf("failed to send message. try again\n");
-               sleep(1);
-               goto retry;
-       default:
-               eprintf("failed to send message, %d\n", ret);
-               return -1;
-       }
-       return 0;
+       if (block_cb) {
+               bm = zalloc(sizeof(*bm));
+               if (!bm)
+                       panic("oom\n");
+               bm->msg = zalloc(msg_len);
+               if (!bm->msg)
+                       panic("oom\n");
+
+               memcpy(bm->msg, msg, msg_len);
+               bm->msg_len = msg_len;
+               bm->cb = block_cb;
+               list_add_tail(&bm->list, &corosync_block_list);
+
+               ret = send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0);
+       } else
+               ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, msg, msg_len);
+
+       return ret;
 }

 static int corosync_dispatch(void)
diff --git a/sheep/group.c b/sheep/group.c
index f6743f5..a25f8bf 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -366,7 +366,7 @@ forward:

       list_add(&req->pending_list, &sys->pending_list);

-       sys->cdrv->notify(msg, msg->header.msg_length);
+       sys->cdrv->notify(msg, msg->header.msg_length, NULL);

       free(msg);
 }
@@ -1062,7 +1062,7 @@ static int tx_mastership(void)
       msg.header.from = sys->this_node;
       msg.header.sheepid = sys->this_sheepid;

-       return sys->cdrv->notify(&msg, msg.header.msg_length);
+       return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
 }

 static void send_join_response(struct work_notify *w)
@@ -1094,7 +1094,7 @@ static void send_join_response(struct work_notify *w)
               exit(1);
       }
       jm->epoch = sys->epoch;
-       sys->cdrv->notify(m, m->msg_length);
+       sys->cdrv->notify(m, m->msg_length, NULL);
 }

 static void __sd_notify_done(struct cpg_event *cevent)
@@ -1173,7 +1173,7 @@ static void __sd_notify_done(struct cpg_event *cevent)
                       break;
               case SD_MSG_VDI_OP:
                       m->state = DM_FIN;
-                       sys->cdrv->notify(m, m->msg_length);
+                       sys->cdrv->notify(m, m->msg_length, NULL);
                       break;
               default:
                       eprintf("unknown message %d\n", m->op);
@@ -1347,7 +1347,7 @@ static void send_join_request(struct sheepid *id)
                       msg.nodes[i].ent = entries[i];
       }

-       sys->cdrv->notify(&msg, msg.header.msg_length);
+       sys->cdrv->notify(&msg, msg.header.msg_length, NULL);

       vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
 }
@@ -1965,5 +1965,5 @@ int leave_cluster(void)
       msg.epoch = get_latest_epoch();

       dprintf("%d\n", msg.epoch);
-       return sys->cdrv->notify(&msg, msg.header.msg_length);
+       return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
 }
--
1.7.2.5

--
sheepdog mailing list
sheepdog at lists.wpkg.org<mailto:sheepdog at lists.wpkg.org>
http://lists.wpkg.org/mailman/listinfo/sheepdog


________________________________

This email (including any attachments) is confidential and may be legally privileged. If you received this email in error, please delete it immediately and do not copy it or use it for any purpose or disclose its contents to any other person. Thank you.

本电邮(包括任何附件)可能含有机密资料并受法律保护。如您不是正确的收件人,请您立即删除本邮件。请不要将本电邮进行复制并用作任何其他用途、或透露本邮件之内容。谢谢。
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.wpkg.org/pipermail/sheepdog/attachments/20111011/ea884d50/attachment-0003.html>


More information about the sheepdog mailing list