[sheepdog] [PATCH v4] sheep/md: change process of for_each_object_in_wd() to multi-threads

Liu Yuan namei.unix at gmail.com
Wed Feb 26 07:54:36 CET 2014


On Wed, Feb 26, 2014 at 02:43:33PM +0800, Robin Dong wrote:
> In our test environment, we upload 6TB data and kill one node, then the sheep
> daemons on each server will try to rename files from 'data' to '.stale', but
> the rename operations cost almost half an hour. And in this half an hour, the
> whole cluster can't be read or write.
> 
> To accelerate the speed of renames, we change for_each_object_in_wd() to
> multi-threads, which is one thread for one disk.
> 
> Signed-off-by: Robin Dong <sanbai at taobao.com>
> ---
> v3-->v4:
>  1. add free() for thread_args and thread_array
> 
> v2-->v3:
>  1. optimize for_each_object_in_wd() instead of for_each_object_in_stale()
> 
> v1-->v2:
>  1. support unlimited number of disk
>  2. panic() if create thread fail
>  3. change sd_warn to sd_err
> 
>  sheep/md.c | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 68 insertions(+), 3 deletions(-)
> 
> diff --git a/sheep/md.c b/sheep/md.c
> index e7e8ec2..03e3d37 100644
> --- a/sheep/md.c
> +++ b/sheep/md.c
> @@ -354,20 +354,85 @@ const char *md_get_object_path(uint64_t oid)
>  	return p;
>  }
>  
> +struct process_path_arg {
> +	const char *path;
> +	int (*func)(uint64_t oid, const char *path, uint32_t epoch, void *arg);
> +	bool cleanup;
> +	void *opaque;
> +	int result;
> +};
> +
> +static void *thread_process_path(void *arg)
> +{
> +	int ret = SD_RES_SUCCESS;
> +	struct process_path_arg *parg = (struct process_path_arg *)arg;
> +
> +	ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup,
> +				      parg->opaque);
> +	if (ret != SD_RES_SUCCESS)
> +		parg->result = ret;
> +
> +	return arg;
> +}
> +
>  int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
>  				      uint32_t epoch, void *arg),
>  			  bool cleanup, void *arg)
>  {
>  	int ret = SD_RES_SUCCESS;
>  	const struct disk *disk;
> +	struct process_path_arg *thread_args, *path_arg;
> +	void *ret_arg;
> +	pthread_t *thread_array;
> +	int nr_thread = 0, idx = 0;
>  
>  	sd_read_lock(&md.lock);
> +
>  	rb_for_each_entry(disk, &md.root, rb) {
> -		ret = for_each_object_in_path(disk->path, func, cleanup, arg);
> -		if (ret != SD_RES_SUCCESS)
> -			break;
> +		nr_thread++;
> +	}
> +
> +	thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
> +	thread_array = xmalloc(nr_thread * sizeof(pthread_t));
> +
> +	rb_for_each_entry(disk, &md.root, rb) {
> +		thread_args[idx].path = disk->path;
> +		thread_args[idx].func = func;
> +		thread_args[idx].cleanup = cleanup;
> +		thread_args[idx].opaque = arg;
> +		thread_args[idx].result = SD_RES_SUCCESS;
> +		ret = pthread_create(thread_array + idx, NULL,
> +				     thread_process_path,
> +				     (void *)(thread_args + idx));
> +		if (ret) {
> +			/*
> +			 * If we can't create enough threads to process
> +			 * files, the data-consistent will be broken if
> +			 * we continued.
> +			 */
> +			panic("Failed to create thread for path %s",
> +			      disk->path);
> +		}
> +		idx++;
> +	}
> +
> +	sd_debug("Create %d threads for all path", nr_thread);
> +	/* wait for all threads to exit */
> +	for (idx = 0; idx < nr_thread; idx++) {
> +		ret = pthread_join(thread_array[idx], &ret_arg);
> +		if (ret)
> +			sd_err("Failed to join thread");
> +		if (ret_arg) {
> +			path_arg = (struct process_path_arg *)ret_arg;
> +			if (path_arg->result != SD_RES_SUCCESS)
> +				sd_err("Failed to run thread for path %s",
> +				       path_arg->path);
> +		}
>  	}
>  	sd_rw_unlock(&md.lock);
> +
> +	free(thread_args);
> +	free(thread_array);
>  	return ret;
>  }
>  
> -- 
> 1.7.12.4
> 
> -- 
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog

Applied thanks

Yuan



More information about the sheepdog mailing list