[Sheepdog] [PATCH v5 2/3] cluster: add blocking mechanism to notification

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Wed Oct 12 13:49:33 CEST 2011


Currently Sheepdog vdi operations (create/delete/lookup/...) are
processed in two phase multicasting:

 1. multicasts a vdi request
 2. only the master node handles the request and multicasts the
    response

During this two phase, we cannot allow any other vdi operations and
membership changes, and this makes sheep/group.c a bit hard to read.

This patch simplifies this by adding a blocking callback to the
notification function in the cluster driver.  If the caller of
cdrv->notify() sets 'block_cb' as an argument, block_cb() is called
from the cluster driver before the message is notified to any node.
All the cluster events are blocked in every nodes until the caller
finishes the vdi operation in block_cb().

With this change, the master node is no longer in charge of vdi
operations, but this is a good change to make Sheepdog more symmetric.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/cluster.h          |    9 ++-
 sheep/cluster/corosync.c |  215 ++++++++++++++++++++++++++++++++++++++++------
 sheep/group.c            |   23 +++--
 3 files changed, 208 insertions(+), 39 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 43f4575..89d0566 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -71,11 +71,16 @@ struct cluster_driver {
 	 *
 	 * This function sends 'msg' to all the nodes.  The notified
 	 * messages can be read through notify_handler() in
-	 * cdrv_handlers.
+	 * cdrv_handlers.  If 'block_cb' is specified, block_cb() is
+	 * called before 'msg' is notified to all the nodes.  All the
+	 * cluster events including this notification are blocked
+	 * until block_cb() returns or this blocking node leaves the
+	 * cluster.  The sheep daemon can sleep in block_cb(), so this
+	 * callback must be not called from the dispatch (main) thread.
 	 *
 	 * Returns zero on success, -1 on error
 	 */
-	int (*notify)(void *msg, size_t msg_len);
+	int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
 
 	/*
 	 * Dispatch handlers
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index f0a35e5..7aa3d02 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -14,22 +14,35 @@
 #include <corosync/cfg.h>
 
 #include "cluster.h"
+#include "work.h"
 
 static cpg_handle_t cpg_handle;
 static struct cpg_name cpg_group = { 9, "sheepdog" };
 
 static corosync_cfg_handle_t cfg_handle;
+static struct sheepid this_sheepid;
+
+static struct work_queue *corosync_block_wq;
 
 static struct cdrv_handlers corosync_handlers;
 
 static LIST_HEAD(corosync_event_list);
+static LIST_HEAD(corosync_block_list);
 
+/* event types which are dispatched in corosync_dispatch() */
 enum corosync_event_type {
 	COROSYNC_EVENT_TYPE_JOIN,
 	COROSYNC_EVENT_TYPE_LEAVE,
 	COROSYNC_EVENT_TYPE_NOTIFY,
 };
 
+/* multicast message type */
+enum corosync_message_type {
+	COROSYNC_MSG_TYPE_NOTIFY,
+	COROSYNC_MSG_TYPE_BLOCK,
+	COROSYNC_MSG_TYPE_UNBLOCK,
+};
+
 struct corosync_event {
 	enum corosync_event_type type;
 
@@ -40,6 +53,18 @@ struct corosync_event {
 	void *msg;
 	size_t msg_len;
 
+	int blocked;
+	int callbacked;
+
+	struct list_head list;
+};
+
+struct corosync_block_msg {
+	void *msg;
+	size_t msg_len;
+	void (*cb)(void *arg);
+
+	struct work work;
 	struct list_head list;
 };
 
@@ -89,9 +114,71 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
 	}
 }
 
+static int send_message(uint64_t type, void *msg, size_t msg_len)
+{
+	struct iovec iov[2];
+	int ret, iov_cnt = 1;
+
+	iov[0].iov_base = &type;
+	iov[0].iov_len = sizeof(type);
+	if (msg) {
+		iov[1].iov_base = msg;
+		iov[1].iov_len = msg_len;
+		iov_cnt++;
+	}
+retry:
+	ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, iov_cnt);
+	switch (ret) {
+	case CPG_OK:
+		break;
+	case CPG_ERR_TRY_AGAIN:
+		dprintf("failed to send message. try again\n");
+		sleep(1);
+		goto retry;
+	default:
+		eprintf("failed to send message, %d\n", ret);
+		return -1;
+	}
+	return 0;
+}
+
+static void corosync_block(struct work *work, int idx)
+{
+	struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
+
+	bm->cb(bm->msg);
+}
+
+static void corosync_block_done(struct work *work, int idx)
+{
+	struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
+
+	send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);
+
+	free(bm->msg);
+	free(bm);
+}
+
+static struct corosync_event *find_block_event(struct sheepid *sender)
+{
+	struct corosync_event *cevent;
+
+	list_for_each_entry(cevent, &corosync_event_list, list) {
+		if (!cevent->blocked)
+			continue;
+
+		if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY &&
+		    sheepid_cmp(&cevent->sender, sender) == 0)
+			return cevent;
+	}
+
+	return NULL;
+}
+
 static void __corosync_dispatch(void)
 {
 	struct corosync_event *cevent;
+	struct corosync_block_msg *bm;
 
 	while (!list_empty(&corosync_event_list)) {
 		cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
@@ -108,6 +195,28 @@ static void __corosync_dispatch(void)
 							cevent->nr_members);
 			break;
 		case COROSYNC_EVENT_TYPE_NOTIFY:
+			if (cevent->blocked) {
+				if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 &&
+				    !cevent->callbacked) {
+					/* call a block callback function from a worker thread */
+					if (list_empty(&corosync_block_list))
+						panic("cannot call block callback\n");
+
+					bm = list_first_entry(&corosync_block_list,
+							      typeof(*bm), list);
+					list_del(&bm->list);
+
+					bm->work.fn = corosync_block;
+					bm->work.done = corosync_block_done;
+					queue_work(corosync_block_wq, &bm->work);
+
+					cevent->callbacked = 1;
+				}
+
+				/* block the rest messages until unblock message comes */
+				goto out;
+			}
+
 			corosync_handlers.notify_handler(&cevent->sender,
 							 cevent->msg,
 							 cevent->msg_len);
@@ -117,6 +226,8 @@ static void __corosync_dispatch(void)
 		list_del(&cevent->list);
 		free(cevent);
 	}
+out:
+	return;
 }
 
 static void cdrv_cpg_deliver(cpg_handle_t handle,
@@ -125,21 +236,59 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
 			     void *msg, size_t msg_len)
 {
 	struct corosync_event *cevent;
+	uint64_t type;
+	struct sheepid sender;
+
+	nodeid_to_addr(nodeid, sender.addr);
+	sender.pid = pid;
+
+	memcpy(&type, msg, sizeof(type));
+	msg = (uint8_t *)msg + sizeof(type);
+	msg_len -= sizeof(type);
 
 	cevent = zalloc(sizeof(*cevent));
 	if (!cevent)
 		panic("oom\n");
-	cevent->msg = zalloc(msg_len);
-	if (!cevent->msg)
-		panic("oom\n");
 
-	cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
-	nodeid_to_addr(nodeid, cevent->sender.addr);
-	cevent->sender.pid = pid;
-	memcpy(cevent->msg, msg, msg_len);
-	cevent->msg_len = msg_len;
+	switch (type) {
+	case COROSYNC_MSG_TYPE_BLOCK:
+		cevent->blocked = 1;
+		/* fall through */
+	case COROSYNC_MSG_TYPE_NOTIFY:
+		cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
+		cevent->sender = sender;
+		cevent->msg_len = msg_len;
+		if (msg_len) {
+			cevent->msg = zalloc(msg_len);
+			if (!cevent->msg)
+				panic("oom\n");
+			memcpy(cevent->msg, msg, msg_len);
+		} else
+			cevent->msg = NULL;
 
-	list_add_tail(&cevent->list, &corosync_event_list);
+		list_add_tail(&cevent->list, &corosync_event_list);
+		break;
+	case COROSYNC_MSG_TYPE_UNBLOCK:
+		free(cevent); /* we don't add a new cluster event in this case */
+
+		cevent = find_block_event(&sender);
+		if (!cevent)
+			/* block message was casted before this node joins */
+			break;
+
+		cevent->blocked = 0;
+		cevent->msg_len = msg_len;
+		if (msg_len) {
+			cevent->msg = realloc(cevent->msg, msg_len);
+			if (!cevent->msg)
+				panic("oom\n");
+			memcpy(cevent->msg, msg, msg_len);
+		} else {
+			free(cevent->msg);
+			cevent->msg = NULL;
+		}
+		break;
+	}
 
 	__corosync_dispatch();
 }
@@ -175,6 +324,13 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
 
 	/* dispatch leave_handler */
 	for (i = 0; i < left_list_entries; i++) {
+		cevent = find_block_event(left_sheeps + i);
+		if (cevent) {
+			/* the node left before sending UNBLOCK */
+			list_del(&cevent->list);
+			free(cevent);
+		}
+
 		cevent = zalloc(sizeof(*cevent));
 		if (!cevent)
 			panic("oom\n");
@@ -249,6 +405,7 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
 	}
 
 	myid->pid = getpid();
+	this_sheepid = *myid;
 
 	ret = cpg_fd_get(cpg_handle, &fd);
 	if (ret != CPG_OK) {
@@ -256,6 +413,8 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
 		return -1;
 	}
 
+	corosync_block_wq = init_work_queue(1);
+
 	return fd;
 }
 
@@ -295,27 +454,29 @@ static int corosync_leave(void)
 	return 0;
 }
 
-static int corosync_notify(void *msg, size_t msg_len)
+static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))
 {
-	struct iovec iov;
 	int ret;
+	struct corosync_block_msg *bm;
 
-	iov.iov_base = msg;
-	iov.iov_len = msg_len;
-retry:
-	ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
-	switch (ret) {
-	case CPG_OK:
-		break;
-	case CPG_ERR_TRY_AGAIN:
-		dprintf("failed to send message. try again\n");
-		sleep(1);
-		goto retry;
-	default:
-		eprintf("failed to send message, %d\n", ret);
-		return -1;
-	}
-	return 0;
+	if (block_cb) {
+		bm = zalloc(sizeof(*bm));
+		if (!bm)
+			panic("oom\n");
+		bm->msg = zalloc(msg_len);
+		if (!bm->msg)
+			panic("oom\n");
+
+		memcpy(bm->msg, msg, msg_len);
+		bm->msg_len = msg_len;
+		bm->cb = block_cb;
+		list_add_tail(&bm->list, &corosync_block_list);
+
+		ret = send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0);
+	} else
+		ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, msg, msg_len);
+
+	return ret;
 }
 
 static int corosync_dispatch(void)
diff --git a/sheep/group.c b/sheep/group.c
index 0a6d0b0..8681875 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -366,7 +366,7 @@ forward:
 
 	list_add(&req->pending_list, &sys->pending_list);
 
-	sys->cdrv->notify(msg, msg->header.msg_length);
+	sys->cdrv->notify(msg, msg->header.msg_length, NULL);
 
 	free(msg);
 }
@@ -1063,7 +1063,7 @@ static int tx_mastership(void)
 	msg.header.from = sys->this_node;
 	msg.header.sheepid = sys->this_sheepid;
 
-	return sys->cdrv->notify(&msg, msg.header.msg_length);
+	return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
 }
 
 static void send_join_response(struct work_notify *w)
@@ -1095,7 +1095,7 @@ static void send_join_response(struct work_notify *w)
 		exit(1);
 	}
 	jm->epoch = sys->epoch;
-	sys->cdrv->notify(m, m->msg_length);
+	sys->cdrv->notify(m, m->msg_length, NULL);
 }
 
 static void __sd_notify_done(struct cpg_event *cevent)
@@ -1174,7 +1174,7 @@ static void __sd_notify_done(struct cpg_event *cevent)
 			break;
 		case SD_MSG_VDI_OP:
 			m->state = DM_FIN;
-			sys->cdrv->notify(m, m->msg_length);
+			sys->cdrv->notify(m, m->msg_length, NULL);
 			break;
 		default:
 			eprintf("unknown message %d\n", m->op);
@@ -1211,10 +1211,13 @@ static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
 
 	vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent);
 
-	w->msg = zalloc(msg_len);
-	if (!w->msg)
-		return;
-	memcpy(w->msg, msg, msg_len);
+	if (msg_len) {
+		w->msg = zalloc(msg_len);
+		if (!w->msg)
+			return;
+		memcpy(w->msg, msg, msg_len);
+	} else
+		w->msg = NULL;
 
 	if (cpg_event_suspended() && m->state == DM_FIN) {
 		list_add(&cevent->cpg_event_list, &sys->cpg_event_siblings);
@@ -1348,7 +1351,7 @@ static void send_join_request(struct sheepid *id)
 			msg.nodes[i].ent = entries[i];
 	}
 
-	sys->cdrv->notify(&msg, msg.header.msg_length);
+	sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
 
 	vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
 }
@@ -1966,5 +1969,5 @@ int leave_cluster(void)
 	msg.epoch = get_latest_epoch();
 
 	dprintf("%d\n", msg.epoch);
-	return sys->cdrv->notify(&msg, msg.header.msg_length);
+	return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
 }
-- 
1.7.2.5




More information about the sheepdog mailing list