[sheepdog] [PATCH] change the gateway mode behavior to only do the request forwarding

Wang Zhengyong wangzhengyong at cmss.chinamobile.com
Wed Apr 22 08:26:08 CEST 2015


In current master branch, there are two issues in gateway mode:
1. Currently the object write mechanism is synchronous. Write operation returns success until
all the redundancy data written to disks. It will block other IO requests which wait for
the previous IO operation finished. In case a gateway node responsed to multiple
dog requests, the meshanism will Reduce IO efficiency

2. Currently gateway nodes synchronize cluster information. It increases the code complexity
and network IO

In order to solve above issues, we should change the gateway node mechanism, as following:
1. The only job gateway nodes need do is to forward the requests. In response to dog request,
forward to the sheep cluster. Gateway nodes needn't check the cluster status, opcode, epoch which
should deliver to sheep nodes

2. Separate gateway nodes from cluster. Gateway nodes needn't response to message of the
cluster execpt node join/leave behavior

This patch is alpha version, there are still some issues to slove:
1. now the code only support zookeeper and local driver without corysync
2. sheep select is used random node, do not support load balancing

It is pleasure to put forward value opinion

Signed-off-by: Wang Zhengyong <wangzhengyong at cmss.chinamobile.com>
---
 sheep/cluster.h           |   10 +++++
 sheep/cluster/local.c     |   36 ++++++++++++++++++
 sheep/cluster/zookeeper.c |   47 +++++++++++++++++++++--
 sheep/group.c             |   36 ++++++++++++++++++
 sheep/request.c           |   90 +++++++++++++++++++++++++++++++++++++++++++++
 sheep/sheep.c             |   15 +++++++-
 sheep/sheep_priv.h        |    1 +
 7 files changed, 230 insertions(+), 5 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index e70c8b6..2c408d7 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -144,6 +144,16 @@ struct cluster_driver {
 	 */
 	int (*update_node)(struct sd_node *);
 
+	/*
+	 * update all  nodes in cluster
+	 *
+	 *This function is only called in the Gateway mode,
+	 *and used to  get the nodes in cluster
+	 *
+	 * Returns SD_RES_XXX
+	 */
+	int (*update_nodes)(void);
+
 	struct list_node list;
 };
 
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index e447822..9760b53 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -54,6 +54,7 @@ static int shmfd;
 static int sigfd;
 static int block_event_pos;
 static int nonblock_event_pos;
+static int nr_nodes;
 static struct local_node this_node;
 static bool joined;
 
@@ -343,19 +344,49 @@ static int add_event_lock(enum local_event_type type, struct local_node *lnode,
 	return ret;
 }
 
+static int local_update_nodes(void)
+{
+	struct local_event *ev;
+	int i;
+	struct rb_root root = RB_ROOT;
+	size_t nr_nodes = 0;
+
+	ev = shm_queue->nonblock_events +
+		shm_queue->nonblock_event_pos;
+	if (!ev)
+		return -1;
+
+	for (i = 0; i < ev->nr_lnodes; i++) {
+		if (!ev->lnodes[i].gateway) {
+			node_insert(&ev->lnodes[i].node, &root);
+			nr_nodes++;
+		}
+	}
+
+	gateway_update_nodes(&root, nr_nodes);
+
+	return 0;
+}
+
 static void check_pids(void *arg)
 {
 	int i;
 	size_t nr;
+	bool update = false;
 	struct local_node lnodes[LOCAL_MAX_NODES];
 	struct local_event *ev;
 
 	shm_queue_lock();
 
 	nr = get_nodes(lnodes);
+	if (nr_nodes != nr) {
+		nr_nodes = nr;
+		update = true;
+	}
 
 	for (i = 0; i < nr; i++)
 		if (!process_exists(lnodes[i].pid)) {
+			update = true;
 			add_event(EVENT_LEAVE, lnodes + i, NULL, 0);
 
 			/* unblock blocking event if sender has gone */
@@ -364,8 +395,12 @@ static void check_pids(void *arg)
 				ev->removed = true;
 				msync(ev, sizeof(*ev), MS_SYNC);
 			}
+
 		}
 
+	if (update)
+		local_update_nodes();
+
 	shm_queue_unlock();
 
 	add_timer(arg, PROCESS_CHECK_INTERVAL);
@@ -741,6 +776,7 @@ static struct cluster_driver cdrv_local = {
 	.lock		= local_lock,
 	.unlock		= local_unlock,
 	.update_node    = local_update_node,
+	.update_nodes	= local_update_nodes,
 };
 
 cdrv_register(cdrv_local);
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 4ad3ef9..2e79890 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -290,12 +290,12 @@ static inline ZOOAPI int zk_node_exists(const char *path)
 	return rc;
 }
 
-static inline ZOOAPI int zk_get_children(const char *path,
+static inline ZOOAPI int zk_get_children(const char *path, int flag,
 					 struct String_vector *strings)
 {
 	int rc;
 	do {
-		rc = zoo_get_children(zhandle, path, 1, strings);
+		rc = zoo_get_children(zhandle, path, flag, strings);
 	} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
 	CHECK_ZK_RC(rc, path);
 
@@ -639,6 +639,14 @@ static inline void zk_tree_destroy(void)
 	sd_rw_unlock(&zk_tree_lock);
 }
 
+static inline void zk_tree_init(void)
+{
+	sd_write_lock(&zk_tree_lock);
+	INIT_RB_ROOT(&zk_node_root);
+	nr_sd_nodes = 0;
+	INIT_RB_ROOT(&sd_node_root);
+	sd_rw_unlock(&zk_tree_lock);
+}
 static inline void build_node_list(void)
 {
 	struct zk_node *zk;
@@ -700,6 +708,34 @@ static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
 	}
 }
 
+static void zk_update_nodes(void)
+{
+	struct zk_node znode;
+	struct String_vector strs;
+	int err_ret;
+	char path[MAX_NODE_STR_LEN];
+	char str[MAX_NODE_STR_LEN], *p;
+
+	/* realloc the zk tree */
+	zk_tree_destroy();
+	zk_tree_init();
+
+	RETURN_VOID_IF_ERROR(zk_get_children(MEMBER_ZNODE, 1, &strs), "");
+
+	FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
+		err_ret = sscanf(path, MEMBER_ZNODE "/%s", str);
+		if (err_ret != 1)
+			return;
+		p = strrchr(path, '/');
+		p++;
+		str_to_node(p, &znode.node);
+
+		zk_tree_add(&znode);
+	}
+
+	gateway_update_nodes(&sd_node_root, nr_sd_nodes);
+}
+
 /*
  * Type value:
  * -1 SESSION_EVENT, use State to indicate what kind of sub-event
@@ -746,6 +782,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 			zk_node_exists(path);
 		/* kick off the event handler */
 		eventfd_xwrite(efd, 1);
+	} else if (type == ZOO_CHILD_EVENT) {
+		zk_update_nodes();
+		return;
 	} else if (type == ZOO_DELETED_EVENT) {
 		struct zk_node *n;
 
@@ -828,7 +867,7 @@ static int zk_get_least_seq(const char *parent, char *least_seq_path,
 		 */
 		least_seq = INT_MAX;
 
-		RETURN_IF_ERROR(zk_get_children(parent, &strs), "");
+		RETURN_IF_ERROR(zk_get_children(parent, 0, &strs), "");
 
 		FOR_EACH_ZNODE(parent, path, &strs) {
 			p = strrchr(path, '/');
@@ -1085,7 +1124,7 @@ static void watch_all_nodes(void)
 	struct String_vector strs;
 	char path[MAX_NODE_STR_LEN];
 
-	RETURN_VOID_IF_ERROR(zk_get_children(MEMBER_ZNODE, &strs), "");
+	RETURN_VOID_IF_ERROR(zk_get_children(MEMBER_ZNODE, 0, &strs), "");
 
 	FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
 		RETURN_VOID_IF_ERROR(zk_node_exists(path), "");
diff --git a/sheep/group.c b/sheep/group.c
index 7c7cb6d..2f3b239 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -1305,6 +1305,35 @@ main_fn void sd_update_node_handler(struct sd_node *node)
 	kick_node_recover();
 }
 
+int create_gateway(int port, bool explicit_addr)
+{
+	int ret = -1;
+
+	if (!sys->cdrv) {
+		sys->cdrv = find_cdrv(DEFAULT_CLUSTER_DRIVER);
+		sd_debug("use %s cluster driver as default",
+			 DEFAULT_CLUSTER_DRIVER);
+	}
+
+	ret = sys->cdrv->init(sys->cdrv_option);
+	if (ret < 0)
+		return ret;
+
+	if (!explicit_addr) {
+		ret = sys->cdrv->get_local_addr(sys->this_node.nid.addr);
+
+		if (ret < 0)
+			return ret;
+	}
+
+	sys->cdrv->update_nodes();
+
+	sys->this_node.nid.port = port;
+
+	return ret;
+
+}
+
 int create_cluster(int port, int64_t zone, int nr_vnodes,
 		   bool explicit_addr)
 {
@@ -1403,3 +1432,10 @@ int leave_cluster(void)
 	left = true;
 	return sys->cdrv->leave();
 }
+
+main_fn void gateway_update_nodes(const struct rb_root *nroot, size_t nr_nodes)
+{
+	sys->cinfo.nr_nodes = nr_nodes;
+
+	nodes_to_buffer(nroot, sys->cinfo.nodes);
+}
diff --git a/sheep/request.c b/sheep/request.c
index f12ca6b..e4877a1 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -321,6 +321,89 @@ static bool has_enough_zones(struct request *req)
 	return req->vinfo->nr_zones >= get_vdi_copy_number(oid_to_vid(oid));
 }
 
+
+void do_gw_process_work(struct work *work)
+{
+	struct request *req = container_of(work, struct request, work);
+	struct sd_req hdr;
+	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+	int ret = SD_RES_SUCCESS;
+	struct sockfd *sfd;
+	struct cluster_info *cinfo = &sys->cinfo;
+	const struct node_id *nid;
+	void *buf = xvalloc(req->data_length);
+	int start = random() % cinfo->nr_nodes, i, end = cinfo->nr_nodes;
+
+again:
+	for (i = start; i < end; i++) {
+		nid = &cinfo->nodes[i].nid;
+		sfd = sockfd_cache_get(nid);
+		if (!sfd)
+			continue;
+		else
+			goto do_request;
+	}
+
+	if (start != 0) {
+		end = start;
+		start = 0;
+		goto again;
+	}
+
+	if (i >= end) {
+		sd_err("network error");
+		req->rp.result = SD_RES_NETWORK_ERROR;
+		goto error;
+	}
+
+do_request:
+	sd_debug("opcode: %x, node: %s", req->rq.opcode,
+		node_to_str(cinfo->nodes + i));
+
+	memcpy(&hdr, &req->rq, sizeof(hdr));
+	memcpy(buf, req->data, req->data_length);
+	ret = sheep_exec_req(nid, &hdr, buf);
+	if (ret != SD_RES_SUCCESS) {
+		sd_err("request gw request error");
+		req->rp.result = ret;
+		goto error;
+	}
+
+	memcpy(&req->rp, rsp, sizeof(*rsp));
+	req->data = buf;
+	return;
+
+error:
+	free(buf);
+	return;
+}
+
+static void gw_op_done(struct work *work)
+{
+	struct request *req = container_of(work, struct request, work);
+	struct sd_req *hdr = &req->rq;
+
+	switch (req->rp.result) {
+	case SD_RES_SUCCESS:
+		break;
+	default:
+		sd_debug("gw opertion error %s", sd_strerror(req->rp.result));
+		break;
+	}
+
+	put_request(req);
+	return;
+}
+
+static void queue_gw_request(struct request *req)
+{
+
+	req->work.fn = do_gw_process_work;
+	req->work.done = gw_op_done;
+	queue_work(sys->gateway_wqueue, &req->work);
+	return;
+}
+
 static void queue_gateway_request(struct request *req)
 {
 	struct sd_req *hdr = &req->rq;
@@ -462,6 +545,7 @@ static void queue_request(struct request *req)
 	struct sd_req *hdr = &req->rq;
 	struct sd_rsp *rsp = &req->rp;
 
+
 	/*
 	 * Check the protocol version for all internal commands, and public
 	 * commands that have it set.  We can't enforce it on all public
@@ -479,6 +563,7 @@ static void queue_request(struct request *req)
 		}
 	}
 
+
 	req->op = get_sd_op(hdr->opcode);
 	if (!req->op) {
 		sd_err("invalid opcode %d", hdr->opcode);
@@ -486,6 +571,11 @@ static void queue_request(struct request *req)
 		goto done;
 	}
 
+	if (sys->gateway_only) {
+		queue_gw_request(req);
+		return;
+	}
+
 	sd_debug("%s, %d", op_name(req->op), sys->cinfo.status);
 
 	switch (sys->cinfo.status) {
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 6763a83..4bf7f30 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -483,6 +483,16 @@ static int create_work_queues(void)
 	if (init_work_queue(get_nr_nodes))
 		return -1;
 
+	if (sys->gateway_only) {
+		sys->net_wqueue = create_work_queue("net", WQ_UNLIMITED);
+		sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
+
+		if (!sys->net_wqueue || !sys->gateway_wqueue)
+			return -1;
+
+		return 0;
+	}
+
 	sys->net_wqueue = create_work_queue("net", WQ_UNLIMITED);
 	sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
 	sys->io_wqueue = create_work_queue("io", WQ_UNLIMITED);
@@ -961,7 +971,10 @@ int main(int argc, char **argv)
 	if (ret)
 		goto cleanup_log;
 
-	ret = create_cluster(port, zone, nr_vnodes, explicit_addr);
+	if (sys->gateway_only)
+		ret = create_gateway(port, explicit_addr);
+	else
+		ret = create_cluster(port, zone, nr_vnodes, explicit_addr);
 	if (ret) {
 		sd_err("failed to create sheepdog cluster");
 		goto cleanup_log;
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 3876b31..92ed0d2 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -396,6 +396,7 @@ void wakeup_requests_on_oid(uint64_t oid);
 void wakeup_all_requests(void);
 void resume_suspended_recovery(void);
 
+int init_gateway_watcher(int port, bool explicit_addr);
 int create_cluster(int port, int64_t zone, int nr_vnodes,
 		   bool explicit_addr);
 int leave_cluster(void);
-- 
1.7.1






More information about the sheepdog mailing list