[sheepdog] [PATCH V4 2/2] collie: optimize 'collie vdi check' command

Yunkai Zhang yunkai.me at gmail.com
Sun Sep 2 14:59:45 CEST 2012


From: Yunkai Zhang <qiushu.zyk at taobao.com>

V4:
- share forward_request_concurrently() in this patch
- move sha1_to_hex to sha1.c
- use sockfd cache

V3:
- use local sheep as gateway, no just a proxy
- update commit log

V2:
- use '-R, --repair' instread of '-F, --force_repair'
- not connect to target sheep directly
- define a member name instead of using __pad.
- update this commit log
---------------------------------------------- >8

Reading all of vdi's objects from cluster when checking them will lead to a lot
of waste of network bandwith, let's calculate the checksum of objects in backend
and only send the checksum result to the collie client.

And I think repairing object automaticly is dangerous, as we don't known which
replica is correct. In order to let user have a chance to check them if
necessary, I add a new option: '-R, --repair'. By default, this command
just do check, not repair(as the command name implies).

After add '-R' flag, the help looks like:
$ collie vdi check
Usage: collie vdi check [-R] [-s snapshot] [-a address] [-p port] [-h] <vdiname>
Options:
  -R, --repair            force repair object's copies (dangerous)
  -s, --snapshot          specify a snapshot id or tag name
  -a, --address           specify the daemon address (default: localhost)
  -p, --port              specify the daemon port
  -h, --help              display this help and exit

Let's show some examples when execute this command:
* Success:
$ collie vdi check test.img
CHECKING VDI:test.img ...
PASSED

* Failure (by default not repair):
$ collie vdi check test.img
CHECKING VDI:test.img ...
Failed oid: 9c5e6800000001
>> copy[0], sha1: c78ca69c4be7401b6d1f11a37a4cec4226e736cd, from: 127.0.0.1:7000
>> copy[1], sha1: 46dbc769de60a508faf134c6d51926741c0e38fa, from: 127.0.0.1:7001
>> copy[2], sha1: c78ca69c4be7401b6d1f11a37a4cec4226e736cd, from: 127.0.0.1:7004
FAILED

User can do force repair by specify -R or --repair flag:
* Force repair:
$ collie vdi check -R test.img
CHECKING VDI:test.img ...
Failed oid: 9c5e6800000001
>> copy[0], sha1: c78ca69c4be7401b6d1f11a37a4cec4226e736cd, from: 127.0.0.1:7000
>> copy[1], sha1: 46dbc769de60a508faf134c6d51926741c0e38fa, from: 127.0.0.1:7001
>> copy[2], sha1: c78ca69c4be7401b6d1f11a37a4cec4226e736cd, from: 127.0.0.1:7004
>> Repaired successfully!
REPAIRED

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 collie/common.c          |   6 +-
 collie/vdi.c             | 186 ++++++++++++++++++++++++--------------------
 include/internal_proto.h |  24 +++++-
 include/sha1.h           |   1 +
 lib/sha1.c               |  17 +++++
 sheep/farm/farm.h        |   1 -
 sheep/farm/sha1_file.c   |  15 ----
 sheep/gateway.c          |  69 +++++++++++++++++
 sheep/ops.c              | 195 +++++++++++++++++++++++++++++++++++++++--------
 sheep/sheep_priv.h       |   4 +
 10 files changed, 383 insertions(+), 135 deletions(-)

diff --git a/collie/common.c b/collie/common.c
index 8bfab30..fbc9449 100644
--- a/collie/common.c
+++ b/collie/common.c
@@ -211,8 +211,7 @@ int send_light_req_get_response(struct sd_req *hdr, const char *host, int port)
 	ret = exec_req(fd, hdr, NULL, &wlen, &rlen);
 	close(fd);
 	if (ret) {
-		fprintf(stderr, "failed to connect to  %s:%d\n",
-			host, port);
+		fprintf(stderr, "failed to connect to  %s:%d\n", host, port);
 		return -1;
 	}
 
@@ -233,8 +232,7 @@ int send_light_req(struct sd_req *hdr, const char *host, int port)
 		return -1;
 
 	if (ret != SD_RES_SUCCESS) {
-		fprintf(stderr, "Response's result: %s\n",
-			sd_strerror(ret));
+		fprintf(stderr, "Response's result: %s\n", sd_strerror(ret));
 		return -1;
 	}
 
diff --git a/collie/vdi.c b/collie/vdi.c
index 966fb9f..467bc3b 100644
--- a/collie/vdi.c
+++ b/collie/vdi.c
@@ -15,6 +15,7 @@
 
 #include "collie.h"
 #include "treeview.h"
+#include "sha1.h"
 
 static struct sd_option vdi_options[] = {
 	{'P', "prealloc", 0, "preallocate all the data objects"},
@@ -24,6 +25,8 @@ static struct sd_option vdi_options[] = {
 	{'d', "delete", 0, "delete a key"},
 	{'w', "writeback", 0, "use writeback mode"},
 	{'c', "copies", 1, "specify the data redundancy (number of copies)"},
+	{'R', "repair", 0, "force repair object's copies"},
+
 	{ 0, NULL, 0, NULL },
 };
 
@@ -36,6 +39,8 @@ struct vdi_cmd_data {
 	int prealloc;
 	int nr_copies;
 	bool writeback;
+	int cache;
+	int repair;
 } vdi_cmd_data = { ~0, };
 
 struct get_vdi_info {
@@ -1323,126 +1328,129 @@ out:
 	return ret;
 }
 
-static void *read_object_from(struct sd_vnode *vnode, uint64_t oid)
+static int get_object_checksum(uint64_t oid,
+			       unsigned char sha1[SD_MAX_COPIES][SHA1_LEN],
+			       struct node_id nids[SD_MAX_COPIES])
 {
 	struct sd_req hdr;
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
-	int fd, ret;
-	unsigned wlen = 0, rlen = SD_DATA_OBJ_SIZE;
-	char name[128];
+	unsigned rlen, wlen;
+	int i, fd, ret, offset, nr_copies;
 	void *buf;
 
-	buf = malloc(SD_DATA_OBJ_SIZE);
-	if (!buf) {
-		fprintf(stderr, "Failed to allocate memory\n");
-		exit(EXIT_SYSFAIL);
-	}
-
-	addr_to_str(name, sizeof(name), vnode->nid.addr, 0);
-	fd = connect_to(name, vnode->nid.port);
+	fd = connect_to(sdhost, sdport);
 	if (fd < 0) {
-		fprintf(stderr, "failed to connect to %s:%"PRIu32"\n",
-			name, vnode->nid.port);
-		exit(EXIT_FAILURE);
+		fprintf(stderr, "Failed to connect\n");
+		return -1;
 	}
 
-	sd_init_req(&hdr, SD_OP_READ_PEER);
-	hdr.epoch = sd_epoch;
-	hdr.flags = 0;
-	hdr.data_length = rlen;
+	sd_init_req(&hdr, SD_OP_CALC_CHKSUM);
+
+	rlen = SD_MAX_COPIES * (SHA1_LEN + sizeof(*nids));
+	wlen = 0;
 
+	buf = xmalloc(rlen);
+	hdr.epoch = sd_epoch;
 	hdr.obj.oid = oid;
+	hdr.data_length = rlen;
+	hdr.flags = 0;
 
 	ret = exec_req(fd, &hdr, buf, &wlen, &rlen);
 	close(fd);
-
 	if (ret) {
-		fprintf(stderr, "Failed to execute request\n");
-		exit(EXIT_FAILURE);
+		fprintf(stderr, "Failed to get checksum, oid:%"PRIx64"\n", oid);
+		return -1;
 	}
-
 	if (rsp->result != SD_RES_SUCCESS) {
-		fprintf(stderr, "Failed to read, %s\n",
-			sd_strerror(rsp->result));
-		exit(EXIT_FAILURE);
+		fprintf(stderr, "Failed to get checksum oid:%"PRIx64", %s\n",
+			oid, sd_strerror(rsp->result));
+		return -1;
 	}
-	return buf;
+
+	offset = 0;
+	nr_copies = rsp->obj.copies;
+	for (i = 0; i < nr_copies; i++) {
+		memcpy(&sha1[i], (char *)buf + offset, SHA1_LEN);
+		memcpy(&nids[i], (char *)buf + offset + SHA1_LEN,
+		       sizeof(*nids));
+		offset += SHA1_LEN + sizeof(*nids);
+	}
+
+	return nr_copies;
 }
 
-static void write_object_to(struct sd_vnode *vnode, uint64_t oid, void *buf)
+static int do_repair(uint64_t oid)
 {
 	struct sd_req hdr;
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
-	int fd, ret;
-	unsigned wlen = SD_DATA_OBJ_SIZE, rlen = 0;
-	char name[128];
+	int ret;
 
-	addr_to_str(name, sizeof(name), vnode->nid.addr, 0);
-	fd = connect_to(name, vnode->nid.port);
-	if (fd < 0) {
-		fprintf(stderr, "failed to connect to %s:%"PRIu32"\n",
-			name, vnode->nid.port);
-		exit(EXIT_FAILURE);
-	}
+	sd_init_req(&hdr, SD_OP_REPAIR_OBJ);
 
-	sd_init_req(&hdr, SD_OP_WRITE_PEER);
 	hdr.epoch = sd_epoch;
-	hdr.flags = SD_FLAG_CMD_WRITE;
-	hdr.data_length = wlen;
-
 	hdr.obj.oid = oid;
 
-	ret = exec_req(fd, &hdr, buf, &wlen, &rlen);
-	close(fd);
-
+	ret = send_light_req(&hdr, sdhost, sdport);
 	if (ret) {
-		fprintf(stderr, "Failed to execute request\n");
-		exit(EXIT_FAILURE);
+		fprintf(stderr, "Failed to repair oid:%"PRIx64"\n", oid);
+		return SD_RES_EIO;
 	}
-
 	if (rsp->result != SD_RES_SUCCESS) {
-		fprintf(stderr, "Failed to read, %s\n",
-			sd_strerror(rsp->result));
-		exit(EXIT_FAILURE);
+		fprintf(stderr, "Failed to repair oid:%"PRIx64", %s\n",
+			oid, sd_strerror(rsp->result));
+		return rsp->result;
 	}
+
+	return SD_RES_SUCCESS;
 }
 
-/*
- * Fix consistency of the replica of oid.
- *
- * XXX: The fix is rather dumb, just read the first copy and write it
- * to other replica.
- */
-static void do_check_repair(uint64_t oid, int nr_copies)
+static int do_check_repair(uint64_t oid)
 {
-	struct sd_vnode *tgt_vnodes[nr_copies];
-	void *buf, *buf_cmp;
-	int i;
-
-	oid_to_vnodes(sd_vnodes, sd_vnodes_nr, oid, nr_copies, tgt_vnodes);
-	buf = read_object_from(tgt_vnodes[0], oid);
-	for (i = 1; i < nr_copies; i++) {
-		buf_cmp = read_object_from(tgt_vnodes[i], oid);
-		if (memcmp(buf, buf_cmp, SD_DATA_OBJ_SIZE)) {
-			free(buf_cmp);
-			goto fix_consistency;
+	unsigned char sha1[SD_MAX_COPIES][SHA1_LEN];
+	struct node_id nids[SD_MAX_COPIES];
+	int i, j, ret, nr_copies;
+	char host[128];
+
+	nr_copies = get_object_checksum(oid, sha1, nids);
+	for (i = 0; i < nr_copies; i++) {
+		for (j = (i + 1); j < nr_copies; j++) {
+			if (memcmp(sha1[i], sha1[j], SHA1_LEN))
+				goto diff;
 		}
-		free(buf_cmp);
 	}
-	free(buf);
-	return;
+	return 0;
 
-fix_consistency:
-	for (i = 1; i < nr_copies; i++)
-		write_object_to(tgt_vnodes[i], oid, buf);
-	fprintf(stdout, "fix %"PRIx64" success\n", oid);
-	free(buf);
+diff:
+	fprintf(stderr, "Failed oid: %"PRIx64"\n", oid);
+	for (i = 0; i < nr_copies; i++) {
+		addr_to_str(host, sizeof(host), nids[i].addr, 0);
+		fprintf(stderr, ">> copy[%d], sha1: %s, from: %s:%d\n",
+			i, sha1_to_hex(sha1[i]), host, nids[i].port);
+	}
+
+	if (!vdi_cmd_data.repair)
+		return -1;
+
+	/*
+	 * Force repair the consistency of oid's replica
+	 *
+	 * FIXME: this fix is rather dumb, it just read the
+	 * first copy and write it to other replica,
+	 */
+	ret = do_repair(oid);
+	if (ret == SD_RES_SUCCESS) {
+		fprintf(stderr, ">> Repaired successfully!\n");
+		return -1;
+	}
+
+	fprintf(stderr, ">> Failed to repair!\n");
+	return -1;
 }
 
 static int check_repair_vdi(uint32_t vid)
 {
 	struct sheepdog_inode *inode;
-	int ret;
+	int ret, failed = 0;
 	uint64_t total, done = 0, oid;
 	uint32_t idx = 0, dvid;
 
@@ -1451,22 +1459,29 @@ static int check_repair_vdi(uint32_t vid)
 		fprintf(stderr, "Failed to allocate memory\n");
 		return EXIT_SYSFAIL;
 	}
+
 	ret = sd_read_object(vid_to_vdi_oid(vid), inode, SD_INODE_SIZE, 0);
 	if (ret != SD_RES_SUCCESS) {
 		fprintf(stderr, "Failed to read an inode\n");
 		return EXIT_FAILURE;
 	}
 
+	if (do_check_repair(vid_to_vdi_oid(vid)))
+		failed = 1;
+
 	total = inode->vdi_size;
 	while(done < total) {
 		dvid = inode->data_vdi_id[idx];
 		if (dvid) {
 			oid = vid_to_data_oid(dvid, idx);
-			do_check_repair(oid, inode->nr_copies);
+			if (do_check_repair(oid))
+				failed = 1;
 		}
 		done += SD_DATA_OBJ_SIZE;
 		idx++;
 	}
+	if (failed)
+		return EXIT_FAILURE;
 
 	return EXIT_SUCCESS;
 }
@@ -1485,11 +1500,15 @@ static int vdi_check(int argc, char **argv)
 		goto out;
 	}
 
+	printf("CHECKING VDI:%s ...\n", vdiname);
 	ret = check_repair_vdi(vid);
-	if (ret != EXIT_SUCCESS)
+	if (ret != EXIT_SUCCESS) {
+		printf("%s\n",
+		       vdi_cmd_data.repair ? "REPAIRED" : "FAILED");
 		goto out;
+	}
 
-	fprintf(stdout, "finish check&repair %s\n", vdiname);
+	printf("PASSED\n");
 	return EXIT_SUCCESS;
 out:
 	return ret;
@@ -1522,7 +1541,7 @@ out:
 }
 
 static struct subcommand vdi_cmd[] = {
-	{"check", "<vdiname>", "saph", "check and repair image's consistency",
+	{"check", "<vdiname>", "Rsaph", "check and repair image's consistency",
 	 NULL, SUBCMD_FLAG_NEED_NODELIST|SUBCMD_FLAG_NEED_THIRD_ARG,
 	 vdi_check, vdi_options},
 	{"create", "<vdiname> <size>", "Pcaph", "create an image",
@@ -1611,6 +1630,9 @@ static int vdi_parser(int ch, char *opt)
 			exit(EXIT_FAILURE);
 		}
 		vdi_cmd_data.nr_copies = nr_copies;
+	case 'R':
+		vdi_cmd_data.repair = 1;
+		break;
 	}
 
 	return 0;
diff --git a/include/internal_proto.h b/include/internal_proto.h
index 5288823..7817fee 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -61,10 +61,14 @@
 #define SD_OP_REMOVE_PEER    0xA6
 #define SD_OP_SET_CACHE_SIZE 0xA7
 #define SD_OP_ENABLE_RECOVER 0xA8
-#define SD_OP_DISABLE_RECOVER 0xA9
-#define SD_OP_INFO_RECOVER 0xAA
-#define SD_OP_GET_VDI_COPIES 0xAB
+#define SD_OP_DISABLE_RECOVER  0xA9
+#define SD_OP_INFO_RECOVER     0xAA
+#define SD_OP_GET_VDI_COPIES   0xAB
 #define SD_OP_COMPLETE_RECOVERY 0xAC
+#define SD_OP_CALC_CHKSUM      0xAD
+#define SD_OP_CALC_CHKSUM_PEER 0xAE
+#define SD_OP_REPAIR_OBJ       0xAF
+#define SD_OP_REPAIR_OBJ_PEER  0xB0
 
 /* internal flags for hdr.flags, must be above 0x80 */
 #define SD_FLAG_CMD_RECOVERY 0x0080
@@ -169,6 +173,20 @@ struct sd_node_rsp {
 	uint64_t	store_free;
 };
 
+struct sd_checksum_rsp {
+	uint8_t		proto_ver;
+	uint8_t		opcode;
+	uint16_t	flags;
+	uint32_t	epoch;
+	uint32_t	id;
+	uint32_t	data_length;
+	uint32_t	result;
+	union {
+		uint8_t		sha1[SHA1_LEN];
+		uint32_t	__pad[7];
+	};
+};
+
 struct node_id {
 	uint8_t addr[16];
 	uint16_t port;
diff --git a/include/sha1.h b/include/sha1.h
index cc60dc1..1a3c14d 100644
--- a/include/sha1.h
+++ b/include/sha1.h
@@ -23,5 +23,6 @@ struct sha1_ctx {
 void sha1_init(void *ctx);
 void sha1_update(void *ctx, const uint8_t *data, unsigned int len);
 void sha1_final(void* ctx, uint8_t *out);
+char *sha1_to_hex(const unsigned char *sha1);
 
 #endif
diff --git a/lib/sha1.c b/lib/sha1.c
index 5cf444a..e417098 100644
--- a/lib/sha1.c
+++ b/lib/sha1.c
@@ -18,6 +18,7 @@
  *
  */
 #include <arpa/inet.h>
+#include "sheepdog_proto.h"
 #include "sha1.h"
 
 #define SHA1_DIGEST_SIZE	20
@@ -166,3 +167,19 @@ void sha1_final(void* ctx, uint8_t *out)
 	/* Wipe context */
 	memset(sctx, 0, sizeof *sctx);
 }
+
+char *sha1_to_hex(const unsigned char *sha1)
+{
+	static char buffer[50];
+	static const char hex[] = "0123456789abcdef";
+	char *buf = buffer;
+	int i;
+
+	for (i = 0; i < SHA1_LEN; i++) {
+		unsigned int val = *sha1++;
+		*buf++ = hex[val >> 4];
+		*buf++ = hex[val & 0xf];
+	}
+	buffer[2 * SHA1_LEN] = 0;
+	return buffer;
+}
diff --git a/sheep/farm/farm.h b/sheep/farm/farm.h
index 1a0081e..da79159 100644
--- a/sheep/farm/farm.h
+++ b/sheep/farm/farm.h
@@ -46,7 +46,6 @@ extern char farm_obj_dir[PATH_MAX];
 extern char *sha1_to_path(const unsigned char *sha1);
 extern int sha1_file_write(unsigned char *buf, unsigned len, unsigned char *);
 extern void *sha1_file_read(const unsigned char *sha1, struct sha1_file_hdr *);
-extern char *sha1_to_hex(const unsigned char *sha1);
 extern int get_sha1_hex(const char *hex, unsigned char *sha1);
 extern int sha1_file_try_delete(const unsigned char *sha1);
 
diff --git a/sheep/farm/sha1_file.c b/sheep/farm/sha1_file.c
index b0abc16..3cd4f40 100644
--- a/sheep/farm/sha1_file.c
+++ b/sheep/farm/sha1_file.c
@@ -257,18 +257,3 @@ int get_sha1_hex(const char *hex, unsigned char *sha1)
 	}
 	return 0;
 }
-
-char *sha1_to_hex(const unsigned char *sha1)
-{
-	static char buffer[50];
-	static const char hex[] = "0123456789abcdef";
-	char *buf = buffer;
-	int i;
-
-	for (i = 0; i < SHA1_LEN; i++) {
-		unsigned int val = *sha1++;
-		*buf++ = hex[val >> 4];
-		*buf++ = hex[val & 0xf];
-	}
-	return buffer;
-}
diff --git a/sheep/gateway.c b/sheep/gateway.c
index 94ec6f9..fb91c28 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -346,3 +346,72 @@ int gateway_remove_obj(struct request *req)
 {
 	return gateway_forward_request(req);
 }
+
+int gateway_calc_obj_chksum(struct request *req)
+{
+	struct sd_req fwdhdr, *hdr = &req->rq;
+	struct sd_vnode *vnodes[SD_MAX_COPIES];
+	struct sd_rsp rsps[SD_MAX_COPIES];
+	unsigned int wlen;
+	uint64_t oid = hdr->obj.oid;
+	int i, ret, offset, nr_objs;
+
+	nr_objs = get_obj_copy_number(oid, req->vinfo->nr_zones);
+
+	oid_to_vnodes(req->vinfo->vnodes, req->vinfo->nr_vnodes,
+		      oid, nr_objs, vnodes);
+
+	memcpy(&fwdhdr, hdr, sizeof(fwdhdr));
+	fwdhdr.opcode = gateway_to_peer_opcode(hdr->opcode);
+	fwdhdr.flags = 0;
+	fwdhdr.data_length = 0;
+	wlen = 0;
+
+	ret = forward_request_concurrently(&fwdhdr, NULL, &wlen,
+					   vnodes, rsps, NULL, nr_objs);
+	if (ret != SD_RES_SUCCESS)
+		return ret;
+
+	for (i = 0, offset = 0; i < nr_objs; i++) {
+		struct sd_checksum_rsp *rsp;
+
+		rsp = (struct sd_checksum_rsp *)&rsps[i];
+		memcpy((char *)req->data + offset, rsp->sha1, SHA1_LEN);
+		memcpy((char *)req->data + offset + SHA1_LEN, &vnodes[i]->nid,
+		       sizeof(vnodes[i]->nid));
+		offset += SHA1_LEN + sizeof(vnodes[i]->nid);
+	}
+
+	req->rp.obj.copies = nr_objs;
+	req->rp.data_length = offset;
+
+	return SD_RES_SUCCESS;
+}
+
+int gateway_repair_obj(struct request *req)
+{
+	struct sd_req fwdhdr, *hdr = &req->rq;
+	struct sd_vnode *vnodes[SD_MAX_COPIES];
+	struct sd_rsp rsps[SD_MAX_COPIES];
+	uint64_t oid = hdr->obj.oid;
+	struct node_id *src;
+	unsigned wlen;
+	int nr_objs;
+
+	nr_objs = get_obj_copy_number(oid, req->vinfo->nr_zones);
+
+	oid_to_vnodes(req->vinfo->vnodes, req->vinfo->nr_vnodes,
+		      oid, nr_objs, vnodes);
+
+	src = &vnodes[0]->nid;
+	wlen = sizeof(*src);
+
+	memcpy(&fwdhdr, hdr, sizeof(fwdhdr));
+	fwdhdr.opcode = gateway_to_peer_opcode(hdr->opcode);
+	fwdhdr.data_length = wlen;
+	fwdhdr.flags = SD_FLAG_CMD_WRITE;
+
+	return forward_request_concurrently(&fwdhdr, src, &wlen,
+					    &vnodes[1], rsps, NULL,
+					    nr_objs - 1);
+}
diff --git a/sheep/ops.c b/sheep/ops.c
index b4fa77f..9817388 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -26,6 +26,7 @@
 #include "strbuf.h"
 #include "trace/trace.h"
 #include "util.h"
+#include "sha1.h"
 
 enum sd_op_type {
 	SD_OP_TYPE_CLUSTER = 1, /* cluster operations */
@@ -219,7 +220,6 @@ static int cluster_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
 	int i, ret;
 	uint32_t latest_epoch;
 	uint64_t created_time;
-	struct siocb iocb = { 0 };
 	struct store_driver *driver;
 	char *store_name = data;
 
@@ -229,7 +229,6 @@ static int cluster_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
 
 	sd_store = driver;
 	latest_epoch = get_latest_epoch();
-	iocb.epoch = latest_epoch;
 
 	ret = sd_store->format();
 	if (ret != SD_RES_SUCCESS)
@@ -690,6 +689,53 @@ static int local_kill_node(const struct sd_req *req, struct sd_rsp *rsp,
 	return SD_RES_SUCCESS;
 }
 
+static inline int read_object_from(struct node_id *src, uint32_t epoch,
+				   uint64_t oid, char *buf)
+{
+	struct sd_req hdr;
+	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+	struct sockfd *sfd;
+	unsigned wlen, rlen;
+	char host[128];
+	int ret;
+
+	sfd = sheep_get_sockfd(src);
+	if (!sfd) {
+		addr_to_str(host, sizeof(host), src->addr, 0);
+		dprintf("failed to connect to %s:%"PRIu32"\n",
+			host, src->port);
+		return SD_RES_NETWORK_ERROR;
+	}
+
+	rlen = SD_DATA_OBJ_SIZE;
+	if (is_vdi_obj(oid))
+		rlen = SD_INODE_SIZE;
+	wlen = 0;
+
+	sd_init_req(&hdr, SD_OP_READ_PEER);
+	hdr.epoch = epoch;
+	hdr.data_length = rlen;
+
+	hdr.obj.oid = oid;
+	hdr.obj.offset = 0;
+
+	ret = exec_req(sfd->fd, &hdr, buf, &wlen, &rlen);
+	if (ret) {
+		dprintf("Failed to execute request\n");
+		sheep_del_sockfd(src, sfd);
+		return SD_RES_NETWORK_ERROR;
+	}
+
+	sheep_put_sockfd(src, sfd);
+
+	if (rsp->result != SD_RES_SUCCESS) {
+		dprintf("Failed to read, %s\n", sd_strerror(rsp->result));
+		return rsp->result;
+	}
+
+	return SD_RES_SUCCESS;
+}
+
 static int read_copy_from_replica(struct vnode_info *vnodes, uint32_t epoch,
 				  uint64_t oid, char *buf)
 {
@@ -731,9 +777,6 @@ static int read_copy_from_replica(struct vnode_info *vnodes, uint32_t epoch,
 	rounded_rand = random() % nr_copies;
 
 	for (i = 0; i < nr_copies; i++) {
-		unsigned wlen, rlen;
-		int fd;
-
 		j = (i + rounded_rand) % nr_copies;
 
 		/* bypass the local copy */
@@ -741,32 +784,9 @@ static int read_copy_from_replica(struct vnode_info *vnodes, uint32_t epoch,
 			continue;
 
 		v = obj_vnodes[j];
-		addr_to_str(name, sizeof(name), v->nid.addr, 0);
-
-		fd = connect_to(name, v->nid.port);
-		if (fd < 0)
-			continue;
-
-		rlen = SD_DATA_OBJ_SIZE;
-		wlen = 0;
-
-		sd_init_req(&hdr, SD_OP_READ_PEER);
-		hdr.epoch = epoch;
-		hdr.data_length = rlen;
-
-		hdr.obj.oid = oid;
-		hdr.obj.offset = 0;
-
-		ret = exec_req(fd, &hdr, buf, &wlen, &rlen);
-
-		close(fd);
 
-		if (ret) {
-			dprintf("%x, %x\n", ret, rsp->result);
-			continue;
-		}
-
-		if (rsp->result == SD_RES_SUCCESS)
+		ret = read_object_from(&v->nid, epoch, oid, buf);
+		if (ret == SD_RES_SUCCESS)
 			break;
 	}
 
@@ -817,6 +837,50 @@ out:
 	return ret;
 }
 
+int peer_calc_obj_chksum(struct request *req)
+{
+	struct sd_req *hdr = &req->rq;
+	struct sd_checksum_rsp *rsp = (struct sd_checksum_rsp *)&req->rp;
+	uint32_t epoch = hdr->epoch;
+	unsigned char sha1[SHA1_LEN];
+	struct siocb iocb;
+	struct sha1_ctx ctx;
+	void *buf;
+	int ret;
+
+	if (sys->gateway_only)
+		return SD_RES_NO_OBJ;
+
+	hdr->data_length = SD_DATA_OBJ_SIZE;
+	if (is_vdi_obj(hdr->obj.oid))
+		hdr->data_length = SD_INODE_SIZE;
+
+	buf = valloc(hdr->data_length);
+	hdr->obj.offset = 0;
+
+	memset(&iocb, 0, sizeof(iocb));
+	iocb.epoch = epoch;
+	iocb.flags = hdr->flags;
+	iocb.buf = buf;
+	iocb.length = hdr->data_length;
+	iocb.offset = hdr->obj.offset;
+	ret = sd_store->read(hdr->obj.oid, &iocb);
+	if (ret != SD_RES_SUCCESS)
+		goto out;
+
+	sha1_init(&ctx);
+	sha1_update(&ctx, buf, hdr->data_length);
+	sha1_final(&ctx, sha1);
+	memcpy(&rsp->sha1, sha1, SHA1_LEN);
+out:
+	req->data = NULL;
+	req->data_length = 0;
+	hdr->data_length = 0;
+	rsp->data_length = 0;
+	free(buf);
+	return ret;
+}
+
 static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch,
 		void *data, int create)
 {
@@ -846,6 +910,50 @@ static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch,
 	return ret;
 }
 
+int peer_repair_obj(struct request *req)
+{
+	struct sd_req whdr, *hdr = &req->rq;
+	uint32_t epoch = hdr->epoch;
+	uint64_t oid = hdr->obj.oid;
+	struct node_id *src;
+	struct siocb iocb;
+	char host[128];
+	int ret, len;
+	void *buf;
+
+	if (sys->gateway_only)
+		return SD_RES_NO_OBJ;
+
+	src = (struct node_id *)req->data;
+	addr_to_str(host, sizeof(host), src->addr, src->port);
+
+	dprintf("oid:%"PRIx64", from:%s\n", oid, host);
+
+	len = SD_DATA_OBJ_SIZE;
+	if (is_vdi_obj(hdr->obj.oid))
+		len = SD_INODE_SIZE;
+
+	buf = valloc(len);
+	if (!buf) {
+		eprintf("can not allocate memory\n");
+		return SD_RES_NO_SPACE;
+	}
+
+	ret = read_object_from(src, epoch, oid, buf);
+	if (ret != SD_RES_SUCCESS)
+		return ret;
+
+	memset(&whdr, 0, sizeof(whdr));
+	memset(&iocb, 0, sizeof(iocb));
+
+	whdr.data_length = len;
+	whdr.obj.offset = 0;
+	whdr.obj.oid = oid;
+	whdr.epoch = epoch;
+	iocb.epoch = epoch;
+	return do_write_obj(&iocb, &whdr, epoch, buf, 1);
+}
+
 int peer_write_obj(struct request *req)
 {
 	struct sd_req *hdr = &req->rq;
@@ -1167,11 +1275,36 @@ static struct sd_op_template sd_ops[] = {
 		.type = SD_OP_TYPE_CLUSTER,
 		.process_main = cluster_disable_recover,
 	},
+
 	[SD_OP_INFO_RECOVER] = {
 		.name = "INFO_RECOVER",
 		.type = SD_OP_TYPE_LOCAL,
 		.process_main = local_info_recover,
 	},
+
+	[SD_OP_CALC_CHKSUM] = {
+		.name = "CALC_CHKSUM",
+		.type = SD_OP_TYPE_GATEWAY,
+		.process_work = gateway_calc_obj_chksum,
+	},
+
+	[SD_OP_CALC_CHKSUM_PEER] = {
+		.name = "CALC_CHKSUM_PEER",
+		.type = SD_OP_TYPE_PEER,
+		.process_work = peer_calc_obj_chksum,
+	},
+
+	[SD_OP_REPAIR_OBJ] = {
+		.name = "REPAIR_OBJ",
+		.type = SD_OP_TYPE_GATEWAY,
+		.process_work = gateway_repair_obj,
+	},
+
+	[SD_OP_REPAIR_OBJ_PEER] = {
+		.name = "REPAIR_OBJ_PEER",
+		.type = SD_OP_TYPE_PEER,
+		.process_work = peer_repair_obj,
+	},
 };
 
 struct sd_op_template *get_sd_op(uint8_t opcode)
@@ -1257,6 +1390,8 @@ static int map_table[] = {
 	[SD_OP_READ_OBJ] = SD_OP_READ_PEER,
 	[SD_OP_WRITE_OBJ] = SD_OP_WRITE_PEER,
 	[SD_OP_REMOVE_OBJ] = SD_OP_REMOVE_PEER,
+	[SD_OP_CALC_CHKSUM] = SD_OP_CALC_CHKSUM_PEER,
+	[SD_OP_REPAIR_OBJ] = SD_OP_REPAIR_OBJ_PEER,
 };
 
 int gateway_to_peer_opcode(int opcode)
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 5969c26..4e8bae6 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -348,12 +348,16 @@ int gateway_read_obj(struct request *req);
 int gateway_write_obj(struct request *req);
 int gateway_create_and_write_obj(struct request *req);
 int gateway_remove_obj(struct request *req);
+int gateway_calc_obj_chksum(struct request *req);
+int gateway_repair_obj(struct request *req);
 
 /* backend store */
 int peer_read_obj(struct request *req);
 int peer_write_obj(struct request *req);
 int peer_create_and_write_obj(struct request *req);
 int peer_remove_obj(struct request *req);
+int peer_calc_obj_chksum(struct request *req);
+int peer_repair_obj(struct request *req);
 
 /* object_cache */
 
-- 
1.7.11.4




More information about the sheepdog mailing list