[sheepdog] [PATCH v1 2/3] sheepfs: make fetching data for cache become async operation

Robin Dong robin.k.dong at gmail.com
Thu Mar 6 08:47:20 CET 2014


From: Robin Dong <sanbai at taobao.com>

Currently, we have add cache for http interface of sheepfs.But it will
only fetch data from http request when fuse operation has read out of
the cache.

For better performance, we use 'double buffer' tech: fuse is reading
one buffer and a new created thread could read future data onto another
buffer at the same time. It will make reading operation more smoothly
and faster.

We use two pointer: 'ready' and 'prepare' to point the double buffers
and use classic 'consumer and producer model' to avoid race condition.

Signed-off-by: Robin Dong <sanbai at taobao.com>
---
 sheepfs/http.c | 165 +++++++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 124 insertions(+), 41 deletions(-)

diff --git a/sheepfs/http.c b/sheepfs/http.c
index 7df05ad..5610110 100644
--- a/sheepfs/http.c
+++ b/sheepfs/http.c
@@ -19,6 +19,7 @@
 #include <stdio.h>
 #include <time.h>
 #include <curl/curl.h>
+#include <semaphore.h>
 
 #include "strbuf.h"
 #include "sheepfs.h"
@@ -157,16 +158,17 @@ static size_t curl_read_object(const char *url, char *buf, size_t size,
 		}
 		if ((size_t)content_length > size) {
 			sheepfs_pr("Failed to get correct CONTENT_LENGTH, "
-			       "content_length: %"PRIu64", get_size: %"PRIu64,
-			       (size_t)content_length, size);
+				   "content_length: %"PRIu64", get_size: %"
+				   PRIu64, (size_t)content_length, size);
 			size = 0;
 		} else {
-			sd_debug("Read out %"PRIu64" data from %s", size, url);
+			sheepfs_pr("Read out %"PRIu64" data from %s", size,
+				   url);
 			size = (size_t)content_length;
 		}
 	} else {
 		sheepfs_pr("Failed to call libcurl res: %s, url: %s",
-		       curl_easy_strerror(res), url);
+			   curl_easy_strerror(res), url);
 		size = 0;
 	}
 out:
@@ -234,19 +236,69 @@ out:
 /* no rationale */
 #define CACHE_SIZE	(64 * 1024 * 1024)
 
-struct cache_handle {
+struct cache_s {
 	char *mem;
 	off_t offset;
 	size_t size;
 };
 
+struct cache_handle {
+	char            path[PATH_MAX];
+	struct cache_s  *ready;
+	struct cache_s  *prepare;
+	pthread_t       fetch_thread;
+	sem_t		ready_sem;
+	sem_t		prepare_sem;
+	bool            stop;
+	uatomic_bool	fetching;
+	off_t		fetch_offset;
+	size_t		obj_size;
+};
+
+static void swap_cache(struct cache_handle *ch)
+{
+	struct cache_s *cache;
+	cache = ch->ready;
+	ch->ready = ch->prepare;
+	ch->prepare = cache;
+}
+
+static void *fetch_thread_run(void *arg)
+{
+	struct cache_handle *ch = (struct cache_handle *)arg;
+	char url[PATH_MAX];
+	char *pos = strstr(ch->path, PATH_HTTP) + strlen(PATH_HTTP);
+	int ret;
+
+	while (true) {
+		sem_wait(&ch->prepare_sem);
+		if (ch->stop)
+			break;
+		uatomic_set_true(&ch->fetching);
+		/* update cache */
+		ret = generate_url(pos, strlen(ch->path) - strlen(PATH_HTTP),
+				   url, PATH_MAX);
+		if (ret)
+			sheepfs_pr("failed to generate url for %s", ch->path);
+		else {
+			ret = curl_read_object(url, ch->prepare->mem,
+					       CACHE_SIZE, ch->fetch_offset);
+			ch->prepare->offset = ch->fetch_offset;
+			ch->prepare->size = ret;
+		}
+		uatomic_set_false(&ch->fetching);
+		sem_post(&ch->ready_sem);
+	}
+	return NULL;
+}
+
 int object_read(const char *path, char *buf, size_t size, off_t offset,
 		struct fuse_file_info *fi)
 {
-	char url[PATH_MAX];
+	struct cache_handle *ch;
+	struct cache_s *cache;
 	char *pos;
 	int ret;
-	struct cache_handle *ch;
 
 	pos = strstr(path, PATH_HTTP);
 	if (!pos) {
@@ -256,52 +308,79 @@ int object_read(const char *path, char *buf, size_t size, off_t offset,
 	}
 
 	ch = (struct cache_handle *)fi->fh;
-
-	while (true) {
-		/* try to read from cache first */
-		if (offset >= ch->offset && (ch->offset + ch->size) > offset) {
-			if ((ch->offset + ch->size) > (offset + size))
-				ret = size;
-			else
-				ret = (ch->offset + ch->size) - offset;
-			memcpy(buf, ch->mem + (offset - ch->offset), ret);
-			break;
-		} else { /* update cache */
-			if (!ch->mem)
-				ch->mem = xmalloc(CACHE_SIZE);
-
-			pos += strlen(PATH_HTTP);
-			/* don't need '\n' at the end of 'path' */
-			ret = generate_url(pos,
-					   strlen(path) - strlen(PATH_HTTP),
-					   url, PATH_MAX);
-			if (ret)
-				goto out;
-
-			ret = curl_read_object(url, ch->mem, CACHE_SIZE,
-					       offset);
-			ch->offset = offset;
-			ch->size = ret;
-			sd_debug("update cache offset %lu size %d",
-				 offset, ret);
-			if (ret <= 0)
-				break;
-		}
+again:
+	cache = ch->ready;
+	/* try to read from cache first */
+	if (offset >= cache->offset && (cache->offset + cache->size) > offset) {
+		if ((cache->offset + cache->size) > (offset + size))
+			ret = size;
+		else
+			ret = (cache->offset + cache->size) - offset;
+		memcpy(buf, cache->mem + (offset - cache->offset), ret);
+	} else if (offset >= ch->obj_size) {
+		ret = 0;
+		goto out;
+	} else {
+		sem_wait(&ch->ready_sem);
+		swap_cache(ch);
+		ch->fetch_offset = ch->ready->offset + ch->ready->size;
+		sem_post(&ch->prepare_sem);
+		if (ch->ready->size == 0) {
+			ret = 0;
+			goto out;
+		} else
+			goto again;
 	}
 out:
 	return ret;
 }
 
+static void release_cache_handle(struct cache_handle *ch)
+{
+	if (ch->ready)
+		free(ch->ready->mem);
+	if (ch->prepare)
+		free(ch->prepare->mem);
+	free(ch->ready);
+	free(ch->prepare);
+	free(ch);
+}
+
 int object_open(const char *path, struct fuse_file_info *fi)
 {
 	struct cache_handle *ch;
+	char *pos;
+	int ret;
+
+	pos = strstr(path, PATH_HTTP);
+	if (!pos) {
+		sheepfs_pr("Invalid Path %s", path);
+		return -EINVAL;
+	}
 
 	/* don't need page cache of fuse */
 	fi->direct_io = 1;
 
 	ch = xzalloc(sizeof(*ch));
+	ch->ready = xzalloc(sizeof(struct cache_s));
+	ch->ready->mem = xmalloc(CACHE_SIZE);
+	ch->prepare = xzalloc(sizeof(struct cache_s));
+	ch->prepare->mem = xmalloc(CACHE_SIZE);
+	ch->stop = false;
+	ch->fetch_offset = 0;
+	ch->obj_size = object_get_size(path);
 	fi->fh = (uint64_t)ch;
 
+	sem_init(&ch->ready_sem, 0, 0);
+	sem_init(&ch->prepare_sem, 0, 1);
+	strncpy(ch->path, path, PATH_MAX);
+	uatomic_set_false(&ch->fetching);
+	ret = pthread_create(&ch->fetch_thread, NULL, fetch_thread_run, ch);
+	if (ret != 0) {
+		sheepfs_pr("failed to create thread to fetch data");
+		release_cache_handle(ch);
+		return -1;
+	}
 	return 0;
 }
 
@@ -309,10 +388,14 @@ int object_release(const char *path, struct fuse_file_info *fi)
 {
 	struct cache_handle *ch = (struct cache_handle *)fi->fh;
 
-	free(ch->mem);
-	free(ch);
-	fi->fh = 0;
+	ch->stop = true;
+	sem_post(&ch->prepare_sem);
+	pthread_join(ch->fetch_thread, NULL);
+	sem_destroy(&ch->ready_sem);
+	sem_destroy(&ch->prepare_sem);
 
+	release_cache_handle(ch);
+	fi->fh = 0;
 	return 0;
 }
 
-- 
1.7.12.4




More information about the sheepdog mailing list