[sheepdog] [PATCH 2/3] sheep: add a helper to send req on top of sockfd cache

Liu Yuan namei.unix at gmail.com
Wed Sep 12 10:07:13 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

This will greatly reduce the duplicated and lenghty code.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/gateway.c      |   37 ++++++---------------
 sheep/group.c        |   33 ++++---------------
 sheep/recovery.c     |   87 +++++++++++---------------------------------------
 sheep/sheep_priv.h   |    2 ++
 sheep/sockfd_cache.c |   25 +++++++++++++++
 sheep/store.c        |   23 ++-----------
 6 files changed, 64 insertions(+), 143 deletions(-)

diff --git a/sheep/gateway.c b/sheep/gateway.c
index 2231804..3fd49a5 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -56,44 +56,27 @@ int gateway_read_obj(struct request *req)
 	 */
 	j = random();
 	for (i = 0; i < nr_copies; i++) {
-		struct sockfd *sfd;
 		int idx = (i + j) % nr_copies;
 
-		memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
-		fwd_hdr.opcode = SD_OP_READ_PEER;
-		fwd_hdr.proto_ver = SD_SHEEP_PROTO_VER;
-
 		v = obj_vnodes[idx];
 		if (vnode_is_local(v))
 			continue;
 
-		sfd = sheep_get_sockfd(&v->nid);
-		if (!sfd) {
-			ret = SD_RES_NETWORK_ERROR;
-			continue;
-		}
-
+		memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
+		fwd_hdr.opcode = SD_OP_READ_PEER;
+		fwd_hdr.proto_ver = SD_SHEEP_PROTO_VER;
 		wlen = 0;
 		rlen = fwd_hdr.data_length;
 
-		ret = exec_req(sfd->fd, &fwd_hdr, req->data, &wlen, &rlen);
+		ret = sheep_exec_req(&v->nid, &fwd_hdr, req->data, &wlen,
+				     &rlen);
 
-		if (!ret && rsp->result == SD_RES_SUCCESS) {
-			memcpy(&req->rp, rsp, sizeof(*rsp));
-			ret = rsp->result;
-			sheep_put_sockfd(&v->nid, sfd);
-			break; /* Read success */
-		}
+		if (ret != SD_RES_SUCCESS)
+			continue;
 
-		if (ret) {
-			dprintf("remote node might have gone away\n");
-			sheep_del_sockfd(&v->nid, sfd);
-			ret = SD_RES_NETWORK_ERROR;
-		} else {
-			ret = rsp->result;
-			eprintf("remote read fail %x\n", ret);
-			sheep_put_sockfd(&v->nid, sfd);
-		}
+		/* Read success */
+		memcpy(&req->rp, rsp, sizeof(*rsp));
+		break;
 	}
 	return ret;
 }
diff --git a/sheep/group.c b/sheep/group.c
index 56dee6a..ae04a6d 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -623,7 +623,6 @@ static int get_vdis_from(struct sd_node *node)
 	struct sd_req hdr;
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
 	struct vdi_copy *vc = NULL;
-	struct sockfd *sfd;
 	int i, ret = SD_RES_SUCCESS;
 	unsigned int rlen, wlen;
 	int count;
@@ -631,18 +630,7 @@ static int get_vdis_from(struct sd_node *node)
 	if (node_is_local(node))
 		goto out;
 
-	sfd = sheep_get_sockfd(&node->nid);
-	if (!sfd) {
-		ret = SD_RES_NETWORK_ERROR;
-		goto out;
-	}
-
 	rlen = SD_DATA_OBJ_SIZE; /* FIXME */
-	sd_init_req(&hdr, SD_OP_GET_VDI_COPIES);
-	hdr.epoch = sys->epoch;
-	hdr.data_length = rlen;
-	wlen = 0;
-
 	vc = zalloc(rlen);
 	if (!vc) {
 		vprintf(SDOG_ERR, "unable to allocate memory\n");
@@ -650,28 +638,19 @@ static int get_vdis_from(struct sd_node *node)
 		goto out;
 	}
 
-	ret = exec_req(sfd->fd, &hdr, (char *)vc, &wlen, &rlen);
-
-	if (ret) {
-		dprintf("remote node might have gone away\n");
-		sheep_del_sockfd(&node->nid, sfd);
-		ret = SD_RES_NETWORK_ERROR;
-		goto out;
-	}
-	if (rsp->result != SD_RES_SUCCESS) {
-		ret = rsp->result;
-		eprintf("failed %x\n", ret);
-		sheep_put_sockfd(&node->nid, sfd);
+	sd_init_req(&hdr, SD_OP_GET_VDI_COPIES);
+	hdr.epoch = sys->epoch;
+	hdr.data_length = rlen;
+	wlen = 0;
+	ret = sheep_exec_req(&node->nid, &hdr, (char *)vc, &wlen, &rlen);
+	if (ret != SD_RES_SUCCESS)
 		goto out;
-	}
 
-	sheep_put_sockfd(&node->nid, sfd);
 	count = rsp->data_length / sizeof(*vc);
 	for (i = 0; i < count; i++) {
 		set_bit(vc[i].vid, sys->vdi_inuse);
 		add_vdi_copy_number(vc[i].vid, vc[i].nr_copies);
 	}
-	ret = SD_RES_SUCCESS;
 out:
 	free(vc);
 	return ret;
diff --git a/sheep/recovery.c b/sheep/recovery.c
index ae36768..aaa144b 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -45,8 +45,10 @@ static struct recovery_work *recovering_work;
 
 static int obj_cmp(const void *oid1, const void *oid2)
 {
-	const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);
-	const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t), FNV1A_64_INIT);
+	const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t),
+					   FNV1A_64_INIT);
+	const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t),
+					   FNV1A_64_INIT);
 
 	if (hval1 < hval2)
 		return -1;
@@ -59,15 +61,12 @@ static int recover_object_from_replica(uint64_t oid, struct sd_vnode *vnode,
 				       uint32_t epoch, uint32_t tgt_epoch)
 {
 	struct sd_req hdr;
-	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
 	unsigned wlen = 0, rlen;
-	int ret = -1;
+	int ret = SD_RES_NO_MEM;
 	void *buf;
 	struct siocb iocb = { 0 };
-	struct sockfd *sfd;
 
 	rlen = get_objsize(oid);
-
 	buf = valloc(rlen);
 	if (!buf) {
 		eprintf("%m\n");
@@ -78,19 +77,6 @@ static int recover_object_from_replica(uint64_t oid, struct sd_vnode *vnode,
 		iocb.epoch = epoch;
 		iocb.length = rlen;
 		ret = sd_store->link(oid, &iocb, tgt_epoch);
-		if (ret == SD_RES_SUCCESS) {
-			ret = 0;
-			goto done;
-		} else {
-			ret = -1;
-			goto out;
-		}
-	}
-
-
-	sfd = sheep_get_sockfd(&vnode->nid);
-	if (!sfd) {
-		ret = -1;
 		goto out;
 	}
 
@@ -98,41 +84,22 @@ static int recover_object_from_replica(uint64_t oid, struct sd_vnode *vnode,
 	hdr.epoch = epoch;
 	hdr.flags = SD_FLAG_CMD_RECOVERY;
 	hdr.data_length = rlen;
-
 	hdr.obj.oid = oid;
 	hdr.obj.tgt_epoch = tgt_epoch;
 
-	ret = exec_req(sfd->fd, &hdr, buf, &wlen, &rlen);
-
-	if (ret != 0) {
-		eprintf("res: %"PRIx32"\n", rsp->result);
-		ret = -1;
-		sheep_del_sockfd(&vnode->nid, sfd);
-		goto out;
-	}
-
-	if (rsp->result == SD_RES_SUCCESS) {
-		sheep_put_sockfd(&vnode->nid, sfd);
-
-		iocb.epoch = epoch;
-		iocb.length = rlen;
-		iocb.buf = buf;
-		ret = sd_store->atomic_put(oid, &iocb);
-		if (ret != SD_RES_SUCCESS) {
-			ret = -1;
-			goto out;
-		}
-	} else {
-		eprintf("failed, res: %"PRIx32"\n", rsp->result);
-		ret = rsp->result;
-		sheep_del_sockfd(&vnode->nid, sfd);
+	ret = sheep_exec_req(&vnode->nid, &hdr, buf, &wlen, &rlen);
+	if (ret != SD_RES_SUCCESS)
 		goto out;
-	}
-done:
-	dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch);
+	iocb.epoch = epoch;
+	iocb.length = rlen;
+	iocb.buf = buf;
+	ret = sd_store->atomic_put(oid, &iocb);
 out:
-	if (ret == SD_RES_SUCCESS)
+	if (ret == SD_RES_SUCCESS) {
+		dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid,
+			tgt_epoch, epoch);
 		objlist_cache_insert(oid);
+	}
 	free(buf);
 	return ret;
 }
@@ -180,7 +147,7 @@ again:
 			continue;
 		ret = recover_object_from_replica(oid, tgt_vnode,
 						  epoch, tgt_epoch);
-		if (ret == 0) {
+		if (ret == SD_RES_SUCCESS) {
 			/* Succeed */
 			break;
 		} else if (SD_RES_OLD_NODE_VER == ret) {
@@ -499,19 +466,12 @@ static int fetch_object_list(struct sd_node *e, uint32_t epoch,
 	char name[128];
 	struct sd_list_req hdr;
 	struct sd_list_rsp *rsp = (struct sd_list_rsp *)&hdr;
-	struct sockfd *sfd;
 	int ret;
 
 	addr_to_str(name, sizeof(name), e->nid.addr, 0);
 
 	dprintf("%s %"PRIu32"\n", name, e->nid.port);
 
-	sfd = sheep_get_sockfd(&e->nid);
-	if (!sfd) {
-		eprintf("%s %"PRIu32"\n", name, e->nid.port);
-		return -1;
-	}
-
 	wlen = 0;
 	rlen = buf_size;
 
@@ -520,21 +480,10 @@ static int fetch_object_list(struct sd_node *e, uint32_t epoch,
 	hdr.flags = 0;
 	hdr.data_length = rlen;
 
-	ret = exec_req(sfd->fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
+	ret = sheep_exec_req(&e->nid, (struct sd_req *)&hdr, buf, &wlen, &rlen);
 
-	if (ret) {
-		dprintf("remote node might have gone away\n");
-		sheep_del_sockfd(&e->nid, sfd);
-		return -1;
-	}
-	if (rsp->result != SD_RES_SUCCESS) {
-		ret = rsp->result;
-		eprintf("failed %x\n", ret);
-		sheep_put_sockfd(&e->nid, sfd);
+	if (ret != SD_RES_SUCCESS)
 		return -1;
-	}
-
-	sheep_put_sockfd(&e->nid, sfd);
 
 	dprintf("%zu\n", rsp->data_length / sizeof(uint64_t));
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 6eb7e2c..9c7c88f 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -403,6 +403,8 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr);
 struct sockfd *sheep_get_sockfd(struct node_id *);
 void sheep_put_sockfd(struct node_id *, struct sockfd *);
 void sheep_del_sockfd(struct node_id *, struct sockfd *);
+int sheep_exec_req(struct node_id *nid, struct sd_req *hdr, void *data,
+		   unsigned int *wlen, unsigned int *rlen);
 
 static inline bool is_object_cache_enabled(void)
 {
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index 1e957eb..ac1c9ce 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -462,3 +462,28 @@ void sheep_del_sockfd(struct node_id *nid, struct sockfd *sfd)
 	sockfd_cache_del(nid);
 	free(sfd);
 }
+
+int sheep_exec_req(struct node_id *nid, struct sd_req *hdr, void *buf,
+		   unsigned int *wlen, unsigned int *rlen)
+{
+	struct sd_rsp *rsp = (struct sd_rsp *)hdr;
+	struct sockfd *sfd;
+	int ret;
+
+	sfd = sheep_get_sockfd(nid);
+	if (!sfd)
+		return SD_RES_NETWORK_ERROR;
+
+	ret = exec_req(sfd->fd, hdr, buf, wlen, rlen);
+	if (ret) {
+		dprintf("remote node might have gone away\n");
+		sheep_del_sockfd(nid, sfd);
+		return SD_RES_NETWORK_ERROR;
+	}
+	ret = rsp->result;
+	if (ret != SD_RES_SUCCESS)
+		eprintf("failed %x\n", ret);
+
+	sheep_put_sockfd(nid, sfd);
+	return ret;
+}
diff --git a/sheep/store.c b/sheep/store.c
index d89e875..7f9ea29 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -102,37 +102,20 @@ int epoch_log_read_remote(uint32_t epoch, struct sd_node *nodes, int len)
 	for (i = 0; i < nr; i++) {
 		struct sd_req hdr;
 		struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
-		struct sockfd *sfd;
 		unsigned int rlen, wlen;
 
 		if (node_is_local(&local_nodes[i]))
 			continue;
 
-		sfd = sheep_get_sockfd(&local_nodes[i].nid);
-		if (!sfd) {
-			continue;
-		}
-
 		sd_init_req(&hdr, SD_OP_GET_EPOCH);
 		hdr.data_length = rlen = len;
 		hdr.obj.tgt_epoch = epoch;
-
 		wlen = 0;
-
-		ret = exec_req(sfd->fd, &hdr, nodes, &wlen, &rlen);
-		if (ret) {
-			dprintf("remote node might have gone away\n");
-			sheep_del_sockfd(&local_nodes[i].nid, sfd);
-			continue;
-		}
-		if (rsp->result != SD_RES_SUCCESS) {
-			ret = rsp->result;
-			eprintf("failed %x\n", ret);
-			sheep_put_sockfd(&local_nodes[i].nid, sfd);
+		ret = sheep_exec_req(&local_nodes[i].nid, &hdr, nodes, &wlen,
+				     &rlen);
+		if (ret != SD_RES_SUCCESS)
 			continue;
-		}
 
-		sheep_put_sockfd(&local_nodes[i].nid, sfd);
 		return rsp->data_length / sizeof(*nodes);
 	}
 
-- 
1.7.10.2




More information about the sheepdog mailing list