[sheepdog] [PATCH stable-0.8 1/6] sheep/md: change process of for_each_object_in_wd() to multi-threads

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Thu Mar 20 09:52:20 CET 2014


From: Robin Dong <robin.k.dong at gmail.com>

In our test environment, we upload 6TB data and kill one node, then the sheep
daemons on each server will try to rename files from 'data' to '.stale', but
the rename operations cost almost half an hour. And in this half an hour, the
whole cluster can't be read or write.

To accelerate the speed of renames, we change for_each_object_in_wd() to
multi-threads, which is one thread for one disk.

Signed-off-by: Robin Dong <sanbai at taobao.com>
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
 sheep/md.c | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 68 insertions(+), 3 deletions(-)

diff --git a/sheep/md.c b/sheep/md.c
index 0a2903f..dfce49a 100644
--- a/sheep/md.c
+++ b/sheep/md.c
@@ -354,20 +354,85 @@ const char *md_get_object_path(uint64_t oid)
 	return p;
 }
 
+struct process_path_arg {
+	const char *path;
+	int (*func)(uint64_t oid, const char *path, uint32_t epoch, void *arg);
+	bool cleanup;
+	void *opaque;
+	int result;
+};
+
+static void *thread_process_path(void *arg)
+{
+	int ret = SD_RES_SUCCESS;
+	struct process_path_arg *parg = (struct process_path_arg *)arg;
+
+	ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup,
+				      parg->opaque);
+	if (ret != SD_RES_SUCCESS)
+		parg->result = ret;
+
+	return arg;
+}
+
 int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
 				      uint32_t epoch, void *arg),
 			  bool cleanup, void *arg)
 {
 	int ret = SD_RES_SUCCESS;
 	const struct disk *disk;
+	struct process_path_arg *thread_args, *path_arg;
+	void *ret_arg;
+	pthread_t *thread_array;
+	int nr_thread = 0, idx = 0;
 
 	sd_read_lock(&md.lock);
+
 	rb_for_each_entry(disk, &md.root, rb) {
-		ret = for_each_object_in_path(disk->path, func, cleanup, arg);
-		if (ret != SD_RES_SUCCESS)
-			break;
+		nr_thread++;
+	}
+
+	thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
+	thread_array = xmalloc(nr_thread * sizeof(pthread_t));
+
+	rb_for_each_entry(disk, &md.root, rb) {
+		thread_args[idx].path = disk->path;
+		thread_args[idx].func = func;
+		thread_args[idx].cleanup = cleanup;
+		thread_args[idx].opaque = arg;
+		thread_args[idx].result = SD_RES_SUCCESS;
+		ret = pthread_create(thread_array + idx, NULL,
+				     thread_process_path,
+				     (void *)(thread_args + idx));
+		if (ret) {
+			/*
+			 * If we can't create enough threads to process
+			 * files, the data-consistent will be broken if
+			 * we continued.
+			 */
+			panic("Failed to create thread for path %s",
+			      disk->path);
+		}
+		idx++;
+	}
+
+	sd_debug("Create %d threads for all path", nr_thread);
+	/* wait for all threads to exit */
+	for (idx = 0; idx < nr_thread; idx++) {
+		ret = pthread_join(thread_array[idx], &ret_arg);
+		if (ret)
+			sd_err("Failed to join thread");
+		if (ret_arg) {
+			path_arg = (struct process_path_arg *)ret_arg;
+			if (path_arg->result != SD_RES_SUCCESS)
+				sd_err("%s, %s", path_arg->path,
+				       sd_strerror(path_arg->result));
+		}
 	}
 	sd_rw_unlock(&md.lock);
+
+	free(thread_args);
+	free(thread_array);
 	return ret;
 }
 
-- 
1.8.1.2




More information about the sheepdog mailing list