[Sheepdog] [PATCH v2 2/3] cluster: add blocking mechanism to notification
Liu Yuan
namei.unix at gmail.com
Mon Oct 10 12:15:21 CEST 2011
On 10/10/2011 05:58 PM, MORITA Kazutaka wrote:
> Thanks for your review!
>
> At Mon, 10 Oct 2011 17:29:06 +0800,
> Liu Yuan wrote:
>> On 10/10/2011 04:44 PM, MORITA Kazutaka wrote:
>>> Currently Sheepdog vdi operations (create/delete/lookup/...) are
>>> processed in two phase multicasting:
>>>
>>> 1. multicasts a vdi request
>>> 2. only the master node handles the request and multicasts the
>>> response
>>>
>>> During this two phase, we cannot allow any other vdi operations and
>>> membership changes, and this makes sheep/group.c a bit hard to read.
>>>
>>> This patch simplifies this by adding a blocking callback to the
>>> notification function in the cluster driver. If the caller of
>>> cdrv->notify() sets 'block_cb' as an argument, block_cb() is called
>>> from the cluster driver before the message is notified to any node.
>>> All the cluster events are blocked in every nodes until the caller
>>> finishes the vdi operation in block_cb().
>>>
>>> With this change, the master node is no longer in charge of vdi
>>> operations, but this is a good change to make Sheepdog more symmetric.
>>>
>>> Signed-off-by: MORITA Kazutaka<morita.kazutaka at lab.ntt.co.jp>
>>> ---
>>> sheep/cluster.h | 10 ++-
>>> sheep/cluster/corosync.c | 211 +++++++++++++++++++++++++++++++++++++++-------
>>> sheep/group.c | 12 ++--
>>> 3 files changed, 195 insertions(+), 38 deletions(-)
>>>
>>> diff --git a/sheep/cluster.h b/sheep/cluster.h
>>> index 43f4575..25b2d48 100644
>>> --- a/sheep/cluster.h
>>> +++ b/sheep/cluster.h
>>> @@ -71,11 +71,17 @@ struct cluster_driver {
>>> *
>>> * This function sends 'msg' to all the nodes. The notified
>>> * messages can be read through notify_handler() in
>>> - * cdrv_handlers.
>>> + * cdrv_handlers. If 'block_cb' is specified, block_cb() is
>>> + * called before 'msg' is notified to all the nodes. All the
>>> + * cluster events including this notification are blocked
>>> + * until block_cb() returns or this blocking node leaves the
>>> + * cluster. The sheep daemon can sleep in block_cb(), so this
>>> + * callback must be called from other than the dispatch
>>> + * thread.
>> from other than? I don't get it. Did you mean from worker thread other
>> than main thread?
> I meant we cannot call block_cb() in the same thread with
> cdrv->dispatch(). It is because cdrv->dispatch() is called in the
> main thread and we cannot sleep in the main thread. The corosync
> driver calls block_cb() in a worker thread, but some drivers may not.
>
So "callback must be not called from the dispatch(main) thread" would be
easier to understand.
>>> *
>>> * Returns zero on success, -1 on error
>>> */
>>> - int (*notify)(void *msg, size_t msg_len);
>>> + int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
>>>
>>> /*
>>> * Dispatch handlers
>>> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
>>> index ff52525..629e8a9 100644
>>> --- a/sheep/cluster/corosync.c
>>> +++ b/sheep/cluster/corosync.c
>>> @@ -14,22 +14,35 @@
>>> #include<corosync/cfg.h>
>>>
>>> #include "cluster.h"
>>> +#include "work.h"
>>>
>>> static cpg_handle_t cpg_handle;
>>> static struct cpg_name cpg_group = { 9, "sheepdog" };
>>>
>>> static corosync_cfg_handle_t cfg_handle;
>>> +static struct sheepid this_sheepid;
>>> +
>>> +static struct work_queue *corosync_block_wq;
>>>
>>> static struct cdrv_handlers corosync_handlers;
>>>
>>> static LIST_HEAD(corosync_event_list);
>>> +static LIST_HEAD(corosync_block_list);
>>>
>>> +/* event types which are dispatched in corosync_dispatch() */
>>> enum corosync_event_type {
>>> COROSYNC_EVENT_TYPE_JOIN,
>>> COROSYNC_EVENT_TYPE_LEAVE,
>>> COROSYNC_EVENT_TYPE_NOTIFY,
>>> };
>>>
>>> +/* multicast message type */
>>> +enum corosync_message_type {
>>> + COROSYNC_MSG_TYPE_NOTIFY,
>>> + COROSYNC_MSG_TYPE_BLOCK,
>>> + COROSYNC_MSG_TYPE_UNBLOCK,
>>> +};
>>> +
>>> struct corosync_event {
>>> enum corosync_event_type type;
>>>
>>> @@ -42,6 +55,18 @@ struct corosync_event {
>>> void *msg;
>>> size_t msg_len;
>>>
>>> + int blocked;
>>> + int callbacked;
>>> +
>>> + struct list_head list;
>>> +};
>>> +
>>> +struct corosync_block_msg {
>>> + void *msg;
>>> + size_t msg_len;
>>> + void (*cb)(void *arg);
>>> +
>>> + struct work work;
>>> struct list_head list;
>>> };
>>>
>>> @@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
>>> }
>>> }
>>>
>>> +static int send_message(uint64_t type, void *msg, size_t msg_len)
>>> +{
>>> + struct iovec iov[2];
>>> + int ret, iov_cnt = 1;
>>> +
>>> + iov[0].iov_base =&type;
>>> + iov[0].iov_len = sizeof(type);
>>> + if (msg) {
>>> + iov[1].iov_base = msg;
>>> + iov[1].iov_len = msg_len;
>>> + iov_cnt++;
>>> + }
>>> +retry:
>>> + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt);
>>> + switch (ret) {
>>> + case CPG_OK:
>>> + break;
>>> + case CPG_ERR_TRY_AGAIN:
>>> + dprintf("failed to send message. try again\n");
>>> + sleep(1);
>>> + goto retry;
>>> + default:
>>> + eprintf("failed to send message, %d\n", ret);
>>> + return -1;
>>> + }
>>> + return 0;
>>> +}
>>> +
>>> +static void corosync_block(struct work *work, int idx)
>>> +{
>>> + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
>>> +
>>> + bm->cb(bm->msg);
>>> +}
>>> +
>>> +static void corosync_block_done(struct work *work, int idx)
>>> +{
>>> + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
>>> +
>>> + send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);
>>> +
>>> + free(bm->msg);
>>> + free(bm);
>>> +}
>>> +
>>> +static struct corosync_event *find_block_event(struct sheepid *sender)
>>> +{
>>> + struct corosync_event *cevent;
>>> +
>>> + list_for_each_entry(cevent,&corosync_event_list, list) {
>>> + if (!cevent->blocked)
>>> + continue;
>>> +
>>> + if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY&&
>>> + sheepid_cmp(&cevent->sender, sender) == 0)
>>> + return cevent;
>>> + }
>>> +
>>> + return NULL;
>>> +}
>>> +
>>> static void __corosync_dispatch(void)
>>> {
>>> struct corosync_event *cevent;
>>> + struct corosync_block_msg *bm;
>>>
>>> while (!list_empty(&corosync_event_list)) {
>>> cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
>>> @@ -110,6 +197,28 @@ static void __corosync_dispatch(void)
>>> cevent->nr_members);
>>> break;
>>> case COROSYNC_EVENT_TYPE_NOTIFY:
>>> + if (cevent->blocked) {
>>> + if (sheepid_cmp(&cevent->sender,&this_sheepid) == 0&&
>>> + !cevent->callbacked) {
>>> + /* call a block callback function from a worker thread */
>>> + if (list_empty(&corosync_block_list))
>>> + panic("cannot call block callback\n");
>>> +
>>> + bm = list_first_entry(&corosync_block_list,
>>> + typeof(*bm), list);
>>> + list_del(&bm->list);
>>> +
>>> + bm->work.fn = corosync_block;
>>> + bm->work.done = corosync_block_done;
>>> + queue_work(corosync_block_wq,&bm->work);
>>> +
>>> + cevent->callbacked = 1;
>>> + }
>>> +
>> is cevent->callbacked really needed? How about a static local one?
> This variable is needed to ensure that block_cb() is called only once.
> How to set zero to cevent->callbacked if we make it a static local
> variable?
>
Hmm, seems this is the simplest method.
>>> + /* block the rest messages until unblock message comes */
>>> + goto out;
>>> + }
>>> +
>>> corosync_handlers.notify_handler(&cevent->sender,
>>> cevent->msg,
>>> cevent->msg_len);
>>> @@ -119,6 +228,8 @@ static void __corosync_dispatch(void)
>>> list_del(&cevent->list);
>>> free(cevent);
>>> }
>>> +out:
>>> + return;
>>> }
>>>
>>> static void cdrv_cpg_deliver(cpg_handle_t handle,
>>> @@ -127,21 +238,49 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
>>> void *msg, size_t msg_len)
>>> {
>>> struct corosync_event *cevent;
>>> + uint64_t type;
>>> + struct sheepid sender;
>>>
>>> - cevent = zalloc(sizeof(*cevent));
>>> - if (!cevent)
>>> - panic("oom\n");
>>> - cevent->msg = zalloc(msg_len);
>>> - if (!cevent->msg)
>>> - panic("oom\n");
>>> + nodeid_to_addr(nodeid, sender.addr);
>>> + sender.pid = pid;
>>>
>>> - cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
>>> - nodeid_to_addr(nodeid, cevent->sender.addr);
>>> - cevent->sender.pid = pid;
>>> - memcpy(cevent->msg, msg, msg_len);
>>> - cevent->msg_len = msg_len;
>>> + memcpy(&type, msg, sizeof(type));
>>> + msg = (uint8_t *)msg + sizeof(type);
>>> + msg_len -= sizeof(type);
>>>
>>> - list_add_tail(&cevent->list,&corosync_event_list);
>>> + switch (type) {
>>> + case COROSYNC_MSG_TYPE_BLOCK:
>>> + case COROSYNC_MSG_TYPE_NOTIFY:
>>> + cevent = zalloc(sizeof(*cevent));
>>> + if (!cevent)
>>> + panic("oom\n");
>>> + cevent->msg = zalloc(msg_len);
>>> + if (!cevent->msg)
>>> + panic("oom\n");
>>> +
>>> + cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
>>> + cevent->sender = sender;
>>> + memcpy(cevent->msg, msg, msg_len);
>>> + cevent->msg_len = msg_len;
>>> + if (type == COROSYNC_MSG_TYPE_BLOCK)
>>> + cevent->blocked = 1;
>> move cevent->blocked = 1; into case COROSYNC_MSG_TYPE_BLOCK would avoid
>> if check.
> But cevent is allocated in this case block.
>
So can we move cevent allocation outside switch clause?
Thanks,
Yuan
More information about the sheepdog
mailing list