[sheepdog] [PATCH v3 2/4] sheepfs: make fetching data for cache become async operation

Robin Dong robin.k.dong at gmail.com
Fri Mar 7 10:48:57 CET 2014


From: Robin Dong <sanbai at taobao.com>

Currently, we have add cache for http interface of sheepfs.But it will
only fetch data from http request when fuse operation has read out of
the cache.

For better performance, we use 'double buffer' tech: fuse is reading
one buffer and a new created thread could read future data onto another
buffer at the same time. It will make reading operation more smoothly
and faster.

We use two pointer: 'ready' and 'prepare' to point the double buffers
and use classic 'consumer and producer model' to avoid race condition.

Signed-off-by: Robin Dong <sanbai at taobao.com>
---
v1-->v2:                
  1. change 'struct cache_s' to 'struct read_cache'
  2. remove unused variable 'fetching'
  3. change 'goto again' to 'while' loop
  4. add comment for using sem_t instead of pthread_mutex_t

 sheepfs/http.c | 182 ++++++++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 149 insertions(+), 33 deletions(-)

diff --git a/sheepfs/http.c b/sheepfs/http.c
index 2f6d0b8..630129c 100644
--- a/sheepfs/http.c
+++ b/sheepfs/http.c
@@ -19,6 +19,7 @@
 #include <stdio.h>
 #include <time.h>
 #include <curl/curl.h>
+#include <semaphore.h>
 
 #include "strbuf.h"
 #include "sheepfs.h"
@@ -151,14 +152,14 @@ static size_t curl_read_object(const char *url, char *buf, size_t size,
 					&content_length);
 		if (res != CURLE_OK) {
 			sheepfs_pr("Failed to getinfo res: %s",
-			       curl_easy_strerror(res));
+				   curl_easy_strerror(res));
 			size = 0;
 			goto out;
 		}
 		if ((size_t)content_length > size) {
 			sheepfs_pr("Failed to get correct CONTENT_LENGTH, "
-			       "content_length: %"PRIu64", get_size: %"PRIu64,
-			       (size_t)content_length, size);
+				   "content_length: %"PRIu64", get_size: %"
+				   PRIu64, (size_t)content_length, size);
 			size = 0;
 		} else {
 			sheepfs_pr("Read out %"PRIu64" data from %s",
@@ -167,7 +168,7 @@ static size_t curl_read_object(const char *url, char *buf, size_t size,
 		}
 	} else {
 		sheepfs_pr("Failed to call libcurl res: %s, url: %s",
-		       curl_easy_strerror(res), url);
+			   curl_easy_strerror(res), url);
 		size = 0;
 	}
 out:
@@ -235,19 +236,101 @@ out:
 /* no rationale */
 #define CACHE_SIZE	(64 * 1024 * 1024)
 
-struct cache_handle {
+struct read_cache {
 	char *mem;
 	off_t offset;
 	size_t size;
 };
 
+/*
+ * The pthread_mutex_t is very hard to be used in 'consumer and producer' model.
+ * For example:
+ *
+ *     (lock and unlock many times)
+ *     ....
+ *     pthread_mutex_unlock()
+ *     pthread_mutex_destroy()
+ *
+ * and
+ *
+ *     (lock and unlock many times)
+ *     ....
+ *     pthread_mutex_lock()
+ *     pthread_mutex_destroy()
+ *
+ * the pthread_mutex_destroy will return EBUSY and cause panic in both case
+ * above.
+ *
+ * In "consumer and producer model", the consumer (or producer) would end in any
+ * condition, which means pthread_mutex_t could end in locked or unlocked status
+ * and we can't just use pthread_mutex_destroy() to release it.
+ *
+ * Attribute PTHREAD_MUTEX_ERRORCHECK for pthread_mutex_t dose not allowed one
+ * thread to lock same mutex twice; and a mutex with PTHREAD_MUTEX_RECURSIVE
+ * could be locked twice without waiting which is not satisfied for our model;
+ * pthread_cond_t may lose signal......so, after all, the best choice is
+ * the sandard semaphore - 'sem_t'.
+ *
+ *
+ *
+ * All "size" variables in the object_read() and object_write() has type of
+ * 'size_t', actually we can't create a file larger than (size_t) in fuse, so
+ * we set type of 'obj_size' to 'size_t'.
+ */
+struct cache_handle {
+	char			path[PATH_MAX];
+	struct read_cache	*ready;
+	struct read_cache	*prepare;
+	pthread_t		fetch_thread;
+	sem_t			ready_sem;
+	sem_t			prepare_sem;
+	bool			stop;
+	off_t			fetch_offset;
+	size_t			obj_size;
+};
+
+static void swap_cache(struct cache_handle *ch)
+{
+	struct read_cache *cache;
+	cache = ch->ready;
+	ch->ready = ch->prepare;
+	ch->prepare = cache;
+}
+
+static void *fetch_thread_run(void *arg)
+{
+	struct cache_handle *ch = (struct cache_handle *)arg;
+	char url[PATH_MAX];
+	char *pos = strstr(ch->path, PATH_HTTP) + strlen(PATH_HTTP);
+	int ret;
+
+	while (true) {
+		sem_wait(&ch->prepare_sem);
+		if (ch->stop)
+			break;
+		/* update cache */
+		ret = generate_url(pos, strlen(ch->path) - strlen(PATH_HTTP),
+				   url, PATH_MAX);
+		if (ret)
+			sheepfs_pr("failed to generate url for %s", ch->path);
+		else {
+			ret = curl_read_object(url, ch->prepare->mem,
+					       CACHE_SIZE, ch->fetch_offset);
+			ch->prepare->offset = ch->fetch_offset;
+			ch->prepare->size = ret;
+		}
+		sem_post(&ch->ready_sem);
+	}
+	return NULL;
+}
+
 int object_read(const char *path, char *buf, size_t size, off_t offset,
 		struct fuse_file_info *fi)
 {
-	char url[PATH_MAX];
+	struct cache_handle *ch;
+	struct read_cache *cache;
 	char *pos;
 	int ret;
-	struct cache_handle *ch;
 
 	pos = strstr(path, PATH_HTTP);
 	if (!pos) {
@@ -259,50 +342,79 @@ int object_read(const char *path, char *buf, size_t size, off_t offset,
 	ch = (struct cache_handle *)fi->fh;
 
 	while (true) {
+		cache = ch->ready;
 		/* try to read from cache first */
-		if (offset >= ch->offset && (ch->offset + ch->size) > offset) {
-			if ((ch->offset + ch->size) > (offset + size))
+		if (offset >= cache->offset &&
+		    (cache->offset + cache->size) > offset) {
+			if ((cache->offset + cache->size) > (offset + size))
 				ret = size;
 			else
-				ret = (ch->offset + ch->size) - offset;
-			memcpy(buf, ch->mem + (offset - ch->offset), ret);
+				ret = (cache->offset + cache->size) - offset;
+			memcpy(buf, cache->mem + (offset - cache->offset), ret);
 			break;
-		} else { /* update cache */
-			if (!ch->mem)
-				ch->mem = xmalloc(CACHE_SIZE);
-
-			pos += strlen(PATH_HTTP);
-			/* don't need '\n' at the end of 'path' */
-			ret = generate_url(pos,
-					   strlen(path) - strlen(PATH_HTTP),
-					   url, PATH_MAX);
-			if (ret)
-				goto out;
-
-			ret = curl_read_object(url, ch->mem, CACHE_SIZE,
-					       offset);
-			ch->offset = offset;
-			ch->size = ret;
-			sheepfs_pr("update cache offset %lu size %d",
-				   offset, ret);
-			if (ret <= 0)
+		} else if (offset >= ch->obj_size) {
+			ret = 0;
+			break;
+		} else {
+			sem_wait(&ch->ready_sem);
+			swap_cache(ch);
+			ch->fetch_offset = ch->ready->offset + ch->ready->size;
+			sem_post(&ch->prepare_sem);
+			if (ch->ready->size == 0) {
+				ret = 0;
 				break;
+			}
 		}
 	}
 out:
 	return ret;
 }
 
+static void release_cache_handle(struct cache_handle *ch)
+{
+	if (ch->ready)
+		free(ch->ready->mem);
+	if (ch->prepare)
+		free(ch->prepare->mem);
+	free(ch->ready);
+	free(ch->prepare);
+	free(ch);
+}
+
 int object_open(const char *path, struct fuse_file_info *fi)
 {
 	struct cache_handle *ch;
+	char *pos;
+	int ret;
+
+	pos = strstr(path, PATH_HTTP);
+	if (!pos) {
+		sheepfs_pr("Invalid Path %s", path);
+		return -EINVAL;
+	}
 
 	/* don't need page cache of fuse */
 	fi->direct_io = 1;
 
 	ch = xzalloc(sizeof(*ch));
+	ch->ready = xzalloc(sizeof(struct read_cache));
+	ch->ready->mem = xmalloc(CACHE_SIZE);
+	ch->prepare = xzalloc(sizeof(struct read_cache));
+	ch->prepare->mem = xmalloc(CACHE_SIZE);
+	ch->stop = false;
+	ch->fetch_offset = 0;
+	ch->obj_size = object_get_size(path);
 	fi->fh = (uint64_t)ch;
 
+	sem_init(&ch->ready_sem, 0, 0);
+	sem_init(&ch->prepare_sem, 0, 1);
+	strncpy(ch->path, path, PATH_MAX);
+	ret = pthread_create(&ch->fetch_thread, NULL, fetch_thread_run, ch);
+	if (ret != 0) {
+		sheepfs_pr("failed to create thread to fetch data");
+		release_cache_handle(ch);
+		return -1;
+	}
 	return 0;
 }
 
@@ -310,10 +422,14 @@ int object_release(const char *path, struct fuse_file_info *fi)
 {
 	struct cache_handle *ch = (struct cache_handle *)fi->fh;
 
-	free(ch->mem);
-	free(ch);
-	fi->fh = 0;
+	ch->stop = true;
+	sem_post(&ch->prepare_sem);
+	pthread_join(ch->fetch_thread, NULL);
+	sem_destroy(&ch->ready_sem);
+	sem_destroy(&ch->prepare_sem);
 
+	release_cache_handle(ch);
+	fi->fh = 0;
 	return 0;
 }
 
-- 
1.7.12.4




More information about the sheepdog mailing list