Currently Sheepdog vdi operations (create/delete/lookup/...) are processed in two phase multicasting: 1. multicasts a vdi request 2. only the master node handles the request and multicasts the response During this two phase, we cannot allow any other vdi operations and membership changes, and this makes sheep/group.c a bit hard to read. This patch simplifies this by adding a blocking callback to the notification function in the cluster driver. If the caller of cdrv->notify() sets 'block_cb' as an argument, block_cb() is called from the cluster driver before the message is notified to any node. All the cluster events are blocked in every nodes until the caller finishes the vdi operation in block_cb(). With this change, the master node is no longer in charge of vdi operations, but this is a good change to make Sheepdog more symmetric. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/cluster.h | 10 ++- sheep/cluster/corosync.c | 211 +++++++++++++++++++++++++++++++++++++++------- sheep/group.c | 12 ++-- 3 files changed, 195 insertions(+), 38 deletions(-) diff --git a/sheep/cluster.h b/sheep/cluster.h index 43f4575..25b2d48 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -71,11 +71,17 @@ struct cluster_driver { * * This function sends 'msg' to all the nodes. The notified * messages can be read through notify_handler() in - * cdrv_handlers. + * cdrv_handlers. If 'block_cb' is specified, block_cb() is + * called before 'msg' is notified to all the nodes. All the + * cluster events including this notification are blocked + * until block_cb() returns or this blocking node leaves the + * cluster. The sheep daemon can sleep in block_cb(), so this + * callback must be called from other than the dispatch + * thread. * * Returns zero on success, -1 on error */ - int (*notify)(void *msg, size_t msg_len); + int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg)); /* * Dispatch handlers diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index ff52525..629e8a9 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -14,22 +14,35 @@ #include <corosync/cfg.h> #include "cluster.h" +#include "work.h" static cpg_handle_t cpg_handle; static struct cpg_name cpg_group = { 9, "sheepdog" }; static corosync_cfg_handle_t cfg_handle; +static struct sheepid this_sheepid; + +static struct work_queue *corosync_block_wq; static struct cdrv_handlers corosync_handlers; static LIST_HEAD(corosync_event_list); +static LIST_HEAD(corosync_block_list); +/* event types which are dispatched in corosync_dispatch() */ enum corosync_event_type { COROSYNC_EVENT_TYPE_JOIN, COROSYNC_EVENT_TYPE_LEAVE, COROSYNC_EVENT_TYPE_NOTIFY, }; +/* multicast message type */ +enum corosync_message_type { + COROSYNC_MSG_TYPE_NOTIFY, + COROSYNC_MSG_TYPE_BLOCK, + COROSYNC_MSG_TYPE_UNBLOCK, +}; + struct corosync_event { enum corosync_event_type type; @@ -42,6 +55,18 @@ struct corosync_event { void *msg; size_t msg_len; + int blocked; + int callbacked; + + struct list_head list; +}; + +struct corosync_block_msg { + void *msg; + size_t msg_len; + void (*cb)(void *arg); + + struct work work; struct list_head list; }; @@ -91,9 +116,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, } } +static int send_message(uint64_t type, void *msg, size_t msg_len) +{ + struct iovec iov[2]; + int ret, iov_cnt = 1; + + iov[0].iov_base = &type; + iov[0].iov_len = sizeof(type); + if (msg) { + iov[1].iov_base = msg; + iov[1].iov_len = msg_len; + iov_cnt++; + } +retry: + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt); + switch (ret) { + case CPG_OK: + break; + case CPG_ERR_TRY_AGAIN: + dprintf("failed to send message. try again\n"); + sleep(1); + goto retry; + default: + eprintf("failed to send message, %d\n", ret); + return -1; + } + return 0; +} + +static void corosync_block(struct work *work, int idx) +{ + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); + + bm->cb(bm->msg); +} + +static void corosync_block_done(struct work *work, int idx) +{ + struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); + + send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len); + + free(bm->msg); + free(bm); +} + +static struct corosync_event *find_block_event(struct sheepid *sender) +{ + struct corosync_event *cevent; + + list_for_each_entry(cevent, &corosync_event_list, list) { + if (!cevent->blocked) + continue; + + if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY && + sheepid_cmp(&cevent->sender, sender) == 0) + return cevent; + } + + return NULL; +} + static void __corosync_dispatch(void) { struct corosync_event *cevent; + struct corosync_block_msg *bm; while (!list_empty(&corosync_event_list)) { cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list); @@ -110,6 +197,28 @@ static void __corosync_dispatch(void) cevent->nr_members); break; case COROSYNC_EVENT_TYPE_NOTIFY: + if (cevent->blocked) { + if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 && + !cevent->callbacked) { + /* call a block callback function from a worker thread */ + if (list_empty(&corosync_block_list)) + panic("cannot call block callback\n"); + + bm = list_first_entry(&corosync_block_list, + typeof(*bm), list); + list_del(&bm->list); + + bm->work.fn = corosync_block; + bm->work.done = corosync_block_done; + queue_work(corosync_block_wq, &bm->work); + + cevent->callbacked = 1; + } + + /* block the rest messages until unblock message comes */ + goto out; + } + corosync_handlers.notify_handler(&cevent->sender, cevent->msg, cevent->msg_len); @@ -119,6 +228,8 @@ static void __corosync_dispatch(void) list_del(&cevent->list); free(cevent); } +out: + return; } static void cdrv_cpg_deliver(cpg_handle_t handle, @@ -127,21 +238,49 @@ static void cdrv_cpg_deliver(cpg_handle_t handle, void *msg, size_t msg_len) { struct corosync_event *cevent; + uint64_t type; + struct sheepid sender; - cevent = zalloc(sizeof(*cevent)); - if (!cevent) - panic("oom\n"); - cevent->msg = zalloc(msg_len); - if (!cevent->msg) - panic("oom\n"); + nodeid_to_addr(nodeid, sender.addr); + sender.pid = pid; - cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; - nodeid_to_addr(nodeid, cevent->sender.addr); - cevent->sender.pid = pid; - memcpy(cevent->msg, msg, msg_len); - cevent->msg_len = msg_len; + memcpy(&type, msg, sizeof(type)); + msg = (uint8_t *)msg + sizeof(type); + msg_len -= sizeof(type); - list_add_tail(&cevent->list, &corosync_event_list); + switch (type) { + case COROSYNC_MSG_TYPE_BLOCK: + case COROSYNC_MSG_TYPE_NOTIFY: + cevent = zalloc(sizeof(*cevent)); + if (!cevent) + panic("oom\n"); + cevent->msg = zalloc(msg_len); + if (!cevent->msg) + panic("oom\n"); + + cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; + cevent->sender = sender; + memcpy(cevent->msg, msg, msg_len); + cevent->msg_len = msg_len; + if (type == COROSYNC_MSG_TYPE_BLOCK) + cevent->blocked = 1; + + list_add_tail(&cevent->list, &corosync_event_list); + break; + case COROSYNC_MSG_TYPE_UNBLOCK: + cevent = find_block_event(&sender); + if (!cevent) + /* block message was casted before this node joins */ + break; + + cevent->blocked = 0; + cevent->msg = realloc(cevent->msg, msg_len); + if (!cevent->msg) + panic("oom\n"); + memcpy(cevent->msg, msg, msg_len); + cevent->msg_len = msg_len; + break; + } __corosync_dispatch(); } @@ -177,6 +316,13 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, /* dispatch leave_handler */ for (i = 0; i < left_list_entries; i++) { + cevent = find_block_event(left_sheeps + i); + if (cevent) { + /* the node left before sending UNBLOCK */ + list_del(&cevent->list); + free(cevent); + } + cevent = zalloc(sizeof(*cevent)); if (!cevent) panic("oom\n"); @@ -251,6 +397,7 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid) } myid->pid = getpid(); + this_sheepid = *myid; ret = cpg_fd_get(cpg_handle, &fd); if (ret != CPG_OK) { @@ -258,6 +405,8 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid) return -1; } + corosync_block_wq = init_work_queue(1); + return fd; } @@ -297,27 +446,29 @@ static int corosync_leave(void) return 0; } -static int corosync_notify(void *msg, size_t msg_len) +static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *)) { - struct iovec iov; int ret; + struct corosync_block_msg *bm; - iov.iov_base = msg; - iov.iov_len = msg_len; -retry: - ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1); - switch (ret) { - case CPG_OK: - break; - case CPG_ERR_TRY_AGAIN: - dprintf("failed to send message. try again\n"); - sleep(1); - goto retry; - default: - eprintf("failed to send message, %d\n", ret); - return -1; - } - return 0; + if (block_cb) { + bm = zalloc(sizeof(*bm)); + if (!bm) + panic("oom\n"); + bm->msg = zalloc(msg_len); + if (!bm->msg) + panic("oom\n"); + + memcpy(bm->msg, msg, msg_len); + bm->msg_len = msg_len; + bm->cb = block_cb; + list_add_tail(&bm->list, &corosync_block_list); + + ret = send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0); + } else + ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, msg, msg_len); + + return ret; } static int corosync_dispatch(void) diff --git a/sheep/group.c b/sheep/group.c index f6743f5..a25f8bf 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -366,7 +366,7 @@ forward: list_add(&req->pending_list, &sys->pending_list); - sys->cdrv->notify(msg, msg->header.msg_length); + sys->cdrv->notify(msg, msg->header.msg_length, NULL); free(msg); } @@ -1062,7 +1062,7 @@ static int tx_mastership(void) msg.header.from = sys->this_node; msg.header.sheepid = sys->this_sheepid; - return sys->cdrv->notify(&msg, msg.header.msg_length); + return sys->cdrv->notify(&msg, msg.header.msg_length, NULL); } static void send_join_response(struct work_notify *w) @@ -1094,7 +1094,7 @@ static void send_join_response(struct work_notify *w) exit(1); } jm->epoch = sys->epoch; - sys->cdrv->notify(m, m->msg_length); + sys->cdrv->notify(m, m->msg_length, NULL); } static void __sd_notify_done(struct cpg_event *cevent) @@ -1173,7 +1173,7 @@ static void __sd_notify_done(struct cpg_event *cevent) break; case SD_MSG_VDI_OP: m->state = DM_FIN; - sys->cdrv->notify(m, m->msg_length); + sys->cdrv->notify(m, m->msg_length, NULL); break; default: eprintf("unknown message %d\n", m->op); @@ -1347,7 +1347,7 @@ static void send_join_request(struct sheepid *id) msg.nodes[i].ent = entries[i]; } - sys->cdrv->notify(&msg, msg.header.msg_length); + sys->cdrv->notify(&msg, msg.header.msg_length, NULL); vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid)); } @@ -1965,5 +1965,5 @@ int leave_cluster(void) msg.epoch = get_latest_epoch(); dprintf("%d\n", msg.epoch); - return sys->cdrv->notify(&msg, msg.header.msg_length); + return sys->cdrv->notify(&msg, msg.header.msg_length, NULL); } -- 1.7.2.5 |