[sheepdog] [PATCH v2 08/10] collie: use hash for collie check

MORITA Kazutaka morita.kazutaka at gmail.com
Mon May 13 17:11:16 CEST 2013


From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>

This patch checks the hash values of the target objects and skips
check and repair when there is no difference between them.  This also
uses work queue to boost the vdi check performance.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/collie.c |   15 +++++
 collie/vdi.c    |  195 ++++++++++++++++++++++++++++++++++++++++++++-----------
 2 files changed, 172 insertions(+), 38 deletions(-)

diff --git a/collie/collie.c b/collie/collie.c
index 6d54829..05031fd 100644
--- a/collie/collie.c
+++ b/collie/collie.c
@@ -18,6 +18,8 @@
 #include "collie.h"
 #include "util.h"
 
+#define EPOLL_SIZE 4096
+
 static const char program_name[] = "collie";
 const char *sdhost = "127.0.0.1";
 int sdport = SD_LISTEN_PORT;
@@ -322,6 +324,11 @@ static void crash_handler(int signo)
 	reraise_crash_signal(signo, EXIT_SYSFAIL);
 }
 
+static size_t get_nr_nodes(void)
+{
+	return sd_nodes_nr;
+}
+
 int main(int argc, char **argv)
 {
 	int ch, longindex, ret;
@@ -393,5 +400,13 @@ int main(int argc, char **argv)
 	if (flags & SUBCMD_FLAG_NEED_ARG && argc == optind)
 		subcommand_usage(argv[1], argv[2], EXIT_USAGE);
 
+	if (init_event(EPOLL_SIZE) < 0)
+		exit(EXIT_SYSFAIL);
+
+	if (init_work_queue(get_nr_nodes, NULL, NULL) != 0) {
+		fprintf(stderr, "Failed to init work queue\n");
+		exit(EXIT_SYSFAIL);
+	}
+
 	return command_fn(argc, argv);
 }
diff --git a/collie/vdi.c b/collie/vdi.c
index 91ed671..3520e2f 100644
--- a/collie/vdi.c
+++ b/collie/vdi.c
@@ -1439,50 +1439,164 @@ static void write_object_to(const struct sd_vnode *vnode, uint64_t oid,
 	}
 }
 
-/*
- * Fix consistency of the replica of oid.
- *
- * XXX: The fix is rather dumb, just read the random copy and write it
- * to other replica.
- */
-static void do_check_repair(uint64_t oid, int nr_copies)
+struct vdi_check_work {
+	struct vdi_check_info *info;
+	const struct sd_vnode *vnode;
+	uint8_t hash[SHA1_LEN];
+	bool object_found;
+	struct work work;
+};
+
+struct vdi_check_info {
+	uint64_t oid;
+	int nr_copies;
+	uint64_t total;
+	uint64_t *done;
+	int refcnt;
+	struct work_queue *wq;
+	struct vdi_check_work *base;
+	struct vdi_check_work vcw[0];
+};
+
+static void free_vdi_check_info(struct vdi_check_info *info)
 {
-	const struct sd_vnode *tgt_vnodes[SD_MAX_COPIES];
-	size_t size = get_objsize(oid);
-	void *buf = xmalloc(size), *buf_cmp;
-	int i, ret;
+	if (info->done) {
+		*info->done += SD_DATA_OBJ_SIZE;
+		show_progress(*info->done, info->total);
+	}
+	free(info);
+}
 
-	ret = sd_read_object(oid, buf, size, 0, true);
-	if (ret != SD_RES_SUCCESS) {
-		fprintf(stderr, "FATAL: read %"PRIx64" failed\n", oid);
+static void vdi_repair_work(struct work *work)
+{
+	struct vdi_check_work *vcw = container_of(work, struct vdi_check_work,
+						  work);
+	struct vdi_check_info *info = vcw->info;
+	void *buf;
+
+	buf = read_object_from(info->base->vnode, info->oid);
+	write_object_to(vcw->vnode, info->oid, buf, !vcw->object_found);
+	free(buf);
+}
+
+static void vdi_repair_main(struct work *work)
+{
+	struct vdi_check_work *vcw = container_of(work, struct vdi_check_work,
+						  work);
+	struct vdi_check_info *info = vcw->info;
+
+	if (vcw->object_found)
+		fprintf(stdout, "fixed replica %"PRIx64"\n", info->oid);
+	else
+		fprintf(stdout, "fixed missing %"PRIx64"\n", info->oid);
+
+	info->refcnt--;
+	if (info->refcnt == 0)
+		free_vdi_check_info(info);
+}
+
+static void vdi_hash_check_work(struct work *work)
+{
+	struct vdi_check_work *vcw = container_of(work, struct vdi_check_work,
+						  work);
+	struct vdi_check_info *info = vcw->info;
+	char host[HOST_NAME_MAX];
+	int ret;
+	struct sd_req hdr;
+	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+
+	sd_init_req(&hdr, SD_OP_GET_HASH);
+	hdr.obj.oid = info->oid;
+	hdr.obj.tgt_epoch = sd_epoch;
+
+	addr_to_str(host, sizeof(host), vcw->vnode->nid.addr, 0);
+	ret = collie_exec_req(host, vcw->vnode->nid.port, &hdr, NULL);
+	if (ret < 0)
+		exit(EXIT_SYSFAIL);
+
+	switch (ret) {
+	case SD_RES_SUCCESS:
+		vcw->object_found = true;
+		memcpy(vcw->hash, rsp->hash.digest, sizeof(vcw->hash));
+		uatomic_set(&info->base, vcw);
+		break;
+	case SD_RES_NO_OBJ:
+		vcw->object_found = false;
+		break;
+	default:
+		fprintf(stderr, "failed to read %"PRIx64" from %s:%d, %s\n",
+			info->oid, host, vcw->vnode->nid.port,
+			sd_strerror(ret));
 		exit(EXIT_FAILURE);
 	}
+}
 
-	oid_to_vnodes(sd_vnodes, sd_vnodes_nr, oid, nr_copies, tgt_vnodes);
-	for (i = 0; i < nr_copies; i++) {
-		buf_cmp = read_object_from(tgt_vnodes[i], oid);
-		if (!buf_cmp) {
-			write_object_to(tgt_vnodes[i], oid, buf, true);
-			fprintf(stdout, "fixed missing %"PRIx64"\n", oid);
+static void vdi_hash_check_main(struct work *work)
+{
+	struct vdi_check_work *vcw = container_of(work, struct vdi_check_work,
+						  work);
+	struct vdi_check_info *info = vcw->info;
+
+	info->refcnt--;
+	if (info->refcnt > 0)
+		return;
+
+	if (info->base  == NULL) {
+		fprintf(stderr, "no node has %"PRIx64"\n", info->oid);
+		exit(EXIT_FAILURE);
+	}
+
+	for (int i = 0; i < info->nr_copies; i++) {
+		if (&info->vcw[i] == info->base)
 			continue;
+		/* need repair when object not found or consistency broken */
+		if (!info->vcw[i].object_found ||
+		    memcmp(info->base->hash, info->vcw[i].hash,
+			   sizeof(info->base->hash)) != 0) {
+			info->vcw[i].work.fn = vdi_repair_work;
+			info->vcw[i].work.done = vdi_repair_main;
+			info->refcnt++;
+			queue_work(info->wq, &info->vcw[i].work);
 		}
-		if (memcmp(buf, buf_cmp, size)) {
-			write_object_to(tgt_vnodes[i], oid, buf, false);
-			fprintf(stdout, "fixed replica %"PRIx64"\n", oid);
-		}
-		free(buf_cmp);
 	}
-	free(buf);
-	return;
+
+	if (info->refcnt == 0)
+		free_vdi_check_info(info);
+}
+
+static void queue_vdi_check_work(struct sd_inode *inode, uint64_t oid,
+				 uint64_t *done, struct work_queue *wq)
+{
+	struct vdi_check_info *info;
+	const struct sd_vnode *tgt_vnodes[SD_MAX_COPIES];
+	int nr_copies = inode->nr_copies;
+
+	info = xzalloc(sizeof(*info) + sizeof(info->vcw[0]) * nr_copies);
+	info->oid = oid;
+	info->nr_copies = nr_copies;
+	info->total = inode->vdi_size;
+	info->done = done;
+	info->wq = wq;
+
+	oid_to_vnodes(sd_vnodes, sd_vnodes_nr, oid, nr_copies, tgt_vnodes);
+	for (int i = 0; i < nr_copies; i++) {
+		info->vcw[i].info = info;
+		info->vcw[i].vnode = tgt_vnodes[i];
+		info->vcw[i].work.fn = vdi_hash_check_work;
+		info->vcw[i].work.done = vdi_hash_check_main;
+		info->refcnt++;
+		queue_work(info->wq, &info->vcw[i].work);
+	}
 }
 
 static int vdi_check(int argc, char **argv)
 {
 	const char *vdiname = argv[optind++];
-	int ret;
-	uint64_t total, done = 0, oid;
-	uint32_t idx = 0, vid;
+	int ret, max_idx;
+	uint64_t done = 0, oid;
+	uint32_t vid;
 	struct sd_inode *inode = xmalloc(sizeof(*inode));
+	struct work_queue *wq;
 
 	ret = read_vdi_obj(vdiname, vdi_cmd_data.snapshot_id,
 			   vdi_cmd_data.snapshot_tag, &vid, inode,
@@ -1497,19 +1611,24 @@ static int vdi_check(int argc, char **argv)
 		return EXIT_FAILURE;
 	}
 
-	do_check_repair(vid_to_vdi_oid(vid), inode->nr_copies);
-	total = inode->vdi_size;
-	while (done < total) {
-		show_progress(done, total);
+	wq = create_work_queue("vdi check", WQ_DYNAMIC);
+
+	queue_vdi_check_work(inode, vid_to_vdi_oid(vid), NULL, wq);
+
+	max_idx = DIV_ROUND_UP(inode->vdi_size, SD_DATA_OBJ_SIZE);
+	show_progress(done, inode->vdi_size);
+	for (int idx = 0; idx < max_idx; idx++) {
 		vid = inode->data_vdi_id[idx];
 		if (vid) {
 			oid = vid_to_data_oid(vid, idx);
-			do_check_repair(oid, inode->nr_copies);
+			queue_vdi_check_work(inode, oid, &done, wq);
+		} else {
+			done += SD_DATA_OBJ_SIZE;
+			show_progress(done, inode->vdi_size);
 		}
-		done += SD_DATA_OBJ_SIZE;
-		idx++;
 	}
-	show_progress(done, total);
+
+	work_queue_wait(wq);
 
 	fprintf(stdout, "finish check&repair %s\n", vdiname);
 	return EXIT_SUCCESS;
-- 
1.7.9.5




More information about the sheepdog mailing list