[sheepdog] [PATCH] farm: parallize cluster-wide snapshot by work queue
Kai Zhang
kyle at zelin.io
Thu May 30 14:23:32 CEST 2013
Signed-off-by: Kai Zhang <kyle at zelin.io>
---
collie/farm/farm.c | 184 ++++++++++++++++++++++++++++++++------------
collie/farm/farm.h | 12 +--
collie/farm/object_tree.c | 15 +---
collie/farm/trunk.c | 15 +---
4 files changed, 146 insertions(+), 80 deletions(-)
diff --git a/collie/farm/farm.c b/collie/farm/farm.c
index cb4aaa9..bb7b36c 100644
--- a/collie/farm/farm.c
+++ b/collie/farm/farm.c
@@ -29,6 +29,16 @@ struct vdi_entry {
};
static LIST_HEAD(last_vdi_list);
+struct snapshot_work {
+ uint64_t oid;
+ int nr_copies;
+ unsigned char sha1[SHA1_LEN];
+ struct strbuf *trunk_entries;
+ struct work work;
+};
+struct work_queue *wq;
+static uatomic_bool work_error;
+
static struct vdi_entry *find_vdi(const char *name)
{
struct vdi_entry *vdi;
@@ -186,39 +196,6 @@ static int notify_vdi_add(uint32_t vdi_id, uint32_t nr_copies)
return ret;
}
-static int fill_trunk_entry(uint64_t oid, int nr_copies,
- void *buf, size_t size, void *data)
-{
- int ret = -1;
-
- struct strbuf *trunk_entries = data;
- struct trunk_entry new_entry = {};
- struct sha1_file_hdr hdr = { .priv = 0 };
- struct strbuf object_strbuf = STRBUF_INIT;
-
- memcpy(hdr.tag, TAG_DATA, TAG_LEN);
- hdr.size = size;
-
- strbuf_add(&object_strbuf, buf, size);
- strbuf_insert(&object_strbuf, 0, &hdr, sizeof(hdr));
-
- if (sha1_file_write((void *)object_strbuf.buf,
- object_strbuf.len,
- new_entry.sha1) != 0)
- goto out;
-
- new_entry.oid = oid;
- new_entry.nr_copies = nr_copies;
- strbuf_add(trunk_entries, &new_entry, sizeof(struct trunk_entry));
-
- ret = 0;
-out:
- if (ret)
- fprintf(stderr, "Fail to fill trunk entry\n.");
- strbuf_release(&object_strbuf);
- return ret;
-}
-
int farm_init(const char *path)
{
int ret = -1;
@@ -240,14 +217,78 @@ bool farm_contain_snapshot(uint32_t idx, const char *tag)
return (get_trunk_sha1(idx, tag, trunk_sha1) == 0);
}
+static void save_object_main(struct work *work)
+{
+ void *sha1_buf, *data_buf;
+ size_t sha1_size, data_size;
+ struct snapshot_work *sw;
+ struct sha1_file_hdr *hdr;
+
+ if (uatomic_is_true(&work_error))
+ return;
+
+ sw = container_of(work, struct snapshot_work, work);
+ data_size = get_objsize(sw->oid);
+ sha1_size = data_size + sizeof(struct sha1_file_hdr);
+ hdr = sha1_buf = xmalloc(sha1_size);
+ data_buf = (char *)sha1_buf + sizeof(struct sha1_file_hdr);
+
+ if (sd_read_object(sw->oid, data_buf, data_size, 0, true) < 0)
+ goto error;
+
+ memcpy(hdr->tag, TAG_DATA, TAG_LEN);
+ hdr->size = data_size;
+
+ if (sha1_file_write(sha1_buf, sha1_size, sw->sha1) < 0)
+ goto error;
+
+ goto out;
+error:
+ fprintf(stderr, "Fail to save object, oid %"PRIu64"\n",
+ sw->oid);
+ uatomic_set_true(&work_error);
+out:
+ free(sha1_buf);
+}
+
+static void save_object_done(struct work *work)
+{
+ struct trunk_entry entry;
+ struct snapshot_work *sw = container_of(work, struct snapshot_work,
+ work);
+
+ if (uatomic_is_true(&work_error))
+ goto out;
+
+ entry.oid = sw->oid;
+ entry.nr_copies = sw->nr_copies;
+ memcpy(entry.sha1, sw->sha1, SHA1_LEN);
+ strbuf_add(sw->trunk_entries, &entry, sizeof(struct trunk_entry));
+out:
+ free(sw);
+}
+
+static int queue_save_snapshot_work(uint64_t oid, int nr_copies, void *data)
+{
+ struct snapshot_work *sw = xzalloc(sizeof(struct snapshot_work));
+ struct strbuf *trunk_entries = data;
+ sw->oid = oid;
+ sw->nr_copies = nr_copies;
+ sw->trunk_entries = trunk_entries;
+ sw->work.fn = save_object_main;
+ sw->work.done = save_object_done;
+ queue_work(wq, &sw->work);
+
+ return 0;
+}
+
int farm_save_snapshot(const char *tag)
{
unsigned char snap_sha1[SHA1_LEN];
unsigned char trunk_sha1[SHA1_LEN];
struct strbuf trunk_entries = STRBUF_INIT;
void *snap_log = NULL;
- int log_nr, idx;
- int ret = -1;
+ int log_nr, idx, ret = -1;
snap_log = snap_log_read(&log_nr);
if (!snap_log)
@@ -255,7 +296,13 @@ int farm_save_snapshot(const char *tag)
idx = log_nr + 1;
- if (for_each_object_in_tree(fill_trunk_entry, &trunk_entries) < 0)
+ wq = create_work_queue("save snapshot", WQ_DYNAMIC);
+ if (for_each_object_in_tree(queue_save_snapshot_work,
+ (void *)&trunk_entries) < 0)
+ goto out;
+
+ work_queue_wait(wq);
+ if (uatomic_is_true(&work_error))
goto out;
if (trunk_file_write(trunk_sha1, &trunk_entries) < 0)
@@ -264,7 +311,7 @@ int farm_save_snapshot(const char *tag)
if (snap_file_write(idx, trunk_sha1, snap_sha1) < 0)
goto out;
- if (snap_log_write(idx, tag, snap_sha1) != 0)
+ if (snap_log_write(idx, tag, snap_sha1) < 0)
goto out;
ret = 0;
@@ -274,26 +321,57 @@ out:
return ret;
}
-static int restore_object(uint64_t oid, int nr_copies,
- void *buffer, size_t size, void *data)
+static void load_object_main(struct work *work)
{
- int ret = -1;
+ void *buffer = NULL;
+ struct sha1_file_hdr hdr;
+ struct snapshot_work *sw;
- if (sd_write_object(oid, 0, buffer, size, 0, 0,
- nr_copies, true, true) != 0)
- goto out;
+ if (uatomic_is_true(&work_error))
+ return;
- if (is_vdi_obj(oid)) {
- if (notify_vdi_add(oid_to_vid(oid), nr_copies) < 0)
- goto out;
+ sw = container_of(work, struct snapshot_work, work);
+ buffer = sha1_file_read(sw->sha1, &hdr);
+ if (!buffer)
+ goto error;
+
+ if (sd_write_object(sw->oid, 0, buffer, hdr.size, 0, 0,
+ sw->nr_copies, true, true) != 0)
+ goto error;
+
+ if (is_vdi_obj(sw->oid)) {
+ if (notify_vdi_add(oid_to_vid(sw->oid), sw->nr_copies) < 0)
+ goto error;
insert_vdi(buffer);
}
- ret = 0;
+ goto out;
+error:
+ fprintf(stderr, "Fail to load object, oid %"PRIu64"\n", sw->oid);
+ uatomic_set_true(&work_error);
out:
- if (ret)
- fprintf(stderr, "Fail to restore object, oid %"PRIu64"\n", oid);
+ free(buffer);
+ return;
+}
+
+static void load_object_done(struct work *work)
+{
+ struct snapshot_work *sw = container_of(work, struct snapshot_work,
+ work);
+ free(sw);
+}
+
+static int queue_load_snapshot_work(struct trunk_entry *entry, void *data)
+{
+ struct snapshot_work *sw = xzalloc(sizeof(struct snapshot_work));
+ sw->oid = entry->oid;
+ sw->nr_copies = entry->nr_copies;
+ memcpy(sw->sha1, entry->sha1, SHA1_LEN);
+ sw->work.fn = load_object_main;
+ sw->work.done = load_object_done;
+ queue_work(wq, &sw->work);
+
return 0;
}
@@ -305,7 +383,13 @@ int farm_load_snapshot(uint32_t idx, const char *tag)
if (get_trunk_sha1(idx, tag, trunk_sha1) < 0)
goto out;
- if (for_each_object_in_trunk(trunk_sha1, restore_object, NULL) < 0)
+ wq = create_work_queue("load snapshot", WQ_DYNAMIC);
+ if (for_each_entry_in_trunk(trunk_sha1, queue_load_snapshot_work,
+ NULL) < 0)
+ goto out;
+
+ work_queue_wait(wq);
+ if (uatomic_is_true(&work_error))
goto out;
if (create_active_vdis() < 0)
diff --git a/collie/farm/farm.h b/collie/farm/farm.h
index 77ac59e..ebedfb1 100644
--- a/collie/farm/farm.h
+++ b/collie/farm/farm.h
@@ -38,9 +38,6 @@ struct sha1_file_hdr {
uint64_t reserved;
};
-typedef int (*object_handler_func_t)(uint64_t oid, int nr_copies, void *buf,
- size_t size, void *data);
-
/* farm.c */
int farm_init(const char *path);
bool farm_contain_snapshot(uint32_t idx, const char *tag);
@@ -52,8 +49,9 @@ char *get_object_directory(void);
int trunk_init(void);
int trunk_file_write(unsigned char *trunk_sha1, struct strbuf *trunk_entries);
void *trunk_file_read(unsigned char *sha1, struct sha1_file_hdr *);
-int for_each_object_in_trunk(unsigned char *trunk_sha1,
- object_handler_func_t func, void *data);
+int for_each_entry_in_trunk(unsigned char *trunk_sha1,
+ int (*func)(struct trunk_entry *entry, void *data),
+ void *data);
/* snap.c */
int snap_init(const char *path);
@@ -74,6 +72,6 @@ int object_tree_size(void);
void object_tree_insert(uint64_t oid, int nr_copies);
void object_tree_free(void);
void object_tree_print(void);
-int for_each_object_in_tree(object_handler_func_t func, void *data);
-
+int for_each_object_in_tree(int (*func)(uint64_t oid, int nr_copies,
+ void *data), void *data);
#endif
diff --git a/collie/farm/object_tree.c b/collie/farm/object_tree.c
index f719af6..97cb3e7 100644
--- a/collie/farm/object_tree.c
+++ b/collie/farm/object_tree.c
@@ -103,31 +103,22 @@ int object_tree_size()
return tree.nr_objs;
}
-int for_each_object_in_tree(object_handler_func_t func, void *data)
+int for_each_object_in_tree(int (*func)(uint64_t oid, int nr_copies,
+ void *data), void *data)
{
struct rb_node *p = rb_first(&tree.root);
struct object_tree_entry *entry;
- uint64_t oid;
- size_t size;
- void *buf = xmalloc(max(SD_INODE_SIZE, SD_DATA_OBJ_SIZE));
int ret = -1;
while (p) {
entry = rb_entry(p, struct object_tree_entry, node);
- oid = entry->oid;
- size = get_objsize(oid);
-
- if (sd_read_object(oid, buf, size, 0, true) < 0)
- goto out;
- if (func(oid, entry->nr_copies,
- buf, size, data) < 0)
+ if (func(entry->oid, entry->nr_copies, data) < 0)
goto out;
p = rb_next(p);
}
ret = 0;
out:
- free(buf);
return ret;
}
diff --git a/collie/farm/trunk.c b/collie/farm/trunk.c
index c0b416f..c2f5bbf 100644
--- a/collie/farm/trunk.c
+++ b/collie/farm/trunk.c
@@ -72,8 +72,9 @@ void *trunk_file_read(unsigned char *sha1, struct sha1_file_hdr *outhdr)
return buffer;
}
-int for_each_object_in_trunk(unsigned char *trunk_sha1,
- object_handler_func_t func, void *data)
+int for_each_entry_in_trunk(unsigned char *trunk_sha1,
+ int (*func)(struct trunk_entry *entry, void *data),
+ void *data)
{
struct trunk_entry *trunk_entry, *trunk_free = NULL;
struct sha1_file_hdr trunk_hdr;
@@ -87,15 +88,7 @@ int for_each_object_in_trunk(unsigned char *trunk_sha1,
nr_trunks = trunk_hdr.priv;
for (uint64_t i = 0; i < nr_trunks; i++, trunk_entry++) {
- struct sha1_file_hdr hdr;
- void *buffer = NULL;
-
- buffer = sha1_file_read(trunk_entry->sha1, &hdr);
- if (!buffer)
- goto out;
-
- if (func(trunk_entry->oid, trunk_entry->nr_copies,
- buffer, hdr.size, data) < 0)
+ if (func(trunk_entry, data) < 0)
goto out;
}
--
1.7.1
More information about the sheepdog
mailing list