[sheepdog] [PATCH 2/2] cluster/local: handle leave events

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Tue Sep 18 10:43:49 CEST 2012


A cluster driver needs to call event handlers even after leave_cluster
is called.  This patch fixes it.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/cluster.h       |    4 +-
 sheep/cluster/local.c |  166 ++++++++++++++++++++++++++++++-------------------
 2 files changed, 105 insertions(+), 65 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 65b95a4..3e881d7 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -76,7 +76,9 @@ struct cluster_driver {
 	 * Leave the cluster
 	 *
 	 * This function is used to leave the cluster, and notifies a
-	 * leave event to all the nodes.
+	 * leave event to all the nodes.  The cluster driver calls event
+	 * handlers even after this function is called, so the left node can
+	 * work as a gateway.
 	 *
 	 * Returns zero on success, -1 on error
 	 */
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 8a53abc..c7ef38e 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -15,7 +15,6 @@
 #include <sys/mman.h>
 #include <sys/signalfd.h>
 #include <sys/file.h>
-#include <search.h>
 #include <signal.h>
 #include <fcntl.h>
 #include <assert.h>
@@ -32,20 +31,41 @@ static int shmfd;
 static int sigfd;
 static int block_event_pos;
 static int nonblock_event_pos;
-static struct sd_node this_node;
+static struct local_node this_node;
 static bool joined;
 
+struct local_node {
+	struct sd_node node;
+	pid_t pid;
+	bool gateway;
+};
+
+static char *lnode_to_str(struct local_node *lnode)
+{
+	char *s = node_to_str(&lnode->node);
+
+	sprintf(s + strlen(s), " pid:%d", lnode->pid);
+
+	return s;
+}
+
+static bool lnode_eq(const struct local_node *a, const struct local_node *b)
+{
+	return node_eq(&a->node, &b->node);
+}
+
 enum local_event_type {
 	EVENT_JOIN_REQUEST = 1,
 	EVENT_JOIN_RESPONSE,
 	EVENT_LEAVE,
+	EVENT_GATEWAY,
 	EVENT_BLOCK,
 	EVENT_NOTIFY,
 };
 
 struct local_event {
 	enum local_event_type type;
-	struct sd_node sender;
+	struct local_node sender;
 
 	bool callbacked;
 	bool removed;
@@ -53,9 +73,8 @@ struct local_event {
 	size_t buf_len;
 	uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
 
-	size_t nr_nodes; /* the number of sheep processes */
-	struct sd_node nodes[SD_MAX_NODES];
-	pid_t pids[SD_MAX_NODES];
+	size_t nr_lnodes; /* the number of sheep processes */
+	struct local_node lnodes[SD_MAX_NODES];
 
 	enum cluster_join_result join_result;
 };
@@ -80,18 +99,16 @@ static void shm_queue_unlock(void)
 	flock(shmfd, LOCK_UN);
 }
 
-static size_t get_nodes(struct sd_node *n, pid_t *p)
+static size_t get_nodes(struct local_node *n)
 {
 	struct local_event *ev;
 
 	ev = shm_queue->nonblock_events + shm_queue->nonblock_event_pos;
 
 	if (n)
-		memcpy(n, ev->nodes, sizeof(ev->nodes));
-	if (p)
-		memcpy(p, ev->pids, sizeof(ev->pids));
+		memcpy(n, ev->lnodes, sizeof(ev->lnodes));
 
-	return ev->nr_nodes;
+	return ev->nr_lnodes;
 }
 
 static int process_exists(pid_t pid)
@@ -156,14 +173,13 @@ static void shm_queue_notify(void)
 {
 	int i;
 	size_t nr;
-	pid_t pids[SD_MAX_NODES];
-	struct sd_node nodes[SD_MAX_NODES];
+	struct local_node lnodes[SD_MAX_NODES];
 
-	nr = get_nodes(nodes, pids);
+	nr = get_nodes(lnodes);
 
 	for (i = 0; i < nr; i++) {
-		dprintf("send signal to %s\n", node_to_str(nodes + i));
-		kill(pids[i], SIGUSR1);
+		dprintf("send signal to %s\n", lnode_to_str(lnodes + i));
+		kill(lnodes[i].pid, SIGUSR1);
 	}
 }
 
@@ -171,15 +187,15 @@ static int is_shm_queue_valid(void)
 {
 	int i;
 	size_t nr;
-	pid_t pids[SD_MAX_NODES];
+	struct local_node lnodes[SD_MAX_NODES];
 
-	nr = get_nodes(NULL, pids);
+	nr = get_nodes(lnodes);
 
 	if (nr == 0)
 		return 1;
 
 	for (i = 0; i < nr; i++)
-		if (process_exists(pids[i]))
+		if (process_exists(lnodes[i].pid))
 			return 1;
 
 	return 0;
@@ -218,39 +234,49 @@ static void shm_queue_init(void)
 	shm_queue_unlock();
 }
 
-static void add_event(enum local_event_type type, struct sd_node *node,
+static struct local_node *find_lnode(struct local_node *key, size_t nr_lnodes,
+				     struct local_node *lnodes)
+{
+	int i;
+
+	for (i = 0; i < nr_lnodes; i++)
+		if (lnode_eq(key, lnodes + i))
+			return lnodes + i;
+
+	panic("internal error\n");
+}
+
+static void add_event(enum local_event_type type, struct local_node *lnode,
 		void *buf, size_t buf_len)
 {
 	int idx, i;
-	struct sd_node *n;
-	pid_t *p;
+	struct local_node *n;
 	struct local_event ev = {
 		.type = type,
-		.sender = *node,
+		.sender = *lnode,
 	};
 
 	ev.buf_len = buf_len;
 	if (buf)
 		memcpy(ev.buf, buf, buf_len);
 
-	ev.nr_nodes = get_nodes(ev.nodes, ev.pids);
+	ev.nr_lnodes = get_nodes(ev.lnodes);
 
 	switch (type) {
 	case EVENT_JOIN_REQUEST:
-		ev.nodes[ev.nr_nodes] = *node;
-		ev.pids[ev.nr_nodes] = getpid(); /* must be local node */
-		ev.nr_nodes++;
+		ev.lnodes[ev.nr_lnodes] = *lnode;
+		ev.nr_lnodes++;
 		break;
 	case EVENT_LEAVE:
-		n = lfind(node, ev.nodes, &ev.nr_nodes, sizeof(*n), node_id_cmp);
-		if (!n)
-			panic("internal error\n");
-		idx = n - ev.nodes;
-		p = ev.pids + idx;
-
-		ev.nr_nodes--;
-		memmove(n, n + 1, sizeof(*n) * (ev.nr_nodes - idx));
-		memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx));
+		n = find_lnode(lnode, ev.nr_lnodes, ev.lnodes);
+		idx = n - ev.lnodes;
+
+		ev.nr_lnodes--;
+		memmove(n, n + 1, sizeof(*n) * (ev.nr_lnodes - idx));
+		break;
+	case EVENT_GATEWAY:
+		n = find_lnode(lnode, ev.nr_lnodes, ev.lnodes);
+		n->gateway = true;
 		break;
 	case EVENT_NOTIFY:
 	case EVENT_BLOCK:
@@ -259,9 +285,9 @@ static void add_event(enum local_event_type type, struct sd_node *node,
 		abort();
 	}
 
-	dprintf("type = %d, sender = %s\n", ev.type, node_to_str(&ev.sender));
-	for (i = 0; i < ev.nr_nodes; i++)
-		dprintf("%d: %s\n", i, node_to_str(ev.nodes + i));
+	dprintf("type = %d, sender = %s\n", ev.type, lnode_to_str(&ev.sender));
+	for (i = 0; i < ev.nr_lnodes; i++)
+		dprintf("%d: %s\n", i, lnode_to_str(ev.lnodes + i));
 
 	shm_queue_push(&ev);
 
@@ -272,21 +298,20 @@ static void check_pids(void *arg)
 {
 	int i;
 	size_t nr;
-	struct sd_node nodes[SD_MAX_NODES];
-	pid_t pids[SD_MAX_NODES];
+	struct local_node lnodes[SD_MAX_NODES];
 	struct local_event *ev;
 
 	shm_queue_lock();
 
-	nr = get_nodes(nodes, pids);
+	nr = get_nodes(lnodes);
 
 	for (i = 0; i < nr; i++)
-		if (!process_exists(pids[i])) {
-			add_event(EVENT_LEAVE, nodes + i, NULL, 0);
+		if (!process_exists(lnodes[i].pid)) {
+			add_event(EVENT_LEAVE, lnodes + i, NULL, 0);
 
 			/* unblock blocking event if sender has gone */
 			ev = shm_queue_peek_block_event();
-			if (node_eq(nodes + i, &ev->sender)) {
+			if (lnode_eq(lnodes + i, &ev->sender)) {
 				ev->removed = true;
 				msync(ev, sizeof(*ev), MS_SYNC);
 			}
@@ -303,7 +328,9 @@ static void check_pids(void *arg)
 static int local_join(struct sd_node *myself,
 		      void *opaque, size_t opaque_len)
 {
-	this_node = *myself;
+	this_node.node = *myself;
+	this_node.pid = getpid();
+	this_node.gateway = false;
 
 	shm_queue_lock();
 
@@ -318,7 +345,7 @@ static int local_leave(void)
 {
 	shm_queue_lock();
 
-	add_event(EVENT_LEAVE, &this_node, NULL, 0);
+	add_event(EVENT_GATEWAY, &this_node, NULL, 0);
 
 	shm_queue_unlock();
 
@@ -369,15 +396,23 @@ static bool local_process_event(void)
 	struct local_event *ev;
 	enum cluster_join_result res;
 	int i;
+	struct sd_node nodes[SD_MAX_NODES];
+	size_t nr_nodes;
 
 	ev = shm_queue_peek();
 	if (!ev)
 		return false;
 
-	dprintf("type = %d, sender = %s\n", ev->type, node_to_str(&ev->sender));
+	dprintf("type = %d, sender = %s\n", ev->type,
+		lnode_to_str(&ev->sender));
 	dprintf("callbacked = %d, removed = %d\n", ev->callbacked, ev->removed);
-	for (i = 0; i < ev->nr_nodes; i++)
-		dprintf("%d: %s\n", i, node_to_str(ev->nodes + i));
+
+	nr_nodes = 0;
+	for (i = 0; i < ev->nr_lnodes; i++) {
+		dprintf("%d: %s\n", i, lnode_to_str(ev->lnodes + i));
+		if (!ev->lnodes[i].gateway)
+			nodes[nr_nodes++] = ev->lnodes[i].node;
+	}
 
 	if (ev->removed)
 		goto out;
@@ -385,19 +420,20 @@ static bool local_process_event(void)
 	if (ev->callbacked)
 		return false; /* wait for unblock event */
 
-	if (ev->type == EVENT_JOIN_RESPONSE && node_eq(&this_node, &ev->sender)) {
+	if (ev->type == EVENT_JOIN_RESPONSE &&
+	    lnode_eq(&this_node, &ev->sender)) {
 		dprintf("join Sheepdog\n");
 		joined = true;
 	}
 
 	if (!joined) {
 		if (ev->type == EVENT_JOIN_REQUEST &&
-		    node_eq(&this_node, &ev->sender)) {
-			struct sd_node nodes[SD_MAX_NODES];
+		    lnode_eq(&this_node, &ev->sender)) {
+			struct local_node lnodes[SD_MAX_NODES];
 
-			get_nodes(nodes, NULL);
+			get_nodes(lnodes);
 
-			if (!node_eq(&this_node, &nodes[0])) {
+			if (!lnode_eq(&this_node, &lnodes[0])) {
 				dprintf("wait for another node to accept this "
 					"node\n");
 				return false;
@@ -408,7 +444,7 @@ static bool local_process_event(void)
 
 	switch (ev->type) {
 	case EVENT_JOIN_REQUEST:
-		res = sd_check_join_cb(&ev->sender, ev->buf);
+		res = sd_check_join_cb(&ev->sender.node, ev->buf);
 		ev->join_result = res;
 		ev->type = EVENT_JOIN_RESPONSE;
 		msync(ev, sizeof(*ev), MS_SYNC);
@@ -426,24 +462,26 @@ static bool local_process_event(void)
 		if (ev->join_result == CJ_RES_MASTER_TRANSFER) {
 			/* FIXME: This code is tricky, but Sheepdog assumes that */
 			/* nr_nodes = 1 when join_result = MASTER_TRANSFER... */
-			ev->nr_nodes = 1;
-			ev->nodes[0] = this_node;
-			ev->pids[0] = getpid();
+			ev->nr_lnodes = 1;
+			ev->lnodes[0] = this_node;
+			nr_nodes = 1;
+			nodes[0] = this_node.node;
 			msync(ev, sizeof(*ev), MS_SYNC);
 		}
 
-		sd_join_handler(&ev->sender, ev->nodes, ev->nr_nodes,
-				    ev->join_result, ev->buf);
+		sd_join_handler(&ev->sender.node, nodes, nr_nodes,
+				ev->join_result, ev->buf);
 		break;
 	case EVENT_LEAVE:
-		sd_leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
+	case EVENT_GATEWAY:
+		sd_leave_handler(&ev->sender.node, nodes, nr_nodes);
 		break;
 	case EVENT_BLOCK:
-		ev->callbacked = sd_block_handler(&ev->sender);
+		ev->callbacked = sd_block_handler(&ev->sender.node);
 		msync(ev, sizeof(*ev), MS_SYNC);
 		return false;
 	case EVENT_NOTIFY:
-		sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
+		sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
 		break;
 	}
 out:
-- 
1.7.2.5




More information about the sheepdog mailing list