[sheepdog] [PATCH v2] sheep/md: change process of for_each_object_in_stale() to multi-threads
Robin Dong
robin.k.dong at gmail.com
Wed Feb 26 04:43:28 CET 2014
In our test environment, we upload 6TB data and kill one node, then the sheep
daemons on each server will try to rename files from 'data' to '.stale', but
the rename operations cost almost half an hour. And in this half an hour, the
whole cluster can't be read or write.
To accelerate the speed of renames, we change for_each_object_in_stale() to
multi-threads, which is one thread for one disk.
Signed-off-by: Robin Dong <sanbai at taobao.com>
---
v1-->v2:
1. support unlimited number of disk
2. panic() if create thread fail
3. change sd_warn to sd_err
sheep/md.c | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 66 insertions(+), 5 deletions(-)
diff --git a/sheep/md.c b/sheep/md.c
index e7e8ec2..07485ae 100644
--- a/sheep/md.c
+++ b/sheep/md.c
@@ -371,22 +371,83 @@ int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
return ret;
}
+struct process_path_arg {
+ char path[PATH_MAX];
+ int (*func)(uint64_t oid, const char *path, uint32_t epoch, void *arg);
+ void *opaque;
+ int result;
+};
+
+static void *thread_process_path(void *arg)
+{
+ int ret = SD_RES_SUCCESS;
+ struct process_path_arg *parg = (struct process_path_arg *)arg;
+
+ ret = for_each_object_in_path(parg->path, parg->func, false,
+ parg->opaque);
+ if (ret != SD_RES_SUCCESS)
+ parg->result = ret;
+
+ return arg;
+}
+
int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path,
uint32_t epoch, void *arg),
void *arg)
{
int ret = SD_RES_SUCCESS;
- char path[PATH_MAX];
const struct disk *disk;
+ struct process_path_arg *thread_args, *path_arg;
+ void *ret_arg;
+ pthread_t *thread_array;
+ int nr_thread = 0, idx = 0;
sd_read_lock(&md.lock);
+
rb_for_each_entry(disk, &md.root, rb) {
- snprintf(path, sizeof(path), "%s/.stale", disk->path);
- ret = for_each_object_in_path(path, func, false, arg);
- if (ret != SD_RES_SUCCESS)
- break;
+ nr_thread++;
+ }
+
+ thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
+ thread_array = xmalloc(nr_thread * sizeof(pthread_t));
+
+ rb_for_each_entry(disk, &md.root, rb) {
+ snprintf(thread_args[idx].path, PATH_MAX, "%s/.stale",
+ disk->path);
+ thread_args[idx].func = func;
+ thread_args[idx].opaque = arg;
+ thread_args[idx].result = SD_RES_SUCCESS;
+ ret = pthread_create(thread_array + idx, NULL,
+ thread_process_path,
+ (void *)(thread_args + idx));
+ if (ret) {
+ /*
+ * If we can't create enough threads to process
+ * files, the data-consistent will be broken if
+ * we continued.
+ */
+ panic("Failed to create thread for path %s",
+ disk->path);
+ }
+ idx++;
+ }
+ sd_debug("Create %d threads for all path", nr_thread);
+ /* wait for all threads to exit */
+ for (idx = 0; idx < nr_thread; idx++) {
+ ret = pthread_join(thread_array[idx], &ret_arg);
+ if (ret)
+ sd_err("Failed to join thread");
+ if (ret_arg) {
+ path_arg = (struct process_path_arg *)ret_arg;
+ if (path_arg->result != SD_RES_SUCCESS)
+ sd_err("Failed to run thread for path %s",
+ path_arg->path);
+ }
}
sd_rw_unlock(&md.lock);
+
+ free(thread_args);
+ free(thread_array);
return ret;
}
--
1.7.12.4
More information about the sheepdog
mailing list