[Sheepdog] [PATCH v2 5/6] sheep: use cluster driver

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Tue Oct 4 14:38:57 CEST 2011


This patch removes all corosync stuff from group.c, and uses a cluster
driver instead of it.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/group.c      |  493 ++++++++++++++++++---------------------------------
 sheep/sdnet.c      |    2 +
 sheep/sheep.c      |   23 +++-
 sheep/sheep_priv.h |   12 +-
 4 files changed, 206 insertions(+), 324 deletions(-)

diff --git a/sheep/group.c b/sheep/group.c
index 14dff00..f6743f5 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -15,8 +15,6 @@
 #include <arpa/inet.h>
 #include <sys/time.h>
 #include <sys/epoll.h>
-#include <corosync/cpg.h>
-#include <corosync/cfg.h>
 
 #include "sheepdog_proto.h"
 #include "sheep_priv.h"
@@ -26,54 +24,6 @@
 #include "work.h"
 #include "cluster.h"
 
-static corosync_cfg_handle_t cfg_handle;
-
-static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
-{
-	int ret, nr;
-	corosync_cfg_node_address_t caddr;
-	struct sockaddr_storage *ss = (struct sockaddr_storage *)caddr.address;
-	struct sockaddr_in *sin = (struct sockaddr_in *)caddr.address;
-	struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)caddr.address;
-	void *saddr;
-
-	ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &caddr);
-	if (ret != CS_OK) {
-		vprintf(SDOG_ERR "failed to get addr %d\n", ret);
-		return -1;
-	}
-
-	if (!nr) {
-		vprintf(SDOG_ERR "we got no address\n");
-		return -1;
-	}
-
-	if (ss->ss_family == AF_INET6) {
-		saddr = &sin6->sin6_addr;
-		memcpy(addr, saddr, 16);
-	} else if (ss->ss_family == AF_INET) {
-		saddr = &sin->sin_addr;
-		memset(addr, 0, 16);
-		memcpy(addr + 12, saddr, 4);
-	} else {
-		vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family);
-		return -1;
-	}
-
-	return 0;
-}
-
-static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
-				struct sheepid *sheeps, size_t nr)
-{
-	int i;
-
-	for (i = 0; i < nr; i++) {
-		nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr);
-		sheeps[i].pid = cpgs[i].pid;
-	}
-}
-
 struct node {
 	struct sheepid sheepid;
 	struct sheepdog_node_list_entry ent;
@@ -134,23 +84,26 @@ struct mastership_tx_message {
 	uint32_t epoch;
 };
 
-struct work_deliver {
+struct work_notify {
 	struct cpg_event cev;
 
 	struct message_header *msg;
 };
 
-struct work_confchg {
+struct work_join {
 	struct cpg_event cev;
 
-	struct cpg_address *member_list;
+	struct sheepid *member_list;
 	size_t member_list_entries;
-	struct cpg_address *left_list;
-	size_t left_list_entries;
-	struct cpg_address *joined_list;
-	size_t joined_list_entries;
+	struct sheepid joined;
+};
 
-	int sd_node_left;
+struct work_leave {
+	struct cpg_event cev;
+
+	struct sheepid *member_list;
+	size_t member_list_entries;
+	struct sheepid left;
 };
 
 #define print_node_list(node_list)				\
@@ -216,30 +169,6 @@ static inline int master_tx_message(struct message_header *m)
 	return m->op == SD_MSG_MASTER_TRANSFER;
 }
 
-static int send_message(cpg_handle_t handle, struct message_header *msg)
-{
-	struct iovec iov;
-	int ret;
-
-	iov.iov_base = msg;
-	iov.iov_len = msg->msg_length;
-retry:
-	ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1);
-	switch (ret) {
-	case CPG_OK:
-		break;
-	case CPG_ERR_TRY_AGAIN:
-		dprintf("failed to send message. try again\n");
-		sleep(1);
-		goto retry;
-	default:
-		eprintf("failed to send message, %d\n", ret);
-		return -1;
-	}
-	return 0;
-}
-
-
 static int get_node_idx(struct sheepdog_node_list_entry *ent,
 			struct sheepdog_node_list_entry *entries, int nr_nodes)
 {
@@ -437,7 +366,7 @@ forward:
 
 	list_add(&req->pending_list, &sys->pending_list);
 
-	send_message(sys->handle, (struct message_header *)msg);
+	sys->cdrv->notify(msg, msg->header.msg_length);
 
 	free(msg);
 }
@@ -450,9 +379,8 @@ static void group_handler(int listen_fd, int events, void *data)
 		goto out;
 	}
 
-	ret = cpg_dispatch(sys->handle, CPG_DISPATCH_ALL);
-
-	if (ret == CPG_OK)
+	ret = sys->cdrv->dispatch();
+	if (ret == 0)
 		return;
 	else
 		eprintf("oops...some error occured inside corosync\n");
@@ -1058,9 +986,9 @@ out:
 	req->done(req);
 }
 
-static void __sd_deliver(struct cpg_event *cevent)
+static void __sd_notify(struct cpg_event *cevent)
 {
-	struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+	struct work_notify *w = container_of(cevent, struct work_notify, cev);
 	struct message_header *m = w->msg;
 	char name[128];
 	struct node *node;
@@ -1134,10 +1062,10 @@ static int tx_mastership(void)
 	msg.header.from = sys->this_node;
 	msg.header.sheepid = sys->this_sheepid;
 
-	return send_message(sys->handle, (struct message_header *)&msg);
+	return sys->cdrv->notify(&msg, msg.header.msg_length);
 }
 
-static void send_join_response(struct work_deliver *w)
+static void send_join_response(struct work_notify *w)
 {
 	struct message_header *m;
 	struct join_message *jm;
@@ -1166,12 +1094,12 @@ static void send_join_response(struct work_deliver *w)
 		exit(1);
 	}
 	jm->epoch = sys->epoch;
-	send_message(sys->handle, m);
+	sys->cdrv->notify(m, m->msg_length);
 }
 
-static void __sd_deliver_done(struct cpg_event *cevent)
+static void __sd_notify_done(struct cpg_event *cevent)
 {
-	struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+	struct work_notify *w = container_of(cevent, struct work_notify, cev);
 	struct message_header *m;
 	char name[128];
 	int do_recovery;
@@ -1245,7 +1173,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
 			break;
 		case SD_MSG_VDI_OP:
 			m->state = DM_FIN;
-			send_message(sys->handle, m);
+			sys->cdrv->notify(m, m->msg_length);
 			break;
 		default:
 			eprintf("unknown message %d\n", m->op);
@@ -1261,25 +1189,24 @@ static void __sd_deliver_done(struct cpg_event *cevent)
 	}
 }
 
-static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
-		       uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
 {
 	struct cpg_event *cevent;
-	struct work_deliver *w;
+	struct work_notify *w;
 	struct message_header *m = msg;
 	char name[128];
 
-	dprintf("op: %d, state: %u, size: %d, from: %s, nodeid: %x, pid: %u\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n",
 		m->op, m->state, m->msg_length,
 		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
-		nodeid, pid);
+		sender->pid);
 
 	w = zalloc(sizeof(*w));
 	if (!w)
 		return;
 
 	cevent = &w->cev;
-	cevent->ctype = CPG_EVENT_DELIVER;
+	cevent->ctype = CPG_EVENT_NOTIFY;
 
 	vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent);
 
@@ -1299,20 +1226,7 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
 	start_cpg_event_work();
 }
 
-static void for_each_node_list(struct cpg_address list[], int count,
-			       void (*func)(struct sheepid *id,
-					    struct work_confchg *w),
-			       struct work_confchg *w)
-{
-	int i;
-	struct sheepid sheepid[SD_MAX_NODES];
-
-	cpg_addr_to_sheepid(list, sheepid, count);
-	for (i = 0; i < count; i++)
-		func(sheepid + i, w);
-}
-
-static void add_node(struct sheepid *id, struct work_confchg *w)
+static void add_node(struct sheepid *id)
 {
 	struct node *node;
 
@@ -1325,7 +1239,7 @@ static void add_node(struct sheepid *id, struct work_confchg *w)
 	list_add_tail(&node->list, &sys->cpg_node_list);
 }
 
-static void del_node(struct sheepid *id, struct work_confchg *w)
+static int del_node(struct sheepid *id)
 {
 	struct node *node;
 
@@ -1334,7 +1248,6 @@ static void del_node(struct sheepid *id, struct work_confchg *w)
 		int nr;
 		struct sheepdog_node_list_entry e[SD_MAX_NODES];
 
-		w->sd_node_left++;
 		sys->nr_vnodes = 0;
 
 		list_del(&node->list);
@@ -1350,38 +1263,26 @@ static void del_node(struct sheepid *id, struct work_confchg *w)
 
 			update_epoch_store(sys->epoch);
 		}
-	} else {
-		node = find_node(&sys->cpg_node_list, id);
-		if (node) {
-			list_del(&node->list);
-			free(node);
-		}
+		return 1;
 	}
-}
 
-static int is_my_cpg_addr(struct cpg_address *addr)
-{
-	struct sheepid id;
+	node = find_node(&sys->cpg_node_list, id);
+	if (node) {
+		list_del(&node->list);
+		free(node);
+	}
 
-	cpg_addr_to_sheepid(addr, &id, 1);
-	return (sheepid_cmp(&id, &sys->this_sheepid) == 0);
+	return 0;
 }
 
 /*
  * Check whether the majority of Sheepdog nodes are still alive or not
  */
-static int check_majority(struct cpg_address *left_list,
-			  size_t left_list_entries)
+static int check_majority(struct sheepid *left)
 {
-	int nr_nodes = 0, nr_majority, nr_reachable = 0, i, fd;
+	int nr_nodes = 0, nr_majority, nr_reachable = 0, fd;
 	struct node *node;
 	char name[INET6_ADDRSTRLEN];
-	struct sheepid left_sheepid[SD_MAX_NODES];
-
-	if (left_list_entries == 0)
-		return 1; /* we don't need this check in this case */
-
-	cpg_addr_to_sheepid(left_list, left_sheepid, left_list_entries);
 
 	nr_nodes = get_nodes_nr_from(&sys->sd_node_list);
 	nr_majority = nr_nodes / 2 + 1;
@@ -1392,11 +1293,7 @@ static int check_majority(struct cpg_address *left_list,
 		return 1;
 
 	list_for_each_entry(node, &sys->sd_node_list, list) {
-		for (i = 0; i < left_list_entries; i++) {
-			if (sheepid_cmp(left_sheepid + i, &node->sheepid) != 0)
-				break;
-		}
-		if (i != left_list_entries)
+		if (sheepid_cmp(&node->sheepid, left) == 0)
 			continue;
 
 		addr_to_str(name, sizeof(name), node->ent.addr, 0);
@@ -1416,26 +1313,22 @@ static int check_majority(struct cpg_address *left_list,
 	return 0;
 }
 
-static void __sd_confchg(struct cpg_event *cevent)
+static void __sd_leave(struct cpg_event *cevent)
 {
-	struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
+	struct work_leave *w = container_of(cevent, struct work_leave, cev);
 
-	if (!check_majority(w->left_list, w->left_list_entries)) {
+	if (!check_majority(&w->left)) {
 		eprintf("perhaps network partition failure has occurred\n");
 		abort();
 	}
 }
 
-static void send_join_request(struct sheepid *id, struct work_confchg *w)
+static void send_join_request(struct sheepid *id)
 {
 	struct join_message msg;
 	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
 	int nr_entries, i, ret;
 
-	/* if I've just joined in cpg, I'll join in sheepdog. */
-	if (sheepid_cmp(id, &sys->this_sheepid) != 0)
-		return;
-
 	memset(&msg, 0, sizeof(msg));
 	msg.header.proto_ver = SD_SHEEP_PROTO_VER;
 	msg.header.op = SD_MSG_JOIN;
@@ -1454,34 +1347,29 @@ static void send_join_request(struct sheepid *id, struct work_confchg *w)
 			msg.nodes[i].ent = entries[i];
 	}
 
-	send_message(sys->handle, (struct message_header *)&msg);
+	sys->cdrv->notify(&msg, msg.header.msg_length);
 
 	vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
 }
 
-static void __sd_confchg_done(struct cpg_event *cevent)
+static void __sd_join_done(struct cpg_event *cevent)
 {
-	struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
-	int ret;
+	struct work_join *w = container_of(cevent, struct work_join, cev);
+	int ret, i;
 	int first_cpg_node = 0;
 
-	if (w->member_list_entries ==
-	    w->joined_list_entries - w->left_list_entries &&
-	    is_my_cpg_addr(w->member_list)) {
+	if (w->member_list_entries == 1 &&
+	    sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) {
 		sys->join_finished = 1;
 		get_global_nr_copies(&sys->nr_sobjs);
 		first_cpg_node = 1;
 	}
 
-	if (list_empty(&sys->cpg_node_list))
-		for_each_node_list(w->member_list, w->member_list_entries,
-				   add_node, w);
-	else
-		for_each_node_list(w->joined_list, w->joined_list_entries,
-				   add_node, w);
-
-	for_each_node_list(w->left_list, w->left_list_entries,
-			   del_node, w);
+	if (list_empty(&sys->cpg_node_list)) {
+		for (i = 0; i < w->member_list_entries; i++)
+			add_node(w->member_list + i);
+	} else
+		add_node(&w->joined);
 
 	if (first_cpg_node) {
 		struct join_message msg;
@@ -1522,35 +1410,40 @@ static void __sd_confchg_done(struct cpg_event *cevent)
 
 	print_node_list(&sys->sd_node_list);
 
-	if (first_cpg_node)
-		goto skip_join;
+	if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0)
+		send_join_request(&w->joined);
+}
+
+static void __sd_leave_done(struct cpg_event *cevent)
+{
+	struct work_leave *w = container_of(cevent, struct work_leave, cev);
+	int node_left;
 
-	for_each_node_list(w->joined_list, w->joined_list_entries,
-			   send_join_request, w);
+	node_left = del_node(&w->left);
 
-skip_join:
-	if (w->sd_node_left && sys->status == SD_STATUS_OK) {
-		if (w->sd_node_left > 1)
-			panic("we can't handle the departure of multiple nodes %d, %Zd\n",
-			      w->sd_node_left, w->left_list_entries);
+	print_node_list(&sys->sd_node_list);
 
+	if (node_left && sys->status == SD_STATUS_OK)
 		start_recovery(sys->epoch);
-	}
 }
 
 static void cpg_event_free(struct cpg_event *cevent)
 {
 	switch (cevent->ctype) {
-	case CPG_EVENT_CONCHG: {
-		struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
+	case CPG_EVENT_JOIN: {
+		struct work_join *w = container_of(cevent, struct work_join, cev);
+		free(w->member_list);
+		free(w);
+		break;
+	}
+	case CPG_EVENT_LEAVE: {
+		struct work_leave *w = container_of(cevent, struct work_leave, cev);
 		free(w->member_list);
-		free(w->left_list);
-		free(w->joined_list);
 		free(w);
 		break;
 	}
-	case CPG_EVENT_DELIVER: {
-		struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+	case CPG_EVENT_NOTIFY: {
+		struct work_notify *w = container_of(cevent, struct work_notify, cev);
 		free(w->msg);
 		free(w);
 		break;
@@ -1575,14 +1468,16 @@ static void cpg_event_fn(struct work *work, int idx)
 	 */
 
 	switch (cevent->ctype) {
-	case CPG_EVENT_CONCHG:
-		__sd_confchg(cevent);
+	case CPG_EVENT_JOIN:
 		break;
-	case CPG_EVENT_DELIVER:
+	case CPG_EVENT_LEAVE:
+		__sd_leave(cevent);
+		break;
+	case CPG_EVENT_NOTIFY:
 	{
-		struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+		struct work_notify *w = container_of(cevent, struct work_notify, cev);
 		vprintf(SDOG_DEBUG "%d\n", w->msg->state);
-		__sd_deliver(cevent);
+		__sd_notify(cevent);
 		break;
 	}
 	case CPG_EVENT_REQUEST:
@@ -1612,12 +1507,15 @@ static void cpg_event_done(struct work *work, int idx)
 		goto out;
 
 	switch (cevent->ctype) {
-	case CPG_EVENT_CONCHG:
-		__sd_confchg_done(cevent);
+	case CPG_EVENT_JOIN:
+		__sd_join_done(cevent);
+		break;
+	case CPG_EVENT_LEAVE:
+		__sd_leave_done(cevent);
 		break;
-	case CPG_EVENT_DELIVER:
+	case CPG_EVENT_NOTIFY:
 	{
-		struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+		struct work_notify *w = container_of(cevent, struct work_notify, cev);
 
 		if (w->msg->state == DM_FIN && vdi_op_message(w->msg))
 			vdi_op_done((struct vdi_op_message *)w->msg);
@@ -1633,9 +1531,9 @@ static void cpg_event_done(struct work *work, int idx)
 
 			list_for_each_entry(f_cevent, &sys->cpg_event_siblings,
 					    cpg_event_list) {
-				struct work_deliver *fw =
-					container_of(f_cevent, struct work_deliver, cev);
-				if (f_cevent->ctype == CPG_EVENT_DELIVER &&
+				struct work_notify *fw =
+					container_of(f_cevent, struct work_notify, cev);
+				if (f_cevent->ctype == CPG_EVENT_NOTIFY &&
 				    fw->msg->state == DM_FIN) {
 					vprintf("already got fin %p\n",
 						f_cevent);
@@ -1651,7 +1549,7 @@ static void cpg_event_done(struct work *work, int idx)
 				cpg_event_set_joining();
 		}
 	got_fin:
-		__sd_deliver_done(cevent);
+		__sd_notify_done(cevent);
 		break;
 	}
 	case CPG_EVENT_REQUEST:
@@ -1762,7 +1660,7 @@ void start_cpg_event_work(void)
 	 * thread is running for a deliver for VDI, then we need to
 	 * run io requests.
 	 */
-	if (cpg_event_running() && cevent->ctype == CPG_EVENT_CONCHG)
+	if (cpg_event_running() && is_membership_change_event(cevent->ctype))
 		return;
 
 	/*
@@ -1796,9 +1694,9 @@ do_retry:
 	list_for_each_entry_safe(cevent, n, &sys->cpg_event_siblings, cpg_event_list) {
 		struct request *req = container_of(cevent, struct request, cev);
 
-		if (cevent->ctype == CPG_EVENT_DELIVER)
+		if (cevent->ctype == CPG_EVENT_NOTIFY)
 			continue;
-		if (cevent->ctype == CPG_EVENT_CONCHG)
+		if (is_membership_change_event(cevent->ctype))
 			break;
 
 		list_del(&cevent->cpg_event_list);
@@ -1882,7 +1780,7 @@ do_retry:
 	cevent = list_first_entry(&sys->cpg_event_siblings,
 				  struct cpg_event, cpg_event_list);
 
-	if (cevent->ctype == CPG_EVENT_CONCHG && sys->nr_outstanding_io)
+	if (is_membership_change_event(cevent->ctype) && sys->nr_outstanding_io)
 		return;
 
 	list_del(&cevent->cpg_event_list);
@@ -1897,25 +1795,16 @@ do_retry:
 	queue_work(sys->cpg_wqueue, &cpg_event_work);
 }
 
-static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name,
-		       const struct cpg_address *member_list,
-		       size_t member_list_entries,
-		       const struct cpg_address *left_list,
-		       size_t left_list_entries,
-		       const struct cpg_address *joined_list,
-		       size_t joined_list_entries)
+static void sd_join_handler(struct sheepid *joined, struct sheepid *members,
+			    size_t nr_members)
 {
 	struct cpg_event *cevent;
-	struct work_confchg *w = NULL;
+	struct work_join *w = NULL;
 	int i, size;
 
-	dprintf("confchg nodeid %x\n", member_list[0].nodeid);
-	dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries,
-		joined_list_entries);
-	for (i = 0; i < member_list_entries; i++) {
-		dprintf("[%x] node_id: %x, pid: %d\n", i,
-			member_list[i].nodeid, member_list[i].pid);
-	}
+	dprintf("join %s\n", sheepid_to_str(joined));
+	for (i = 0; i < nr_members; i++)
+		dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
 
 	if (sys->status == SD_STATUS_SHUTDOWN)
 		return;
@@ -1925,31 +1814,19 @@ static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name,
 		goto oom;
 
 	cevent = &w->cev;
-	cevent->ctype = CPG_EVENT_CONCHG;
+	cevent->ctype = CPG_EVENT_JOIN;
 
 
 	vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
 
-	size = sizeof(struct cpg_address) * member_list_entries;
+	size = sizeof(struct sheepid) * nr_members;
 	w->member_list = zalloc(size);
 	if (!w->member_list)
 		goto oom;
-	memcpy(w->member_list, member_list, size);
-	w->member_list_entries = member_list_entries;
+	memcpy(w->member_list, members, size);
+	w->member_list_entries = nr_members;
 
-	size = sizeof(struct cpg_address) * left_list_entries;
-	w->left_list = zalloc(size);
-	if (!w->left_list)
-		goto oom;
-	memcpy(w->left_list, left_list, size);
-	w->left_list_entries = left_list_entries;
-
-	size = sizeof(struct cpg_address) * joined_list_entries;
-	w->joined_list = zalloc(size);
-	if (!w->joined_list)
-		goto oom;
-	memcpy(w->joined_list, joined_list, size);
-	w->joined_list_entries = joined_list_entries;
+	w->joined = *joined;
 
 	list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
 	start_cpg_event_work();
@@ -1959,111 +1836,94 @@ oom:
 	if (w) {
 		if (w->member_list)
 			free(w->member_list);
-		if (w->left_list)
-			free(w->left_list);
-		if (w->joined_list)
-			free(w->joined_list);
+		free(w);
 	}
 	panic("failed to allocate memory for a confchg event\n");
 }
 
-static int set_addr(unsigned int nodeid, int port)
+static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
+			     size_t nr_members)
 {
-	int ret, nr;
-	corosync_cfg_node_address_t addr;
-	struct sockaddr_storage *ss = (struct sockaddr_storage *)addr.address;
-	struct sockaddr_in *sin = (struct sockaddr_in *)addr.address;
-	struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)addr.address;
-	void *saddr;
-	char tmp[INET6_ADDRSTRLEN];
-
-	memset(sys->this_node.addr, 0, sizeof(sys->this_node.addr));
-
-	ret = corosync_cfg_initialize(&cfg_handle, NULL);
-	if (ret != CPG_OK) {
-		vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret);
-		return -1;
-	}
+	struct cpg_event *cevent;
+	struct work_leave *w = NULL;
+	int i, size;
 
-	ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &addr);
-	if (ret != CPG_OK) {
-		vprintf(SDOG_ERR "failed to get addr %d\n", ret);
-		return -1;
-	}
+	dprintf("leave %s\n", sheepid_to_str(left));
+	for (i = 0; i < nr_members; i++)
+		dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
 
-	if (!nr) {
-		vprintf(SDOG_ERR "we got no address\n");
-		return -1;
-	}
+	if (sys->status == SD_STATUS_SHUTDOWN)
+		return;
 
-	if (ss->ss_family == AF_INET6) {
-		saddr = &sin6->sin6_addr;
-		memcpy(sys->this_node.addr, saddr, 16);
-	} else if (ss->ss_family == AF_INET) {
-		saddr = &sin->sin_addr;
-		memcpy(sys->this_node.addr + 12, saddr, 4);
-	} else {
-		vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family);
-		return -1;
-	}
+	w = zalloc(sizeof(*w));
+	if (!w)
+		goto oom;
 
-	inet_ntop(ss->ss_family, saddr, tmp, sizeof(tmp));
+	cevent = &w->cev;
+	cevent->ctype = CPG_EVENT_LEAVE;
 
-	vprintf(SDOG_INFO "addr = %s, port = %d\n", tmp, port);
-	return 0;
+
+	vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
+
+	size = sizeof(struct sheepid) * nr_members;
+	w->member_list = zalloc(size);
+	if (!w->member_list)
+		goto oom;
+	memcpy(w->member_list, members, size);
+	w->member_list_entries = nr_members;
+
+	w->left = *left;
+
+	list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+	start_cpg_event_work();
+
+	return;
+oom:
+	if (w) {
+		if (w->member_list)
+			free(w->member_list);
+		free(w);
+	}
+	panic("failed to allocate memory for a confchg event\n");
 }
 
 int create_cluster(int port, int64_t zone)
 {
 	int fd, ret;
-	cpg_handle_t cpg_handle;
-	struct cpg_name group = { 8, "sheepdog" };
-	cpg_callbacks_t cb = {&sd_deliver, &sd_confchg};
-	unsigned int nodeid = 0;
-
-	ret = cpg_initialize(&cpg_handle, &cb);
-	if (ret != CPG_OK) {
-		eprintf("Failed to initialize cpg, %d\n", ret);
-		eprintf("Is corosync running?\n");
-		return -1;
-	}
-
-	ret = cpg_local_get(cpg_handle, &nodeid);
-	if (ret != CPG_OK) {
-		eprintf("Failed to get the local node's identifier, %d\n", ret);
-		return 1;
+	struct cluster_driver *cdrv;
+	struct cdrv_handlers handlers = {
+		.join_handler = sd_join_handler,
+		.leave_handler = sd_leave_handler,
+		.notify_handler = sd_notify_handler,
+	};
+
+	if (!sys->cdrv) {
+		FOR_EACH_CLUSTER_DRIVER(cdrv) {
+			if (strcmp(cdrv->name, "corosync") == 0) {
+				dprintf("use corosync driver as default\n");
+				sys->cdrv = cdrv;
+				break;
+			}
+		}
 	}
 
-join_retry:
-	ret = cpg_join(cpg_handle, &group);
-	switch (ret) {
-	case CPG_OK:
-		break;
-	case CPG_ERR_TRY_AGAIN:
-		dprintf("Failed to join the sheepdog group, try again\n");
-		sleep(1);
-		goto join_retry;
-	case CPG_ERR_SECURITY:
-		eprintf("Permission error.\n");
-		return -1;
-	default:
-		eprintf("Failed to join the sheepdog group, %d\n", ret);
+	fd = sys->cdrv->init(&handlers, &sys->this_sheepid);
+	if (fd < 0)
 		return -1;
-	}
 
-	sys->handle = cpg_handle;
-	sys->this_sheepid.pid = getpid();
+	ret = sys->cdrv->join();
+	if (ret != 0)
+		return -1;
 
-	ret = set_addr(nodeid, port);
-	if (ret)
-		return 1;
-	memcpy(sys->this_sheepid.addr, sys->this_node.addr,
+	memcpy(sys->this_node.addr, sys->this_sheepid.addr,
 	       sizeof(sys->this_node.addr));
 	sys->this_node.port = port;
 	sys->this_node.nr_vnodes = SD_DEFAULT_VNODES;
-	if (zone == -1)
-		sys->this_node.zone = nodeid;
-	else
+	if (zone == -1) {
+		/* use last 4 bytes as zone id */
+		uint8_t *b = sys->this_sheepid.addr + 12;
+		sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24;
+	} else
 		sys->this_node.zone = zone;
 	dprintf("zone id = %u\n", sys->this_node.zone);
 
@@ -2082,11 +1942,6 @@ join_retry:
 
 	INIT_LIST_HEAD(&sys->cpg_event_siblings);
 
-	ret = cpg_fd_get(cpg_handle, &fd);
-	if (ret != CPG_OK) {
-		eprintf("Failed to retrieve cpg file descriptor, %d\n", ret);
-		return 1;
-	}
 	ret = register_event(fd, group_handler, NULL);
 	if (ret) {
 		eprintf("Failed to register epoll events, %d\n", ret);
@@ -2110,5 +1965,5 @@ int leave_cluster(void)
 	msg.epoch = get_latest_epoch();
 
 	dprintf("%d\n", msg.epoch);
-	return send_message(sys->handle, (struct message_header *)&msg);
+	return sys->cdrv->notify(&msg, msg.header.msg_length);
 }
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 9985bdb..db2e691 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -11,6 +11,8 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
 #include <netinet/tcp.h>
 #include <sys/epoll.h>
 #include <fcntl.h>
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 789cc4c..0a73587 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -36,11 +36,12 @@ static struct option const long_options[] = {
 	{"debug", no_argument, NULL, 'd'},
 	{"directio", no_argument, NULL, 'D'},
 	{"zone", required_argument, NULL, 'z'},
+	{"cluster", required_argument, NULL, 'c'},
 	{"help", no_argument, NULL, 'h'},
 	{NULL, 0, NULL, 0},
 };
 
-static const char *short_options = "p:fl:dDz:h";
+static const char *short_options = "p:fl:dDz:c:h";
 
 static void usage(int status)
 {
@@ -57,6 +58,7 @@ Sheepdog Daemon, version %s\n\
   -d, --debug             print debug messages\n\
   -D, --directio          use direct IO\n\
   -z, --zone              specify the zone id\n\
+  -c, --cluster           specify the cluster driver\n\
   -h, --help              display this help and exit\n\
 ", PACKAGE_VERSION);
 	}
@@ -76,6 +78,7 @@ int main(int argc, char **argv)
 	char path[PATH_MAX];
 	int64_t zone = -1;
 	char *p;
+	struct cluster_driver *cdrv;
 
 	signal(SIGPIPE, SIG_IGN);
 
@@ -113,6 +116,24 @@ int main(int argc, char **argv)
 			}
 			sys->this_node.zone = zone;
 			break;
+		case 'c':
+			FOR_EACH_CLUSTER_DRIVER(cdrv) {
+				if (strcmp(cdrv->name, optarg) == 0) {
+					sys->cdrv = cdrv;
+					break;
+				}
+			}
+
+			if (!sys->cdrv) {
+				printf("No such cluster driver, %s\n", optarg);
+				printf("Supported drivers:");
+				FOR_EACH_CLUSTER_DRIVER(cdrv) {
+					printf(" %s", cdrv->name);
+				}
+				printf("\n");
+				exit(1);
+			}
+			break;
 		case 'h':
 			usage(0);
 			break;
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 8b20213..e2fcb40 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -12,7 +12,6 @@
 #define __SHEEP_PRIV_H__
 
 #include <inttypes.h>
-#include <corosync/cpg.h>
 
 #include "sheepdog_proto.h"
 #include "event.h"
@@ -42,11 +41,15 @@
 #define SD_RES_NETWORK_ERROR    0x81 /* Network error between sheeps */
 
 enum cpg_event_type {
-	CPG_EVENT_CONCHG,
-	CPG_EVENT_DELIVER,
+	CPG_EVENT_JOIN,
+	CPG_EVENT_LEAVE,
+	CPG_EVENT_NOTIFY,
 	CPG_EVENT_REQUEST,
 };
 
+#define is_membership_change_event(x) \
+	((x) == CPG_EVENT_JOIN || (x) == CPG_EVENT_LEAVE)
+
 struct cpg_event {
 	enum cpg_event_type ctype;
 	struct list_head cpg_event_list;
@@ -103,7 +106,8 @@ struct data_object_bmap {
 };
 
 struct cluster_info {
-	cpg_handle_t handle;
+	struct cluster_driver *cdrv;
+
 	/* set after finishing the JOIN procedure */
 	int join_finished;
 	struct sheepid this_sheepid;
-- 
1.7.2.5




More information about the sheepdog mailing list