[sheepdog] [PATCH 2/2] sheep: remove join_message

MORITA Kazutaka morita.kazutaka at gmail.com
Wed Aug 7 23:29:13 CEST 2013


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

Now we have only a few fields in struct join_message.  To simplify the
codes, this patch moves the fields to struct cluster_info, and make
sheep send cluster_info instead of join_message.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 include/internal_proto.h |   33 +++++------
 sheep/group.c            |  148 ++++++++++++++++++++--------------------------
 sheep/object_cache.c     |    3 +-
 sheep/ops.c              |   12 ++--
 sheep/request.c          |    6 +-
 sheep/sheep.c            |    6 +-
 sheep/sheep_priv.h       |    1 -
 7 files changed, 91 insertions(+), 118 deletions(-)

diff --git a/include/internal_proto.h b/include/internal_proto.h
index 7db954e..3ca46cf 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -104,10 +104,12 @@
 #define SD_RES_STALE_OBJ        0x90 /* Object may be stale */
 #define SD_RES_CLUSTER_ERROR    0x91 /* Cluster driver error */
 
-#define SD_STATUS_OK                0x00000001
-#define SD_STATUS_WAIT              0x00000004
-#define SD_STATUS_SHUTDOWN          0x00000008
-#define SD_STATUS_KILLED            0x00000040
+enum sd_status {
+	SD_STATUS_OK = 1,
+	SD_STATUS_WAIT,
+	SD_STATUS_SHUTDOWN,
+	SD_STATUS_KILLED,
+};
 
 struct node_id {
 	uint8_t addr[16];
@@ -126,14 +128,20 @@ struct sd_node {
 	uint64_t        space;
 };
 
+/*
+ * A joining sheep multicasts the local cluster info.  Then, the existing nodes
+ * reply the latest cluster info which is unique among all of the nodes.
+ */
 struct cluster_info {
-	uint8_t nr_copies;
+	uint8_t proto_ver; /* the version number of the internal protocol */
 	uint8_t disable_recovery;
 	int16_t nr_nodes;
 	uint32_t epoch;
 	uint64_t ctime;
 	uint16_t flags;
-	uint16_t __pad[3];
+	uint8_t nr_copies;
+	enum sd_status status : 8;
+	uint32_t __pad;
 	uint8_t store[STORE_LEN];
 
 	/* node list at cluster_info->epoch */
@@ -150,19 +158,6 @@ struct epoch_log {
 	struct sd_node nodes[SD_MAX_NODES];
 };
 
-struct join_message {
-	uint8_t proto_ver;
-	uint8_t __pad[3];
-	uint32_t cluster_status;
-
-	/*
-	 * A joining sheep puts the local cluster info here.  After the master
-	 * replies it will contain the latest cluster info which is shared among
-	 * the existing nodes.
-	 */
-	struct cluster_info cinfo;
-};
-
 struct vdi_op_message {
 	struct sd_req req;
 	struct sd_rsp rsp;
diff --git a/sheep/group.c b/sheep/group.c
index c049970..59bb398 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -357,15 +357,14 @@ int epoch_log_read_remote(uint32_t epoch, struct sd_node *nodes, int len,
 	return 0;
 }
 
-static bool cluster_ctime_check(const struct join_message *jm)
+static bool cluster_ctime_check(const struct cluster_info *cinfo)
 {
-	if (jm->cinfo.epoch == 0 || sys->cinfo.epoch == 0)
+	if (cinfo->epoch == 0 || sys->cinfo.epoch == 0)
 		return true;
 
-	if (jm->cinfo.ctime != sys->cinfo.ctime) {
-		sd_eprintf("joining node ctime doesn't match: %"
-			   PRIu64 " vs %" PRIu64, jm->cinfo.ctime,
-			   sys->cinfo.ctime);
+	if (cinfo->ctime != sys->cinfo.ctime) {
+		sd_eprintf("joining node ctime doesn't match: %" PRIu64 " vs %"
+			   PRIu64, cinfo->ctime, sys->cinfo.ctime);
 		return false;
 	}
 
@@ -378,13 +377,13 @@ static bool cluster_ctime_check(const struct join_message *jm)
  * Sheepdog can start automatically if and only if all the members in the latest
  * epoch are gathered.
  */
-static bool enough_nodes_gathered(struct join_message *jm,
+static bool enough_nodes_gathered(struct cluster_info *cinfo,
 				  const struct sd_node *joining,
 				  const struct sd_node *nodes,
 				  size_t nr_nodes)
 {
-	for (int i = 0; i < jm->cinfo.nr_nodes; i++) {
-		const struct sd_node *key = jm->cinfo.nodes + i, *n;
+	for (int i = 0; i < cinfo->nr_nodes; i++) {
+		const struct sd_node *key = cinfo->nodes + i, *n;
 
 		n = xlfind(key, nodes, nr_nodes, node_cmp);
 		if (n == NULL && !node_eq(key, joining)) {
@@ -393,28 +392,25 @@ static bool enough_nodes_gathered(struct join_message *jm,
 		}
 	}
 
-	sd_dprintf("all the nodes are gathered, %d, %zd", jm->cinfo.nr_nodes,
+	sd_dprintf("all the nodes are gathered, %d, %zd", cinfo->nr_nodes,
 		   nr_nodes);
 	return true;
 }
 
-static int cluster_wait_check(const struct sd_node *joining,
-			      const struct sd_node *nodes, size_t nr_nodes,
-			      struct join_message *jm)
+static enum sd_status cluster_wait_check(const struct sd_node *joining,
+					 const struct sd_node *nodes,
+					 size_t nr_nodes,
+					 struct cluster_info *cinfo)
 {
-	if (!cluster_ctime_check(jm)) {
+	if (!cluster_ctime_check(cinfo)) {
 		sd_dprintf("joining node is invalid");
-		return sys->status;
+		return sys->cinfo.status;
 	}
 
-	if (jm->cinfo.epoch > sys->cinfo.epoch) {
+	if (cinfo->epoch > sys->cinfo.epoch) {
 		sd_dprintf("joining node has a larger epoch, %" PRIu32 ", %"
-			   PRIu32, jm->cinfo.epoch, sys->cinfo.epoch);
-		sys->cinfo = jm->cinfo;
-	} else if (jm->cinfo.epoch < sys->cinfo.epoch) {
-		sd_dprintf("joining node has a smaller epoch, %" PRIu32 ", %"
-			   PRIu32, jm->cinfo.epoch, sys->cinfo.epoch);
-		jm->cinfo = sys->cinfo;
+			   PRIu32, cinfo->epoch, sys->cinfo.epoch);
+		sys->cinfo = *cinfo;
 	}
 
 	/*
@@ -422,10 +418,10 @@ static int cluster_wait_check(const struct sd_node *joining,
 	 * node list, we can set the cluster live now.
 	 */
 	if (sys->cinfo.epoch > 0 &&
-	    enough_nodes_gathered(jm, joining, nodes, nr_nodes))
+	    enough_nodes_gathered(&sys->cinfo, joining, nodes, nr_nodes))
 		return SD_STATUS_OK;
 
-	return sys->status;
+	return sys->cinfo.status;
 }
 
 static int get_vdis_from(struct sd_node *node)
@@ -542,18 +538,17 @@ static struct vnode_info *alloc_old_vnode_info(const struct sd_node *joined,
 	return alloc_vnode_info(old_nodes, nr_nodes);
 }
 
-static void setup_backend_store(const struct join_message *jm)
+static void setup_backend_store(const struct cluster_info *cinfo)
 {
 	int ret;
 
-	if (jm->cinfo.store[0] == '\0')
+	if (cinfo->store[0] == '\0')
 		return;
 
 	if (!sd_store) {
-		sd_store = find_store_driver((char *)jm->cinfo.store);
+		sd_store = find_store_driver((char *)cinfo->store);
 		if (!sd_store)
-			panic("backend store %s not supported",
-			      jm->cinfo.store);
+			panic("backend store %s not supported", cinfo->store);
 
 		ret = sd_store->init();
 		if (ret != SD_RES_SUCCESS)
@@ -564,7 +559,7 @@ static void setup_backend_store(const struct join_message *jm)
 	 * We need to purge the stale objects for sheep joining back
 	 * after crash
 	 */
-	if (xlfind(&sys->this_node, jm->cinfo.nodes, jm->cinfo.nr_nodes,
+	if (xlfind(&sys->this_node, cinfo->nodes, cinfo->nr_nodes,
 		   node_cmp) == NULL) {
 		ret = sd_store->purge_obj();
 		if (ret != SD_RES_SUCCESS)
@@ -572,9 +567,7 @@ static void setup_backend_store(const struct join_message *jm)
 	}
 }
 
-static void finish_join(const struct join_message *msg,
-			const struct sd_node *joined,
-			const struct sd_node *nodes, size_t nr_nodes)
+static void finish_join(const struct sd_node *nodes, size_t nr_nodes)
 {
 	sockfd_cache_add_group(nodes, nr_nodes);
 }
@@ -636,21 +629,20 @@ void recalculate_vnodes(struct sd_node *nodes, int nr_nodes)
 	}
 }
 
-static void update_cluster_info(const struct join_message *msg,
+static void update_cluster_info(const struct cluster_info *cinfo,
 				const struct sd_node *joined,
 				const struct sd_node *nodes,
 				size_t nr_nodes)
 {
 	struct vnode_info *old_vnode_info;
 
-	sd_dprintf("status = %d, epoch = %d", msg->cluster_status,
-		   msg->cinfo.epoch);
+	sd_dprintf("status = %d, epoch = %d", cinfo->status, cinfo->epoch);
 
 	if (!sys->gateway_only)
-		setup_backend_store(msg);
+		setup_backend_store(cinfo);
 
 	if (node_is_local(joined))
-		finish_join(msg, joined, nodes, nr_nodes);
+		finish_join(nodes, nr_nodes);
 
 	old_vnode_info = main_thread_get(current_vnode_info);
 	main_thread_set(current_vnode_info,
@@ -658,14 +650,12 @@ static void update_cluster_info(const struct join_message *msg,
 
 	get_vdis(nodes, nr_nodes, joined);
 
-	if (msg->cluster_status == SD_STATUS_OK) {
-		if (sys->status == SD_STATUS_WAIT) {
-			if (!is_cluster_formatted())
-				/* initialize config file */
-				set_cluster_config(&sys->cinfo);
-		}
+	if (cinfo->status == SD_STATUS_OK) {
+		if (!is_cluster_formatted())
+			/* initialize config file */
+			set_cluster_config(&sys->cinfo);
 
-		if (nr_nodes != msg->cinfo.nr_nodes) {
+		if (nr_nodes != cinfo->nr_nodes) {
 			int ret = inc_and_log_epoch();
 			if (ret != 0)
 				panic("cannot log current epoch %d",
@@ -684,8 +674,6 @@ static void update_cluster_info(const struct join_message *msg,
 				       false);
 	}
 
-	sys->status = msg->cluster_status;
-
 	put_vnode_info(old_vnode_info);
 
 	sockfd_cache_add(&joined->nid);
@@ -749,7 +737,8 @@ bool sd_join_handler(const struct sd_node *joining,
 		     const struct sd_node *nodes, size_t nr_nodes,
 		     void *opaque)
 {
-	struct join_message *jm = opaque;
+	struct cluster_info *cinfo = opaque;
+	enum sd_status status;
 	char str[MAX_NODE_STR_LEN];
 
 	/*
@@ -762,40 +751,29 @@ bool sd_join_handler(const struct sd_node *joining,
 		return false;
 	}
 
-	sd_dprintf("check %s, %d", node_to_str(joining), sys->status);
-
-	jm->proto_ver = SD_SHEEP_PROTO_VER;
+	sd_dprintf("check %s, %d", node_to_str(joining), sys->cinfo.status);
 
-	if (sys->status == SD_STATUS_WAIT)
-		jm->cluster_status = cluster_wait_check(joining, nodes,
-							nr_nodes, jm);
+	if (sys->cinfo.status == SD_STATUS_WAIT)
+		status = cluster_wait_check(joining, nodes, nr_nodes, cinfo);
 	else
-		jm->cluster_status = sys->status;
+		status = sys->cinfo.status;
+
+	*cinfo = sys->cinfo;
+	cinfo->status = status;
+	cinfo->proto_ver = SD_SHEEP_PROTO_VER;
 
 	sd_dprintf("%s: cluster_status = 0x%x",
 		   addr_to_str(str, sizeof(str), joining->nid.addr,
-			       joining->nid.port), jm->cluster_status);
-
-	jm->cinfo = sys->cinfo;
+			       joining->nid.port), cinfo->status);
 
 	return true;
 }
 
 static int send_join_request(struct sd_node *ent)
 {
-	struct join_message *msg;
-	int ret;
-
-	msg = xzalloc(sizeof(*msg));
-	msg->cinfo = sys->cinfo;
-
-	ret = sys->cdrv->join(ent, msg, sizeof(*msg));
-
 	sd_printf(SDOG_INFO, "%s", node_to_str(&sys->this_node));
 
-	free(msg);
-
-	return ret;
+	return sys->cdrv->join(ent, &sys->cinfo, sizeof(sys->cinfo));
 }
 
 static void requeue_cluster_request(void)
@@ -865,7 +843,7 @@ static void requeue_cluster_request(void)
 
 int sd_reconnect_handler(void)
 {
-	sys->status = SD_STATUS_WAIT;
+	sys->cinfo.status = SD_STATUS_WAIT;
 	if (sys->cdrv->init(sys->cdrv_option) != 0)
 		return -1;
 	if (send_join_request(&sys->this_node) != 0)
@@ -874,20 +852,20 @@ int sd_reconnect_handler(void)
 	return 0;
 }
 
-static bool cluster_join_check(const struct join_message *jm)
+static bool cluster_join_check(const struct cluster_info *cinfo)
 {
-	if (jm->proto_ver != SD_SHEEP_PROTO_VER) {
+	if (cinfo->proto_ver != SD_SHEEP_PROTO_VER) {
 		sd_eprintf("invalid protocol version: %d, %d",
-			   jm->proto_ver, SD_SHEEP_PROTO_VER);
+			   cinfo->proto_ver, SD_SHEEP_PROTO_VER);
 		return false;
 	}
 
-	if (!cluster_ctime_check(jm))
+	if (!cluster_ctime_check(cinfo))
 		return false;
 
-	if (jm->cinfo.epoch == sys->cinfo.epoch &&
-	    memcmp(jm->cinfo.nodes, sys->cinfo.nodes,
-		   sizeof(jm->cinfo.nodes[0]) * jm->cinfo.nr_nodes) != 0) {
+	if (cinfo->epoch == sys->cinfo.epoch &&
+	    memcmp(cinfo->nodes, sys->cinfo.nodes,
+		   sizeof(cinfo->nodes[0]) * cinfo->nr_nodes) != 0) {
 		sd_printf(SDOG_ALERT, "epoch log entries does not match");
 		return false;
 	}
@@ -900,23 +878,23 @@ void sd_accept_handler(const struct sd_node *joined,
 		       const void *opaque)
 {
 	int i;
-	const struct join_message *jm = opaque;
+	const struct cluster_info *cinfo = opaque;
 
-	if (!cluster_join_check(jm)) {
+	if (!cluster_join_check(cinfo)) {
 		sd_eprintf("failed to join Sheepdog");
 		exit(1);
 	}
 
-	sys->cinfo = jm->cinfo;
+	sys->cinfo = *cinfo;
 
 	sd_dprintf("join %s", node_to_str(joined));
 	for (i = 0; i < nr_members; i++)
 		sd_dprintf("[%x] %s", i, node_to_str(members + i));
 
-	if (sys->status == SD_STATUS_SHUTDOWN)
+	if (sys->cinfo.status == SD_STATUS_SHUTDOWN)
 		return;
 
-	update_cluster_info(jm, joined, members, nr_members);
+	update_cluster_info(cinfo, joined, members, nr_members);
 
 	if (node_is_local(joined))
 		/* this output is used for testing */
@@ -933,7 +911,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
 	for (i = 0; i < nr_members; i++)
 		sd_dprintf("[%x] %s", i, node_to_str(members + i));
 
-	if (sys->status == SD_STATUS_SHUTDOWN)
+	if (sys->cinfo.status == SD_STATUS_SHUTDOWN)
 		return;
 
 	if (node_is_local(left))
@@ -943,7 +921,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
 	old_vnode_info = main_thread_get(current_vnode_info);
 	main_thread_set(current_vnode_info,
 			  alloc_vnode_info(members, nr_members));
-	if (sys->status == SD_STATUS_OK) {
+	if (sys->cinfo.status == SD_STATUS_OK) {
 		ret = inc_and_log_epoch();
 		if (ret != 0)
 			panic("cannot log current epoch %d", sys->cinfo.epoch);
@@ -1026,7 +1004,7 @@ int create_cluster(int port, int64_t zone, int nr_vnodes,
 		if (sys->cinfo.nr_nodes == -1)
 			return -1;
 	}
-	sys->status = SD_STATUS_WAIT;
+	sys->cinfo.status = SD_STATUS_WAIT;
 
 	main_thread_set(pending_block_list,
 			  xzalloc(sizeof(struct list_head)));
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index dcf6972..c2b2ccb 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -279,7 +279,8 @@ static void add_to_dirty_list(struct object_cache_entry *entry)
 	list_add_tail(&entry->dirty_list, &oc->dirty_head);
 	/* FIXME read sys->status atomically */
 	if (uatomic_add_return(&oc->dirty_count, 1) > MAX_DIRTY_OBJECT_COUNT
-	    && !uatomic_is_true(&oc->in_push) && sys->status == SD_STATUS_OK)
+	    && !uatomic_is_true(&oc->in_push)
+	    && sys->cinfo.status == SD_STATUS_OK)
 		kick_background_pusher(oc);
 }
 
diff --git a/sheep/ops.c b/sheep/ops.c
index 1f1f702..f0d89df 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -264,7 +264,7 @@ static int cluster_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
 	if (ret)
 		return SD_RES_EIO;
 
-	sys->status = SD_STATUS_OK;
+	sys->cinfo.status = SD_STATUS_OK;
 
 	return SD_RES_SUCCESS;
 }
@@ -272,7 +272,7 @@ static int cluster_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
 static int cluster_shutdown(const struct sd_req *req, struct sd_rsp *rsp,
 			    void *data)
 {
-	sys->status = SD_STATUS_SHUTDOWN;
+	sys->cinfo.status = SD_STATUS_SHUTDOWN;
 	return SD_RES_SUCCESS;
 }
 
@@ -435,7 +435,7 @@ static int local_stat_cluster(struct request *req)
 		epoch--;
 	}
 out:
-	switch (sys->status) {
+	switch (sys->cinfo.status) {
 	case SD_STATUS_OK:
 		return SD_RES_SUCCESS;
 	case SD_STATUS_WAIT:
@@ -488,7 +488,7 @@ static int cluster_force_recover_work(struct request *req)
 	 * 2) some nodes are physically down (same epoch condition).
 	 * In both case, the nodes(s) stat is WAIT_FOR_JOIN.
 	 */
-	if (sys->status != SD_STATUS_WAIT || req->vinfo == NULL)
+	if (sys->cinfo.status != SD_STATUS_WAIT || req->vinfo == NULL)
 		return SD_RES_FORCE_RECOVER;
 
 	old_vnode_info = get_vnode_info_epoch(epoch, req->vinfo);
@@ -539,7 +539,7 @@ static int cluster_force_recover_main(const struct sd_req *req,
 		/* initialize config file */
 		set_cluster_config(&sys->cinfo);
 
-	sys->status = SD_STATUS_OK;
+	sys->cinfo.status = SD_STATUS_OK;
 
 	vnode_info = get_vnode_info();
 	old_vnode_info = alloc_vnode_info(nodes, nr_nodes);
@@ -804,7 +804,7 @@ static int local_trace_read_buf(struct request *request)
 static int local_kill_node(const struct sd_req *req, struct sd_rsp *rsp,
 			   void *data)
 {
-	sys->status = SD_STATUS_KILLED;
+	sys->cinfo.status = SD_STATUS_KILLED;
 
 	return SD_RES_SUCCESS;
 }
diff --git a/sheep/request.c b/sheep/request.c
index ee1e987..386390b 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -344,9 +344,9 @@ static void queue_request(struct request *req)
 		goto done;
 	}
 
-	sd_dprintf("%s, %d", op_name(req->op), sys->status);
+	sd_dprintf("%s, %d", op_name(req->op), sys->cinfo.status);
 
-	switch (sys->status) {
+	switch (sys->cinfo.status) {
 	case SD_STATUS_KILLED:
 		rsp->result = SD_RES_KILLED;
 		goto done;
@@ -805,7 +805,7 @@ static void listen_handler(int listen_fd, int events, void *data)
 	struct client_info *ci;
 	bool is_inet_socket = *(bool *)data;
 
-	if (sys->status == SD_STATUS_SHUTDOWN) {
+	if (sys->cinfo.status == SD_STATUS_SHUTDOWN) {
 		sd_dprintf("unregistering connection %d", listen_fd);
 		unregister_event(listen_fd);
 		return;
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 39288a8..6a1efe5 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -212,7 +212,7 @@ static void signal_handler(int listen_fd, int events, void *data)
 	sd_dprintf("signal %d", siginfo.ssi_signo);
 	switch (siginfo.ssi_signo) {
 	case SIGTERM:
-		sys->status = SD_STATUS_KILLED;
+		sys->cinfo.status = SD_STATUS_KILLED;
 		break;
 	default:
 		sd_eprintf("signal %d unhandled", siginfo.ssi_signo);
@@ -874,8 +874,8 @@ int main(int argc, char **argv)
 		  PACKAGE_VERSION);
 
 	while (sys->nr_outstanding_reqs != 0 ||
-	       (sys->status != SD_STATUS_KILLED &&
-		sys->status != SD_STATUS_SHUTDOWN))
+	       (sys->cinfo.status != SD_STATUS_KILLED &&
+		sys->cinfo.status != SD_STATUS_SHUTDOWN))
 		event_loop(-1);
 
 	sd_printf(SDOG_INFO, "shutdown");
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 1652218..6c3cc50 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -94,7 +94,6 @@ struct system_info {
 	struct sd_node this_node;
 
 	struct cluster_info cinfo;
-	uint32_t status;
 
 	uint64_t disk_space;
 
-- 
1.7.9.5




More information about the sheepdog mailing list