[Sheepdog] [PATCH v2 2/6] sheep: remove corosync nodeid

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Tue Oct 4 14:38:54 CEST 2011


This patch uses a generic sheepid instead of a corosync-specific node
id.  This patch is necessary to remove the dependency on corosync.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/group.c      |  175 ++++++++++++++++++++++++++++++++--------------------
 sheep/sheep_priv.h |    4 +-
 2 files changed, 109 insertions(+), 70 deletions(-)

diff --git a/sheep/group.c b/sheep/group.c
index 8c65d74..b5bd6fd 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -24,10 +24,58 @@
 #include "util.h"
 #include "logger.h"
 #include "work.h"
+#include "cluster.h"
+
+static corosync_cfg_handle_t cfg_handle;
+
+static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
+{
+	int ret, nr;
+	corosync_cfg_node_address_t caddr;
+	struct sockaddr_storage *ss = (struct sockaddr_storage *)caddr.address;
+	struct sockaddr_in *sin = (struct sockaddr_in *)caddr.address;
+	struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)caddr.address;
+	void *saddr;
+
+	ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &caddr);
+	if (ret != CS_OK) {
+		vprintf(SDOG_ERR "failed to get addr %d\n", ret);
+		return -1;
+	}
+
+	if (!nr) {
+		vprintf(SDOG_ERR "we got no address\n");
+		return -1;
+	}
+
+	if (ss->ss_family == AF_INET6) {
+		saddr = &sin6->sin6_addr;
+		memcpy(addr, saddr, 16);
+	} else if (ss->ss_family == AF_INET) {
+		saddr = &sin->sin_addr;
+		memset(addr, 0, 16);
+		memcpy(addr + 12, saddr, 4);
+	} else {
+		vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family);
+		return -1;
+	}
+
+	return 0;
+}
+
+static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
+				struct sheepid *sheeps, size_t nr)
+{
+	int i;
+
+	for (i = 0; i < nr; i++) {
+		nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr);
+		sheeps[i].pid = cpgs[i].pid;
+	}
+}
 
 struct node {
-	uint32_t nodeid;
-	uint32_t pid;
+	struct sheepid sheepid;
 	struct sheepdog_node_list_entry ent;
 	struct list_head list;
 };
@@ -44,8 +92,7 @@ struct message_header {
 	uint8_t op;
 	uint8_t state;
 	uint32_t msg_length;
-	uint32_t nodeid;
-	uint32_t pid;
+	struct sheepid sheepid;
 	struct sheepdog_node_list_entry from;
 };
 
@@ -60,14 +107,12 @@ struct join_message {
 	uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */
 	uint8_t pad[3];
 	struct {
-		uint32_t nodeid;
-		uint32_t pid;
+		struct sheepid sheepid;
 		struct sheepdog_node_list_entry ent;
 	} nodes[SD_MAX_NODES];
 	uint32_t nr_leave_nodes;
 	struct {
-		uint32_t nodeid;
-		uint32_t pid;
+		struct sheepid sheepid;
 		struct sheepdog_node_list_entry ent;
 	} leave_nodes[SD_MAX_NODES];
 };
@@ -112,11 +157,11 @@ struct work_confchg {
 #define print_node_list(node_list)				\
 ({								\
 	struct node *__node;					\
-	char __name[128];						\
+	char __name[128];					\
 	list_for_each_entry(__node, node_list, list) {		\
-		dprintf("%c nodeid: %x, pid: %d, ip: %s\n",	\
+		dprintf("%c pid: %ld, ip: %s\n",		\
 			is_myself(__node->ent.addr, __node->ent.port) ? 'l' : ' ',	\
-			__node->nodeid, __node->pid,		\
+			__node->sheepid.pid,			\
 			addr_to_str(__name, sizeof(__name),	\
 			__node->ent.addr, __node->ent.port));	\
 	}							\
@@ -417,12 +462,12 @@ out:
 	exit(1);
 }
 
-static struct node *find_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid)
+static struct node *find_node(struct list_head *node_list, struct sheepid *id)
 {
 	struct node *node;
 
 	list_for_each_entry(node, node_list, list) {
-		if (node->nodeid == nodeid && node->pid == pid)
+		if (sheepid_cmp(&node->sheepid, id) == 0)
 			return node;
 	}
 
@@ -511,8 +556,7 @@ static int add_node_to_leave_list(struct message_header *msg)
 			goto ret;
 		}
 
-		n->nodeid = msg->nodeid;
-		n->pid = msg->pid;
+		n->sheepid = msg->sheepid;
 		n->ent = msg->from;
 
 		list_add_tail(&n->list, &sys->leave_list);
@@ -533,8 +577,7 @@ static int add_node_to_leave_list(struct message_header *msg)
 				continue;
 			}
 
-			n->nodeid = jm->leave_nodes[i].nodeid;
-			n->pid = jm->leave_nodes[i].pid;
+			n->sheepid = jm->leave_nodes[i].sheepid;
 			n->ent = jm->leave_nodes[i].ent;
 
 			list_add_tail(&n->list, &tmp_list);
@@ -698,8 +741,7 @@ static void join(struct join_message *msg)
 	msg->ctime = get_cluster_ctime();
 	msg->nr_nodes = 0;
 	list_for_each_entry(node, &sys->sd_node_list, list) {
-		msg->nodes[msg->nr_nodes].nodeid = node->nodeid;
-		msg->nodes[msg->nr_nodes].pid = node->pid;
+		msg->nodes[msg->nr_nodes].sheepid = node->sheepid;
 		msg->nodes[msg->nr_nodes].ent = node->ent;
 		msg->nr_nodes++;
 	}
@@ -768,12 +810,12 @@ static void get_vdi_bitmap_from_sd_list(void)
 		get_vdi_bitmap_from(&nodes[i]);
 }
 
-static int move_node_to_sd_list(uint32_t nodeid, uint32_t pid,
+static int move_node_to_sd_list(struct sheepid *id,
 				struct sheepdog_node_list_entry ent)
 {
 	struct node *node;
 
-	node = find_node(&sys->cpg_node_list, nodeid, pid);
+	node = find_node(&sys->cpg_node_list, id);
 	if (!node)
 		return 1;
 
@@ -830,16 +872,15 @@ static void update_cluster_info(struct join_message *msg)
 
 	sys->epoch = msg->epoch;
 	for (i = 0; i < nr_nodes; i++) {
-		ret = move_node_to_sd_list(msg->nodes[i].nodeid,
-					   msg->nodes[i].pid,
+		ret = move_node_to_sd_list(&msg->nodes[i].sheepid,
 					   msg->nodes[i].ent);
 		/*
 		 * the node belonged to sheepdog when the master build
 		 * the JOIN response however it has gone.
 		 */
 		if (ret)
-			vprintf(SDOG_INFO "nodeid: %x, pid: %d has gone\n",
-				msg->nodes[i].nodeid, msg->nodes[i].pid);
+			vprintf(SDOG_INFO "%s has gone\n",
+				sheepid_to_str(&msg->nodes[i].sheepid));
 	}
 
 	if (msg->cluster_status != SD_STATUS_OK)
@@ -851,14 +892,14 @@ static void update_cluster_info(struct join_message *msg)
 		update_epoch_log(sys->epoch);
 
 join_finished:
-	ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from);
+	ret = move_node_to_sd_list(&msg->header.sheepid, msg->header.from);
 	/*
 	 * this should not happen since __sd_deliver() checks if the
 	 * host from msg on cpg_node_list.
 	 */
 	if (ret)
-		vprintf(SDOG_ERR "nodeid: %x, pid: %d has gone\n",
-			msg->header.nodeid, msg->header.pid);
+		vprintf(SDOG_ERR "%s has gone\n",
+			sheepid_to_str(&msg->header.sheepid));
 
 	if (msg->cluster_status == SD_STATUS_OK) {
 		if (msg->inc_epoch) {
@@ -1025,27 +1066,24 @@ static void __sd_deliver(struct cpg_event *cevent)
 	char name[128];
 	struct node *node;
 
-	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %d\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %ld\n",
 		m->op, m->state, m->msg_length,
 		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
-		m->pid);
+		m->sheepid.pid);
 
 	/*
 	 * we don't want to perform any deliver events except mastership_tx event
 	 * until we join; we wait for our JOIN message.
 	 */
 	if (!sys->join_finished && !master_tx_message(m)) {
-		if (m->pid != sys->this_pid || m->nodeid != sys->this_nodeid) {
+		if (sheepid_cmp(&m->sheepid, &sys->this_sheepid) != 0) {
 			cevent->skip = 1;
 			return;
 		}
 	}
 
 	if (join_message(m)) {
-		uint32_t nodeid = m->nodeid;
-		uint32_t pid = m->pid;
-
-		node = find_node(&sys->cpg_node_list, nodeid, pid);
+		node = find_node(&sys->cpg_node_list, &m->sheepid);
 		if (!node) {
 			dprintf("the node was left before join operation is finished\n");
 			return;
@@ -1095,8 +1133,7 @@ static int tx_mastership(void)
 	msg.header.state = DM_FIN;
 	msg.header.msg_length = sizeof(msg);
 	msg.header.from = sys->this_node;
-	msg.header.nodeid = sys->this_nodeid;
-	msg.header.pid = sys->this_pid;
+	msg.header.sheepid = sys->this_sheepid;
 
 	return send_message(sys->handle, (struct message_header *)&msg);
 }
@@ -1116,8 +1153,7 @@ static void send_join_response(struct work_deliver *w)
 	if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) {
 		jm->nr_leave_nodes = 0;
 		list_for_each_entry(node, &sys->leave_list, list) {
-			jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid;
-			jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid;
+			jm->leave_nodes[jm->nr_leave_nodes].sheepid = node->sheepid;
 			jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
 			jm->nr_leave_nodes++;
 		}
@@ -1151,7 +1187,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
 			update_cluster_info((struct join_message *)m);
 			break;
 		case SD_MSG_LEAVE:
-			node = find_node(&sys->sd_node_list, m->nodeid, m->pid);
+			node = find_node(&sys->sd_node_list, &m->sheepid);
 			if (node) {
 				sys->nr_vnodes = 0;
 
@@ -1173,7 +1209,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
 				 */
 				if (!sys->join_finished) {
 					sys->join_finished = 1;
-					move_node_to_sd_list(sys->this_nodeid, sys->this_pid, sys->this_node);
+					move_node_to_sd_list(&sys->this_sheepid, sys->this_node);
 					sys->epoch = get_latest_epoch();
 				}
 
@@ -1265,16 +1301,19 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
 }
 
 static void for_each_node_list(struct cpg_address list[], int count,
-			       void (*func)(struct cpg_address *addr,
+			       void (*func)(struct sheepid *id,
 					    struct work_confchg *w),
 			       struct work_confchg *w)
 {
 	int i;
+	struct sheepid sheepid[SD_MAX_NODES];
+
+	cpg_addr_to_sheepid(list, sheepid, count);
 	for (i = 0; i < count; i++)
-		func(&list[i], w);
+		func(sheepid + i, w);
 }
 
-static void add_node(struct cpg_address *addr, struct work_confchg *w)
+static void add_node(struct sheepid *id, struct work_confchg *w)
 {
 	struct node *node;
 
@@ -1282,17 +1321,16 @@ static void add_node(struct cpg_address *addr, struct work_confchg *w)
 	if (!node)
 		panic("failed to alloc memory for a new node\n");
 
-	node->nodeid = addr->nodeid;
-	node->pid = addr->pid;
+	node->sheepid = *id;
 
 	list_add_tail(&node->list, &sys->cpg_node_list);
 }
 
-static void del_node(struct cpg_address *addr, struct work_confchg *w)
+static void del_node(struct sheepid *id, struct work_confchg *w)
 {
 	struct node *node;
 
-	node = find_node(&sys->sd_node_list, addr->nodeid, addr->pid);
+	node = find_node(&sys->sd_node_list, id);
 	if (node) {
 		int nr;
 		struct sheepdog_node_list_entry e[SD_MAX_NODES];
@@ -1314,7 +1352,7 @@ static void del_node(struct cpg_address *addr, struct work_confchg *w)
 			update_epoch_store(sys->epoch);
 		}
 	} else {
-		node = find_node(&sys->cpg_node_list, addr->nodeid, addr->pid);
+		node = find_node(&sys->cpg_node_list, id);
 		if (node) {
 			list_del(&node->list);
 			free(node);
@@ -1324,8 +1362,10 @@ static void del_node(struct cpg_address *addr, struct work_confchg *w)
 
 static int is_my_cpg_addr(struct cpg_address *addr)
 {
-	return (sys->this_nodeid == addr->nodeid) &&
-		(sys->this_pid == addr->pid);
+	struct sheepid id;
+
+	cpg_addr_to_sheepid(addr, &id, 1);
+	return (sheepid_cmp(&id, &sys->this_sheepid) == 0);
 }
 
 /*
@@ -1337,10 +1377,13 @@ static int check_majority(struct cpg_address *left_list,
 	int nr_nodes = 0, nr_majority, nr_reachable = 0, i, fd;
 	struct node *node;
 	char name[INET6_ADDRSTRLEN];
+	struct sheepid left_sheepid[SD_MAX_NODES];
 
 	if (left_list_entries == 0)
 		return 1; /* we don't need this check in this case */
 
+	cpg_addr_to_sheepid(left_list, left_sheepid, left_list_entries);
+
 	nr_nodes = get_nodes_nr_from(&sys->sd_node_list);
 	nr_majority = nr_nodes / 2 + 1;
 
@@ -1351,8 +1394,7 @@ static int check_majority(struct cpg_address *left_list,
 
 	list_for_each_entry(node, &sys->sd_node_list, list) {
 		for (i = 0; i < left_list_entries; i++) {
-			if (left_list[i].nodeid == node->nodeid &&
-			    left_list[i].pid == node->pid)
+			if (sheepid_cmp(left_sheepid + i, &node->sheepid) != 0)
 				break;
 		}
 		if (i != left_list_entries)
@@ -1385,14 +1427,14 @@ static void __sd_confchg(struct cpg_event *cevent)
 	}
 }
 
-static void send_join_request(struct cpg_address *addr, struct work_confchg *w)
+static void send_join_request(struct sheepid *id, struct work_confchg *w)
 {
 	struct join_message msg;
 	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
 	int nr_entries, i, ret;
 
 	/* if I've just joined in cpg, I'll join in sheepdog. */
-	if (!is_my_cpg_addr(addr))
+	if (sheepid_cmp(id, &sys->this_sheepid) != 0)
 		return;
 
 	memset(&msg, 0, sizeof(msg));
@@ -1401,8 +1443,7 @@ static void send_join_request(struct cpg_address *addr, struct work_confchg *w)
 	msg.header.state = DM_INIT;
 	msg.header.msg_length = sizeof(msg);
 	msg.header.from = sys->this_node;
-	msg.header.nodeid = sys->this_nodeid;
-	msg.header.pid = sys->this_pid;
+	msg.header.sheepid = sys->this_sheepid;
 
 	get_global_nr_copies(&msg.nr_sobjs);
 
@@ -1416,7 +1457,7 @@ static void send_join_request(struct cpg_address *addr, struct work_confchg *w)
 
 	send_message(sys->handle, (struct message_header *)&msg);
 
-	vprintf(SDOG_INFO "%x %u\n", sys->this_nodeid, sys->this_pid);
+	vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
 }
 
 static void __sd_confchg_done(struct cpg_event *cevent)
@@ -1454,13 +1495,12 @@ static void __sd_confchg_done(struct cpg_event *cevent)
 		 * becomes the master without sending JOIN.
 		 */
 
-		vprintf(SDOG_DEBUG "%d %x\n", sys->this_pid, sys->this_nodeid);
+		vprintf(SDOG_DEBUG "%s\n", sheepid_to_str(&sys->this_sheepid));
 
 		memset(&msg, 0, sizeof(msg));
 
 		msg.header.from = sys->this_node;
-		msg.header.nodeid = sys->this_nodeid;
-		msg.header.pid = sys->this_pid;
+		msg.header.sheepid = sys->this_sheepid;
 
 		nr_entries = ARRAY_SIZE(entries);
 		ret = read_epoch(&epoch, &ctime, entries, &nr_entries);
@@ -1930,7 +1970,6 @@ oom:
 static int set_addr(unsigned int nodeid, int port)
 {
 	int ret, nr;
-	corosync_cfg_handle_t handle;
 	corosync_cfg_node_address_t addr;
 	struct sockaddr_storage *ss = (struct sockaddr_storage *)addr.address;
 	struct sockaddr_in *sin = (struct sockaddr_in *)addr.address;
@@ -1940,13 +1979,13 @@ static int set_addr(unsigned int nodeid, int port)
 
 	memset(sys->this_node.addr, 0, sizeof(sys->this_node.addr));
 
-	ret = corosync_cfg_initialize(&handle, NULL);
+	ret = corosync_cfg_initialize(&cfg_handle, NULL);
 	if (ret != CPG_OK) {
 		vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret);
 		return -1;
 	}
 
-	ret = corosync_cfg_get_node_addrs(handle, nodeid, 1, &nr, &addr);
+	ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &addr);
 	if (ret != CPG_OK) {
 		vprintf(SDOG_ERR "failed to get addr %d\n", ret);
 		return -1;
@@ -2013,12 +2052,13 @@ join_retry:
 	}
 
 	sys->handle = cpg_handle;
-	sys->this_nodeid = nodeid;
-	sys->this_pid = getpid();
+	sys->this_sheepid.pid = getpid();
 
 	ret = set_addr(nodeid, port);
 	if (ret)
 		return 1;
+	memcpy(sys->this_sheepid.addr, sys->this_node.addr,
+	       sizeof(sys->this_node.addr));
 	sys->this_node.port = port;
 	sys->this_node.nr_vnodes = SD_DEFAULT_VNODES;
 	if (zone == -1)
@@ -2066,8 +2106,7 @@ int leave_cluster(void)
 	msg.header.state = DM_FIN;
 	msg.header.msg_length = sizeof(msg);
 	msg.header.from = sys->this_node;
-	msg.header.nodeid = sys->this_nodeid;
-	msg.header.pid = sys->this_pid;
+	msg.header.sheepid = sys->this_sheepid;
 	msg.epoch = get_latest_epoch();
 
 	dprintf("%d\n", msg.epoch);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 6409530..8b20213 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -20,6 +20,7 @@
 #include "work.h"
 #include "net.h"
 #include "sheep.h"
+#include "cluster.h"
 
 #define SD_OP_REMOVE_OBJ     0x91
 
@@ -105,8 +106,7 @@ struct cluster_info {
 	cpg_handle_t handle;
 	/* set after finishing the JOIN procedure */
 	int join_finished;
-	uint32_t this_nodeid;
-	uint32_t this_pid;
+	struct sheepid this_sheepid;
 	struct sheepdog_node_list_entry this_node;
 
 	uint32_t epoch;
-- 
1.7.2.5




More information about the sheepdog mailing list