[Sheepdog] [RFC PATCH 1/2] introduce cluster driver

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Thu Sep 29 22:28:57 CEST 2011


This patch abstracts out a cluster management of Sheepdog, and
introduces a cluster driver interface.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/cluster.h    |  157 ++++++++++++++++
 sheep/group.c      |  527 ++++++++++++++++++++--------------------------------
 sheep/sdnet.c      |    2 +
 sheep/sheep.c      |   24 +++-
 sheep/sheep_priv.h |   16 +-
 5 files changed, 397 insertions(+), 329 deletions(-)
 create mode 100644 sheep/cluster.h

diff --git a/sheep/cluster.h b/sheep/cluster.h
new file mode 100644
index 0000000..e5fecc5
--- /dev/null
+++ b/sheep/cluster.h
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef __CLUSTER_H__
+#define __CLUSTER_H__
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <memory.h>
+
+#include "sheepdog_proto.h"
+#include "sheep.h"
+#include "logger.h"
+
+struct sheepid {
+	uint8_t addr[16];
+	uint64_t pid;
+};
+
+struct cdrv_handlers {
+	void (*join_handler)(struct sheepid *joined, struct sheepid *members,
+			     size_t nr_members);
+	void (*leave_handler)(struct sheepid *left, struct sheepid *members,
+			      size_t nr_members);
+	void (*notify_handler)(struct sheepid *sender, void *msg, size_t msg_len);
+};
+
+struct cluster_driver {
+	const char *name;
+
+	/*
+	 * Initialize the cluster driver
+	 *
+	 * On success, this function returns the file descriptor that
+	 * may be used with the poll(2) to monitor cluster events.  On
+	 * error, returns -1.
+	 */
+	int (*init)(struct cdrv_handlers *handlers, struct sheepid *myid);
+
+	/*
+	 * Join the cluster
+	 *
+	 * This function is used to join the cluster, and notifies a
+	 * join event to all the nodes.
+	 *
+	 * Returns zero on success, -1 on error
+	 */
+	int (*join)(void);
+
+	/*
+	 * Leave the cluster
+	 *
+	 * This function is used to leave the cluster, and notifies a
+	 * leave event to all the nodes.
+	 *
+	 * Returns zero on success, -1 on error
+	 */
+	int (*leave)(void);
+
+	/*
+	 * Notify a message to all nodes in the cluster
+	 *
+	 * This function sends 'msg' to all the nodes.  The notified
+	 * messages can be read through notify_handler() in
+	 * cdrv_handlers.
+	 *
+	 * Returns zero on success, -1 on error
+	 */
+	int (*notify)(void *msg, size_t msg_len);
+
+	/*
+	 * Dispatch handlers
+	 *
+	 * This function dispatches handlers according to the
+	 * delivered events (join/leave/notify) in the cluster.
+	 *
+	 * Note that the events sequence is totally ordered; all nodes
+	 * call the handlers in the same sequence.
+	 *
+	 * Returns zero on success, -1 on error
+	 */
+	int (*dispatch)(void);
+
+	struct list_head list;
+};
+
+extern struct list_head cluster_drivers;
+
+#define cdrv_register(driver)					\
+static void __attribute__((constructor)) regist_ ## driver(void) {      \
+        list_add(&driver.list, &cluster_drivers);			\
+}
+
+#define FOR_EACH_CLUSTER_DRIVER(driver) \
+        list_for_each_entry(driver, &cluster_drivers, list)
+
+
+static inline int sheepid_find(struct sheepid *sheeps, size_t nr_sheeps,
+			       struct sheepid *key)
+{
+	int i;
+
+	for (i = 0; i < nr_sheeps; i++) {
+		if (memcmp(sheeps + i, key, sizeof(*key)) == 0)
+			return i;
+	}
+	return -1;
+}
+
+static inline void sheepid_add(struct sheepid *sheeps1, size_t nr_sheeps1,
+			       struct sheepid *sheeps2, size_t nr_sheeps2)
+{
+	memcpy(sheeps1 + nr_sheeps1, sheeps2, sizeof(*sheeps2) * nr_sheeps2);
+}
+
+static inline void sheepid_del(struct sheepid *sheeps1, size_t nr_sheeps1,
+			       struct sheepid *sheeps2, size_t nr_sheeps2)
+{
+	int i, idx;
+
+	for (i = 0; i < nr_sheeps2; i++) {
+		idx = sheepid_find(sheeps1, nr_sheeps1, sheeps2 + i);
+		if (idx < 0)
+			panic("internal error: cannot find sheepid\n");
+
+		nr_sheeps1--;
+		memmove(sheeps1 + idx, sheeps1 + idx + 1,
+			sizeof(*sheeps1) * nr_sheeps1 - idx);
+	}
+}
+
+static inline char *sheepid_to_str(struct sheepid *id)
+{
+	static char str[256];
+	char name[256];
+
+	snprintf(str, sizeof(str), "ip: %s, pid: %" PRIu64,
+		 addr_to_str(name, sizeof(name), id->addr, 0), id->pid);
+
+	return str;
+}
+
+static inline int sheepid_cmp(struct sheepid *id1, struct sheepid *id2)
+{
+	return memcmp(id1, id2, sizeof(*id1));
+}
+
+#endif
diff --git a/sheep/group.c b/sheep/group.c
index 8c65d74..95fc799 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -15,8 +15,6 @@
 #include <arpa/inet.h>
 #include <sys/time.h>
 #include <sys/epoll.h>
-#include <corosync/cpg.h>
-#include <corosync/cfg.h>
 
 #include "sheepdog_proto.h"
 #include "sheep_priv.h"
@@ -24,10 +22,10 @@
 #include "util.h"
 #include "logger.h"
 #include "work.h"
+#include "cluster.h"
 
 struct node {
-	uint32_t nodeid;
-	uint32_t pid;
+	struct sheepid sheepid;
 	struct sheepdog_node_list_entry ent;
 	struct list_head list;
 };
@@ -44,8 +42,7 @@ struct message_header {
 	uint8_t op;
 	uint8_t state;
 	uint32_t msg_length;
-	uint32_t nodeid;
-	uint32_t pid;
+	struct sheepid sheepid;
 	struct sheepdog_node_list_entry from;
 };
 
@@ -60,14 +57,12 @@ struct join_message {
 	uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */
 	uint8_t pad[3];
 	struct {
-		uint32_t nodeid;
-		uint32_t pid;
+		struct sheepid sheepid;
 		struct sheepdog_node_list_entry ent;
 	} nodes[SD_MAX_NODES];
 	uint32_t nr_leave_nodes;
 	struct {
-		uint32_t nodeid;
-		uint32_t pid;
+		struct sheepid sheepid;
 		struct sheepdog_node_list_entry ent;
 	} leave_nodes[SD_MAX_NODES];
 };
@@ -89,34 +84,36 @@ struct mastership_tx_message {
 	uint32_t epoch;
 };
 
-struct work_deliver {
+struct work_notify {
 	struct cpg_event cev;
 
 	struct message_header *msg;
 };
 
-struct work_confchg {
+struct work_join {
 	struct cpg_event cev;
 
-	struct cpg_address *member_list;
+	struct sheepid *member_list;
 	size_t member_list_entries;
-	struct cpg_address *left_list;
-	size_t left_list_entries;
-	struct cpg_address *joined_list;
-	size_t joined_list_entries;
+	struct sheepid joined;
+};
+
+struct work_leave {
+	struct cpg_event cev;
 
-	int first_cpg_node;
-	int sd_node_left;
+	struct sheepid *member_list;
+	size_t member_list_entries;
+	struct sheepid left;
 };
 
 #define print_node_list(node_list)				\
 ({								\
 	struct node *__node;					\
-	char __name[128];						\
+	char __name[128];					\
 	list_for_each_entry(__node, node_list, list) {		\
-		dprintf("%c nodeid: %x, pid: %d, ip: %s\n",	\
+		dprintf("%c pid: %ld, ip: %s\n",		\
 			is_myself(__node->ent.addr, __node->ent.port) ? 'l' : ' ',	\
-			__node->nodeid, __node->pid,		\
+			__node->sheepid.pid,			\
 			addr_to_str(__name, sizeof(__name),	\
 			__node->ent.addr, __node->ent.port));	\
 	}							\
@@ -172,30 +169,6 @@ static inline int master_tx_message(struct message_header *m)
 	return m->op == SD_MSG_MASTER_TRANSFER;
 }
 
-static int send_message(cpg_handle_t handle, struct message_header *msg)
-{
-	struct iovec iov;
-	int ret;
-
-	iov.iov_base = msg;
-	iov.iov_len = msg->msg_length;
-retry:
-	ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1);
-	switch (ret) {
-	case CPG_OK:
-		break;
-	case CPG_ERR_TRY_AGAIN:
-		dprintf("failed to send message. try again\n");
-		sleep(1);
-		goto retry;
-	default:
-		eprintf("failed to send message, %d\n", ret);
-		return -1;
-	}
-	return 0;
-}
-
-
 static int get_node_idx(struct sheepdog_node_list_entry *ent,
 			struct sheepdog_node_list_entry *entries, int nr_nodes)
 {
@@ -393,7 +366,7 @@ forward:
 
 	list_add(&req->pending_list, &sys->pending_list);
 
-	send_message(sys->handle, (struct message_header *)msg);
+	sys->cdrv->notify(msg, msg->header.msg_length);
 
 	free(msg);
 }
@@ -406,9 +379,8 @@ static void group_handler(int listen_fd, int events, void *data)
 		goto out;
 	}
 
-	ret = cpg_dispatch(sys->handle, CPG_DISPATCH_ALL);
-
-	if (ret == CPG_OK)
+	ret = sys->cdrv->dispatch();
+	if (ret == 0)
 		return;
 	else
 		eprintf("oops...some error occured inside corosync\n");
@@ -417,12 +389,12 @@ out:
 	exit(1);
 }
 
-static struct node *find_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid)
+static struct node *find_node(struct list_head *node_list, struct sheepid *id)
 {
 	struct node *node;
 
 	list_for_each_entry(node, node_list, list) {
-		if (node->nodeid == nodeid && node->pid == pid)
+		if (sheepid_cmp(&node->sheepid, id) == 0)
 			return node;
 	}
 
@@ -511,8 +483,7 @@ static int add_node_to_leave_list(struct message_header *msg)
 			goto ret;
 		}
 
-		n->nodeid = msg->nodeid;
-		n->pid = msg->pid;
+		n->sheepid = msg->sheepid;
 		n->ent = msg->from;
 
 		list_add_tail(&n->list, &sys->leave_list);
@@ -533,8 +504,7 @@ static int add_node_to_leave_list(struct message_header *msg)
 				continue;
 			}
 
-			n->nodeid = jm->leave_nodes[i].nodeid;
-			n->pid = jm->leave_nodes[i].pid;
+			n->sheepid = jm->leave_nodes[i].sheepid;
 			n->ent = jm->leave_nodes[i].ent;
 
 			list_add_tail(&n->list, &tmp_list);
@@ -698,8 +668,7 @@ static void join(struct join_message *msg)
 	msg->ctime = get_cluster_ctime();
 	msg->nr_nodes = 0;
 	list_for_each_entry(node, &sys->sd_node_list, list) {
-		msg->nodes[msg->nr_nodes].nodeid = node->nodeid;
-		msg->nodes[msg->nr_nodes].pid = node->pid;
+		msg->nodes[msg->nr_nodes].sheepid = node->sheepid;
 		msg->nodes[msg->nr_nodes].ent = node->ent;
 		msg->nr_nodes++;
 	}
@@ -768,12 +737,12 @@ static void get_vdi_bitmap_from_sd_list(void)
 		get_vdi_bitmap_from(&nodes[i]);
 }
 
-static int move_node_to_sd_list(uint32_t nodeid, uint32_t pid,
+static int move_node_to_sd_list(struct sheepid *id,
 				struct sheepdog_node_list_entry ent)
 {
 	struct node *node;
 
-	node = find_node(&sys->cpg_node_list, nodeid, pid);
+	node = find_node(&sys->cpg_node_list, id);
 	if (!node)
 		return 1;
 
@@ -830,16 +799,15 @@ static void update_cluster_info(struct join_message *msg)
 
 	sys->epoch = msg->epoch;
 	for (i = 0; i < nr_nodes; i++) {
-		ret = move_node_to_sd_list(msg->nodes[i].nodeid,
-					   msg->nodes[i].pid,
+		ret = move_node_to_sd_list(&msg->nodes[i].sheepid,
 					   msg->nodes[i].ent);
 		/*
 		 * the node belonged to sheepdog when the master build
 		 * the JOIN response however it has gone.
 		 */
 		if (ret)
-			vprintf(SDOG_INFO "nodeid: %x, pid: %d has gone\n",
-				msg->nodes[i].nodeid, msg->nodes[i].pid);
+			vprintf(SDOG_INFO "%s has gone\n",
+				sheepid_to_str(&msg->nodes[i].sheepid));
 	}
 
 	if (msg->cluster_status != SD_STATUS_OK)
@@ -851,14 +819,14 @@ static void update_cluster_info(struct join_message *msg)
 		update_epoch_log(sys->epoch);
 
 join_finished:
-	ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from);
+	ret = move_node_to_sd_list(&msg->header.sheepid, msg->header.from);
 	/*
 	 * this should not happen since __sd_deliver() checks if the
 	 * host from msg on cpg_node_list.
 	 */
 	if (ret)
-		vprintf(SDOG_ERR "nodeid: %x, pid: %d has gone\n",
-			msg->header.nodeid, msg->header.pid);
+		vprintf(SDOG_ERR "%s has gone\n",
+			sheepid_to_str(&msg->header.sheepid));
 
 	if (msg->cluster_status == SD_STATUS_OK) {
 		if (msg->inc_epoch) {
@@ -1018,34 +986,31 @@ out:
 	req->done(req);
 }
 
-static void __sd_deliver(struct cpg_event *cevent)
+static void __sd_notify(struct cpg_event *cevent)
 {
-	struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+	struct work_notify *w = container_of(cevent, struct work_notify, cev);
 	struct message_header *m = w->msg;
 	char name[128];
 	struct node *node;
 
-	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %d\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %ld\n",
 		m->op, m->state, m->msg_length,
 		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
-		m->pid);
+		m->sheepid.pid);
 
 	/*
 	 * we don't want to perform any deliver events except mastership_tx event
 	 * until we join; we wait for our JOIN message.
 	 */
 	if (!sys->join_finished && !master_tx_message(m)) {
-		if (m->pid != sys->this_pid || m->nodeid != sys->this_nodeid) {
+		if (sheepid_cmp(&m->sheepid, &sys->this_sheepid) != 0) {
 			cevent->skip = 1;
 			return;
 		}
 	}
 
 	if (join_message(m)) {
-		uint32_t nodeid = m->nodeid;
-		uint32_t pid = m->pid;
-
-		node = find_node(&sys->cpg_node_list, nodeid, pid);
+		node = find_node(&sys->cpg_node_list, &m->sheepid);
 		if (!node) {
 			dprintf("the node was left before join operation is finished\n");
 			return;
@@ -1095,13 +1060,12 @@ static int tx_mastership(void)
 	msg.header.state = DM_FIN;
 	msg.header.msg_length = sizeof(msg);
 	msg.header.from = sys->this_node;
-	msg.header.nodeid = sys->this_nodeid;
-	msg.header.pid = sys->this_pid;
+	msg.header.sheepid = sys->this_sheepid;
 
-	return send_message(sys->handle, (struct message_header *)&msg);
+	return sys->cdrv->notify(&msg, msg.header.msg_length);
 }
 
-static void send_join_response(struct work_deliver *w)
+static void send_join_response(struct work_notify *w)
 {
 	struct message_header *m;
 	struct join_message *jm;
@@ -1116,8 +1080,7 @@ static void send_join_response(struct work_deliver *w)
 	if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) {
 		jm->nr_leave_nodes = 0;
 		list_for_each_entry(node, &sys->leave_list, list) {
-			jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid;
-			jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid;
+			jm->leave_nodes[jm->nr_leave_nodes].sheepid = node->sheepid;
 			jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
 			jm->nr_leave_nodes++;
 		}
@@ -1131,12 +1094,12 @@ static void send_join_response(struct work_deliver *w)
 		exit(1);
 	}
 	jm->epoch = sys->epoch;
-	send_message(sys->handle, m);
+	sys->cdrv->notify(m, m->msg_length);
 }
 
-static void __sd_deliver_done(struct cpg_event *cevent)
+static void __sd_notify_done(struct cpg_event *cevent)
 {
-	struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+	struct work_notify *w = container_of(cevent, struct work_notify, cev);
 	struct message_header *m;
 	char name[128];
 	int do_recovery;
@@ -1151,7 +1114,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
 			update_cluster_info((struct join_message *)m);
 			break;
 		case SD_MSG_LEAVE:
-			node = find_node(&sys->sd_node_list, m->nodeid, m->pid);
+			node = find_node(&sys->sd_node_list, &m->sheepid);
 			if (node) {
 				sys->nr_vnodes = 0;
 
@@ -1173,7 +1136,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
 				 */
 				if (!sys->join_finished) {
 					sys->join_finished = 1;
-					move_node_to_sd_list(sys->this_nodeid, sys->this_pid, sys->this_node);
+					move_node_to_sd_list(&sys->this_sheepid, sys->this_node);
 					sys->epoch = get_latest_epoch();
 				}
 
@@ -1210,7 +1173,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
 			break;
 		case SD_MSG_VDI_OP:
 			m->state = DM_FIN;
-			send_message(sys->handle, m);
+			sys->cdrv->notify(m, m->msg_length);
 			break;
 		default:
 			eprintf("unknown message %d\n", m->op);
@@ -1226,25 +1189,24 @@ static void __sd_deliver_done(struct cpg_event *cevent)
 	}
 }
 
-static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
-		       uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
 {
 	struct cpg_event *cevent;
-	struct work_deliver *w;
+	struct work_notify *w;
 	struct message_header *m = msg;
 	char name[128];
 
-	dprintf("op: %d, state: %u, size: %d, from: %s, nodeid: %x, pid: %u\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n",
 		m->op, m->state, m->msg_length,
 		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
-		nodeid, pid);
+		sender->pid);
 
 	w = zalloc(sizeof(*w));
 	if (!w)
 		return;
 
 	cevent = &w->cev;
-	cevent->ctype = CPG_EVENT_DELIVER;
+	cevent->ctype = CPG_EVENT_NOTIFY;
 
 	vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent);
 
@@ -1264,17 +1226,7 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
 	start_cpg_event_work();
 }
 
-static void for_each_node_list(struct cpg_address list[], int count,
-			       void (*func)(struct cpg_address *addr,
-					    struct work_confchg *w),
-			       struct work_confchg *w)
-{
-	int i;
-	for (i = 0; i < count; i++)
-		func(&list[i], w);
-}
-
-static void add_node(struct cpg_address *addr, struct work_confchg *w)
+static void add_node(struct sheepid *id)
 {
 	struct node *node;
 
@@ -1282,22 +1234,20 @@ static void add_node(struct cpg_address *addr, struct work_confchg *w)
 	if (!node)
 		panic("failed to alloc memory for a new node\n");
 
-	node->nodeid = addr->nodeid;
-	node->pid = addr->pid;
+	node->sheepid = *id;
 
 	list_add_tail(&node->list, &sys->cpg_node_list);
 }
 
-static void del_node(struct cpg_address *addr, struct work_confchg *w)
+static int del_node(struct sheepid *id)
 {
 	struct node *node;
 
-	node = find_node(&sys->sd_node_list, addr->nodeid, addr->pid);
+	node = find_node(&sys->sd_node_list, id);
 	if (node) {
 		int nr;
 		struct sheepdog_node_list_entry e[SD_MAX_NODES];
 
-		w->sd_node_left++;
 		sys->nr_vnodes = 0;
 
 		list_del(&node->list);
@@ -1313,34 +1263,27 @@ static void del_node(struct cpg_address *addr, struct work_confchg *w)
 
 			update_epoch_store(sys->epoch);
 		}
-	} else {
-		node = find_node(&sys->cpg_node_list, addr->nodeid, addr->pid);
-		if (node) {
-			list_del(&node->list);
-			free(node);
-		}
+		return 1;
 	}
-}
 
-static int is_my_cpg_addr(struct cpg_address *addr)
-{
-	return (sys->this_nodeid == addr->nodeid) &&
-		(sys->this_pid == addr->pid);
+	node = find_node(&sys->cpg_node_list, id);
+	if (node) {
+		list_del(&node->list);
+		free(node);
+	}
+
+	return 0;
 }
 
 /*
  * Check whether the majority of Sheepdog nodes are still alive or not
  */
-static int check_majority(struct cpg_address *left_list,
-			  size_t left_list_entries)
+static int check_majority(struct sheepid *left)
 {
-	int nr_nodes = 0, nr_majority, nr_reachable = 0, i, fd;
+	int nr_nodes = 0, nr_majority, nr_reachable = 0, fd;
 	struct node *node;
 	char name[INET6_ADDRSTRLEN];
 
-	if (left_list_entries == 0)
-		return 1; /* we don't need this check in this case */
-
 	nr_nodes = get_nodes_nr_from(&sys->sd_node_list);
 	nr_majority = nr_nodes / 2 + 1;
 
@@ -1350,12 +1293,7 @@ static int check_majority(struct cpg_address *left_list,
 		return 1;
 
 	list_for_each_entry(node, &sys->sd_node_list, list) {
-		for (i = 0; i < left_list_entries; i++) {
-			if (left_list[i].nodeid == node->nodeid &&
-			    left_list[i].pid == node->pid)
-				break;
-		}
-		if (i != left_list_entries)
+		if (sheepid_cmp(&node->sheepid, left) == 0)
 			continue;
 
 		addr_to_str(name, sizeof(name), node->ent.addr, 0);
@@ -1375,34 +1313,29 @@ static int check_majority(struct cpg_address *left_list,
 	return 0;
 }
 
-static void __sd_confchg(struct cpg_event *cevent)
+static void __sd_leave(struct cpg_event *cevent)
 {
-	struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
+	struct work_leave *w = container_of(cevent, struct work_leave, cev);
 
-	if (!check_majority(w->left_list, w->left_list_entries)) {
+	if (!check_majority(&w->left)) {
 		eprintf("perhaps network partition failure has occurred\n");
 		abort();
 	}
 }
 
-static void send_join_request(struct cpg_address *addr, struct work_confchg *w)
+static void send_join_request(struct sheepid *id)
 {
 	struct join_message msg;
 	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
 	int nr_entries, i, ret;
 
-	/* if I've just joined in cpg, I'll join in sheepdog. */
-	if (!is_my_cpg_addr(addr))
-		return;
-
 	memset(&msg, 0, sizeof(msg));
 	msg.header.proto_ver = SD_SHEEP_PROTO_VER;
 	msg.header.op = SD_MSG_JOIN;
 	msg.header.state = DM_INIT;
 	msg.header.msg_length = sizeof(msg);
 	msg.header.from = sys->this_node;
-	msg.header.nodeid = sys->this_nodeid;
-	msg.header.pid = sys->this_pid;
+	msg.header.sheepid = sys->this_sheepid;
 
 	get_global_nr_copies(&msg.nr_sobjs);
 
@@ -1414,35 +1347,31 @@ static void send_join_request(struct cpg_address *addr, struct work_confchg *w)
 			msg.nodes[i].ent = entries[i];
 	}
 
-	send_message(sys->handle, (struct message_header *)&msg);
+	sys->cdrv->notify(&msg, msg.header.msg_length);
 
-	vprintf(SDOG_INFO "%x %u\n", sys->this_nodeid, sys->this_pid);
+	vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
 }
 
-static void __sd_confchg_done(struct cpg_event *cevent)
+static void __sd_join_done(struct cpg_event *cevent)
 {
-	struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
-	int ret;
+	struct work_join *w = container_of(cevent, struct work_join, cev);
+	int ret, i;
+	int first_cpg_node = 0;
 
-	if (w->member_list_entries ==
-	    w->joined_list_entries - w->left_list_entries &&
-	    is_my_cpg_addr(w->member_list)) {
+	if (w->member_list_entries == 1 &&
+	    sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) {
 		sys->join_finished = 1;
 		get_global_nr_copies(&sys->nr_sobjs);
-		w->first_cpg_node = 1;
+		first_cpg_node = 1;
 	}
 
-	if (list_empty(&sys->cpg_node_list))
-		for_each_node_list(w->member_list, w->member_list_entries,
-				   add_node, w);
-	else
-		for_each_node_list(w->joined_list, w->joined_list_entries,
-				   add_node, w);
-
-	for_each_node_list(w->left_list, w->left_list_entries,
-			   del_node, w);
+	if (list_empty(&sys->cpg_node_list)) {
+		for (i = 0; i < w->member_list_entries; i++)
+			add_node(w->member_list + i);
+	} else
+		add_node(&w->joined);
 
-	if (w->first_cpg_node) {
+	if (first_cpg_node) {
 		struct join_message msg;
 		struct sheepdog_node_list_entry entries[SD_MAX_NODES];
 		int nr_entries;
@@ -1454,13 +1383,12 @@ static void __sd_confchg_done(struct cpg_event *cevent)
 		 * becomes the master without sending JOIN.
 		 */
 
-		vprintf(SDOG_DEBUG "%d %x\n", sys->this_pid, sys->this_nodeid);
+		vprintf(SDOG_DEBUG "%s\n", sheepid_to_str(&sys->this_sheepid));
 
 		memset(&msg, 0, sizeof(msg));
 
 		msg.header.from = sys->this_node;
-		msg.header.nodeid = sys->this_nodeid;
-		msg.header.pid = sys->this_pid;
+		msg.header.sheepid = sys->this_sheepid;
 
 		nr_entries = ARRAY_SIZE(entries);
 		ret = read_epoch(&epoch, &ctime, entries, &nr_entries);
@@ -1482,35 +1410,40 @@ static void __sd_confchg_done(struct cpg_event *cevent)
 
 	print_node_list(&sys->sd_node_list);
 
-	if (w->first_cpg_node)
-		goto skip_join;
+	if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0)
+		send_join_request(&w->joined);
+}
+
+static void __sd_leave_done(struct cpg_event *cevent)
+{
+	struct work_leave *w = container_of(cevent, struct work_leave, cev);
+	int node_left;
 
-	for_each_node_list(w->joined_list, w->joined_list_entries,
-			   send_join_request, w);
+	node_left = del_node(&w->left);
 
-skip_join:
-	if (w->sd_node_left && sys->status == SD_STATUS_OK) {
-		if (w->sd_node_left > 1)
-			panic("we can't handle the departure of multiple nodes %d, %Zd\n",
-			      w->sd_node_left, w->left_list_entries);
+	print_node_list(&sys->sd_node_list);
 
+	if (node_left && sys->status == SD_STATUS_OK)
 		start_recovery(sys->epoch);
-	}
 }
 
 static void cpg_event_free(struct cpg_event *cevent)
 {
 	switch (cevent->ctype) {
-	case CPG_EVENT_CONCHG: {
-		struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
+	case CPG_EVENT_JOIN: {
+		struct work_join *w = container_of(cevent, struct work_join, cev);
 		free(w->member_list);
-		free(w->left_list);
-		free(w->joined_list);
 		free(w);
 		break;
 	}
-	case CPG_EVENT_DELIVER: {
-		struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+	case CPG_EVENT_LEAVE: {
+		struct work_leave *w = container_of(cevent, struct work_leave, cev);
+		free(w->member_list);
+		free(w);
+		break;
+	}
+	case CPG_EVENT_NOTIFY: {
+		struct work_notify *w = container_of(cevent, struct work_notify, cev);
 		free(w->msg);
 		free(w);
 		break;
@@ -1535,14 +1468,16 @@ static void cpg_event_fn(struct work *work, int idx)
 	 */
 
 	switch (cevent->ctype) {
-	case CPG_EVENT_CONCHG:
-		__sd_confchg(cevent);
+	case CPG_EVENT_JOIN:
+		break;
+	case CPG_EVENT_LEAVE:
+		__sd_leave(cevent);
 		break;
-	case CPG_EVENT_DELIVER:
+	case CPG_EVENT_NOTIFY:
 	{
-		struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+		struct work_notify *w = container_of(cevent, struct work_notify, cev);
 		vprintf(SDOG_DEBUG "%d\n", w->msg->state);
-		__sd_deliver(cevent);
+		__sd_notify(cevent);
 		break;
 	}
 	case CPG_EVENT_REQUEST:
@@ -1572,12 +1507,15 @@ static void cpg_event_done(struct work *work, int idx)
 		goto out;
 
 	switch (cevent->ctype) {
-	case CPG_EVENT_CONCHG:
-		__sd_confchg_done(cevent);
+	case CPG_EVENT_JOIN:
+		__sd_join_done(cevent);
 		break;
-	case CPG_EVENT_DELIVER:
+	case CPG_EVENT_LEAVE:
+		__sd_leave_done(cevent);
+		break;
+	case CPG_EVENT_NOTIFY:
 	{
-		struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+		struct work_notify *w = container_of(cevent, struct work_notify, cev);
 
 		if (w->msg->state == DM_FIN && vdi_op_message(w->msg))
 			vdi_op_done((struct vdi_op_message *)w->msg);
@@ -1593,9 +1531,9 @@ static void cpg_event_done(struct work *work, int idx)
 
 			list_for_each_entry(f_cevent, &sys->cpg_event_siblings,
 					    cpg_event_list) {
-				struct work_deliver *fw =
-					container_of(f_cevent, struct work_deliver, cev);
-				if (f_cevent->ctype == CPG_EVENT_DELIVER &&
+				struct work_notify *fw =
+					container_of(f_cevent, struct work_notify, cev);
+				if (f_cevent->ctype == CPG_EVENT_NOTIFY &&
 				    fw->msg->state == DM_FIN) {
 					vprintf("already got fin %p\n",
 						f_cevent);
@@ -1611,7 +1549,7 @@ static void cpg_event_done(struct work *work, int idx)
 				cpg_event_set_joining();
 		}
 	got_fin:
-		__sd_deliver_done(cevent);
+		__sd_notify_done(cevent);
 		break;
 	}
 	case CPG_EVENT_REQUEST:
@@ -1722,7 +1660,7 @@ void start_cpg_event_work(void)
 	 * thread is running for a deliver for VDI, then we need to
 	 * run io requests.
 	 */
-	if (cpg_event_running() && cevent->ctype == CPG_EVENT_CONCHG)
+	if (cpg_event_running() && is_membership_change_event(cevent->ctype))
 		return;
 
 	/*
@@ -1756,9 +1694,9 @@ do_retry:
 	list_for_each_entry_safe(cevent, n, &sys->cpg_event_siblings, cpg_event_list) {
 		struct request *req = container_of(cevent, struct request, cev);
 
-		if (cevent->ctype == CPG_EVENT_DELIVER)
+		if (cevent->ctype == CPG_EVENT_NOTIFY)
 			continue;
-		if (cevent->ctype == CPG_EVENT_CONCHG)
+		if (is_membership_change_event(cevent->ctype))
 			break;
 
 		list_del(&cevent->cpg_event_list);
@@ -1842,7 +1780,7 @@ do_retry:
 	cevent = list_first_entry(&sys->cpg_event_siblings,
 				  struct cpg_event, cpg_event_list);
 
-	if (cevent->ctype == CPG_EVENT_CONCHG && sys->nr_outstanding_io)
+	if (is_membership_change_event(cevent->ctype) && sys->nr_outstanding_io)
 		return;
 
 	list_del(&cevent->cpg_event_list);
@@ -1857,25 +1795,16 @@ do_retry:
 	queue_work(sys->cpg_wqueue, &cpg_event_work);
 }
 
-static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name,
-		       const struct cpg_address *member_list,
-		       size_t member_list_entries,
-		       const struct cpg_address *left_list,
-		       size_t left_list_entries,
-		       const struct cpg_address *joined_list,
-		       size_t joined_list_entries)
+static void sd_join_handler(struct sheepid *joined, struct sheepid *members,
+			    size_t nr_members)
 {
 	struct cpg_event *cevent;
-	struct work_confchg *w = NULL;
+	struct work_join *w = NULL;
 	int i, size;
 
-	dprintf("confchg nodeid %x\n", member_list[0].nodeid);
-	dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries,
-		joined_list_entries);
-	for (i = 0; i < member_list_entries; i++) {
-		dprintf("[%x] node_id: %x, pid: %d\n", i,
-			member_list[i].nodeid, member_list[i].pid);
-	}
+	dprintf("join %s\n", sheepid_to_str(joined));
+	for (i = 0; i < nr_members; i++)
+		dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
 
 	if (sys->status == SD_STATUS_SHUTDOWN)
 		return;
@@ -1885,31 +1814,19 @@ static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name,
 		goto oom;
 
 	cevent = &w->cev;
-	cevent->ctype = CPG_EVENT_CONCHG;
+	cevent->ctype = CPG_EVENT_JOIN;
 
 
 	vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
 
-	size = sizeof(struct cpg_address) * member_list_entries;
+	size = sizeof(struct sheepid) * nr_members;
 	w->member_list = zalloc(size);
 	if (!w->member_list)
 		goto oom;
-	memcpy(w->member_list, member_list, size);
-	w->member_list_entries = member_list_entries;
-
-	size = sizeof(struct cpg_address) * left_list_entries;
-	w->left_list = zalloc(size);
-	if (!w->left_list)
-		goto oom;
-	memcpy(w->left_list, left_list, size);
-	w->left_list_entries = left_list_entries;
+	memcpy(w->member_list, members, size);
+	w->member_list_entries = nr_members;
 
-	size = sizeof(struct cpg_address) * joined_list_entries;
-	w->joined_list = zalloc(size);
-	if (!w->joined_list)
-		goto oom;
-	memcpy(w->joined_list, joined_list, size);
-	w->joined_list_entries = joined_list_entries;
+	w->joined = *joined;
 
 	list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
 	start_cpg_event_work();
@@ -1919,111 +1836,83 @@ oom:
 	if (w) {
 		if (w->member_list)
 			free(w->member_list);
-		if (w->left_list)
-			free(w->left_list);
-		if (w->joined_list)
-			free(w->joined_list);
+		free(w);
 	}
 	panic("failed to allocate memory for a confchg event\n");
 }
 
-static int set_addr(unsigned int nodeid, int port)
+static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
+			     size_t nr_members)
 {
-	int ret, nr;
-	corosync_cfg_handle_t handle;
-	corosync_cfg_node_address_t addr;
-	struct sockaddr_storage *ss = (struct sockaddr_storage *)addr.address;
-	struct sockaddr_in *sin = (struct sockaddr_in *)addr.address;
-	struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)addr.address;
-	void *saddr;
-	char tmp[INET6_ADDRSTRLEN];
-
-	memset(sys->this_node.addr, 0, sizeof(sys->this_node.addr));
-
-	ret = corosync_cfg_initialize(&handle, NULL);
-	if (ret != CPG_OK) {
-		vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret);
-		return -1;
-	}
+	struct cpg_event *cevent;
+	struct work_leave *w = NULL;
+	int i, size;
 
-	ret = corosync_cfg_get_node_addrs(handle, nodeid, 1, &nr, &addr);
-	if (ret != CPG_OK) {
-		vprintf(SDOG_ERR "failed to get addr %d\n", ret);
-		return -1;
-	}
+	dprintf("leave %s\n", sheepid_to_str(left));
+	for (i = 0; i < nr_members; i++)
+		dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
 
-	if (!nr) {
-		vprintf(SDOG_ERR "we got no address\n");
-		return -1;
-	}
+	if (sys->status == SD_STATUS_SHUTDOWN)
+		return;
 
-	if (ss->ss_family == AF_INET6) {
-		saddr = &sin6->sin6_addr;
-		memcpy(sys->this_node.addr, saddr, 16);
-	} else if (ss->ss_family == AF_INET) {
-		saddr = &sin->sin_addr;
-		memcpy(sys->this_node.addr + 12, saddr, 4);
-	} else {
-		vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family);
-		return -1;
-	}
+	w = zalloc(sizeof(*w));
+	if (!w)
+		goto oom;
 
-	inet_ntop(ss->ss_family, saddr, tmp, sizeof(tmp));
+	cevent = &w->cev;
+	cevent->ctype = CPG_EVENT_LEAVE;
 
-	vprintf(SDOG_INFO "addr = %s, port = %d\n", tmp, port);
-	return 0;
+
+	vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
+
+	size = sizeof(struct sheepid) * nr_members;
+	w->member_list = zalloc(size);
+	if (!w->member_list)
+		goto oom;
+	memcpy(w->member_list, members, size);
+	w->member_list_entries = nr_members;
+
+	w->left = *left;
+
+	list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+	start_cpg_event_work();
+
+	return;
+oom:
+	if (w) {
+		if (w->member_list)
+			free(w->member_list);
+		free(w);
+	}
+	panic("failed to allocate memory for a confchg event\n");
 }
 
 int create_cluster(int port, int64_t zone)
 {
 	int fd, ret;
-	cpg_handle_t cpg_handle;
-	struct cpg_name group = { 8, "sheepdog" };
-	cpg_callbacks_t cb = {&sd_deliver, &sd_confchg};
-	unsigned int nodeid = 0;
-
-	ret = cpg_initialize(&cpg_handle, &cb);
-	if (ret != CPG_OK) {
-		eprintf("Failed to initialize cpg, %d\n", ret);
-		eprintf("Is corosync running?\n");
+	struct cdrv_handlers handlers = {
+		.join_handler = sd_join_handler,
+		.leave_handler = sd_leave_handler,
+		.notify_handler = sd_notify_handler,
+	};
+
+	fd = sys->cdrv->init(&handlers, &sys->this_sheepid);
+	if (fd < 0)
 		return -1;
-	}
 
-	ret = cpg_local_get(cpg_handle, &nodeid);
-	if (ret != CPG_OK) {
-		eprintf("Failed to get the local node's identifier, %d\n", ret);
-		return 1;
-	}
-
-join_retry:
-	ret = cpg_join(cpg_handle, &group);
-	switch (ret) {
-	case CPG_OK:
-		break;
-	case CPG_ERR_TRY_AGAIN:
-		dprintf("Failed to join the sheepdog group, try again\n");
-		sleep(1);
-		goto join_retry;
-	case CPG_ERR_SECURITY:
-		eprintf("Permission error.\n");
+	ret = sys->cdrv->join();
+	if (ret != 0)
 		return -1;
-	default:
-		eprintf("Failed to join the sheepdog group, %d\n", ret);
-		return -1;
-	}
-
-	sys->handle = cpg_handle;
-	sys->this_nodeid = nodeid;
-	sys->this_pid = getpid();
 
-	ret = set_addr(nodeid, port);
-	if (ret)
-		return 1;
+	memcpy(sys->this_node.addr, sys->this_sheepid.addr,
+	       sizeof(sys->this_node.addr));
 	sys->this_node.port = port;
 	sys->this_node.nr_vnodes = SD_DEFAULT_VNODES;
-	if (zone == -1)
-		sys->this_node.zone = nodeid;
-	else
+	if (zone == -1) {
+		/* use last 4 bytes as zone id */
+		uint8_t *b = sys->this_sheepid.addr + 12;
+		sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24;
+	} else
 		sys->this_node.zone = zone;
 	dprintf("zone id = %u\n", sys->this_node.zone);
 
@@ -2042,11 +1931,6 @@ join_retry:
 
 	INIT_LIST_HEAD(&sys->cpg_event_siblings);
 
-	ret = cpg_fd_get(cpg_handle, &fd);
-	if (ret != CPG_OK) {
-		eprintf("Failed to retrieve cpg file descriptor, %d\n", ret);
-		return 1;
-	}
 	ret = register_event(fd, group_handler, NULL);
 	if (ret) {
 		eprintf("Failed to register epoll events, %d\n", ret);
@@ -2066,10 +1950,9 @@ int leave_cluster(void)
 	msg.header.state = DM_FIN;
 	msg.header.msg_length = sizeof(msg);
 	msg.header.from = sys->this_node;
-	msg.header.nodeid = sys->this_nodeid;
-	msg.header.pid = sys->this_pid;
+	msg.header.sheepid = sys->this_sheepid;
 	msg.epoch = get_latest_epoch();
 
 	dprintf("%d\n", msg.epoch);
-	return send_message(sys->handle, (struct message_header *)&msg);
+	return sys->cdrv->notify(&msg, msg.header.msg_length);
 }
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 9985bdb..db2e691 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -11,6 +11,8 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
 #include <netinet/tcp.h>
 #include <sys/epoll.h>
 #include <fcntl.h>
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 50cd841..0a73587 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -26,6 +26,7 @@
 #define DEFAULT_OBJECT_DIR "/tmp"
 #define LOG_FILE_NAME "sheep.log"
 
+LIST_HEAD(cluster_drivers);
 static char program_name[] = "sheep";
 
 static struct option const long_options[] = {
@@ -35,11 +36,12 @@ static struct option const long_options[] = {
 	{"debug", no_argument, NULL, 'd'},
 	{"directio", no_argument, NULL, 'D'},
 	{"zone", required_argument, NULL, 'z'},
+	{"cluster", required_argument, NULL, 'c'},
 	{"help", no_argument, NULL, 'h'},
 	{NULL, 0, NULL, 0},
 };
 
-static const char *short_options = "p:fl:dDz:h";
+static const char *short_options = "p:fl:dDz:c:h";
 
 static void usage(int status)
 {
@@ -56,6 +58,7 @@ Sheepdog Daemon, version %s\n\
   -d, --debug             print debug messages\n\
   -D, --directio          use direct IO\n\
   -z, --zone              specify the zone id\n\
+  -c, --cluster           specify the cluster driver\n\
   -h, --help              display this help and exit\n\
 ", PACKAGE_VERSION);
 	}
@@ -75,6 +78,7 @@ int main(int argc, char **argv)
 	char path[PATH_MAX];
 	int64_t zone = -1;
 	char *p;
+	struct cluster_driver *cdrv;
 
 	signal(SIGPIPE, SIG_IGN);
 
@@ -112,6 +116,24 @@ int main(int argc, char **argv)
 			}
 			sys->this_node.zone = zone;
 			break;
+		case 'c':
+			FOR_EACH_CLUSTER_DRIVER(cdrv) {
+				if (strcmp(cdrv->name, optarg) == 0) {
+					sys->cdrv = cdrv;
+					break;
+				}
+			}
+
+			if (!sys->cdrv) {
+				printf("No such cluster driver, %s\n", optarg);
+				printf("Supported drivers:");
+				FOR_EACH_CLUSTER_DRIVER(cdrv) {
+					printf(" %s", cdrv->name);
+				}
+				printf("\n");
+				exit(1);
+			}
+			break;
 		case 'h':
 			usage(0);
 			break;
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 6409530..e2fcb40 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -12,7 +12,6 @@
 #define __SHEEP_PRIV_H__
 
 #include <inttypes.h>
-#include <corosync/cpg.h>
 
 #include "sheepdog_proto.h"
 #include "event.h"
@@ -20,6 +19,7 @@
 #include "work.h"
 #include "net.h"
 #include "sheep.h"
+#include "cluster.h"
 
 #define SD_OP_REMOVE_OBJ     0x91
 
@@ -41,11 +41,15 @@
 #define SD_RES_NETWORK_ERROR    0x81 /* Network error between sheeps */
 
 enum cpg_event_type {
-	CPG_EVENT_CONCHG,
-	CPG_EVENT_DELIVER,
+	CPG_EVENT_JOIN,
+	CPG_EVENT_LEAVE,
+	CPG_EVENT_NOTIFY,
 	CPG_EVENT_REQUEST,
 };
 
+#define is_membership_change_event(x) \
+	((x) == CPG_EVENT_JOIN || (x) == CPG_EVENT_LEAVE)
+
 struct cpg_event {
 	enum cpg_event_type ctype;
 	struct list_head cpg_event_list;
@@ -102,11 +106,11 @@ struct data_object_bmap {
 };
 
 struct cluster_info {
-	cpg_handle_t handle;
+	struct cluster_driver *cdrv;
+
 	/* set after finishing the JOIN procedure */
 	int join_finished;
-	uint32_t this_nodeid;
-	uint32_t this_pid;
+	struct sheepid this_sheepid;
 	struct sheepdog_node_list_entry this_node;
 
 	uint32_t epoch;
-- 
1.7.2.5




More information about the sheepdog mailing list