[sheepdog] [PATCH] farm: parallize cluster-wide snapshot by work queue

Kai Zhang kyle at zelin.io
Thu May 30 14:23:32 CEST 2013


Signed-off-by: Kai Zhang <kyle at zelin.io>
---
 collie/farm/farm.c        |  184 ++++++++++++++++++++++++++++++++------------
 collie/farm/farm.h        |   12 +--
 collie/farm/object_tree.c |   15 +---
 collie/farm/trunk.c       |   15 +---
 4 files changed, 146 insertions(+), 80 deletions(-)

diff --git a/collie/farm/farm.c b/collie/farm/farm.c
index cb4aaa9..bb7b36c 100644
--- a/collie/farm/farm.c
+++ b/collie/farm/farm.c
@@ -29,6 +29,16 @@ struct vdi_entry {
 };
 static LIST_HEAD(last_vdi_list);
 
+struct snapshot_work {
+	uint64_t oid;
+	int nr_copies;
+	unsigned char sha1[SHA1_LEN];
+	struct strbuf *trunk_entries;
+	struct work work;
+};
+struct work_queue *wq;
+static uatomic_bool work_error;
+
 static struct vdi_entry *find_vdi(const char *name)
 {
 	struct vdi_entry *vdi;
@@ -186,39 +196,6 @@ static int notify_vdi_add(uint32_t vdi_id, uint32_t nr_copies)
 	return ret;
 }
 
-static int fill_trunk_entry(uint64_t oid, int nr_copies,
-			    void *buf, size_t size, void *data)
-{
-	int ret = -1;
-
-	struct strbuf *trunk_entries = data;
-	struct trunk_entry new_entry = {};
-	struct sha1_file_hdr hdr = { .priv = 0 };
-	struct strbuf object_strbuf = STRBUF_INIT;
-
-	memcpy(hdr.tag, TAG_DATA, TAG_LEN);
-	hdr.size = size;
-
-	strbuf_add(&object_strbuf, buf, size);
-	strbuf_insert(&object_strbuf, 0, &hdr, sizeof(hdr));
-
-	if (sha1_file_write((void *)object_strbuf.buf,
-			    object_strbuf.len,
-			    new_entry.sha1) != 0)
-		goto out;
-
-	new_entry.oid = oid;
-	new_entry.nr_copies = nr_copies;
-	strbuf_add(trunk_entries, &new_entry, sizeof(struct trunk_entry));
-
-	ret = 0;
-out:
-	if (ret)
-		fprintf(stderr, "Fail to fill trunk entry\n.");
-	strbuf_release(&object_strbuf);
-	return ret;
-}
-
 int farm_init(const char *path)
 {
 	int ret = -1;
@@ -240,14 +217,78 @@ bool farm_contain_snapshot(uint32_t idx, const char *tag)
 	return (get_trunk_sha1(idx, tag, trunk_sha1) == 0);
 }
 
+static void save_object_main(struct work *work)
+{
+	void *sha1_buf, *data_buf;
+	size_t sha1_size, data_size;
+	struct snapshot_work *sw;
+	struct sha1_file_hdr *hdr;
+
+	if (uatomic_is_true(&work_error))
+		return;
+
+	sw = container_of(work, struct snapshot_work, work);
+	data_size = get_objsize(sw->oid);
+	sha1_size = data_size + sizeof(struct sha1_file_hdr);
+	hdr = sha1_buf = xmalloc(sha1_size);
+	data_buf = (char *)sha1_buf + sizeof(struct sha1_file_hdr);
+
+	if (sd_read_object(sw->oid, data_buf, data_size, 0, true) < 0)
+		goto error;
+
+	memcpy(hdr->tag, TAG_DATA, TAG_LEN);
+	hdr->size = data_size;
+
+	if (sha1_file_write(sha1_buf, sha1_size, sw->sha1) < 0)
+		goto error;
+
+	goto out;
+error:
+	fprintf(stderr, "Fail to save object, oid %"PRIu64"\n",
+		sw->oid);
+	uatomic_set_true(&work_error);
+out:
+	free(sha1_buf);
+}
+
+static void save_object_done(struct work *work)
+{
+	struct trunk_entry entry;
+	struct snapshot_work *sw = container_of(work, struct snapshot_work,
+						work);
+
+	if (uatomic_is_true(&work_error))
+		goto out;
+
+	entry.oid = sw->oid;
+	entry.nr_copies = sw->nr_copies;
+	memcpy(entry.sha1, sw->sha1, SHA1_LEN);
+	strbuf_add(sw->trunk_entries, &entry, sizeof(struct trunk_entry));
+out:
+	free(sw);
+}
+
+static int queue_save_snapshot_work(uint64_t oid, int nr_copies, void *data)
+{
+	struct snapshot_work *sw = xzalloc(sizeof(struct snapshot_work));
+	struct strbuf *trunk_entries = data;
+	sw->oid = oid;
+	sw->nr_copies = nr_copies;
+	sw->trunk_entries = trunk_entries;
+	sw->work.fn = save_object_main;
+	sw->work.done = save_object_done;
+	queue_work(wq, &sw->work);
+
+	return 0;
+}
+
 int farm_save_snapshot(const char *tag)
 {
 	unsigned char snap_sha1[SHA1_LEN];
 	unsigned char trunk_sha1[SHA1_LEN];
 	struct strbuf trunk_entries = STRBUF_INIT;
 	void *snap_log = NULL;
-	int log_nr, idx;
-	int ret = -1;
+	int log_nr, idx, ret = -1;
 
 	snap_log = snap_log_read(&log_nr);
 	if (!snap_log)
@@ -255,7 +296,13 @@ int farm_save_snapshot(const char *tag)
 
 	idx = log_nr + 1;
 
-	if (for_each_object_in_tree(fill_trunk_entry, &trunk_entries) < 0)
+	wq = create_work_queue("save snapshot", WQ_DYNAMIC);
+	if (for_each_object_in_tree(queue_save_snapshot_work,
+				    (void *)&trunk_entries) < 0)
+		goto out;
+
+	work_queue_wait(wq);
+	if (uatomic_is_true(&work_error))
 		goto out;
 
 	if (trunk_file_write(trunk_sha1, &trunk_entries) < 0)
@@ -264,7 +311,7 @@ int farm_save_snapshot(const char *tag)
 	if (snap_file_write(idx, trunk_sha1, snap_sha1) < 0)
 		goto out;
 
-	if (snap_log_write(idx, tag, snap_sha1) != 0)
+	if (snap_log_write(idx, tag, snap_sha1) < 0)
 		goto out;
 
 	ret = 0;
@@ -274,26 +321,57 @@ out:
 	return ret;
 }
 
-static int restore_object(uint64_t oid, int nr_copies,
-			void *buffer, size_t size, void *data)
+static void load_object_main(struct work *work)
 {
-	int ret = -1;
+	void *buffer = NULL;
+	struct sha1_file_hdr hdr;
+	struct snapshot_work *sw;
 
-	if (sd_write_object(oid, 0, buffer, size, 0, 0,
-			    nr_copies, true, true) != 0)
-		goto out;
+	if (uatomic_is_true(&work_error))
+		return;
 
-	if (is_vdi_obj(oid)) {
-		if (notify_vdi_add(oid_to_vid(oid), nr_copies) < 0)
-			goto out;
+	sw = container_of(work, struct snapshot_work, work);
+	buffer = sha1_file_read(sw->sha1, &hdr);
+	if (!buffer)
+		goto error;
+
+	if (sd_write_object(sw->oid, 0, buffer, hdr.size, 0, 0,
+			    sw->nr_copies, true, true) != 0)
+		goto error;
+
+	if (is_vdi_obj(sw->oid)) {
+		if (notify_vdi_add(oid_to_vid(sw->oid), sw->nr_copies) < 0)
+			goto error;
 
 		insert_vdi(buffer);
 	}
 
-	ret = 0;
+	goto out;
+error:
+	fprintf(stderr, "Fail to load object, oid %"PRIu64"\n", sw->oid);
+	uatomic_set_true(&work_error);
 out:
-	if (ret)
-		fprintf(stderr, "Fail to restore object, oid %"PRIu64"\n", oid);
+	free(buffer);
+	return;
+}
+
+static void load_object_done(struct work *work)
+{
+	struct snapshot_work *sw = container_of(work, struct snapshot_work,
+						work);
+	free(sw);
+}
+
+static int queue_load_snapshot_work(struct trunk_entry *entry, void *data)
+{
+	struct snapshot_work *sw = xzalloc(sizeof(struct snapshot_work));
+	sw->oid = entry->oid;
+	sw->nr_copies = entry->nr_copies;
+	memcpy(sw->sha1, entry->sha1, SHA1_LEN);
+	sw->work.fn = load_object_main;
+	sw->work.done = load_object_done;
+	queue_work(wq, &sw->work);
+
 	return 0;
 }
 
@@ -305,7 +383,13 @@ int farm_load_snapshot(uint32_t idx, const char *tag)
 	if (get_trunk_sha1(idx, tag, trunk_sha1) < 0)
 		goto out;
 
-	if (for_each_object_in_trunk(trunk_sha1, restore_object, NULL) < 0)
+	wq = create_work_queue("load snapshot", WQ_DYNAMIC);
+	if (for_each_entry_in_trunk(trunk_sha1, queue_load_snapshot_work,
+				    NULL) < 0)
+		goto out;
+
+	work_queue_wait(wq);
+	if (uatomic_is_true(&work_error))
 		goto out;
 
 	if (create_active_vdis() < 0)
diff --git a/collie/farm/farm.h b/collie/farm/farm.h
index 77ac59e..ebedfb1 100644
--- a/collie/farm/farm.h
+++ b/collie/farm/farm.h
@@ -38,9 +38,6 @@ struct sha1_file_hdr {
 	uint64_t reserved;
 };
 
-typedef int (*object_handler_func_t)(uint64_t oid, int nr_copies, void *buf,
-				     size_t size, void *data);
-
 /* farm.c */
 int farm_init(const char *path);
 bool farm_contain_snapshot(uint32_t idx, const char *tag);
@@ -52,8 +49,9 @@ char *get_object_directory(void);
 int trunk_init(void);
 int trunk_file_write(unsigned char *trunk_sha1, struct strbuf *trunk_entries);
 void *trunk_file_read(unsigned char *sha1, struct sha1_file_hdr *);
-int for_each_object_in_trunk(unsigned char *trunk_sha1,
-			     object_handler_func_t func, void *data);
+int for_each_entry_in_trunk(unsigned char *trunk_sha1,
+			    int (*func)(struct trunk_entry *entry, void *data),
+			    void *data);
 
 /* snap.c */
 int snap_init(const char *path);
@@ -74,6 +72,6 @@ int object_tree_size(void);
 void object_tree_insert(uint64_t oid, int nr_copies);
 void object_tree_free(void);
 void object_tree_print(void);
-int for_each_object_in_tree(object_handler_func_t func, void *data);
-
+int for_each_object_in_tree(int (*func)(uint64_t oid, int nr_copies,
+					void *data), void *data);
 #endif
diff --git a/collie/farm/object_tree.c b/collie/farm/object_tree.c
index f719af6..97cb3e7 100644
--- a/collie/farm/object_tree.c
+++ b/collie/farm/object_tree.c
@@ -103,31 +103,22 @@ int object_tree_size()
 	return tree.nr_objs;
 }
 
-int for_each_object_in_tree(object_handler_func_t func, void *data)
+int for_each_object_in_tree(int (*func)(uint64_t oid, int nr_copies,
+					void *data), void *data)
 {
 	struct rb_node *p = rb_first(&tree.root);
 	struct object_tree_entry *entry;
-	uint64_t oid;
-	size_t size;
-	void *buf = xmalloc(max(SD_INODE_SIZE, SD_DATA_OBJ_SIZE));
 	int ret = -1;
 
 	while (p) {
 		entry = rb_entry(p, struct object_tree_entry, node);
-		oid = entry->oid;
-		size = get_objsize(oid);
-
-		if (sd_read_object(oid, buf, size, 0, true) < 0)
-			goto out;
 
-		if (func(oid, entry->nr_copies,
-			 buf, size, data) < 0)
+		if (func(entry->oid, entry->nr_copies, data) < 0)
 			goto out;
 
 		p = rb_next(p);
 	}
 	ret = 0;
 out:
-	free(buf);
 	return ret;
 }
diff --git a/collie/farm/trunk.c b/collie/farm/trunk.c
index c0b416f..c2f5bbf 100644
--- a/collie/farm/trunk.c
+++ b/collie/farm/trunk.c
@@ -72,8 +72,9 @@ void *trunk_file_read(unsigned char *sha1, struct sha1_file_hdr *outhdr)
 	return buffer;
 }
 
-int for_each_object_in_trunk(unsigned char *trunk_sha1,
-			     object_handler_func_t func, void *data)
+int for_each_entry_in_trunk(unsigned char *trunk_sha1,
+			    int (*func)(struct trunk_entry *entry, void *data),
+			    void *data)
 {
 	struct trunk_entry *trunk_entry, *trunk_free = NULL;
 	struct sha1_file_hdr trunk_hdr;
@@ -87,15 +88,7 @@ int for_each_object_in_trunk(unsigned char *trunk_sha1,
 
 	nr_trunks = trunk_hdr.priv;
 	for (uint64_t i = 0; i < nr_trunks; i++, trunk_entry++) {
-		struct sha1_file_hdr hdr;
-		void *buffer = NULL;
-
-		buffer = sha1_file_read(trunk_entry->sha1, &hdr);
-		if (!buffer)
-			goto out;
-
-		if (func(trunk_entry->oid, trunk_entry->nr_copies,
-			 buffer, hdr.size, data) < 0)
+		if (func(trunk_entry, data) < 0)
 			goto out;
 	}
 
-- 
1.7.1




More information about the sheepdog mailing list