[sheepdog] [PATCH v1] sheep/md: change process of for_each_object_in_stale() to multi-threads
Robin Dong
robin.k.dong at gmail.com
Tue Feb 25 08:48:35 CET 2014
In our test environment, we upload 6TB data and kill one node, then the sheep
daemons on each server will try to rename files from 'data' to '.stale', but
the rename operations cost almost half an hour. And in this half an hour, the
whole cluster can't be read or write.
To accelerate the speed of renames, we change for_each_object_in_stale() to
multi-threads, which is one thread for one disk.
Signed-off-by: Robin Dong <sanbai at taobao.com>
---
sheep/md.c | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 51 insertions(+), 5 deletions(-)
diff --git a/sheep/md.c b/sheep/md.c
index e7e8ec2..ccac4b0 100644
--- a/sheep/md.c
+++ b/sheep/md.c
@@ -371,20 +371,66 @@ int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
return ret;
}
+struct process_path_arg {
+ char path[PATH_MAX];
+ int (*func)(uint64_t oid, const char *path, uint32_t epoch, void *arg);
+ void *opaque;
+ int result;
+};
+
+static void *thread_process_path(void *arg)
+{
+ int ret = SD_RES_SUCCESS;
+ struct process_path_arg *parg = (struct process_path_arg *)arg;
+
+ ret = for_each_object_in_path(parg->path, parg->func, false,
+ parg->opaque);
+ if (ret != SD_RES_SUCCESS)
+ parg->result = ret;
+
+ return arg;
+}
+
int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path,
uint32_t epoch, void *arg),
void *arg)
{
int ret = SD_RES_SUCCESS;
- char path[PATH_MAX];
const struct disk *disk;
+ struct process_path_arg thread_args[MD_MAX_DISK], *path_arg;
+ void *ret_arg;
+ pthread_t thread_array[MD_MAX_DISK];
+ int nr_thread = 0, i;
sd_read_lock(&md.lock);
rb_for_each_entry(disk, &md.root, rb) {
- snprintf(path, sizeof(path), "%s/.stale", disk->path);
- ret = for_each_object_in_path(path, func, false, arg);
- if (ret != SD_RES_SUCCESS)
- break;
+ snprintf(thread_args[nr_thread].path, PATH_MAX, "%s/.stale",
+ disk->path);
+ thread_args[nr_thread].func = func;
+ thread_args[nr_thread].opaque = arg;
+ thread_args[nr_thread].result = SD_RES_SUCCESS;
+ ret = pthread_create(thread_array + nr_thread, NULL,
+ thread_process_path,
+ (void *)(thread_args + nr_thread));
+ if (ret)
+ sd_warn("Failed to create thread for path %s",
+ disk->path);
+ nr_thread++;
+ if (nr_thread >= MD_MAX_DISK)
+ panic("Too many disks for md of sheep!");
+ }
+ sd_debug("Create %d threads for all path", nr_thread);
+ /* wait for all threads to exit */
+ for (i = 0; i < nr_thread; i++) {
+ ret = pthread_join(thread_array[i], &ret_arg);
+ if (ret)
+ sd_warn("Failed to join thread");
+ if (ret_arg) {
+ path_arg = (struct process_path_arg *)ret_arg;
+ if (path_arg->result != SD_RES_SUCCESS)
+ sd_err("Failed to run thread for path %s",
+ path_arg->path);
+ }
}
sd_rw_unlock(&md.lock);
return ret;
--
1.7.12.4
More information about the sheepdog
mailing list