[sheepdog] [PATCH v4] sheep/md: change process of for_each_object_in_wd() to multi-threads
Liu Yuan
namei.unix at gmail.com
Wed Feb 26 07:54:36 CET 2014
On Wed, Feb 26, 2014 at 02:43:33PM +0800, Robin Dong wrote:
> In our test environment, we upload 6TB data and kill one node, then the sheep
> daemons on each server will try to rename files from 'data' to '.stale', but
> the rename operations cost almost half an hour. And in this half an hour, the
> whole cluster can't be read or write.
>
> To accelerate the speed of renames, we change for_each_object_in_wd() to
> multi-threads, which is one thread for one disk.
>
> Signed-off-by: Robin Dong <sanbai at taobao.com>
> ---
> v3-->v4:
> 1. add free() for thread_args and thread_array
>
> v2-->v3:
> 1. optimize for_each_object_in_wd() instead of for_each_object_in_stale()
>
> v1-->v2:
> 1. support unlimited number of disk
> 2. panic() if create thread fail
> 3. change sd_warn to sd_err
>
> sheep/md.c | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
> 1 file changed, 68 insertions(+), 3 deletions(-)
>
> diff --git a/sheep/md.c b/sheep/md.c
> index e7e8ec2..03e3d37 100644
> --- a/sheep/md.c
> +++ b/sheep/md.c
> @@ -354,20 +354,85 @@ const char *md_get_object_path(uint64_t oid)
> return p;
> }
>
> +struct process_path_arg {
> + const char *path;
> + int (*func)(uint64_t oid, const char *path, uint32_t epoch, void *arg);
> + bool cleanup;
> + void *opaque;
> + int result;
> +};
> +
> +static void *thread_process_path(void *arg)
> +{
> + int ret = SD_RES_SUCCESS;
> + struct process_path_arg *parg = (struct process_path_arg *)arg;
> +
> + ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup,
> + parg->opaque);
> + if (ret != SD_RES_SUCCESS)
> + parg->result = ret;
> +
> + return arg;
> +}
> +
> int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
> uint32_t epoch, void *arg),
> bool cleanup, void *arg)
> {
> int ret = SD_RES_SUCCESS;
> const struct disk *disk;
> + struct process_path_arg *thread_args, *path_arg;
> + void *ret_arg;
> + pthread_t *thread_array;
> + int nr_thread = 0, idx = 0;
>
> sd_read_lock(&md.lock);
> +
> rb_for_each_entry(disk, &md.root, rb) {
> - ret = for_each_object_in_path(disk->path, func, cleanup, arg);
> - if (ret != SD_RES_SUCCESS)
> - break;
> + nr_thread++;
> + }
> +
> + thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
> + thread_array = xmalloc(nr_thread * sizeof(pthread_t));
> +
> + rb_for_each_entry(disk, &md.root, rb) {
> + thread_args[idx].path = disk->path;
> + thread_args[idx].func = func;
> + thread_args[idx].cleanup = cleanup;
> + thread_args[idx].opaque = arg;
> + thread_args[idx].result = SD_RES_SUCCESS;
> + ret = pthread_create(thread_array + idx, NULL,
> + thread_process_path,
> + (void *)(thread_args + idx));
> + if (ret) {
> + /*
> + * If we can't create enough threads to process
> + * files, the data-consistent will be broken if
> + * we continued.
> + */
> + panic("Failed to create thread for path %s",
> + disk->path);
> + }
> + idx++;
> + }
> +
> + sd_debug("Create %d threads for all path", nr_thread);
> + /* wait for all threads to exit */
> + for (idx = 0; idx < nr_thread; idx++) {
> + ret = pthread_join(thread_array[idx], &ret_arg);
> + if (ret)
> + sd_err("Failed to join thread");
> + if (ret_arg) {
> + path_arg = (struct process_path_arg *)ret_arg;
> + if (path_arg->result != SD_RES_SUCCESS)
> + sd_err("Failed to run thread for path %s",
> + path_arg->path);
> + }
> }
> sd_rw_unlock(&md.lock);
> +
> + free(thread_args);
> + free(thread_array);
> return ret;
> }
>
> --
> 1.7.12.4
>
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog
Applied thanks
Yuan
More information about the sheepdog
mailing list