Signed-off-by: Kai Zhang <kyle at zelin.io> --- collie/farm/farm.c | 184 ++++++++++++++++++++++++++++++++------------ collie/farm/farm.h | 12 +-- collie/farm/object_tree.c | 15 +--- collie/farm/trunk.c | 15 +--- 4 files changed, 146 insertions(+), 80 deletions(-) diff --git a/collie/farm/farm.c b/collie/farm/farm.c index cb4aaa9..bb7b36c 100644 --- a/collie/farm/farm.c +++ b/collie/farm/farm.c @@ -29,6 +29,16 @@ struct vdi_entry { }; static LIST_HEAD(last_vdi_list); +struct snapshot_work { + uint64_t oid; + int nr_copies; + unsigned char sha1[SHA1_LEN]; + struct strbuf *trunk_entries; + struct work work; +}; +struct work_queue *wq; +static uatomic_bool work_error; + static struct vdi_entry *find_vdi(const char *name) { struct vdi_entry *vdi; @@ -186,39 +196,6 @@ static int notify_vdi_add(uint32_t vdi_id, uint32_t nr_copies) return ret; } -static int fill_trunk_entry(uint64_t oid, int nr_copies, - void *buf, size_t size, void *data) -{ - int ret = -1; - - struct strbuf *trunk_entries = data; - struct trunk_entry new_entry = {}; - struct sha1_file_hdr hdr = { .priv = 0 }; - struct strbuf object_strbuf = STRBUF_INIT; - - memcpy(hdr.tag, TAG_DATA, TAG_LEN); - hdr.size = size; - - strbuf_add(&object_strbuf, buf, size); - strbuf_insert(&object_strbuf, 0, &hdr, sizeof(hdr)); - - if (sha1_file_write((void *)object_strbuf.buf, - object_strbuf.len, - new_entry.sha1) != 0) - goto out; - - new_entry.oid = oid; - new_entry.nr_copies = nr_copies; - strbuf_add(trunk_entries, &new_entry, sizeof(struct trunk_entry)); - - ret = 0; -out: - if (ret) - fprintf(stderr, "Fail to fill trunk entry\n."); - strbuf_release(&object_strbuf); - return ret; -} - int farm_init(const char *path) { int ret = -1; @@ -240,14 +217,78 @@ bool farm_contain_snapshot(uint32_t idx, const char *tag) return (get_trunk_sha1(idx, tag, trunk_sha1) == 0); } +static void save_object_main(struct work *work) +{ + void *sha1_buf, *data_buf; + size_t sha1_size, data_size; + struct snapshot_work *sw; + struct sha1_file_hdr *hdr; + + if (uatomic_is_true(&work_error)) + return; + + sw = container_of(work, struct snapshot_work, work); + data_size = get_objsize(sw->oid); + sha1_size = data_size + sizeof(struct sha1_file_hdr); + hdr = sha1_buf = xmalloc(sha1_size); + data_buf = (char *)sha1_buf + sizeof(struct sha1_file_hdr); + + if (sd_read_object(sw->oid, data_buf, data_size, 0, true) < 0) + goto error; + + memcpy(hdr->tag, TAG_DATA, TAG_LEN); + hdr->size = data_size; + + if (sha1_file_write(sha1_buf, sha1_size, sw->sha1) < 0) + goto error; + + goto out; +error: + fprintf(stderr, "Fail to save object, oid %"PRIu64"\n", + sw->oid); + uatomic_set_true(&work_error); +out: + free(sha1_buf); +} + +static void save_object_done(struct work *work) +{ + struct trunk_entry entry; + struct snapshot_work *sw = container_of(work, struct snapshot_work, + work); + + if (uatomic_is_true(&work_error)) + goto out; + + entry.oid = sw->oid; + entry.nr_copies = sw->nr_copies; + memcpy(entry.sha1, sw->sha1, SHA1_LEN); + strbuf_add(sw->trunk_entries, &entry, sizeof(struct trunk_entry)); +out: + free(sw); +} + +static int queue_save_snapshot_work(uint64_t oid, int nr_copies, void *data) +{ + struct snapshot_work *sw = xzalloc(sizeof(struct snapshot_work)); + struct strbuf *trunk_entries = data; + sw->oid = oid; + sw->nr_copies = nr_copies; + sw->trunk_entries = trunk_entries; + sw->work.fn = save_object_main; + sw->work.done = save_object_done; + queue_work(wq, &sw->work); + + return 0; +} + int farm_save_snapshot(const char *tag) { unsigned char snap_sha1[SHA1_LEN]; unsigned char trunk_sha1[SHA1_LEN]; struct strbuf trunk_entries = STRBUF_INIT; void *snap_log = NULL; - int log_nr, idx; - int ret = -1; + int log_nr, idx, ret = -1; snap_log = snap_log_read(&log_nr); if (!snap_log) @@ -255,7 +296,13 @@ int farm_save_snapshot(const char *tag) idx = log_nr + 1; - if (for_each_object_in_tree(fill_trunk_entry, &trunk_entries) < 0) + wq = create_work_queue("save snapshot", WQ_DYNAMIC); + if (for_each_object_in_tree(queue_save_snapshot_work, + (void *)&trunk_entries) < 0) + goto out; + + work_queue_wait(wq); + if (uatomic_is_true(&work_error)) goto out; if (trunk_file_write(trunk_sha1, &trunk_entries) < 0) @@ -264,7 +311,7 @@ int farm_save_snapshot(const char *tag) if (snap_file_write(idx, trunk_sha1, snap_sha1) < 0) goto out; - if (snap_log_write(idx, tag, snap_sha1) != 0) + if (snap_log_write(idx, tag, snap_sha1) < 0) goto out; ret = 0; @@ -274,26 +321,57 @@ out: return ret; } -static int restore_object(uint64_t oid, int nr_copies, - void *buffer, size_t size, void *data) +static void load_object_main(struct work *work) { - int ret = -1; + void *buffer = NULL; + struct sha1_file_hdr hdr; + struct snapshot_work *sw; - if (sd_write_object(oid, 0, buffer, size, 0, 0, - nr_copies, true, true) != 0) - goto out; + if (uatomic_is_true(&work_error)) + return; - if (is_vdi_obj(oid)) { - if (notify_vdi_add(oid_to_vid(oid), nr_copies) < 0) - goto out; + sw = container_of(work, struct snapshot_work, work); + buffer = sha1_file_read(sw->sha1, &hdr); + if (!buffer) + goto error; + + if (sd_write_object(sw->oid, 0, buffer, hdr.size, 0, 0, + sw->nr_copies, true, true) != 0) + goto error; + + if (is_vdi_obj(sw->oid)) { + if (notify_vdi_add(oid_to_vid(sw->oid), sw->nr_copies) < 0) + goto error; insert_vdi(buffer); } - ret = 0; + goto out; +error: + fprintf(stderr, "Fail to load object, oid %"PRIu64"\n", sw->oid); + uatomic_set_true(&work_error); out: - if (ret) - fprintf(stderr, "Fail to restore object, oid %"PRIu64"\n", oid); + free(buffer); + return; +} + +static void load_object_done(struct work *work) +{ + struct snapshot_work *sw = container_of(work, struct snapshot_work, + work); + free(sw); +} + +static int queue_load_snapshot_work(struct trunk_entry *entry, void *data) +{ + struct snapshot_work *sw = xzalloc(sizeof(struct snapshot_work)); + sw->oid = entry->oid; + sw->nr_copies = entry->nr_copies; + memcpy(sw->sha1, entry->sha1, SHA1_LEN); + sw->work.fn = load_object_main; + sw->work.done = load_object_done; + queue_work(wq, &sw->work); + return 0; } @@ -305,7 +383,13 @@ int farm_load_snapshot(uint32_t idx, const char *tag) if (get_trunk_sha1(idx, tag, trunk_sha1) < 0) goto out; - if (for_each_object_in_trunk(trunk_sha1, restore_object, NULL) < 0) + wq = create_work_queue("load snapshot", WQ_DYNAMIC); + if (for_each_entry_in_trunk(trunk_sha1, queue_load_snapshot_work, + NULL) < 0) + goto out; + + work_queue_wait(wq); + if (uatomic_is_true(&work_error)) goto out; if (create_active_vdis() < 0) diff --git a/collie/farm/farm.h b/collie/farm/farm.h index 77ac59e..ebedfb1 100644 --- a/collie/farm/farm.h +++ b/collie/farm/farm.h @@ -38,9 +38,6 @@ struct sha1_file_hdr { uint64_t reserved; }; -typedef int (*object_handler_func_t)(uint64_t oid, int nr_copies, void *buf, - size_t size, void *data); - /* farm.c */ int farm_init(const char *path); bool farm_contain_snapshot(uint32_t idx, const char *tag); @@ -52,8 +49,9 @@ char *get_object_directory(void); int trunk_init(void); int trunk_file_write(unsigned char *trunk_sha1, struct strbuf *trunk_entries); void *trunk_file_read(unsigned char *sha1, struct sha1_file_hdr *); -int for_each_object_in_trunk(unsigned char *trunk_sha1, - object_handler_func_t func, void *data); +int for_each_entry_in_trunk(unsigned char *trunk_sha1, + int (*func)(struct trunk_entry *entry, void *data), + void *data); /* snap.c */ int snap_init(const char *path); @@ -74,6 +72,6 @@ int object_tree_size(void); void object_tree_insert(uint64_t oid, int nr_copies); void object_tree_free(void); void object_tree_print(void); -int for_each_object_in_tree(object_handler_func_t func, void *data); - +int for_each_object_in_tree(int (*func)(uint64_t oid, int nr_copies, + void *data), void *data); #endif diff --git a/collie/farm/object_tree.c b/collie/farm/object_tree.c index f719af6..97cb3e7 100644 --- a/collie/farm/object_tree.c +++ b/collie/farm/object_tree.c @@ -103,31 +103,22 @@ int object_tree_size() return tree.nr_objs; } -int for_each_object_in_tree(object_handler_func_t func, void *data) +int for_each_object_in_tree(int (*func)(uint64_t oid, int nr_copies, + void *data), void *data) { struct rb_node *p = rb_first(&tree.root); struct object_tree_entry *entry; - uint64_t oid; - size_t size; - void *buf = xmalloc(max(SD_INODE_SIZE, SD_DATA_OBJ_SIZE)); int ret = -1; while (p) { entry = rb_entry(p, struct object_tree_entry, node); - oid = entry->oid; - size = get_objsize(oid); - - if (sd_read_object(oid, buf, size, 0, true) < 0) - goto out; - if (func(oid, entry->nr_copies, - buf, size, data) < 0) + if (func(entry->oid, entry->nr_copies, data) < 0) goto out; p = rb_next(p); } ret = 0; out: - free(buf); return ret; } diff --git a/collie/farm/trunk.c b/collie/farm/trunk.c index c0b416f..c2f5bbf 100644 --- a/collie/farm/trunk.c +++ b/collie/farm/trunk.c @@ -72,8 +72,9 @@ void *trunk_file_read(unsigned char *sha1, struct sha1_file_hdr *outhdr) return buffer; } -int for_each_object_in_trunk(unsigned char *trunk_sha1, - object_handler_func_t func, void *data) +int for_each_entry_in_trunk(unsigned char *trunk_sha1, + int (*func)(struct trunk_entry *entry, void *data), + void *data) { struct trunk_entry *trunk_entry, *trunk_free = NULL; struct sha1_file_hdr trunk_hdr; @@ -87,15 +88,7 @@ int for_each_object_in_trunk(unsigned char *trunk_sha1, nr_trunks = trunk_hdr.priv; for (uint64_t i = 0; i < nr_trunks; i++, trunk_entry++) { - struct sha1_file_hdr hdr; - void *buffer = NULL; - - buffer = sha1_file_read(trunk_entry->sha1, &hdr); - if (!buffer) - goto out; - - if (func(trunk_entry->oid, trunk_entry->nr_copies, - buffer, hdr.size, data) < 0) + if (func(trunk_entry, data) < 0) goto out; } -- 1.7.1 |