[sheepdog] [PATCH v1 2/3] sheepfs: make fetching data for cache become async operation
Robin Dong
robin.k.dong at gmail.com
Thu Mar 6 08:47:20 CET 2014
From: Robin Dong <sanbai at taobao.com>
Currently, we have add cache for http interface of sheepfs.But it will
only fetch data from http request when fuse operation has read out of
the cache.
For better performance, we use 'double buffer' tech: fuse is reading
one buffer and a new created thread could read future data onto another
buffer at the same time. It will make reading operation more smoothly
and faster.
We use two pointer: 'ready' and 'prepare' to point the double buffers
and use classic 'consumer and producer model' to avoid race condition.
Signed-off-by: Robin Dong <sanbai at taobao.com>
---
sheepfs/http.c | 165 +++++++++++++++++++++++++++++++++++++++++++--------------
1 file changed, 124 insertions(+), 41 deletions(-)
diff --git a/sheepfs/http.c b/sheepfs/http.c
index 7df05ad..5610110 100644
--- a/sheepfs/http.c
+++ b/sheepfs/http.c
@@ -19,6 +19,7 @@
#include <stdio.h>
#include <time.h>
#include <curl/curl.h>
+#include <semaphore.h>
#include "strbuf.h"
#include "sheepfs.h"
@@ -157,16 +158,17 @@ static size_t curl_read_object(const char *url, char *buf, size_t size,
}
if ((size_t)content_length > size) {
sheepfs_pr("Failed to get correct CONTENT_LENGTH, "
- "content_length: %"PRIu64", get_size: %"PRIu64,
- (size_t)content_length, size);
+ "content_length: %"PRIu64", get_size: %"
+ PRIu64, (size_t)content_length, size);
size = 0;
} else {
- sd_debug("Read out %"PRIu64" data from %s", size, url);
+ sheepfs_pr("Read out %"PRIu64" data from %s", size,
+ url);
size = (size_t)content_length;
}
} else {
sheepfs_pr("Failed to call libcurl res: %s, url: %s",
- curl_easy_strerror(res), url);
+ curl_easy_strerror(res), url);
size = 0;
}
out:
@@ -234,19 +236,69 @@ out:
/* no rationale */
#define CACHE_SIZE (64 * 1024 * 1024)
-struct cache_handle {
+struct cache_s {
char *mem;
off_t offset;
size_t size;
};
+struct cache_handle {
+ char path[PATH_MAX];
+ struct cache_s *ready;
+ struct cache_s *prepare;
+ pthread_t fetch_thread;
+ sem_t ready_sem;
+ sem_t prepare_sem;
+ bool stop;
+ uatomic_bool fetching;
+ off_t fetch_offset;
+ size_t obj_size;
+};
+
+static void swap_cache(struct cache_handle *ch)
+{
+ struct cache_s *cache;
+ cache = ch->ready;
+ ch->ready = ch->prepare;
+ ch->prepare = cache;
+}
+
+static void *fetch_thread_run(void *arg)
+{
+ struct cache_handle *ch = (struct cache_handle *)arg;
+ char url[PATH_MAX];
+ char *pos = strstr(ch->path, PATH_HTTP) + strlen(PATH_HTTP);
+ int ret;
+
+ while (true) {
+ sem_wait(&ch->prepare_sem);
+ if (ch->stop)
+ break;
+ uatomic_set_true(&ch->fetching);
+ /* update cache */
+ ret = generate_url(pos, strlen(ch->path) - strlen(PATH_HTTP),
+ url, PATH_MAX);
+ if (ret)
+ sheepfs_pr("failed to generate url for %s", ch->path);
+ else {
+ ret = curl_read_object(url, ch->prepare->mem,
+ CACHE_SIZE, ch->fetch_offset);
+ ch->prepare->offset = ch->fetch_offset;
+ ch->prepare->size = ret;
+ }
+ uatomic_set_false(&ch->fetching);
+ sem_post(&ch->ready_sem);
+ }
+ return NULL;
+}
+
int object_read(const char *path, char *buf, size_t size, off_t offset,
struct fuse_file_info *fi)
{
- char url[PATH_MAX];
+ struct cache_handle *ch;
+ struct cache_s *cache;
char *pos;
int ret;
- struct cache_handle *ch;
pos = strstr(path, PATH_HTTP);
if (!pos) {
@@ -256,52 +308,79 @@ int object_read(const char *path, char *buf, size_t size, off_t offset,
}
ch = (struct cache_handle *)fi->fh;
-
- while (true) {
- /* try to read from cache first */
- if (offset >= ch->offset && (ch->offset + ch->size) > offset) {
- if ((ch->offset + ch->size) > (offset + size))
- ret = size;
- else
- ret = (ch->offset + ch->size) - offset;
- memcpy(buf, ch->mem + (offset - ch->offset), ret);
- break;
- } else { /* update cache */
- if (!ch->mem)
- ch->mem = xmalloc(CACHE_SIZE);
-
- pos += strlen(PATH_HTTP);
- /* don't need '\n' at the end of 'path' */
- ret = generate_url(pos,
- strlen(path) - strlen(PATH_HTTP),
- url, PATH_MAX);
- if (ret)
- goto out;
-
- ret = curl_read_object(url, ch->mem, CACHE_SIZE,
- offset);
- ch->offset = offset;
- ch->size = ret;
- sd_debug("update cache offset %lu size %d",
- offset, ret);
- if (ret <= 0)
- break;
- }
+again:
+ cache = ch->ready;
+ /* try to read from cache first */
+ if (offset >= cache->offset && (cache->offset + cache->size) > offset) {
+ if ((cache->offset + cache->size) > (offset + size))
+ ret = size;
+ else
+ ret = (cache->offset + cache->size) - offset;
+ memcpy(buf, cache->mem + (offset - cache->offset), ret);
+ } else if (offset >= ch->obj_size) {
+ ret = 0;
+ goto out;
+ } else {
+ sem_wait(&ch->ready_sem);
+ swap_cache(ch);
+ ch->fetch_offset = ch->ready->offset + ch->ready->size;
+ sem_post(&ch->prepare_sem);
+ if (ch->ready->size == 0) {
+ ret = 0;
+ goto out;
+ } else
+ goto again;
}
out:
return ret;
}
+static void release_cache_handle(struct cache_handle *ch)
+{
+ if (ch->ready)
+ free(ch->ready->mem);
+ if (ch->prepare)
+ free(ch->prepare->mem);
+ free(ch->ready);
+ free(ch->prepare);
+ free(ch);
+}
+
int object_open(const char *path, struct fuse_file_info *fi)
{
struct cache_handle *ch;
+ char *pos;
+ int ret;
+
+ pos = strstr(path, PATH_HTTP);
+ if (!pos) {
+ sheepfs_pr("Invalid Path %s", path);
+ return -EINVAL;
+ }
/* don't need page cache of fuse */
fi->direct_io = 1;
ch = xzalloc(sizeof(*ch));
+ ch->ready = xzalloc(sizeof(struct cache_s));
+ ch->ready->mem = xmalloc(CACHE_SIZE);
+ ch->prepare = xzalloc(sizeof(struct cache_s));
+ ch->prepare->mem = xmalloc(CACHE_SIZE);
+ ch->stop = false;
+ ch->fetch_offset = 0;
+ ch->obj_size = object_get_size(path);
fi->fh = (uint64_t)ch;
+ sem_init(&ch->ready_sem, 0, 0);
+ sem_init(&ch->prepare_sem, 0, 1);
+ strncpy(ch->path, path, PATH_MAX);
+ uatomic_set_false(&ch->fetching);
+ ret = pthread_create(&ch->fetch_thread, NULL, fetch_thread_run, ch);
+ if (ret != 0) {
+ sheepfs_pr("failed to create thread to fetch data");
+ release_cache_handle(ch);
+ return -1;
+ }
return 0;
}
@@ -309,10 +388,14 @@ int object_release(const char *path, struct fuse_file_info *fi)
{
struct cache_handle *ch = (struct cache_handle *)fi->fh;
- free(ch->mem);
- free(ch);
- fi->fh = 0;
+ ch->stop = true;
+ sem_post(&ch->prepare_sem);
+ pthread_join(ch->fetch_thread, NULL);
+ sem_destroy(&ch->ready_sem);
+ sem_destroy(&ch->prepare_sem);
+ release_cache_handle(ch);
+ fi->fh = 0;
return 0;
}
--
1.7.12.4
More information about the sheepdog
mailing list