<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
</head>
<body>
<br>
<br>
<div class="gmail_quote">On Tue, Oct 11, 2011 at 2:06 AM, MORITA Kazutaka <span dir="ltr">
<<a href="mailto:morita.kazutaka@lab.ntt.co.jp">morita.kazutaka@lab.ntt.co.jp</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex; border-left:1px #ccc solid; padding-left:1ex">
<div class="im">Currently Sheepdog vdi operations (create/delete/lookup/...) are<br>
processed in two phase multicasting:<br>
<br>
 1. multicasts a vdi request<br>
 2. only the master node handles the request and multicasts the<br>
   response<br>
<br>
During this two phase, we cannot allow any other vdi operations and<br>
membership changes, and this makes sheep/group.c a bit hard to read.<br>
<br>
This patch simplifies this by adding a blocking callback to the<br>
notification function in the cluster driver.  If the caller of<br>
cdrv->notify() sets 'block_cb' as an argument, block_cb() is called<br>
from the cluster driver before the message is notified to any node.<br>
All the cluster events are blocked in every nodes until the caller<br>
finishes the vdi operation in block_cb().<br>
<br>
With this change, the master node is no longer in charge of vdi<br>
operations, but this is a good change to make Sheepdog more symmetric.<br>
<br>
Signed-off-by: MORITA Kazutaka <<a href="mailto:morita.kazutaka@lab.ntt.co.jp">morita.kazutaka@lab.ntt.co.jp</a>><br>
---<br>
</div>
 sheep/cluster.h          |    9 ++-<br>
 sheep/cluster/corosync.c |  208 ++++++++++++++++++++++++++++++++++++++++------<br>
 sheep/group.c            |   12 ++--<br>
 3 files changed, 194 insertions(+), 35 deletions(-)<br>
<br>
diff --git a/sheep/cluster.h b/sheep/cluster.h<br>
index 43f4575..89d0566 100644<br>
--- a/sheep/cluster.h<br>
+++ b/sheep/cluster.h<br>
@@ -71,11 +71,16 @@ struct cluster_driver {<br>
<div class="im">        *<br>
        * This function sends 'msg' to all the nodes.  The notified<br>
        * messages can be read through notify_handler() in<br>
-        * cdrv_handlers.<br>
+        * cdrv_handlers.  If 'block_cb' is specified, block_cb() is<br>
+        * called before 'msg' is notified to all the nodes.  All the<br>
+        * cluster events including this notification are blocked<br>
+        * until block_cb() returns or this blocking node leaves the<br>
+        * cluster.  The sheep daemon can sleep in block_cb(), so this<br>
</div>
+        * callback must be not called from the dispatch (main) thread.<br>
<div class="im">        *<br>
        * Returns zero on success, -1 on error<br>
        */<br>
-       int (*notify)(void *msg, size_t msg_len);<br>
+       int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));<br>
<br>
       /*<br>
        * Dispatch handlers<br>
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c<br>
</div>
index ff52525..5d7b412 100644<br>
<div>
<div></div>
<div class="h5">--- a/sheep/cluster/corosync.c<br>
+++ b/sheep/cluster/corosync.c<br>
@@ -14,22 +14,35 @@<br>
 #include <corosync/cfg.h><br>
<br>
 #include "cluster.h"<br>
+#include "work.h"<br>
<br>
 static cpg_handle_t cpg_handle;<br>
 static struct cpg_name cpg_group = { 9, "sheepdog" };<br>
<br>
 static corosync_cfg_handle_t cfg_handle;<br>
+static struct sheepid this_sheepid;<br>
+<br>
+static struct work_queue *corosync_block_wq;<br>
<br>
 static struct cdrv_handlers corosync_handlers;<br>
<br>
 static LIST_HEAD(corosync_event_list);<br>
+static LIST_HEAD(corosync_block_list);<br>
<br>
+/* event types which are dispatched in corosync_dispatch() */<br>
 enum corosync_event_type {<br>
       COROSYNC_EVENT_TYPE_JOIN,<br>
       COROSYNC_EVENT_TYPE_LEAVE,<br>
       COROSYNC_EVENT_TYPE_NOTIFY,<br>
 };<br>
<br>
+/* multicast message type */<br>
+enum corosync_message_type {<br>
+       COROSYNC_MSG_TYPE_NOTIFY,<br>
+       COROSYNC_MSG_TYPE_BLOCK,<br>
+       COROSYNC_MSG_TYPE_UNBLOCK,<br>
+};<br>
+<br>
 struct corosync_event {<br>
       enum corosync_event_type type;<br>
<br>
@@ -42,6 +55,18 @@ struct corosync_event {<br>
       void *msg;<br>
       size_t msg_len;<br>
<br>
+       int blocked;<br>
+       int callbacked;<br>
+<br>
+       struct list_head list;<br>
+};<br>
+<br>
+struct corosync_block_msg {<br>
+       void *msg;<br>
+       size_t msg_len;<br>
+       void (*cb)(void *arg);<br>
+<br>
+       struct work work;<br>
       struct list_head list;<br>
 };<br>
<br>
@@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,<br>
       }<br>
 }<br>
<br>
+static int send_message(uint64_t type, void *msg, size_t msg_len)<br>
+{<br>
+       struct iovec iov[2];<br>
+       int ret, iov_cnt = 1;<br>
+<br>
+       iov[0].iov_base = &type;<br>
+       iov[0].iov_len = sizeof(type);<br>
+       if (msg) {<br>
+               iov[1].iov_base = msg;<br>
+               iov[1].iov_len = msg_len;<br>
+               iov_cnt++;<br>
+       }<br>
+retry:<br>
+       ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt);<br>
+       switch (ret) {<br>
+       case CPG_OK:<br>
+               break;<br>
+       case CPG_ERR_TRY_AGAIN:<br>
+               dprintf("failed to send message. try again\n");<br>
+               sleep(1);<br>
+               goto retry;<br>
+       default:<br>
+               eprintf("failed to send message, %d\n", ret);<br>
+               return -1;<br>
+       }<br>
+       return 0;<br>
+}<br>
+<br>
+static void corosync_block(struct work *work, int idx)<br>
+{<br>
+       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);<br>
+<br>
+       bm->cb(bm->msg);<br>
+}<br>
+<br>
+static void corosync_block_done(struct work *work, int idx)<br>
+{<br>
+       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);<br>
+<br>
+       send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);<br>
+<br>
+       free(bm->msg);<br>
+       free(bm);<br>
+}<br>
+<br>
+static struct corosync_event *find_block_event(struct sheepid *sender)<br>
+{<br>
+       struct corosync_event *cevent;<br>
+<br>
</div>
</div>
+       list_for_each_entry(cevent, &corosync_event_list, list) {<br>
<div class="im">+               if (!cevent->blocked)<br>
+                       continue;<br>
+<br>
+               if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY &&<br>
+                   sheepid_cmp(&cevent->sender, sender) == 0)<br>
+                       return cevent;<br>
+       }<br>
+<br>
+       return NULL;<br>
+}<br>
+<br>
 static void __corosync_dispatch(void)<br>
 {<br>
       struct corosync_event *cevent;<br>
+       struct corosync_block_msg *bm;<br>
<br>
       while (!list_empty(&corosync_event_list)) {<br>
               cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);<br>
@@ -110,6 +197,28 @@ static void __corosync_dispatch(void)<br>
                                                       cevent->nr_members);<br>
                       break;<br>
               case COROSYNC_EVENT_TYPE_NOTIFY:<br>
+                       if (cevent->blocked) {<br>
</div>
<div class="im">+                               if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 &&<br>
</div>
<div class="im">+                                   !cevent->callbacked) {<br>
+                                       /* call a block callback function from a worker thread */<br>
+                                       if (list_empty(&corosync_block_list))<br>
+                                               panic("cannot call block callback\n");<br>
+<br>
+                                       bm = list_first_entry(&corosync_block_list,<br>
+                                                             typeof(*bm), list);<br>
+                                       list_del(&bm->list);<br>
+<br>
+                                       bm->work.fn = corosync_block;<br>
+                                       bm->work.done = corosync_block_done;<br>
</div>
+                                       queue_work(corosync_block_wq, &bm->work);<br>
<div class="im">+<br>
+                                       cevent->callbacked = 1;<br>
+                               }<br>
+<br>
</div>
<div class="im">+                               /* block the rest messages until unblock message comes */<br>
+                               goto out;<br>
+                       }<br>
+<br>
                       corosync_handlers.notify_handler(&cevent->sender,<br>
                                                        cevent->msg,<br>
                                                        cevent->msg_len);<br>
@@ -119,6 +228,8 @@ static void __corosync_dispatch(void)<br>
               list_del(&cevent->list);<br>
               free(cevent);<br>
       }<br>
+out:<br>
+       return;<br>
 }<br>
<br>
 static void cdrv_cpg_deliver(cpg_handle_t handle,<br>
</div>
@@ -127,21 +238,52 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,<br>
<div class="im">                            void *msg, size_t msg_len)<br>
 {<br>
       struct corosync_event *cevent;<br>
+       uint64_t type;<br>
+       struct sheepid sender;<br>
</div>
<div class="im">+<br>
+       nodeid_to_addr(nodeid, sender.addr);<br>
+       sender.pid = pid;<br>
</div>
<div class="im">+<br>
+       memcpy(&type, msg, sizeof(type));<br>
+       msg = (uint8_t *)msg + sizeof(type);<br>
+       msg_len -= sizeof(type);<br>
<br>
</div>
<div class="im">       cevent = zalloc(sizeof(*cevent));<br>
       if (!cevent)<br>
</div>
<div class="im">               panic("oom\n");<br>
-       cevent->msg = zalloc(msg_len);<br>
-       if (!cevent->msg)<br>
-               panic("oom\n");<br>
<br>
</div>
<div class="im">-       cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;<br>
-       nodeid_to_addr(nodeid, cevent->sender.addr);<br>
-       cevent->sender.pid = pid;<br>
-       memcpy(cevent->msg, msg, msg_len);<br>
-       cevent->msg_len = msg_len;<br>
</div>
<div class="im">+       switch (type) {<br>
+       case COROSYNC_MSG_TYPE_BLOCK:<br>
</div>
<div class="im">+               cevent->blocked = 1;<br>
</div>
+               /* fall through */<br>
+       case COROSYNC_MSG_TYPE_NOTIFY:<br>
<div class="im">+               cevent->msg = zalloc(msg_len);<br>
+               if (!cevent->msg)<br>
+                       panic("oom\n");<br>
<br>
</div>
-       list_add_tail(&cevent->list, &corosync_event_list);<br>
<div class="im">+               cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;<br>
+               cevent->sender = sender;<br>
+               memcpy(cevent->msg, msg, msg_len);<br>
</div>
</blockquote>
<div>if we doesn't set message body , such as send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0) in corosync_notify(),</div>
<div>'msg' will point to a unknown address , so I think we should init it before insert it into the event_list, </div>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex; border-left:1px #ccc solid; padding-left:1ex">
<div class="im">+               cevent->msg_len = msg_len;<br>
+<br>
</div>
<div class="im">+               list_add_tail(&cevent->list, &corosync_event_list);<br>
+               break;<br>
+       case COROSYNC_MSG_TYPE_UNBLOCK:<br>
</div>
+               free(cevent); /* we don't add a new cluster event in this case */<br>
<div class="im">+<br>
+               cevent = find_block_event(&sender);<br>
+               if (!cevent)<br>
+                       /* block message was casted before this node joins */<br>
+                       break;<br>
+<br>
+               cevent->blocked = 0;<br>
+               cevent->msg = realloc(cevent->msg, msg_len);<br>
</div>
<div class="im">+               if (!cevent->msg)<br>
+                       panic("oom\n");<br>
</div>
<div class="im">+               memcpy(cevent->msg, msg, msg_len);<br>
+               cevent->msg_len = msg_len;<br>
</div>
+               break;<br>
+       }<br>
<br>
       __corosync_dispatch();<br>
 }<br>
@@ -177,6 +319,13 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,<br>
<div class="im"><br>
       /* dispatch leave_handler */<br>
       for (i = 0; i < left_list_entries; i++) {<br>
+               cevent = find_block_event(left_sheeps + i);<br>
+               if (cevent) {<br>
+                       /* the node left before sending UNBLOCK */<br>
+                       list_del(&cevent->list);<br>
+                       free(cevent);<br>
+               }<br>
+<br>
               cevent = zalloc(sizeof(*cevent));<br>
               if (!cevent)<br>
                       panic("oom\n");<br>
</div>
@@ -251,6 +400,7 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)<br>
<div class="im">       }<br>
<br>
       myid->pid = getpid();<br>
+       this_sheepid = *myid;<br>
<br>
       ret = cpg_fd_get(cpg_handle, &fd);<br>
       if (ret != CPG_OK) {<br>
</div>
@@ -258,6 +408,8 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)<br>
<div class="im">               return -1;<br>
       }<br>
<br>
+       corosync_block_wq = init_work_queue(1);<br>
+<br>
       return fd;<br>
 }<br>
<br>
</div>
@@ -297,27 +449,29 @@ static int corosync_leave(void)<br>
<div class="im">       return 0;<br>
 }<br>
<br>
-static int corosync_notify(void *msg, size_t msg_len)<br>
+static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))<br>
 {<br>
-       struct iovec iov;<br>
       int ret;<br>
+       struct corosync_block_msg *bm;<br>
<br>
-       iov.iov_base = msg;<br>
-       iov.iov_len = msg_len;<br>
-retry:<br>
-       ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);<br>
-       switch (ret) {<br>
-       case CPG_OK:<br>
-               break;<br>
-       case CPG_ERR_TRY_AGAIN:<br>
</div>
<div class="im">-               dprintf("failed to send message. try again\n");<br>
</div>
<div class="im">-               sleep(1);<br>
-               goto retry;<br>
-       default:<br>
</div>
<div class="im">-               eprintf("failed to send message, %d\n", ret);<br>
</div>
<div class="im">-               return -1;<br>
-       }<br>
-       return 0;<br>
+       if (block_cb) {<br>
+               bm = zalloc(sizeof(*bm));<br>
+               if (!bm)<br>
</div>
<div class="im">+                       panic("oom\n");<br>
</div>
+               bm->msg = zalloc(msg_len);<br>
+               if (!bm->msg)<br>
<div class="im">+                       panic("oom\n");<br>
+<br>
</div>
<div>
<div></div>
<div class="h5">+               memcpy(bm->msg, msg, msg_len);<br>
+               bm->msg_len = msg_len;<br>
+               bm->cb = block_cb;<br>
+               list_add_tail(&bm->list, &corosync_block_list);<br>
+<br>
+               ret = send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0);<br>
+       } else<br>
+               ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, msg, msg_len);<br>
+<br>
+       return ret;<br>
 }<br>
<br>
 static int corosync_dispatch(void)<br>
diff --git a/sheep/group.c b/sheep/group.c<br>
index f6743f5..a25f8bf 100644<br>
--- a/sheep/group.c<br>
+++ b/sheep/group.c<br>
@@ -366,7 +366,7 @@ forward:<br>
<br>
       list_add(&req->pending_list, &sys->pending_list);<br>
<br>
-       sys->cdrv->notify(msg, msg->header.msg_length);<br>
+       sys->cdrv->notify(msg, msg->header.msg_length, NULL);<br>
<br>
       free(msg);<br>
 }<br>
@@ -1062,7 +1062,7 @@ static int tx_mastership(void)<br>
       msg.header.from = sys->this_node;<br>
       msg.header.sheepid = sys->this_sheepid;<br>
<br>
-       return sys->cdrv->notify(&msg, msg.header.msg_length);<br>
+       return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);<br>
 }<br>
<br>
 static void send_join_response(struct work_notify *w)<br>
@@ -1094,7 +1094,7 @@ static void send_join_response(struct work_notify *w)<br>
               exit(1);<br>
       }<br>
       jm->epoch = sys->epoch;<br>
-       sys->cdrv->notify(m, m->msg_length);<br>
+       sys->cdrv->notify(m, m->msg_length, NULL);<br>
 }<br>
<br>
 static void __sd_notify_done(struct cpg_event *cevent)<br>
@@ -1173,7 +1173,7 @@ static void __sd_notify_done(struct cpg_event *cevent)<br>
                       break;<br>
               case SD_MSG_VDI_OP:<br>
                       m->state = DM_FIN;<br>
-                       sys->cdrv->notify(m, m->msg_length);<br>
+                       sys->cdrv->notify(m, m->msg_length, NULL);<br>
                       break;<br>
               default:<br>
                       eprintf("unknown message %d\n", m->op);<br>
@@ -1347,7 +1347,7 @@ static void send_join_request(struct sheepid *id)<br>
                       msg.nodes[i].ent = entries[i];<br>
       }<br>
<br>
-       sys->cdrv->notify(&msg, msg.header.msg_length);<br>
+       sys->cdrv->notify(&msg, msg.header.msg_length, NULL);<br>
<br>
       vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));<br>
 }<br>
@@ -1965,5 +1965,5 @@ int leave_cluster(void)<br>
       msg.epoch = get_latest_epoch();<br>
<br>
       dprintf("%d\n", msg.epoch);<br>
-       return sys->cdrv->notify(&msg, msg.header.msg_length);<br>
+       return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);<br>
 }<br>
--<br>
1.7.2.5<br>
<br>
</div>
</div>
<div>
<div></div>
<div class="h5">--<br>
sheepdog mailing list<br>
<a href="mailto:sheepdog@lists.wpkg.org">sheepdog@lists.wpkg.org</a><br>
<a href="http://lists.wpkg.org/mailman/listinfo/sheepdog" target="_blank">http://lists.wpkg.org/mailman/listinfo/sheepdog</a><br>
</div>
</div>
</blockquote>
</div>
<br>
<br>
<hr>
<font face="Arial" color="Gray" size="1"><br>
This email (including any attachments) is confidential and may be legally privileged. If you received this email in error, please delete it immediately and do not copy it or use it for any purpose or disclose its contents to any other person. Thank you.<br>
<br>
本电邮(包括任何附件)可能含有机密资料并受法律保护。如您不是正确的收件人,请您立即删除本邮件。请不要将本电邮进行复制并用作任何其他用途、或透露本邮件之内容。谢谢。<br>
</font>
</body>
</html>