[sheepdog] [PATCH v4 1/3] sheep: rework update node logic

Liu Yuan namei.unix at gmail.com
Wed Jul 10 10:15:01 CEST 2013


The new logic is
1. size-varied node call cdrv->update_node() to send a non-blocking message to
   all nodes
2. all nodes in the cluster
 - update cluster driver's internal state of sender's size if any
 - cluster driver call sd_update_node_handler() to update sender's size in sheep

SD_OP_UPDATE_SIZE is no longer used and removed.

Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
 include/internal_proto.h  |    1 -
 sheep/cluster.h           |    9 +++++++--
 sheep/cluster/corosync.c  |   42 +++++++++++++++++++++++++++++----------
 sheep/cluster/local.c     |   13 ++++++++----
 sheep/cluster/shepherd.c  |    5 ++---
 sheep/cluster/zookeeper.c |   37 ++++++++++++++++++++++------------
 sheep/group.c             |   11 ++++++++---
 sheep/ops.c               |   48 +--------------------------------------------
 sheep/sheep_priv.h        |    2 --
 9 files changed, 83 insertions(+), 85 deletions(-)

diff --git a/include/internal_proto.h b/include/internal_proto.h
index 149f8f8..317096e 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -74,7 +74,6 @@
 #define SD_OP_MD_UNPLUG 0xB3
 #define SD_OP_GET_HASH       0xB4
 #define SD_OP_REWEIGHT       0xB5
-#define SD_OP_UPDATE_SIZE    0xB6
 
 /* internal flags for hdr.flags, must be above 0x80 */
 #define SD_FLAG_CMD_RECOVERY 0x0080
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 4851290..3760001 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -105,8 +105,12 @@ struct cluster_driver {
 	 */
 	int (*unblock)(void *msg, size_t msg_len);
 
-	/* Update the specific node in the driver's private copy of nodes */
-	void (*update_node)(struct sd_node *);
+	/*
+	 * Update the specific node in the driver's private copy of nodes
+	 *
+	 * Returns SD_RES_XXX
+	 */
+	int (*update_node)(struct sd_node *);
 
 	struct list_head list;
 };
@@ -161,6 +165,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
 void sd_notify_handler(const struct sd_node *sender, void *msg, size_t msg_len);
 bool sd_block_handler(const struct sd_node *sender);
 int sd_reconnect_handler(void);
+void sd_update_node_handler(struct sd_node *);
 enum cluster_join_result sd_check_join_cb(const struct sd_node *joining,
 					  const struct sd_node *nodes,
 					  size_t nr_nodes, void *opaque);
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 9b59126..a9c2806 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -51,6 +51,7 @@ enum corosync_event_type {
 	COROSYNC_EVENT_TYPE_LEAVE,
 	COROSYNC_EVENT_TYPE_BLOCK,
 	COROSYNC_EVENT_TYPE_NOTIFY,
+	COROSYNC_EVENT_TYPE_UPDATE_NODE,
 };
 
 /* multicast message type */
@@ -61,6 +62,7 @@ enum corosync_message_type {
 	COROSYNC_MSG_TYPE_NOTIFY,
 	COROSYNC_MSG_TYPE_BLOCK,
 	COROSYNC_MSG_TYPE_UNBLOCK,
+	COROSYNC_MSG_TYPE_UPDATE_NODE,
 };
 
 struct corosync_event {
@@ -279,8 +281,9 @@ static void build_node_list(const struct cpg_node *nodes, size_t nr_nodes,
 static bool __corosync_dispatch_one(struct corosync_event *cevent)
 {
 	enum cluster_join_result res;
-	struct sd_node entries[SD_MAX_NODES];
+	struct sd_node entries[SD_MAX_NODES], *node;
 	struct cpg_node *n;
+	int idx;
 
 	switch (cevent->type) {
 	case COROSYNC_EVENT_TYPE_JOIN_REQUEST:
@@ -343,6 +346,17 @@ static bool __corosync_dispatch_one(struct corosync_event *cevent)
 		sd_notify_handler(&cevent->sender.ent, cevent->msg,
 						 cevent->msg_len);
 		break;
+	case COROSYNC_EVENT_TYPE_UPDATE_NODE:
+		node = &cevent->sender.ent;
+
+		if (node_eq(node, &this_node.ent))
+			this_node.ent = *node;
+
+		idx = find_sd_node(cpg_nodes, nr_cpg_nodes, node);
+		assert(idx >= 0);
+		cpg_nodes[idx].ent = *node;
+		sd_update_node_handler(node);
+		break;
 	}
 
 	return true;
@@ -486,11 +500,19 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
 		/* fall through */
 	case COROSYNC_MSG_TYPE_BLOCK:
 	case COROSYNC_MSG_TYPE_NOTIFY:
+	case COROSYNC_MSG_TYPE_UPDATE_NODE:
 		cevent = xzalloc(sizeof(*cevent));
-		if (cmsg->type == COROSYNC_MSG_TYPE_BLOCK)
+		switch (cmsg->type) {
+		case COROSYNC_MSG_TYPE_BLOCK:
 			cevent->type = COROSYNC_EVENT_TYPE_BLOCK;
-		else
+			break;
+		case COROSYNC_MSG_TYPE_UPDATE_NODE:
+			cevent->type = COROSYNC_EVENT_TYPE_UPDATE_NODE;
+			break;
+		default:
 			cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
+			break;
+		}
 
 		cevent->sender = cmsg->sender;
 		cevent->msg_len = cmsg->msg_len;
@@ -544,7 +566,7 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
 }
 
 static void build_cpg_node_list(struct cpg_node *nodes,
-		const struct cpg_address *list, size_t nr)
+				const struct cpg_address *list, size_t nr)
 {
 	int i;
 
@@ -804,14 +826,14 @@ again:
 	return 0;
 }
 
-static void corosync_update_node(struct sd_node *node)
+static int corosync_update_node(struct sd_node *node)
 {
-	int idx;
+	struct cpg_node cnode = {
+		.ent = *node,
+	};
 
-	idx = find_sd_node(cpg_nodes, nr_cpg_nodes, node);
-	if (idx < 0)
-		return;
-	cpg_nodes[idx].ent = *node;
+	return send_message(COROSYNC_MSG_TYPE_UPDATE_NODE, 0, &cnode,
+			    NULL, 0, NULL, 0);
 }
 
 static struct cluster_driver cdrv_corosync = {
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index e6fa149..80228b3 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -478,6 +478,10 @@ static bool local_process_event(void)
 		sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
 		break;
 	case EVENT_UPDATE_NODE:
+		if (node_eq(&ev->sender.node, &this_node.node))
+			this_node.node = ev->sender.node;
+
+		sd_update_node_handler(&ev->sender.node);
 		break;
 	}
 out:
@@ -555,18 +559,19 @@ static int local_init(const char *option)
 	return 0;
 }
 
-/* FIXME: we have to call nr of nodes times to update nodes information */
-static void local_update_node(struct sd_node *node)
+static int local_update_node(struct sd_node *node)
 {
-	struct local_node n = {
+	struct local_node lnode = {
 		.node = *node,
 	};
 
 	shm_queue_lock();
 
-	add_event(EVENT_UPDATE_NODE, &n, NULL, 0);
+	add_event(EVENT_UPDATE_NODE, &lnode, NULL, 0);
 
 	shm_queue_unlock();
+
+	return SD_RES_SUCCESS;
 }
 
 static struct cluster_driver cdrv_local = {
diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c
index df8737f..3aebfc1 100644
--- a/sheep/cluster/shepherd.c
+++ b/sheep/cluster/shepherd.c
@@ -666,10 +666,9 @@ static int shepherd_unblock(void *msg, size_t msg_len)
 }
 
 /* FIXME: shepherd server also has to udpate node information */
-static void shepherd_update_node(struct sd_node *node)
+static int shepherd_update_node(struct sd_node *node)
 {
-	struct sd_node *n = xlfind(node, nodes, nr_nodes, node_cmp);
-	*n = *node;
+	return SD_RES_NO_SUPPORT;
 }
 
 static struct cluster_driver cdrv_shepherd = {
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index ac6347a..cb86024 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -47,6 +47,7 @@ enum zk_event_type {
 	EVENT_BLOCK,
 	EVENT_UNBLOCK,
 	EVENT_NOTIFY,
+	EVENT_UPDATE_NODE,
 };
 
 struct zk_node {
@@ -992,6 +993,25 @@ static void zk_handle_notify(struct zk_event *ev)
 	sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
 }
 
+static void zk_handle_update_node(struct zk_event *ev)
+{
+	struct zk_node *t;
+	struct sd_node *snode = &ev->sender.node;
+
+	sd_dprintf("%s", node_to_str(snode));
+
+	if (node_eq(snode, &this_node.node))
+		this_node.node = *snode;
+
+	pthread_rwlock_rdlock(&zk_tree_lock);
+	t = zk_tree_search_nolock(&snode->nid);
+	assert(t);
+	t->node = *snode;
+	build_node_list();
+	pthread_rwlock_unlock(&zk_tree_lock);
+	sd_update_node_handler(snode);
+}
+
 static void (*const zk_event_handlers[])(struct zk_event *ev) = {
 	[EVENT_JOIN_REQUEST]	= zk_handle_join_request,
 	[EVENT_JOIN_RESPONSE]	= zk_handle_join_response,
@@ -999,6 +1019,7 @@ static void (*const zk_event_handlers[])(struct zk_event *ev) = {
 	[EVENT_BLOCK]		= zk_handle_block,
 	[EVENT_UNBLOCK]		= zk_handle_unblock,
 	[EVENT_NOTIFY]		= zk_handle_notify,
+	[EVENT_UPDATE_NODE]	= zk_handle_update_node,
 };
 
 static const int zk_max_event_handlers = ARRAY_SIZE(zk_event_handlers);
@@ -1132,22 +1153,12 @@ static int zk_init(const char *option)
 	return 0;
 }
 
-static void zk_update_node(struct sd_node *node)
+static int zk_update_node(struct sd_node *node)
 {
-	struct zk_node n = {
+	struct zk_node znode = {
 		.node = *node,
 	};
-	struct zk_node *t;
-
-	sd_dprintf("%s", node_to_str(&n.node));
-
-	pthread_rwlock_rdlock(&zk_tree_lock);
-	t = zk_tree_search_nolock(&n.node.nid);
-	if (t) {
-		t->node = n.node;
-		build_node_list();
-	}
-	pthread_rwlock_unlock(&zk_tree_lock);
+	return add_event(EVENT_UPDATE_NODE, &znode, NULL, 0);
 }
 
 static struct cluster_driver cdrv_zookeeper = {
diff --git a/sheep/group.c b/sheep/group.c
index 370c625..936fc88 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -1090,16 +1090,15 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
 	sockfd_cache_del(&left->nid);
 }
 
-void update_node_size(struct sd_node *node)
+static void update_node_size(struct sd_node *node)
 {
 	struct vnode_info *cur_vinfo = main_thread_get(current_vnode_info);
 	int idx = get_node_idx(cur_vinfo, node);
 	assert(idx != -1);
 	cur_vinfo->nodes[idx].space = node->space;
-	sys->cdrv->update_node(node);
 }
 
-void kick_node_recover(void)
+static void kick_node_recover(void)
 {
 	struct vnode_info *old = main_thread_get(current_vnode_info);
 
@@ -1111,6 +1110,12 @@ void kick_node_recover(void)
 	put_vnode_info(old);
 }
 
+void sd_update_node_handler(struct sd_node *node)
+{
+	update_node_size(node);
+	kick_node_recover();
+}
+
 int create_cluster(int port, int64_t zone, int nr_vnodes,
 		   bool explicit_addr)
 {
diff --git a/sheep/ops.c b/sheep/ops.c
index bf975bf..55c2deb 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -662,35 +662,6 @@ static int cluster_recovery_completion(const struct sd_req *req,
 	return SD_RES_SUCCESS;
 }
 
-static void do_reweight(struct work *work)
-{
-	struct sd_req hdr;
-	int ret;
-
-	sd_init_req(&hdr, SD_OP_UPDATE_SIZE);
-	hdr.flags = SD_FLAG_CMD_WRITE;
-	hdr.data_length = sizeof(sys->this_node);
-
-	ret = exec_local_req(&hdr, &sys->this_node);
-	if (ret != SD_RES_SUCCESS)
-		sd_eprintf("failed to update node size");
-}
-
-static void reweight_done(struct work *work)
-{
-	free(work);
-}
-
-static void reweight_node(void)
-{
-	struct work *rw = xzalloc(sizeof(*rw));
-
-	rw->fn = do_reweight;
-	rw->done = reweight_done;
-
-	queue_work(sys->recovery_wqueue, rw);
-}
-
 static bool node_size_varied(void)
 {
 	uint64_t new, used, old = sys->this_node.space;
@@ -724,18 +695,7 @@ static int cluster_reweight(const struct sd_req *req, struct sd_rsp *rsp,
 			    void *data)
 {
 	if (node_size_varied())
-		reweight_node();
-
-	return SD_RES_SUCCESS;
-}
-
-static int cluster_update_size(const struct sd_req *req, struct sd_rsp *rsp,
-			       void *data)
-{
-	struct sd_node *node = (struct sd_node *)data;
-
-	update_node_size(node);
-	kick_node_recover();
+		return sys->cdrv->update_node(&sys->this_node);
 
 	return SD_RES_SUCCESS;
 }
@@ -1091,12 +1051,6 @@ static struct sd_op_template sd_ops[] = {
 		.process_main = cluster_reweight,
 	},
 
-	[SD_OP_UPDATE_SIZE] = {
-		.name = "UPDATE_SIZE",
-		.type = SD_OP_TYPE_CLUSTER,
-		.process_main = cluster_update_size,
-	},
-
 	[SD_OP_ENABLE_RECOVER] = {
 		.name = "ENABLE_RECOVER",
 		.type = SD_OP_TYPE_CLUSTER,
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 1f002bf..127ee5f 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -433,8 +433,6 @@ uint32_t md_get_info(struct sd_md_info *info);
 int md_plug_disks(char *disks);
 int md_unplug_disks(char *disks);
 uint64_t md_get_size(uint64_t *used);
-void kick_node_recover(void);
-void update_node_size(struct sd_node *node);
 
 /* http.c */
 #ifdef HAVE_HTTP
-- 
1.7.9.5




More information about the sheepdog mailing list