[sheepdog] [PATCH v3] sheep/md: change process of for_each_object_in_wd() to multi-threads

Robin Dong robin.k.dong at gmail.com
Wed Feb 26 07:24:30 CET 2014


In our test environment, we upload 6TB data and kill one node, then the sheep
daemons on each server will try to rename files from 'data' to '.stale', but
the rename operations cost almost half an hour. And in this half an hour, the
whole cluster can't be read or write.

To accelerate the speed of renames, we change for_each_object_in_wd() to
multi-threads, which is one thread for one disk.

Signed-off-by: Robin Dong <sanbai at taobao.com>
---
v2-->v3:
 1. optimize for_each_object_in_wd() instead of for_each_object_in_stale()

v1-->v2:
 1. support unlimited number of disk
 2. panic() if create thread fail
 3. change sd_warn to sd_err

 sheep/md.c | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 65 insertions(+), 3 deletions(-)

diff --git a/sheep/md.c b/sheep/md.c
index e7e8ec2..2cf0e3d 100644
--- a/sheep/md.c
+++ b/sheep/md.c
@@ -354,18 +354,80 @@ const char *md_get_object_path(uint64_t oid)
 	return p;
 }
 
+struct process_path_arg {
+	const char *path;
+	int (*func)(uint64_t oid, const char *path, uint32_t epoch, void *arg);
+	bool cleanup;
+	void *opaque;
+	int result;
+};
+
+static void *thread_process_path(void *arg)
+{
+	int ret = SD_RES_SUCCESS;
+	struct process_path_arg *parg = (struct process_path_arg *)arg;
+
+	ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup,
+				      parg->opaque);
+	if (ret != SD_RES_SUCCESS)
+		parg->result = ret;
+
+	return arg;
+}
+
 int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
 				      uint32_t epoch, void *arg),
 			  bool cleanup, void *arg)
 {
 	int ret = SD_RES_SUCCESS;
 	const struct disk *disk;
+	struct process_path_arg *thread_args, *path_arg;
+	void *ret_arg;
+	pthread_t *thread_array;
+	int nr_thread = 0, idx = 0;
 
 	sd_read_lock(&md.lock);
+
+ 	rb_for_each_entry(disk, &md.root, rb) {
+		nr_thread++;
+	}
+
+	thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
+	thread_array = xmalloc(nr_thread * sizeof(pthread_t));
+
 	rb_for_each_entry(disk, &md.root, rb) {
-		ret = for_each_object_in_path(disk->path, func, cleanup, arg);
-		if (ret != SD_RES_SUCCESS)
-			break;
+		thread_args[idx].path = disk->path;
+		thread_args[idx].func = func;
+		thread_args[idx].cleanup = cleanup;
+		thread_args[idx].opaque = arg;
+		thread_args[idx].result = SD_RES_SUCCESS;
+		ret = pthread_create(thread_array + idx, NULL,
+				     thread_process_path,
+				     (void *)(thread_args + idx));
+		if (ret) {
+			/*
+			 * If we can't create enough threads to process
+			 * files, the data-consistent will be broken if
+			 * we continued.
+			 */
+			panic("Failed to create thread for path %s",
+			      disk->path);
+		}
+		idx++;
+	}
+
+	sd_debug("Create %d threads for all path", nr_thread);
+	/* wait for all threads to exit */
+	for (idx = 0; idx < nr_thread; idx++) {
+		ret = pthread_join(thread_array[idx], &ret_arg);
+		if (ret)
+			sd_err("Failed to join thread");
+		if (ret_arg) {
+			path_arg = (struct process_path_arg *)ret_arg;
+			if (path_arg->result != SD_RES_SUCCESS)
+				sd_err("Failed to run thread for path %s",
+				       path_arg->path);
+		}
 	}
 	sd_rw_unlock(&md.lock);
 	return ret;
-- 
1.7.12.4




More information about the sheepdog mailing list