[sheepdog] [PATCH v2 1/6] farm: do cluster-wide snapshot by work queue
Kai Zhang
kyle at zelin.io
Sat Jun 8 04:02:28 CEST 2013
Currently, loading snapshot uses dynamic queue where saving snapshot uses ordered queue.
This is because there is a risk that parallelized saving snapshot could slow down cluster's performance.
We can easily parallelize this by using dynamic queue.
Signed-off-by: Kai Zhang <kyle at zelin.io>
---
collie/farm/farm.c | 188 +++++++++++++++++++++++++++++++++------------
collie/farm/farm.h | 11 +--
collie/farm/object_tree.c | 15 +---
collie/farm/sha1_file.c | 2 +-
collie/farm/trunk.c | 15 +---
5 files changed, 150 insertions(+), 81 deletions(-)
diff --git a/collie/farm/farm.c b/collie/farm/farm.c
index cb4aaa9..072ee16 100644
--- a/collie/farm/farm.c
+++ b/collie/farm/farm.c
@@ -13,12 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <pthread.h>
+
#include "farm.h"
#include "list.h"
static char farm_object_dir[PATH_MAX];
static char farm_dir[PATH_MAX];
+static pthread_rwlock_t vdi_list_lock = PTHREAD_RWLOCK_INITIALIZER;
struct vdi_entry {
char name[SD_MAX_VDI_LEN];
uint64_t vdi_size;
@@ -29,6 +32,14 @@ struct vdi_entry {
};
static LIST_HEAD(last_vdi_list);
+struct snapshot_work {
+ struct trunk_entry entry;
+ struct strbuf *trunk_buf;
+ struct work work;
+};
+struct work_queue *wq;
+static uatomic_bool work_error;
+
static struct vdi_entry *find_vdi(const char *name)
{
struct vdi_entry *vdi;
@@ -186,39 +197,6 @@ static int notify_vdi_add(uint32_t vdi_id, uint32_t nr_copies)
return ret;
}
-static int fill_trunk_entry(uint64_t oid, int nr_copies,
- void *buf, size_t size, void *data)
-{
- int ret = -1;
-
- struct strbuf *trunk_entries = data;
- struct trunk_entry new_entry = {};
- struct sha1_file_hdr hdr = { .priv = 0 };
- struct strbuf object_strbuf = STRBUF_INIT;
-
- memcpy(hdr.tag, TAG_DATA, TAG_LEN);
- hdr.size = size;
-
- strbuf_add(&object_strbuf, buf, size);
- strbuf_insert(&object_strbuf, 0, &hdr, sizeof(hdr));
-
- if (sha1_file_write((void *)object_strbuf.buf,
- object_strbuf.len,
- new_entry.sha1) != 0)
- goto out;
-
- new_entry.oid = oid;
- new_entry.nr_copies = nr_copies;
- strbuf_add(trunk_entries, &new_entry, sizeof(struct trunk_entry));
-
- ret = 0;
-out:
- if (ret)
- fprintf(stderr, "Fail to fill trunk entry\n.");
- strbuf_release(&object_strbuf);
- return ret;
-}
-
int farm_init(const char *path)
{
int ret = -1;
@@ -240,14 +218,75 @@ bool farm_contain_snapshot(uint32_t idx, const char *tag)
return (get_trunk_sha1(idx, tag, trunk_sha1) == 0);
}
+static void do_save_object(struct work *work)
+{
+ void *sha1_buf, *data_buf;
+ size_t sha1_size, data_size;
+ struct snapshot_work *sw;
+ struct sha1_file_hdr *hdr;
+
+ if (uatomic_is_true(&work_error))
+ return;
+
+ sw = container_of(work, struct snapshot_work, work);
+ data_size = get_objsize(sw->entry.oid);
+ sha1_size = data_size + sizeof(struct sha1_file_hdr);
+ hdr = sha1_buf = xmalloc(sha1_size);
+ data_buf = (char *)sha1_buf + sizeof(struct sha1_file_hdr);
+
+ if (sd_read_object(sw->entry.oid, data_buf, data_size, 0, true) < 0)
+ goto error;
+
+ memcpy(hdr->tag, TAG_DATA, TAG_LEN);
+ hdr->size = data_size;
+
+ if (sha1_file_write(sha1_buf, sha1_size, sw->entry.sha1) < 0)
+ goto error;
+
+ free(sha1_buf);
+ return;
+error:
+ free(sha1_buf);
+ fprintf(stderr, "Fail to save object, oid %"PRIu64"\n",
+ sw->entry.oid);
+ uatomic_set_true(&work_error);
+}
+
+static void save_object_done(struct work *work)
+{
+ struct snapshot_work *sw = container_of(work, struct snapshot_work,
+ work);
+
+ if (uatomic_is_true(&work_error))
+ goto out;
+
+ strbuf_add(sw->trunk_buf, &sw->entry, sizeof(struct trunk_entry));
+out:
+ free(sw);
+}
+
+static int queue_save_snapshot_work(uint64_t oid, int nr_copies, void *data)
+{
+ struct snapshot_work *sw = xzalloc(sizeof(struct snapshot_work));
+ struct strbuf *trunk_buf = data;
+
+ sw->entry.oid = oid;
+ sw->entry.nr_copies = nr_copies;
+ sw->trunk_buf = trunk_buf;
+ sw->work.fn = do_save_object;
+ sw->work.done = save_object_done;
+ queue_work(wq, &sw->work);
+
+ return 0;
+}
+
int farm_save_snapshot(const char *tag)
{
unsigned char snap_sha1[SHA1_LEN];
unsigned char trunk_sha1[SHA1_LEN];
struct strbuf trunk_entries = STRBUF_INIT;
void *snap_log = NULL;
- int log_nr, idx;
- int ret = -1;
+ int log_nr, idx, ret = -1;
snap_log = snap_log_read(&log_nr);
if (!snap_log)
@@ -255,7 +294,13 @@ int farm_save_snapshot(const char *tag)
idx = log_nr + 1;
- if (for_each_object_in_tree(fill_trunk_entry, &trunk_entries) < 0)
+ wq = create_work_queue("save snapshot", WQ_ORDERED);
+ if (for_each_object_in_tree(queue_save_snapshot_work,
+ (void *)&trunk_entries) < 0)
+ goto out;
+
+ work_queue_wait(wq);
+ if (uatomic_is_true(&work_error))
goto out;
if (trunk_file_write(trunk_sha1, &trunk_entries) < 0)
@@ -264,7 +309,7 @@ int farm_save_snapshot(const char *tag)
if (snap_file_write(idx, trunk_sha1, snap_sha1) < 0)
goto out;
- if (snap_log_write(idx, tag, snap_sha1) != 0)
+ if (snap_log_write(idx, tag, snap_sha1) < 0)
goto out;
ret = 0;
@@ -274,26 +319,61 @@ out:
return ret;
}
-static int restore_object(uint64_t oid, int nr_copies,
- void *buffer, size_t size, void *data)
+static void do_load_object(struct work *work)
{
- int ret = -1;
+ void *buffer = NULL;
+ struct sha1_file_hdr hdr;
+ struct snapshot_work *sw;
- if (sd_write_object(oid, 0, buffer, size, 0, 0,
- nr_copies, true, true) != 0)
- goto out;
+ if (uatomic_is_true(&work_error))
+ return;
- if (is_vdi_obj(oid)) {
- if (notify_vdi_add(oid_to_vid(oid), nr_copies) < 0)
- goto out;
+ sw = container_of(work, struct snapshot_work, work);
+
+ buffer = sha1_file_read(sw->entry.sha1, &hdr);
+
+ if (!buffer)
+ goto error;
+
+ if (sd_write_object(sw->entry.oid, 0, buffer, hdr.size, 0, 0,
+ sw->entry.nr_copies, true, true) != 0)
+ goto error;
+
+ if (is_vdi_obj(sw->entry.oid)) {
+ if (notify_vdi_add(oid_to_vid(sw->entry.oid),
+ sw->entry.nr_copies) < 0)
+ goto error;
+ pthread_rwlock_wrlock(&vdi_list_lock);
insert_vdi(buffer);
+ pthread_rwlock_unlock(&vdi_list_lock);
}
- ret = 0;
-out:
- if (ret)
- fprintf(stderr, "Fail to restore object, oid %"PRIu64"\n", oid);
+ free(buffer);
+ return;
+error:
+ free(buffer);
+ fprintf(stderr, "Fail to load object, oid %"PRIu64"\n", sw->entry.oid);
+ uatomic_set_true(&work_error);
+}
+
+static void load_object_done(struct work *work)
+{
+ struct snapshot_work *sw = container_of(work, struct snapshot_work,
+ work);
+
+ free(sw);
+}
+
+static int queue_load_snapshot_work(struct trunk_entry *entry, void *data)
+{
+ struct snapshot_work *sw = xzalloc(sizeof(struct snapshot_work));
+
+ memcpy(&sw->entry, entry, sizeof(struct trunk_entry));
+ sw->work.fn = do_load_object;
+ sw->work.done = load_object_done;
+ queue_work(wq, &sw->work);
+
return 0;
}
@@ -305,7 +385,13 @@ int farm_load_snapshot(uint32_t idx, const char *tag)
if (get_trunk_sha1(idx, tag, trunk_sha1) < 0)
goto out;
- if (for_each_object_in_trunk(trunk_sha1, restore_object, NULL) < 0)
+ wq = create_work_queue("load snapshot", WQ_DYNAMIC);
+ if (for_each_entry_in_trunk(trunk_sha1, queue_load_snapshot_work,
+ NULL) < 0)
+ goto out;
+
+ work_queue_wait(wq);
+ if (uatomic_is_true(&work_error))
goto out;
if (create_active_vdis() < 0)
diff --git a/collie/farm/farm.h b/collie/farm/farm.h
index 77ac59e..10ba597 100644
--- a/collie/farm/farm.h
+++ b/collie/farm/farm.h
@@ -38,9 +38,6 @@ struct sha1_file_hdr {
uint64_t reserved;
};
-typedef int (*object_handler_func_t)(uint64_t oid, int nr_copies, void *buf,
- size_t size, void *data);
-
/* farm.c */
int farm_init(const char *path);
bool farm_contain_snapshot(uint32_t idx, const char *tag);
@@ -52,8 +49,9 @@ char *get_object_directory(void);
int trunk_init(void);
int trunk_file_write(unsigned char *trunk_sha1, struct strbuf *trunk_entries);
void *trunk_file_read(unsigned char *sha1, struct sha1_file_hdr *);
-int for_each_object_in_trunk(unsigned char *trunk_sha1,
- object_handler_func_t func, void *data);
+int for_each_entry_in_trunk(unsigned char *trunk_sha1,
+ int (*func)(struct trunk_entry *entry, void *data),
+ void *data);
/* snap.c */
int snap_init(const char *path);
@@ -74,6 +72,7 @@ int object_tree_size(void);
void object_tree_insert(uint64_t oid, int nr_copies);
void object_tree_free(void);
void object_tree_print(void);
-int for_each_object_in_tree(object_handler_func_t func, void *data);
+int for_each_object_in_tree(int (*func)(uint64_t oid, int nr_copies,
+ void *data), void *data);
#endif
diff --git a/collie/farm/object_tree.c b/collie/farm/object_tree.c
index f719af6..97cb3e7 100644
--- a/collie/farm/object_tree.c
+++ b/collie/farm/object_tree.c
@@ -103,31 +103,22 @@ int object_tree_size()
return tree.nr_objs;
}
-int for_each_object_in_tree(object_handler_func_t func, void *data)
+int for_each_object_in_tree(int (*func)(uint64_t oid, int nr_copies,
+ void *data), void *data)
{
struct rb_node *p = rb_first(&tree.root);
struct object_tree_entry *entry;
- uint64_t oid;
- size_t size;
- void *buf = xmalloc(max(SD_INODE_SIZE, SD_DATA_OBJ_SIZE));
int ret = -1;
while (p) {
entry = rb_entry(p, struct object_tree_entry, node);
- oid = entry->oid;
- size = get_objsize(oid);
-
- if (sd_read_object(oid, buf, size, 0, true) < 0)
- goto out;
- if (func(oid, entry->nr_copies,
- buf, size, data) < 0)
+ if (func(entry->oid, entry->nr_copies, data) < 0)
goto out;
p = rb_next(p);
}
ret = 0;
out:
- free(buf);
return ret;
}
diff --git a/collie/farm/sha1_file.c b/collie/farm/sha1_file.c
index 2d94673..a577c1e 100644
--- a/collie/farm/sha1_file.c
+++ b/collie/farm/sha1_file.c
@@ -42,7 +42,7 @@ static void fill_sha1_path(char *pathbuf, const unsigned char *sha1)
static char *sha1_to_path(const unsigned char *sha1)
{
- static char buf[PATH_MAX];
+ static __thread char buf[PATH_MAX];
const char *objdir;
int len;
diff --git a/collie/farm/trunk.c b/collie/farm/trunk.c
index c0b416f..c2f5bbf 100644
--- a/collie/farm/trunk.c
+++ b/collie/farm/trunk.c
@@ -72,8 +72,9 @@ void *trunk_file_read(unsigned char *sha1, struct sha1_file_hdr *outhdr)
return buffer;
}
-int for_each_object_in_trunk(unsigned char *trunk_sha1,
- object_handler_func_t func, void *data)
+int for_each_entry_in_trunk(unsigned char *trunk_sha1,
+ int (*func)(struct trunk_entry *entry, void *data),
+ void *data)
{
struct trunk_entry *trunk_entry, *trunk_free = NULL;
struct sha1_file_hdr trunk_hdr;
@@ -87,15 +88,7 @@ int for_each_object_in_trunk(unsigned char *trunk_sha1,
nr_trunks = trunk_hdr.priv;
for (uint64_t i = 0; i < nr_trunks; i++, trunk_entry++) {
- struct sha1_file_hdr hdr;
- void *buffer = NULL;
-
- buffer = sha1_file_read(trunk_entry->sha1, &hdr);
- if (!buffer)
- goto out;
-
- if (func(trunk_entry->oid, trunk_entry->nr_copies,
- buffer, hdr.size, data) < 0)
+ if (func(trunk_entry, data) < 0)
goto out;
}
--
1.7.1
More information about the sheepdog
mailing list