[Sheepdog] [PATCH 1/7] sheep: move node membership management into cluster driver

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Wed Oct 12 15:22:13 CEST 2011


Currently, Sheepdog has two node lists; sd_node_list and
cpg_node_list.  The former is used for consistent hashing and seen
from users.  The latter is managed in the cluster driver and notified
in join_handler/leave_handler.  But this design is too complex.  We
should move all the cluster management stuff into the cluster driver.
Main changes of this patch are as follows:

 - make join process one phase

   Node joining was really complex; cpg_confchg() notifies the newly
   joining node, the node multicasts a SD_MSG_JOIN message, and the
   master node receives it and multicasts the response.  Moreover, we
   couldn't allow any I/O events during two multicasting.  This patch
   moves all of them into the cluster driver.

 - add check_join_handler() to cdrv_handlers

   This new handler is called on one of the Sheepdog nodes (e.g. in
   the case of the corosync driver, the master server will call this).
   check_join_handler() checks whether the joining node may join the
   cluster, and returns the result.

 - use sheepdog_node_list_entry in the arguments of
   join_handler()/leave_handler()

   We can use the notified node list for consistent hashing now.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/cluster.h          |   82 +++-----
 sheep/cluster/corosync.c |  460 ++++++++++++++++++++++++++++++++------------
 sheep/group.c            |  484 +++++++++++++++++++++++++--------------------
 sheep/sheep_priv.h       |    1 -
 4 files changed, 634 insertions(+), 393 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 89d0566..a6689c6 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -21,17 +21,28 @@
 #include "sheep.h"
 #include "logger.h"
 
-struct sheepid {
-	uint8_t addr[16];
-	uint64_t pid;
+enum cluster_join_result {
+	CJ_RES_SUCCESS, /* Success */
+	CJ_RES_FAIL, /* Fail to join.  The joining node has an invalidepoch. */
+	CJ_RES_JOIN_LATER, /* Fail to join.  The joining node should
+			    * be added after the cluster start working. */
+	CJ_RES_MASTER_TRANSFER, /* Transfer mastership.  The joining
+				 * node has a newer epoch, so this node
+				 * will leave the cluster (restart later). */
 };
 
 struct cdrv_handlers {
-	void (*join_handler)(struct sheepid *joined, struct sheepid *members,
-			     size_t nr_members);
-	void (*leave_handler)(struct sheepid *left, struct sheepid *members,
+	enum cluster_join_result (*check_join_handler)(
+		struct sheepdog_node_list_entry *joining, void *opaque);
+	void (*join_handler)(struct sheepdog_node_list_entry *joined,
+			     struct sheepdog_node_list_entry *members,
+			     size_t nr_members, enum cluster_join_result result,
+			     void *opaque);
+	void (*leave_handler)(struct sheepdog_node_list_entry *left,
+			      struct sheepdog_node_list_entry *members,
 			      size_t nr_members);
-	void (*notify_handler)(struct sheepid *sender, void *msg, size_t msg_len);
+	void (*notify_handler)(struct sheepdog_node_list_entry *sender,
+			       void *msg, size_t msg_len);
 };
 
 struct cluster_driver {
@@ -44,17 +55,23 @@ struct cluster_driver {
 	 * may be used with the poll(2) to monitor cluster events.  On
 	 * error, returns -1.
 	 */
-	int (*init)(struct cdrv_handlers *handlers, struct sheepid *myid);
+	int (*init)(struct cdrv_handlers *handlers, uint8_t *myaddr);
 
 	/*
 	 * Join the cluster
 	 *
 	 * This function is used to join the cluster, and notifies a
-	 * join event to all the nodes.
+	 * join event to all the nodes.  The copy of 'opaque' is
+	 * passed to check_join_handler() and join_handler().
+	 * check_join_handler() is called on only one of the nodes
+	 * which already paticipate in the cluster.  If the content of
+	 * 'opaque' is changed in check_join_handler(), the updated
+	 * 'opaque' must be passed to join_handler().
 	 *
 	 * Returns zero on success, -1 on error
 	 */
-	int (*join)(void);
+	int (*join)(struct sheepdog_node_list_entry *myself,
+		    void *opaque, size_t opaque_len);
 
 	/*
 	 * Leave the cluster
@@ -112,54 +129,15 @@ static void __attribute__((constructor)) regist_ ## driver(void) {	\
 	list_for_each_entry(driver, &cluster_drivers, list)
 
 
-static inline int sheepid_find(struct sheepid *sheeps, size_t nr_sheeps,
-			       struct sheepid *key)
-{
-	int i;
-
-	for (i = 0; i < nr_sheeps; i++) {
-		if (memcmp(sheeps + i, key, sizeof(*key)) == 0)
-			return i;
-	}
-	return -1;
-}
-
-static inline void sheepid_add(struct sheepid *sheeps1, size_t nr_sheeps1,
-			       struct sheepid *sheeps2, size_t nr_sheeps2)
-{
-	memcpy(sheeps1 + nr_sheeps1, sheeps2, sizeof(*sheeps2) * nr_sheeps2);
-}
-
-static inline void sheepid_del(struct sheepid *sheeps1, size_t nr_sheeps1,
-			       struct sheepid *sheeps2, size_t nr_sheeps2)
-{
-	int i, idx;
-
-	for (i = 0; i < nr_sheeps2; i++) {
-		idx = sheepid_find(sheeps1, nr_sheeps1, sheeps2 + i);
-		if (idx < 0)
-			panic("internal error: cannot find sheepid\n");
-
-		nr_sheeps1--;
-		memmove(sheeps1 + idx, sheeps1 + idx + 1,
-			sizeof(*sheeps1) * nr_sheeps1 - idx);
-	}
-}
-
-static inline char *sheepid_to_str(struct sheepid *id)
+static inline char *node_to_str(struct sheepdog_node_list_entry *id)
 {
 	static char str[256];
 	char name[256];
 
-	snprintf(str, sizeof(str), "ip: %s, pid: %" PRIu64,
-		 addr_to_str(name, sizeof(name), id->addr, 0), id->pid);
+	snprintf(str, sizeof(str), "ip: %s, port: %d",
+		 addr_to_str(name, sizeof(name), id->addr, 0), id->port);
 
 	return str;
 }
 
-static inline int sheepid_cmp(struct sheepid *id1, struct sheepid *id2)
-{
-	return memcmp(id1, id2, sizeof(*id1));
-}
-
 #endif
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 7aa3d02..99cd69f 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -16,11 +16,17 @@
 #include "cluster.h"
 #include "work.h"
 
+struct cpg_node {
+	uint32_t nodeid;
+	uint32_t pid;
+	struct sheepdog_node_list_entry ent;
+};
+
 static cpg_handle_t cpg_handle;
 static struct cpg_name cpg_group = { 9, "sheepdog" };
 
 static corosync_cfg_handle_t cfg_handle;
-static struct sheepid this_sheepid;
+static struct cpg_node this_node;
 
 static struct work_queue *corosync_block_wq;
 
@@ -29,6 +35,9 @@ static struct cdrv_handlers corosync_handlers;
 static LIST_HEAD(corosync_event_list);
 static LIST_HEAD(corosync_block_list);
 
+static struct cpg_node cpg_nodes[SD_MAX_NODES];
+static size_t nr_cpg_nodes;
+
 /* event types which are dispatched in corosync_dispatch() */
 enum corosync_event_type {
 	COROSYNC_EVENT_TYPE_JOIN,
@@ -38,6 +47,8 @@ enum corosync_event_type {
 
 /* multicast message type */
 enum corosync_message_type {
+	COROSYNC_MSG_TYPE_JOIN_REQUEST,
+	COROSYNC_MSG_TYPE_JOIN_RESPONSE,
 	COROSYNC_MSG_TYPE_NOTIFY,
 	COROSYNC_MSG_TYPE_BLOCK,
 	COROSYNC_MSG_TYPE_UNBLOCK,
@@ -46,19 +57,31 @@ enum corosync_message_type {
 struct corosync_event {
 	enum corosync_event_type type;
 
-	struct sheepid members[SD_MAX_NODES];
-	size_t nr_members;
-
-	struct sheepid sender;
+	struct cpg_node sender;
 	void *msg;
 	size_t msg_len;
 
+	enum cluster_join_result result;
+	uint32_t nr_nodes;
+	struct cpg_node nodes[SD_MAX_NODES];
+
 	int blocked;
 	int callbacked;
+	int first_node;
 
 	struct list_head list;
 };
 
+struct corosync_message {
+	struct cpg_node sender;
+	enum corosync_message_type type : 4;
+	enum cluster_join_result result : 4;
+	uint32_t msg_len;
+	uint32_t nr_nodes;
+	struct cpg_node nodes[SD_MAX_NODES];
+	uint8_t msg[0];
+};
+
 struct corosync_block_msg {
 	void *msg;
 	size_t msg_len;
@@ -68,6 +91,44 @@ struct corosync_block_msg {
 	struct list_head list;
 };
 
+static int cpg_node_equal(struct cpg_node *a, struct cpg_node *b)
+{
+	return (a->nodeid == b->nodeid && a->pid == b->pid);
+}
+
+static inline int find_cpg_node(struct cpg_node *nodes, size_t nr_nodes,
+				struct cpg_node *key)
+{
+	int i;
+
+	for (i = 0; i < nr_nodes; i++)
+		if (cpg_node_equal(nodes + i, key))
+			return i;
+
+	return -1;
+}
+
+static inline void add_cpg_node(struct cpg_node *nodes, size_t nr_nodes,
+				struct cpg_node *added)
+{
+	nodes[nr_nodes++] = *added;
+}
+
+static inline void del_cpg_node(struct cpg_node *nodes, size_t nr_nodes,
+				struct cpg_node *deled)
+{
+	int idx;
+
+	idx = find_cpg_node(nodes, nr_nodes, deled);
+	if (idx < 0) {
+		dprintf("cannot find node\n");
+		return;
+	}
+
+	nr_nodes--;
+	memmove(nodes + idx, nodes + idx + 1, sizeof(*nodes) * nr_nodes - idx);
+}
+
 static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
 {
 	int ret, nr;
@@ -103,24 +164,26 @@ static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
 	return 0;
 }
 
-static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
-				struct sheepid *sheeps, size_t nr)
-{
-	int i;
-
-	for (i = 0; i < nr; i++) {
-		nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr);
-		sheeps[i].pid = cpgs[i].pid;
-	}
-}
-
-static int send_message(uint64_t type, void *msg, size_t msg_len)
+static int send_message(enum corosync_message_type type,
+			enum cluster_join_result result,
+			struct cpg_node *sender, struct cpg_node *nodes,
+			size_t nr_nodes, void *msg, size_t msg_len)
 {
 	struct iovec iov[2];
 	int ret, iov_cnt = 1;
+	struct corosync_message cmsg = {
+		.type = type,
+		.msg_len = msg_len,
+		.result = result,
+		.sender = *sender,
+		.nr_nodes = nr_nodes
+	};
 
-	iov[0].iov_base = &type;
-	iov[0].iov_len = sizeof(type);
+	if (nodes)
+		memcpy(cmsg.nodes, nodes, sizeof(*nodes) * nr_nodes);
+
+	iov[0].iov_base = &cmsg;
+	iov[0].iov_len = sizeof(cmsg);
 	if (msg) {
 		iov[1].iov_base = msg;
 		iov[1].iov_len = msg_len;
@@ -153,13 +216,15 @@ static void corosync_block_done(struct work *work, int idx)
 {
 	struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
 
-	send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);
+	send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
+		     bm->msg, bm->msg_len);
 
 	free(bm->msg);
 	free(bm);
 }
 
-static struct corosync_event *find_block_event(struct sheepid *sender)
+static struct corosync_event *find_block_event(enum corosync_event_type type,
+					       struct cpg_node *sender)
 {
 	struct corosync_event *cevent;
 
@@ -167,67 +232,197 @@ static struct corosync_event *find_block_event(struct sheepid *sender)
 		if (!cevent->blocked)
 			continue;
 
-		if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY &&
-		    sheepid_cmp(&cevent->sender, sender) == 0)
+		if (cevent->type == type &&
+		    cpg_node_equal(&cevent->sender, sender))
 			return cevent;
 	}
 
 	return NULL;
 }
 
+static int is_master(void)
+{
+	if (nr_cpg_nodes == 0)
+		/* this node should be the first cpg node */
+		return 1;
+
+	return cpg_node_equal(&cpg_nodes[0], &this_node);
+}
+
+static void build_node_list(struct cpg_node *nodes, size_t nr_nodes,
+			    struct sheepdog_node_list_entry *entries)
+{
+	int i;
+
+	for (i = 0; i < nr_nodes; i++)
+		entries[i] = nodes[i].ent;
+}
+
+/*
+ * Process one dispatch event
+ *
+ * Returns 1 if the event is processed
+ */
+static int __corosync_dispatch_one(struct corosync_event *cevent)
+{
+	struct corosync_block_msg *bm;
+	enum cluster_join_result res;
+	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+	int idx;
+
+	switch (cevent->type) {
+	case COROSYNC_EVENT_TYPE_JOIN:
+		if (cevent->blocked) {
+			if (!is_master())
+				return 0;
+
+			if (!cevent->msg)
+				/* we haven't receive JOIN_REQUEST yet */
+				return 0;
+
+			if (cevent->callbacked)
+				/* check_join() must be called only once */
+				return 0;
+
+			build_node_list(cpg_nodes, nr_cpg_nodes, entries);
+			res = corosync_handlers.check_join_handler(&cevent->sender.ent,
+								   cevent->msg);
+			if (res == CJ_RES_MASTER_TRANSFER)
+				nr_cpg_nodes = 0;
+
+			send_message(COROSYNC_MSG_TYPE_JOIN_RESPONSE, res,
+				     &cevent->sender, cpg_nodes, nr_cpg_nodes,
+				     cevent->msg, cevent->msg_len);
+
+			if (res == CJ_RES_MASTER_TRANSFER) {
+				eprintf("Restart me later when master is up, please. Bye.\n");
+				exit(1);
+			}
+
+			cevent->callbacked = 1;
+			return 0;
+		}
+
+		switch (cevent->result) {
+		case CJ_RES_SUCCESS:
+		case CJ_RES_MASTER_TRANSFER:
+			add_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
+			nr_cpg_nodes++;
+			/* fall through */
+		case CJ_RES_FAIL:
+		case CJ_RES_JOIN_LATER:
+			build_node_list(cpg_nodes, nr_cpg_nodes, entries);
+			corosync_handlers.join_handler(&cevent->sender.ent, entries,
+						       nr_cpg_nodes, cevent->result,
+						       cevent->msg);
+			break;
+		}
+		break;
+	case COROSYNC_EVENT_TYPE_LEAVE:
+		idx = find_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
+		if (idx < 0)
+			break;
+		cevent->sender.ent = cpg_nodes[idx].ent;
+
+		del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
+		nr_cpg_nodes--;
+		build_node_list(cpg_nodes, nr_cpg_nodes, entries);
+		corosync_handlers.leave_handler(&cevent->sender.ent,
+						entries, nr_cpg_nodes);
+		break;
+	case COROSYNC_EVENT_TYPE_NOTIFY:
+		if (cevent->blocked) {
+			if (cpg_node_equal(&cevent->sender, &this_node) &&
+			    !cevent->callbacked) {
+				/* call a block callback function from a worker thread */
+				if (list_empty(&corosync_block_list))
+					panic("cannot call block callback\n");
+
+				bm = list_first_entry(&corosync_block_list,
+						      typeof(*bm), list);
+				list_del(&bm->list);
+
+				bm->work.fn = corosync_block;
+				bm->work.done = corosync_block_done;
+				queue_work(corosync_block_wq, &bm->work);
+
+				cevent->callbacked = 1;
+			}
+
+			/* block the rest messages until unblock message comes */
+			return 0;
+		}
+
+		corosync_handlers.notify_handler(&cevent->sender.ent, cevent->msg,
+						 cevent->msg_len);
+		break;
+	}
+
+	return 1;
+}
+
 static void __corosync_dispatch(void)
 {
 	struct corosync_event *cevent;
-	struct corosync_block_msg *bm;
+	static int join_finished;
+	int done;
 
 	while (!list_empty(&corosync_event_list)) {
 		cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
 
-		switch (cevent->type) {
-		case COROSYNC_EVENT_TYPE_JOIN:
-			corosync_handlers.join_handler(&cevent->sender,
-						       cevent->members,
-						       cevent->nr_members);
-			break;
-		case COROSYNC_EVENT_TYPE_LEAVE:
-			corosync_handlers.leave_handler(&cevent->sender,
-							cevent->members,
-							cevent->nr_members);
-			break;
-		case COROSYNC_EVENT_TYPE_NOTIFY:
-			if (cevent->blocked) {
-				if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 &&
-				    !cevent->callbacked) {
-					/* call a block callback function from a worker thread */
-					if (list_empty(&corosync_block_list))
-						panic("cannot call block callback\n");
-
-					bm = list_first_entry(&corosync_block_list,
-							      typeof(*bm), list);
-					list_del(&bm->list);
-
-					bm->work.fn = corosync_block;
-					bm->work.done = corosync_block_done;
-					queue_work(corosync_block_wq, &bm->work);
-
-					cevent->callbacked = 1;
+		/* update join status */
+		if (!join_finished && cevent->type == COROSYNC_EVENT_TYPE_JOIN) {
+			if (cevent->first_node) {
+				join_finished = 1;
+				nr_cpg_nodes = 0;
+			}
+			if (!cevent->blocked && cpg_node_equal(&cevent->sender, &this_node)) {
+				if (cevent->result == CJ_RES_SUCCESS ||
+				    cevent->result == CJ_RES_MASTER_TRANSFER) {
+					join_finished = 1;
+					nr_cpg_nodes = cevent->nr_nodes;
+					memcpy(cpg_nodes, cevent->nodes,
+					       sizeof(*cevent->nodes) * cevent->nr_nodes);
 				}
-
-				/* block the rest messages until unblock message comes */
-				goto out;
 			}
+		}
+
+		if (join_finished)
+			done = __corosync_dispatch_one(cevent);
+		else
+			done = !cevent->blocked;
 
-			corosync_handlers.notify_handler(&cevent->sender,
-							 cevent->msg,
-							 cevent->msg_len);
+		if (!done)
 			break;
-		}
 
 		list_del(&cevent->list);
 		free(cevent);
 	}
-out:
-	return;
+}
+
+static struct corosync_event *update_block_event(enum corosync_event_type type,
+						 struct cpg_node *sender,
+						 void *msg, size_t msg_len)
+{
+	struct corosync_event *cevent;
+
+	cevent = find_block_event(type, sender);
+	if (!cevent)
+		/* block message was casted before this node joins */
+		return NULL;
+
+	cevent->msg_len = msg_len;
+	if (msg_len) {
+		cevent->msg = realloc(cevent->msg, msg_len);
+		if (!cevent->msg)
+			panic("oom\n");
+		memcpy(cevent->msg, msg, msg_len);
+	} else {
+		free(cevent->msg);
+		cevent->msg = NULL;
+	}
+
+	return cevent;
 }
 
 static void cdrv_cpg_deliver(cpg_handle_t handle,
@@ -236,57 +431,67 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
 			     void *msg, size_t msg_len)
 {
 	struct corosync_event *cevent;
-	uint64_t type;
-	struct sheepid sender;
-
-	nodeid_to_addr(nodeid, sender.addr);
-	sender.pid = pid;
-
-	memcpy(&type, msg, sizeof(type));
-	msg = (uint8_t *)msg + sizeof(type);
-	msg_len -= sizeof(type);
+	struct corosync_message *cmsg = msg;
 
 	cevent = zalloc(sizeof(*cevent));
 	if (!cevent)
 		panic("oom\n");
 
-	switch (type) {
+	switch (cmsg->type) {
+	case COROSYNC_MSG_TYPE_JOIN_REQUEST:
+		free(cevent); /* we don't add a new cluster event in this case */
+
+		cevent = update_block_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender,
+					    cmsg->msg, cmsg->msg_len);
+		if (!cevent)
+			break;
+
+		cevent->sender = cmsg->sender;
+		cevent->msg_len = cmsg->msg_len;
+		break;
 	case COROSYNC_MSG_TYPE_BLOCK:
 		cevent->blocked = 1;
 		/* fall through */
 	case COROSYNC_MSG_TYPE_NOTIFY:
 		cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
-		cevent->sender = sender;
-		cevent->msg_len = msg_len;
-		if (msg_len) {
-			cevent->msg = zalloc(msg_len);
+
+		cevent->sender = cmsg->sender;
+		cevent->msg_len = cmsg->msg_len;
+		if (cmsg->msg_len) {
+			cevent->msg = zalloc(cmsg->msg_len);
 			if (!cevent->msg)
 				panic("oom\n");
-			memcpy(cevent->msg, msg, msg_len);
+			memcpy(cevent->msg, cmsg->msg, cmsg->msg_len);
 		} else
 			cevent->msg = NULL;
 
 		list_add_tail(&cevent->list, &corosync_event_list);
 		break;
+	case COROSYNC_MSG_TYPE_JOIN_RESPONSE:
+		free(cevent); /* we don't add a new cluster event in this case */
+
+		cevent = update_block_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender,
+					    cmsg->msg, cmsg->msg_len);
+		if (!cevent)
+			break;
+
+		cevent->blocked = 0;
+
+		cevent->result = cmsg->result;
+		cevent->nr_nodes = cmsg->nr_nodes;
+		memcpy(cevent->nodes, cmsg->nodes,
+		       sizeof(*cmsg->nodes) * cmsg->nr_nodes);
+
+		break;
 	case COROSYNC_MSG_TYPE_UNBLOCK:
 		free(cevent); /* we don't add a new cluster event in this case */
 
-		cevent = find_block_event(&sender);
+		cevent = update_block_event(COROSYNC_EVENT_TYPE_NOTIFY,
+					    &cmsg->sender, cmsg->msg, cmsg->msg_len);
 		if (!cevent)
-			/* block message was casted before this node joins */
 			break;
 
 		cevent->blocked = 0;
-		cevent->msg_len = msg_len;
-		if (msg_len) {
-			cevent->msg = realloc(cevent->msg, msg_len);
-			if (!cevent->msg)
-				panic("oom\n");
-			memcpy(cevent->msg, msg, msg_len);
-		} else {
-			free(cevent->msg);
-			cevent->msg = NULL;
-		}
 		break;
 	}
 
@@ -304,27 +509,32 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
 {
 	struct corosync_event *cevent;
 	int i;
-	struct sheepid member_sheeps[SD_MAX_NODES];
-	struct sheepid joined_sheeps[SD_MAX_NODES];
-	struct sheepid left_sheeps[SD_MAX_NODES];
-
-	/* convert cpg_address to sheepid*/
-	cpg_addr_to_sheepid(member_list, member_sheeps, member_list_entries);
-	cpg_addr_to_sheepid(left_list, left_sheeps, left_list_entries);
-	cpg_addr_to_sheepid(joined_list, joined_sheeps, joined_list_entries);
-
-	/* calculate a start member list */
-	sheepid_del(member_sheeps, member_list_entries,
-		    joined_sheeps, joined_list_entries);
-	member_list_entries -= joined_list_entries;
+	struct cpg_node joined_sheeps[SD_MAX_NODES];
+	struct cpg_node left_sheeps[SD_MAX_NODES];
 
-	sheepid_add(member_sheeps, member_list_entries,
-		    left_sheeps, left_list_entries);
-	member_list_entries += left_list_entries;
+	/* convert cpg_address to cpg_node */
+	for (i = 0; i < left_list_entries; i++) {
+		left_sheeps[i].nodeid = left_list[i].nodeid;
+		left_sheeps[i].pid = left_list[i].pid;
+	}
+	for (i = 0; i < joined_list_entries; i++) {
+		joined_sheeps[i].nodeid = joined_list[i].nodeid;
+		joined_sheeps[i].pid = joined_list[i].pid;
+	}
 
 	/* dispatch leave_handler */
 	for (i = 0; i < left_list_entries; i++) {
-		cevent = find_block_event(left_sheeps + i);
+		cevent = find_block_event(COROSYNC_EVENT_TYPE_JOIN,
+					  left_sheeps + i);
+		if (cevent) {
+			/* the node left before joining */
+			list_del(&cevent->list);
+			free(cevent);
+			continue;
+		}
+
+		cevent = find_block_event(COROSYNC_EVENT_TYPE_NOTIFY,
+					  left_sheeps + i);
 		if (cevent) {
 			/* the node left before sending UNBLOCK */
 			list_del(&cevent->list);
@@ -335,14 +545,8 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
 		if (!cevent)
 			panic("oom\n");
 
-		sheepid_del(member_sheeps, member_list_entries,
-			    left_sheeps + i, 1);
-		member_list_entries--;
-
 		cevent->type = COROSYNC_EVENT_TYPE_LEAVE;
 		cevent->sender = left_sheeps[i];
-		memcpy(cevent->members, member_sheeps, sizeof(member_sheeps));
-		cevent->nr_members = member_list_entries;
 
 		list_add_tail(&cevent->list, &corosync_event_list);
 	}
@@ -353,14 +557,12 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
 		if (!cevent)
 			panic("oom\n");
 
-		sheepid_add(member_sheeps, member_list_entries,
-			    joined_sheeps, 1);
-		member_list_entries++;
-
 		cevent->type = COROSYNC_EVENT_TYPE_JOIN;
 		cevent->sender = joined_sheeps[i];
-		memcpy(cevent->members, member_sheeps, sizeof(member_sheeps));
-		cevent->nr_members = member_list_entries;
+		cevent->blocked = 1; /* FIXME: add explanation */
+		if (member_list_entries == joined_list_entries - left_list_entries &&
+		    cpg_node_equal(&joined_sheeps[0], &this_node))
+			cevent->first_node = 1;
 
 		list_add_tail(&cevent->list, &corosync_event_list);
 	}
@@ -368,7 +570,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
 	__corosync_dispatch();
 }
 
-static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
+static int corosync_init(struct cdrv_handlers *handlers, uint8_t *myaddr)
 {
 	int ret, fd;
 	uint32_t nodeid;
@@ -398,14 +600,14 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
 		return -1;
 	}
 
-	ret = nodeid_to_addr(nodeid, myid->addr);
+	ret = nodeid_to_addr(nodeid, myaddr);
 	if (ret < 0) {
 		eprintf("failed to get local address\n");
 		return -1;
 	}
 
-	myid->pid = getpid();
-	this_sheepid = *myid;
+	this_node.nodeid = nodeid;
+	this_node.pid = getpid();
 
 	ret = cpg_fd_get(cpg_handle, &fd);
 	if (ret != CPG_OK) {
@@ -418,7 +620,8 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
 	return fd;
 }
 
-static int corosync_join(void)
+static int corosync_join(struct sheepdog_node_list_entry *myself,
+			 void *opaque, size_t opaque_len)
 {
 	int ret;
 retry:
@@ -438,7 +641,12 @@ retry:
 		return -1;
 	}
 
-	return 0;
+	this_node.ent = *myself;
+
+	ret = send_message(COROSYNC_MSG_TYPE_JOIN_REQUEST, 0, &this_node,
+			   NULL, 0, opaque, opaque_len);
+
+	return ret;
 }
 
 static int corosync_leave(void)
@@ -472,9 +680,11 @@ static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))
 		bm->cb = block_cb;
 		list_add_tail(&bm->list, &corosync_block_list);
 
-		ret = send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0);
+		ret = send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node,
+				   NULL, 0, NULL, 0);
 	} else
-		ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, msg, msg_len);
+		ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
+				   NULL, 0, msg, msg_len);
 
 	return ret;
 }
diff --git a/sheep/group.c b/sheep/group.c
index 0b18fd2..42c8d71 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -25,7 +25,6 @@
 #include "cluster.h"
 
 struct node {
-	struct sheepid sheepid;
 	struct sheepdog_node_list_entry ent;
 	struct list_head list;
 };
@@ -42,7 +41,6 @@ struct message_header {
 	uint8_t op;
 	uint8_t state;
 	uint32_t msg_length;
-	struct sheepid sheepid;
 	struct sheepdog_node_list_entry from;
 };
 
@@ -57,12 +55,10 @@ struct join_message {
 	uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */
 	uint8_t pad[3];
 	struct {
-		struct sheepid sheepid;
 		struct sheepdog_node_list_entry ent;
 	} nodes[SD_MAX_NODES];
 	uint32_t nr_leave_nodes;
 	struct {
-		struct sheepid sheepid;
 		struct sheepdog_node_list_entry ent;
 	} leave_nodes[SD_MAX_NODES];
 };
@@ -93,17 +89,19 @@ struct work_notify {
 struct work_join {
 	struct cpg_event cev;
 
-	struct sheepid *member_list;
+	struct sheepdog_node_list_entry *member_list;
 	size_t member_list_entries;
-	struct sheepid joined;
+	struct sheepdog_node_list_entry joined;
+
+	struct join_message jm;
 };
 
 struct work_leave {
 	struct cpg_event cev;
 
-	struct sheepid *member_list;
+	struct sheepdog_node_list_entry *member_list;
 	size_t member_list_entries;
-	struct sheepid left;
+	struct sheepdog_node_list_entry left;
 };
 
 #define print_node_list(node_list)				\
@@ -111,11 +109,11 @@ struct work_leave {
 	struct node *__node;					\
 	char __name[128];					\
 	list_for_each_entry(__node, node_list, list) {		\
-		dprintf("%c pid: %ld, ip: %s\n",		\
+		dprintf("%c ip: %s, port: %d\n",		\
 			is_myself(__node->ent.addr, __node->ent.port) ? 'l' : ' ',	\
-			__node->sheepid.pid,			\
 			addr_to_str(__name, sizeof(__name),	\
-			__node->ent.addr, __node->ent.port));	\
+				    __node->ent.addr, __node->ent.port), \
+			__node->ent.port);			\
 	}							\
 })
 
@@ -389,12 +387,13 @@ out:
 	exit(1);
 }
 
-static struct node *find_node(struct list_head *node_list, struct sheepid *id)
+static struct node *find_node(struct list_head *node_list,
+			      struct sheepdog_node_list_entry *ent)
 {
 	struct node *node;
 
 	list_for_each_entry(node, node_list, list) {
-		if (sheepid_cmp(&node->sheepid, id) == 0)
+		if (node_cmp(&node->ent, ent) == 0)
 			return node;
 	}
 
@@ -483,7 +482,6 @@ static int add_node_to_leave_list(struct message_header *msg)
 			goto ret;
 		}
 
-		n->sheepid = msg->sheepid;
 		n->ent = msg->from;
 
 		list_add_tail(&n->list, &sys->leave_list);
@@ -504,7 +502,6 @@ static int add_node_to_leave_list(struct message_header *msg)
 				continue;
 			}
 
-			n->sheepid = jm->leave_nodes[i].sheepid;
 			n->ent = jm->leave_nodes[i].ent;
 
 			list_add_tail(&n->list, &tmp_list);
@@ -647,7 +644,6 @@ static int get_cluster_status(struct sheepdog_node_list_entry *from,
 
 static void join(struct join_message *msg)
 {
-	struct node *node;
 	struct sheepdog_node_list_entry entry[SD_MAX_NODES];
 	int i;
 
@@ -666,12 +662,6 @@ static void join(struct join_message *msg)
 					 &msg->inc_epoch);
 	msg->nr_sobjs = sys->nr_sobjs;
 	msg->ctime = get_cluster_ctime();
-	msg->nr_nodes = 0;
-	list_for_each_entry(node, &sys->sd_node_list, list) {
-		msg->nodes[msg->nr_nodes].sheepid = node->sheepid;
-		msg->nodes[msg->nr_nodes].ent = node->ent;
-		msg->nr_nodes++;
-	}
 }
 
 static int get_vdi_bitmap_from(struct sheepdog_node_list_entry *node)
@@ -737,18 +727,16 @@ static void get_vdi_bitmap_from_sd_list(void)
 		get_vdi_bitmap_from(&nodes[i]);
 }
 
-static int move_node_to_sd_list(struct sheepid *id,
-				struct sheepdog_node_list_entry ent)
+static int move_node_to_sd_list(struct sheepdog_node_list_entry ent)
 {
 	struct node *node;
 
-	node = find_node(&sys->cpg_node_list, id);
+	node = zalloc(sizeof(*node));
 	if (!node)
-		return 1;
+		panic("failed to alloc memory for a new node\n");
 
 	node->ent = ent;
 
-	list_del(&node->list);
 	list_add_tail(&node->list, &sys->sd_node_list);
 	sys->nr_vnodes = 0;
 
@@ -771,22 +759,14 @@ static int update_epoch_log(int epoch)
 	return ret;
 }
 
-static void update_cluster_info(struct join_message *msg)
+static void update_cluster_info(struct join_message *msg,
+				struct sheepdog_node_list_entry *nodes,
+				size_t nr_nodes)
 {
 	int i;
-	int ret, nr_nodes = msg->nr_nodes;
+	int ret;
 
 	eprintf("status = %d, epoch = %d, %d, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished);
-	if (msg->result != SD_RES_SUCCESS) {
-		if (is_myself(msg->header.from.addr, msg->header.from.port)) {
-			eprintf("failed to join sheepdog, %d\n", msg->result);
-			leave_cluster();
-			eprintf("Restart me later when master is up, please.Bye.\n");
-			exit(1);
-			/* sys->status = SD_STATUS_JOIN_FAILED; */
-		}
-		return;
-	}
 
 	if (sys->status == SD_STATUS_JOIN_FAILED)
 		return;
@@ -799,15 +779,16 @@ static void update_cluster_info(struct join_message *msg)
 
 	sys->epoch = msg->epoch;
 	for (i = 0; i < nr_nodes; i++) {
-		ret = move_node_to_sd_list(&msg->nodes[i].sheepid,
-					   msg->nodes[i].ent);
+		if (node_cmp(nodes + i, &msg->header.from) == 0)
+			continue;
+		ret = move_node_to_sd_list(nodes[i]);
 		/*
 		 * the node belonged to sheepdog when the master build
 		 * the JOIN response however it has gone.
 		 */
 		if (ret)
 			vprintf(SDOG_INFO "%s has gone\n",
-				sheepid_to_str(&msg->nodes[i].sheepid));
+				node_to_str(&nodes[i]));
 	}
 
 	if (msg->cluster_status != SD_STATUS_OK)
@@ -819,14 +800,14 @@ static void update_cluster_info(struct join_message *msg)
 		update_epoch_log(sys->epoch);
 
 join_finished:
-	ret = move_node_to_sd_list(&msg->header.sheepid, msg->header.from);
+	ret = move_node_to_sd_list(msg->header.from);
 	/*
 	 * this should not happen since __sd_deliver() checks if the
 	 * host from msg on cpg_node_list.
 	 */
 	if (ret)
 		vprintf(SDOG_ERR "%s has gone\n",
-			sheepid_to_str(&msg->header.sheepid));
+			node_to_str(&msg->header.from));
 
 	if (msg->cluster_status == SD_STATUS_OK) {
 		if (msg->inc_epoch) {
@@ -995,24 +976,24 @@ static void __sd_notify(struct cpg_event *cevent)
 	char name[128];
 	struct node *node;
 
-	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %ld\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, port: %d\n",
 		m->op, m->state, m->msg_length,
 		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
-		m->sheepid.pid);
+		m->from.port);
 
 	/*
 	 * we don't want to perform any deliver events except mastership_tx event
 	 * until we join; we wait for our JOIN message.
 	 */
 	if (!sys->join_finished && !master_tx_message(m)) {
-		if (sheepid_cmp(&m->sheepid, &sys->this_sheepid) != 0) {
+		if (node_cmp(&m->from, &sys->this_node) != 0) {
 			cevent->skip = 1;
 			return;
 		}
 	}
 
 	if (join_message(m)) {
-		node = find_node(&sys->cpg_node_list, &m->sheepid);
+		node = find_node(&sys->cpg_node_list, &m->from);
 		if (!node) {
 			dprintf("the node was left before join operation is finished\n");
 			return;
@@ -1020,70 +1001,6 @@ static void __sd_notify(struct cpg_event *cevent)
 
 		node->ent = m->from;
 	}
-
-	if (m->state == DM_FIN) {
-		switch (m->op) {
-		case SD_MSG_JOIN:
-			if (((struct join_message *)m)->cluster_status == SD_STATUS_OK)
-				if (sys->status != SD_STATUS_OK) {
-					struct join_message *msg = (struct join_message *)m;
-					int i;
-
-					get_vdi_bitmap_from_sd_list();
-					get_vdi_bitmap_from(&m->from);
-					for (i = 0; i < msg->nr_nodes;i++)
-						get_vdi_bitmap_from(&msg->nodes[i].ent);
-			}
-			break;
-		}
-	}
-
-}
-
-static int tx_mastership(void)
-{
-	struct mastership_tx_message msg;
-	memset(&msg, 0, sizeof(msg));
-	msg.header.proto_ver = SD_SHEEP_PROTO_VER;
-	msg.header.op = SD_MSG_MASTER_TRANSFER;
-	msg.header.state = DM_FIN;
-	msg.header.msg_length = sizeof(msg);
-	msg.header.from = sys->this_node;
-	msg.header.sheepid = sys->this_sheepid;
-
-	return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
-}
-
-static void send_join_response(struct work_notify *w)
-{
-	struct message_header *m;
-	struct join_message *jm;
-	struct node *node;
-
-	m = w->msg;
-	jm = (struct join_message *)m;
-	join(jm);
-	m->state = DM_FIN;
-
-	dprintf("%d, %d\n", jm->result, jm->cluster_status);
-	if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) {
-		jm->nr_leave_nodes = 0;
-		list_for_each_entry(node, &sys->leave_list, list) {
-			jm->leave_nodes[jm->nr_leave_nodes].sheepid = node->sheepid;
-			jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
-			jm->nr_leave_nodes++;
-		}
-		print_node_list(&sys->leave_list);
-	} else if (jm->result != SD_RES_SUCCESS &&
-			jm->epoch > sys->epoch &&
-			jm->cluster_status == SD_STATUS_WAIT_FOR_JOIN) {
-		eprintf("Transfer mastership.\n");
-		tx_mastership();
-		eprintf("Restart me later when master is up, please.Bye.\n");
-		exit(1);
-	}
-	jm->epoch = sys->epoch;
-	sys->cdrv->notify(m, m->msg_length, NULL);
 }
 
 static void __sd_notify_done(struct cpg_event *cevent)
@@ -1100,10 +1017,9 @@ static void __sd_notify_done(struct cpg_event *cevent)
 	if (m->state == DM_FIN) {
 		switch (m->op) {
 		case SD_MSG_JOIN:
-			update_cluster_info((struct join_message *)m);
 			break;
 		case SD_MSG_LEAVE:
-			node = find_node(&sys->sd_node_list, &m->sheepid);
+			node = find_node(&sys->sd_node_list, &m->from);
 			if (node) {
 				sys->nr_vnodes = 0;
 
@@ -1125,7 +1041,7 @@ static void __sd_notify_done(struct cpg_event *cevent)
 				 */
 				if (!sys->join_finished) {
 					sys->join_finished = 1;
-					move_node_to_sd_list(&sys->this_sheepid, sys->this_node);
+					move_node_to_sd_list(sys->this_node);
 					sys->epoch = get_latest_epoch();
 				}
 
@@ -1158,7 +1074,6 @@ static void __sd_notify_done(struct cpg_event *cevent)
 	if (m->state == DM_INIT && is_master()) {
 		switch (m->op) {
 		case SD_MSG_JOIN:
-			send_join_response(w);
 			break;
 		default:
 			eprintf("unknown message %d\n", m->op);
@@ -1174,17 +1089,18 @@ static void __sd_notify_done(struct cpg_event *cevent)
 	}
 }
 
-static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
+static void sd_notify_handler(struct sheepdog_node_list_entry *sender,
+			      void *msg, size_t msg_len)
 {
 	struct cpg_event *cevent;
 	struct work_notify *w;
 	struct message_header *m = msg;
 	char name[128];
 
-	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %u\n",
 		m->op, m->state, m->msg_length,
 		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
-		sender->pid);
+		sender->port);
 
 	w = zalloc(sizeof(*w));
 	if (!w)
@@ -1212,7 +1128,7 @@ static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
 	start_cpg_event_work();
 }
 
-static void add_node(struct sheepid *id)
+static void add_node(struct sheepdog_node_list_entry *ent)
 {
 	struct node *node;
 
@@ -1220,16 +1136,16 @@ static void add_node(struct sheepid *id)
 	if (!node)
 		panic("failed to alloc memory for a new node\n");
 
-	node->sheepid = *id;
+	node->ent = *ent;
 
 	list_add_tail(&node->list, &sys->cpg_node_list);
 }
 
-static int del_node(struct sheepid *id)
+static int del_node(struct sheepdog_node_list_entry *ent)
 {
 	struct node *node;
 
-	node = find_node(&sys->sd_node_list, id);
+	node = find_node(&sys->sd_node_list, ent);
 	if (node) {
 		int nr;
 		struct sheepdog_node_list_entry e[SD_MAX_NODES];
@@ -1252,7 +1168,7 @@ static int del_node(struct sheepid *id)
 		return 1;
 	}
 
-	node = find_node(&sys->cpg_node_list, id);
+	node = find_node(&sys->cpg_node_list, ent);
 	if (node) {
 		list_del(&node->list);
 		free(node);
@@ -1264,7 +1180,7 @@ static int del_node(struct sheepid *id)
 /*
  * Check whether the majority of Sheepdog nodes are still alive or not
  */
-static int check_majority(struct sheepid *left)
+static int check_majority(struct sheepdog_node_list_entry *left)
 {
 	int nr_nodes = 0, nr_majority, nr_reachable = 0, fd;
 	struct node *node;
@@ -1279,7 +1195,7 @@ static int check_majority(struct sheepid *left)
 		return 1;
 
 	list_for_each_entry(node, &sys->sd_node_list, list) {
-		if (sheepid_cmp(&node->sheepid, left) == 0)
+		if (node_cmp(&node->ent, left) == 0)
 			continue;
 
 		addr_to_str(name, sizeof(name), node->ent.addr, 0);
@@ -1299,6 +1215,23 @@ static int check_majority(struct sheepid *left)
 	return 0;
 }
 
+static void __sd_join(struct cpg_event *cevent)
+{
+	struct work_join *w = container_of(cevent, struct work_join, cev);
+	struct join_message *msg = &w->jm;
+	int i;
+
+	if (msg->cluster_status != SD_STATUS_OK)
+		return;
+
+	if (sys->status == SD_STATUS_OK)
+		return;
+
+	get_vdi_bitmap_from_sd_list();
+	for (i = 0; i < w->member_list_entries; i++)
+		get_vdi_bitmap_from(w->member_list + i);
+}
+
 static void __sd_leave(struct cpg_event *cevent)
 {
 	struct work_leave *w = container_of(cevent, struct work_leave, cev);
@@ -1309,7 +1242,7 @@ static void __sd_leave(struct cpg_event *cevent)
 	}
 }
 
-static void send_join_request(struct sheepid *id)
+static int send_join_request(struct sheepdog_node_list_entry *ent)
 {
 	struct join_message msg;
 	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
@@ -1320,8 +1253,7 @@ static void send_join_request(struct sheepid *id)
 	msg.header.op = SD_MSG_JOIN;
 	msg.header.state = DM_INIT;
 	msg.header.msg_length = sizeof(msg);
-	msg.header.from = sys->this_node;
-	msg.header.sheepid = sys->this_sheepid;
+	msg.header.from = *ent;
 
 	get_global_nr_copies(&msg.nr_sobjs);
 
@@ -1333,22 +1265,24 @@ static void send_join_request(struct sheepid *id)
 			msg.nodes[i].ent = entries[i];
 	}
 
-	sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
+	ret = sys->cdrv->join(ent, &msg, msg.header.msg_length);
+
+	vprintf(SDOG_INFO "%s\n", node_to_str(&sys->this_node));
 
-	vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
+	return ret;
 }
 
 static void __sd_join_done(struct cpg_event *cevent)
 {
 	struct work_join *w = container_of(cevent, struct work_join, cev);
-	int ret, i;
-	int first_cpg_node = 0;
+	struct join_message *jm = &w->jm;
+	struct node *node, *t;
+	int i;
 
 	if (w->member_list_entries == 1 &&
-	    sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) {
+	    node_cmp(&w->joined, &sys->this_node) == 0) {
 		sys->join_finished = 1;
 		get_global_nr_copies(&sys->nr_sobjs);
-		first_cpg_node = 1;
 	}
 
 	if (list_empty(&sys->cpg_node_list)) {
@@ -1357,47 +1291,16 @@ static void __sd_join_done(struct cpg_event *cevent)
 	} else
 		add_node(&w->joined);
 
-	if (first_cpg_node) {
-		struct join_message msg;
-		struct sheepdog_node_list_entry entries[SD_MAX_NODES];
-		int nr_entries;
-		uint64_t ctime;
-		uint32_t epoch;
-
-		/*
-		 * If I'm the first sheep joins in colosync, I
-		 * becomes the master without sending JOIN.
-		 */
-
-		vprintf(SDOG_DEBUG "%s\n", sheepid_to_str(&sys->this_sheepid));
-
-		memset(&msg, 0, sizeof(msg));
-
-		msg.header.from = sys->this_node;
-		msg.header.sheepid = sys->this_sheepid;
-
-		nr_entries = ARRAY_SIZE(entries);
-		ret = read_epoch(&epoch, &ctime, entries, &nr_entries);
-		if (ret == SD_RES_SUCCESS) {
-			sys->epoch = epoch;
-			msg.ctime = ctime;
-			get_cluster_status(&msg.header.from, entries, nr_entries,
-					   ctime, epoch, &msg.cluster_status, NULL);
-		} else
-			msg.cluster_status = SD_STATUS_WAIT_FOR_FORMAT;
-
-		update_cluster_info(&msg);
+	print_node_list(&sys->sd_node_list);
 
-		if (sys->status == SD_STATUS_OK) /* sheepdog starts with one node */
-			start_recovery(sys->epoch);
+	update_cluster_info(jm, w->member_list, w->member_list_entries);
 
-		return;
+	if (sys->status == SD_STATUS_OK) {
+		list_for_each_entry_safe(node, t, &sys->leave_list, list) {
+			list_del(&node->list);
+		}
+		start_recovery(sys->epoch);
 	}
-
-	print_node_list(&sys->sd_node_list);
-
-	if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0)
-		send_join_request(&w->joined);
 }
 
 static void __sd_leave_done(struct cpg_event *cevent)
@@ -1455,6 +1358,7 @@ static void cpg_event_fn(struct work *work, int idx)
 
 	switch (cevent->ctype) {
 	case CPG_EVENT_JOIN:
+		__sd_join(cevent);
 		break;
 	case CPG_EVENT_LEAVE:
 		__sd_leave(cevent);
@@ -1746,62 +1650,214 @@ do_retry:
 	queue_work(sys->cpg_wqueue, &cpg_event_work);
 }
 
-static void sd_join_handler(struct sheepid *joined, struct sheepid *members,
-			    size_t nr_members)
+static enum cluster_join_result sd_check_join_handler(struct sheepdog_node_list_entry *joining,
+						      void *opaque)
+{
+	struct message_header *m = opaque;
+	struct join_message *jm;
+	struct node *node;
+
+	jm = (struct join_message *)m;
+
+	if (node_cmp(joining, &sys->this_node) == 0) {
+		struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+		int nr_entries;
+		uint64_t ctime;
+		uint32_t epoch;
+		int ret;
+
+		/*
+		 * If I'm the first sheep joins in colosync, I
+		 * becomes the master without sending JOIN.
+		 */
+
+		vprintf(SDOG_DEBUG "%s\n", node_to_str(&sys->this_node));
+
+		jm->header.from = sys->this_node;
+
+		nr_entries = ARRAY_SIZE(entries);
+		ret = read_epoch(&epoch, &ctime, entries, &nr_entries);
+		if (ret == SD_RES_SUCCESS) {
+			sys->epoch = epoch;
+			jm->ctime = ctime;
+			get_cluster_status(&jm->header.from, entries, nr_entries,
+					   ctime, epoch, &jm->cluster_status, NULL);
+		} else
+			jm->cluster_status = SD_STATUS_WAIT_FOR_FORMAT;
+
+		return CJ_RES_SUCCESS;
+	}
+
+	join(jm);
+	m->state = DM_FIN;
+
+	dprintf("%d, %d\n", jm->result, jm->cluster_status);
+	if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) {
+		jm->nr_leave_nodes = 0;
+		list_for_each_entry(node, &sys->leave_list, list) {
+			jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
+			jm->nr_leave_nodes++;
+		}
+		print_node_list(&sys->leave_list);
+	} else if (jm->result != SD_RES_SUCCESS &&
+			jm->epoch > sys->epoch &&
+			jm->cluster_status == SD_STATUS_WAIT_FOR_JOIN) {
+		eprintf("Transfer mastership. %d, %d\n", jm->epoch, sys->epoch);
+		return CJ_RES_MASTER_TRANSFER;
+	}
+	jm->epoch = sys->epoch;
+
+	if (jm->result == SD_RES_SUCCESS)
+		return CJ_RES_SUCCESS;
+	else if (jm->result == SD_RES_OLD_NODE_VER &&
+		 jm->result == SD_RES_NEW_NODE_VER)
+		return CJ_RES_JOIN_LATER;
+	else
+		return CJ_RES_FAIL;
+}
+
+static void sd_join_handler(struct sheepdog_node_list_entry *joined,
+			    struct sheepdog_node_list_entry *members,
+			    size_t nr_members, enum cluster_join_result result,
+			    void *opaque)
 {
 	struct cpg_event *cevent;
 	struct work_join *w = NULL;
 	int i, size;
+	int nr, nr_local, nr_leave;
+	struct node *n;
+	struct join_message *jm;
+	int le = get_latest_epoch();
 
-	dprintf("join %s\n", sheepid_to_str(joined));
-	for (i = 0; i < nr_members; i++)
-		dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
+	if (node_cmp(joined, &sys->this_node) == 0) {
+		if (result == CJ_RES_FAIL) {
+			eprintf("failed to join sheepdog\n");
+			sys->cdrv->leave();
+			exit(1);
+		} else if (result == CJ_RES_JOIN_LATER) {
+			eprintf("Restart me later when master is up, please .Bye.\n");
+			sys->cdrv->leave();
+			exit(1);
+		}
+	}
 
-	if (sys->status == SD_STATUS_SHUTDOWN)
-		return;
+	switch (result) {
+	case CJ_RES_SUCCESS:
+		dprintf("join %s\n", node_to_str(joined));
+		for (i = 0; i < nr_members; i++)
+			dprintf("[%x] %s\n", i, node_to_str(members + i));
 
-	w = zalloc(sizeof(*w));
-	if (!w)
-		goto oom;
+		if (sys->status == SD_STATUS_SHUTDOWN)
+			break;
 
-	cevent = &w->cev;
-	cevent->ctype = CPG_EVENT_JOIN;
+		w = zalloc(sizeof(*w));
+		if (!w)
+			panic("oom");
 
+		cevent = &w->cev;
+		cevent->ctype = CPG_EVENT_JOIN;
 
-	vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
 
-	size = sizeof(struct sheepid) * nr_members;
-	w->member_list = zalloc(size);
-	if (!w->member_list)
-		goto oom;
-	memcpy(w->member_list, members, size);
-	w->member_list_entries = nr_members;
+		vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
 
-	w->joined = *joined;
+		size = sizeof(struct sheepdog_node_list_entry) * nr_members;
+		w->member_list = zalloc(size);
+		if (!w->member_list)
+			panic("oom");
 
-	list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
-	start_cpg_event_work();
+		memcpy(w->member_list, members, size);
+		w->member_list_entries = nr_members;
 
-	return;
-oom:
-	if (w) {
-		if (w->member_list)
-			free(w->member_list);
-		free(w);
+		w->joined = *joined;
+
+		memcpy(&w->jm, opaque, sizeof(w->jm));
+
+		list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+		start_cpg_event_work();
+		break;
+	case CJ_RES_FAIL:
+	case CJ_RES_JOIN_LATER:
+		if (sys->status != SD_STATUS_WAIT_FOR_JOIN)
+			break;
+
+		n = zalloc(sizeof(*n));
+		if (!n)
+			panic("oom\n");
+
+		if (find_entry_list(joined, &sys->leave_list)
+		    || !find_entry_epoch(joined, le)) {
+			free(n);
+			break;
+		}
+
+		n->ent = *joined;
+
+		list_add_tail(&n->list, &sys->leave_list);
+
+		nr_local = get_nodes_nr_epoch(sys->epoch);
+		nr = nr_members;
+		nr_leave = get_nodes_nr_from(&sys->leave_list);
+
+		dprintf("%d == %d + %d \n", nr_local, nr, nr_leave);
+		if (nr_local == nr + nr_leave) {
+			sys->status = SD_STATUS_OK;
+			update_epoch_log(sys->epoch);
+			update_epoch_store(sys->epoch);
+		}
+		break;
+	case CJ_RES_MASTER_TRANSFER:
+		jm = (struct join_message *)opaque;
+		nr = jm->nr_leave_nodes;
+		for (i = 0; i < nr; i++) {
+			n = zalloc(sizeof(*n));
+			if (!n)
+				panic("oom\n");
+
+			if (find_entry_list(&jm->leave_nodes[i].ent, &sys->leave_list)
+			    || !find_entry_epoch(&jm->leave_nodes[i].ent, le)) {
+				free(n);
+				continue;
+			}
+
+			n->ent = jm->leave_nodes[i].ent;
+
+			list_add_tail(&n->list, &sys->leave_list);
+		}
+
+		/* Sheep needs this to identify itself as master.
+		 * Now mastership transfer is done.
+		 */
+		if (!sys->join_finished) {
+			sys->join_finished = 1;
+			move_node_to_sd_list(sys->this_node);
+			sys->epoch = get_latest_epoch();
+		}
+
+		nr_local = get_nodes_nr_epoch(sys->epoch);
+		nr = nr_members;
+		nr_leave = get_nodes_nr_from(&sys->leave_list);
+
+		dprintf("%d == %d + %d \n", nr_local, nr, nr_leave);
+		if (nr_local == nr + nr_leave) {
+			sys->status = SD_STATUS_OK;
+			update_epoch_log(sys->epoch);
+			update_epoch_store(sys->epoch);
+		}
+		break;
 	}
-	panic("failed to allocate memory for a confchg event\n");
 }
 
-static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
+static void sd_leave_handler(struct sheepdog_node_list_entry *left,
+			     struct sheepdog_node_list_entry *members,
 			     size_t nr_members)
 {
 	struct cpg_event *cevent;
 	struct work_leave *w = NULL;
 	int i, size;
 
-	dprintf("leave %s\n", sheepid_to_str(left));
+	dprintf("leave %s\n", node_to_str(left));
 	for (i = 0; i < nr_members; i++)
-		dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
+		dprintf("[%x] %s\n", i, node_to_str(members + i));
 
 	if (sys->status == SD_STATUS_SHUTDOWN)
 		return;
@@ -1816,7 +1872,7 @@ static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
 
 	vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
 
-	size = sizeof(struct sheepid) * nr_members;
+	size = sizeof(struct sheepdog_node_list_entry) * nr_members;
 	w->member_list = zalloc(size);
 	if (!w->member_list)
 		goto oom;
@@ -1843,6 +1899,7 @@ int create_cluster(int port, int64_t zone)
 	int fd, ret;
 	struct cluster_driver *cdrv;
 	struct cdrv_handlers handlers = {
+		.check_join_handler = sd_check_join_handler,
 		.join_handler = sd_join_handler,
 		.leave_handler = sd_leave_handler,
 		.notify_handler = sd_notify_handler,
@@ -1858,26 +1915,24 @@ int create_cluster(int port, int64_t zone)
 		}
 	}
 
-	fd = sys->cdrv->init(&handlers, &sys->this_sheepid);
+	fd = sys->cdrv->init(&handlers, sys->this_node.addr);
 	if (fd < 0)
 		return -1;
 
-	ret = sys->cdrv->join();
-	if (ret != 0)
-		return -1;
-
-	memcpy(sys->this_node.addr, sys->this_sheepid.addr,
-	       sizeof(sys->this_node.addr));
 	sys->this_node.port = port;
 	sys->this_node.nr_vnodes = SD_DEFAULT_VNODES;
 	if (zone == -1) {
 		/* use last 4 bytes as zone id */
-		uint8_t *b = sys->this_sheepid.addr + 12;
+		uint8_t *b = sys->this_node.addr + 12;
 		sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24;
 	} else
 		sys->this_node.zone = zone;
 	dprintf("zone id = %u\n", sys->this_node.zone);
 
+	ret = send_join_request(&sys->this_node);
+	if (ret != 0)
+		return -1;
+
 	if (get_latest_epoch() == 0)
 		sys->status = SD_STATUS_WAIT_FOR_FORMAT;
 	else
@@ -1912,7 +1967,6 @@ int leave_cluster(void)
 	msg.header.state = DM_FIN;
 	msg.header.msg_length = sizeof(msg);
 	msg.header.from = sys->this_node;
-	msg.header.sheepid = sys->this_sheepid;
 	msg.epoch = get_latest_epoch();
 
 	dprintf("%d\n", msg.epoch);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index b292548..5613f12 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -110,7 +110,6 @@ struct cluster_info {
 
 	/* set after finishing the JOIN procedure */
 	int join_finished;
-	struct sheepid this_sheepid;
 	struct sheepdog_node_list_entry this_node;
 
 	uint32_t epoch;
-- 
1.7.2.5




More information about the sheepdog mailing list