[Sheepdog] [PATCH v2 12/15] sheepfs: add a socket pool to speedup connection
Liu Yuan
namei.unix at gmail.com
Mon May 14 11:47:37 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
Socket pool is used for FUSE read threads, which use threads to simulate aysnc
read. All sockets point to the same gateway.
- add a read/write lock to be thead safe.
- add an option to dis/enable page cache for volumes.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheepfs/cluster.c | 1 +
sheepfs/core.c | 12 ++--
sheepfs/node.c | 1 +
sheepfs/sheepfs.h | 1 +
sheepfs/vdi.c | 1 +
sheepfs/volume.c | 174 +++++++++++++++++++++++++++++++++++++++++++----------
6 files changed, 154 insertions(+), 36 deletions(-)
diff --git a/sheepfs/cluster.c b/sheepfs/cluster.c
index c4a481b..1f645ba 100644
--- a/sheepfs/cluster.c
+++ b/sheepfs/cluster.c
@@ -55,5 +55,6 @@ size_t cluster_info_get_size(const char *path)
len = shadow_file_write(path, buf->buf, buf->len);
strbuf_release(buf);
+ free(buf);
return len;
}
diff --git a/sheepfs/core.c b/sheepfs/core.c
index 869baac..394fb16 100644
--- a/sheepfs/core.c
+++ b/sheepfs/core.c
@@ -31,16 +31,17 @@ char sheepfs_shadow[PATH_MAX];
static int sheepfs_debug;
static int sheepfs_fg;
+int sheepfs_page_cache = 0;
static struct option const long_options[] = {
{"debug", no_argument, NULL, 'd'},
- {"directio", no_argument, NULL, 'D'},
{"help", no_argument, NULL, 'h'},
{"foreground", no_argument, NULL, 'f'},
+ {"pagecache", no_argument, NULL, 'k'},
{NULL, 0, NULL, 0},
};
-static const char *short_options = "dDhf";
+static const char *short_options = "dfhk";
static struct sheepfs_file_operation {
int (*read)(const char *path, char *buf, size_t size, off_t);
@@ -254,8 +255,8 @@ static void usage(int inval)
Usage: sheepfs [OPTION]... MOUNTPOINT\n\
Options:\n\
-d, --debug enable debug output (implies -f)\n\
- -D, --directio use direct IO to access volume\n\
-f, --foreground sheepfs run in the foreground\n\
+ -k, --pagecache use local kernel's page cache to access volume\n\
-h, --help display this help and exit\n\
");
exit(inval);
@@ -274,14 +275,15 @@ int main(int argc, char **argv)
case 'd':
sheepfs_debug = 1;
break;
- case 'D':
- break;
case 'h':
usage(0);
break;
case 'f':
sheepfs_fg = 1;
break;
+ case 'k':
+ sheepfs_page_cache = 1;
+ break;
default:
usage(1);
}
diff --git a/sheepfs/node.c b/sheepfs/node.c
index fcc490f..328ee79 100644
--- a/sheepfs/node.c
+++ b/sheepfs/node.c
@@ -62,6 +62,7 @@ size_t node_info_get_size(const char *path)
len = shadow_file_write(path, buf->buf, buf->len);
strbuf_release(buf);
+ free(buf);
return len;
}
diff --git a/sheepfs/sheepfs.h b/sheepfs/sheepfs.h
index a744c07..b8eb6cd 100644
--- a/sheepfs/sheepfs.h
+++ b/sheepfs/sheepfs.h
@@ -15,6 +15,7 @@ enum sheepfs_opcode {
};
extern char sheepfs_shadow[];
+extern int sheepfs_page_cache;
extern struct strbuf *sheepfs_run_cmd(const char *command);
extern int sheepfs_set_op(const char *path, unsigned opcode);
diff --git a/sheepfs/vdi.c b/sheepfs/vdi.c
index bd4f875..57c04fa 100644
--- a/sheepfs/vdi.c
+++ b/sheepfs/vdi.c
@@ -67,6 +67,7 @@ size_t vdi_list_get_size(const char *path)
len = shadow_file_write(path, buf->buf, buf->len);
strbuf_release(buf);
+ free(buf);
return len;
}
diff --git a/sheepfs/volume.c b/sheepfs/volume.c
index f9b4dfc..eda5261 100644
--- a/sheepfs/volume.c
+++ b/sheepfs/volume.c
@@ -20,6 +20,8 @@
#include <time.h>
#include <assert.h>
#include <syslog.h>
+#include <urcu/uatomic.h>
+#include <pthread.h>
#include "sheep.h"
#include "strbuf.h"
@@ -38,13 +40,28 @@
#define VOLUME_READ 0
#define VOLUME_WRITE 1
+/* #define DEBUG */
+
struct vdi_inode {
struct rb_node rb;
uint32_t vid;
struct sheepdog_inode *inode;
+/* FIXME
+ * 1) Consider various VM request queue depth.
+ * 2) Most drive presents 31 to Linux, I set it as 31 to expect that VM's
+ * real queue depth never exceed 31
+ */
+#define SOCKET_POOL_SIZE 31
+/* Socket pool is used for FUSE read threads, which use threads
+ * to simulate aysnc read. All sockets point to the same gateway
+ */
+ int socket_pool[SOCKET_POOL_SIZE];
+ char socket_in_use[SOCKET_POOL_SIZE]; /* 1 means in use */
+ unsigned socket_poll_adder;
};
static struct rb_root vdi_inode_tree = RB_ROOT;
+static pthread_rwlock_t vdi_inode_tree_lock = PTHREAD_RWLOCK_INITIALIZER;
static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
{
@@ -52,6 +69,7 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
struct rb_node *parent = NULL;
struct vdi_inode *entry;
+ pthread_rwlock_wrlock(&vdi_inode_tree_lock);
while (*p) {
parent = *p;
entry = rb_entry(parent, struct vdi_inode, rb);
@@ -60,12 +78,15 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
p = &(*p)->rb_left;
else if (new->vid > entry->vid)
p = &(*p)->rb_right;
- else
+ else {
+ pthread_rwlock_unlock(&vdi_inode_tree_lock);
return entry; /* already has this entry */
+ }
}
rb_link_node(&new->rb, parent, p);
rb_insert_color(&new->rb, &vdi_inode_tree);
+ pthread_rwlock_unlock(&vdi_inode_tree_lock);
return NULL; /* insert successfully */
}
@@ -74,6 +95,7 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid)
struct rb_node *n = vdi_inode_tree.rb_node;
struct vdi_inode *t;
+ pthread_rwlock_rdlock(&vdi_inode_tree_lock);
while (n) {
t = rb_entry(n, struct vdi_inode, rb);
@@ -81,10 +103,13 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid)
n = n->rb_left;
else if (vid > t->vid)
n = n->rb_right;
- else
+ else {
+ pthread_rwlock_unlock(&vdi_inode_tree_lock);
return t; /* found it */
+ }
}
+ pthread_rwlock_unlock(&vdi_inode_tree_lock);
return NULL;
}
@@ -95,12 +120,34 @@ int create_volume_layout(void)
return 0;
}
+/* We must use get/put_socket_fd in pair */
+static inline int get_socket_fd(struct vdi_inode *vdi, int *idx)
+{
+ int sock_idx, fd;
+
+retry:
+ sock_idx = uatomic_add_return(&vdi->socket_poll_adder, 1) %
+ SOCKET_POOL_SIZE;
+ /* if socket_in_use[sock_idx] == 0, set it to 1, otherwise, retry */
+ if (uatomic_cmpxchg(&vdi->socket_in_use[sock_idx], 0, 1))
+ goto retry;
+ fd = vdi->socket_pool[sock_idx];
+ *idx = sock_idx;
+
+ return fd;
+}
+
+static inline void put_socket_fd(struct vdi_inode *vdi, int idx)
+{
+ uatomic_dec(&vdi->socket_in_use[idx]);
+}
+
static int volume_rw_object(char *buf, uint64_t oid, size_t size,
off_t off, int rw)
{
struct sd_obj_req hdr = { 0 };
struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
- int ret;
+ int ret, fd, sock_idx;
unsigned wlen = 0, rlen = 0;
int create = 0;
uint32_t vid = oid_to_vid(oid);
@@ -135,12 +182,15 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t size,
hdr.offset = off;
hdr.flags |= SD_FLAG_CMD_CACHE;
- ret = exec_req(0, (struct sd_req *)&hdr, buf, &wlen, &rlen);
+ fd = get_socket_fd(vdi, &sock_idx);
+ ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
+ put_socket_fd(vdi, sock_idx);
if (ret || rsp->result != SD_RES_SUCCESS) {
- syslog(LOG_ERR, "failed to %s object %" PRIx64 " ret %d, res %d\n",
- rw == VOLUME_READ ? "read" : "write", oid, ret,
- rsp->result);
+ syslog(LOG_ERR,
+ "[%s] failed to %s object %" PRIx64 " ret %d, res %u\n",
+ __func__, rw == VOLUME_READ ? "read" : "write",
+ oid, ret, rsp->result);
return -1;
}
@@ -179,10 +229,12 @@ static int volume_do_rw(const char *path, char *buf, size_t size,
len = size;
do {
+#ifdef DEBUG
syslog(LOG_INFO, "%s oid %"PRIx64", off %ju, len %zu,"
" size %zu\n",
rw == VOLUME_READ ? "read" : "write",
oid, start, len, size);
+#endif
ret = volume_rw_object(buf, oid, len, start, rw);
if (ret != len)
@@ -227,13 +279,16 @@ static int volume_do_sync(uint32_t vid)
{
struct sd_obj_req hdr = { 0 };
struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
- int ret;
+ int ret, fd, idx;
unsigned wlen = 0, rlen = 0;
+ struct vdi_inode *vdi = vdi_inode_tree_search(vid);
hdr.opcode = SD_OP_FLUSH_VDI;
hdr.oid = vid_to_vdi_oid(vid);
- ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
+ fd = get_socket_fd(vdi, &idx);
+ ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
+ put_socket_fd(vdi, idx);
if (ret || rsp->result != SD_RES_SUCCESS) {
syslog(LOG_ERR, "[%s] failed to flush vdi %"PRIx32"\n",
@@ -259,17 +314,49 @@ int volume_sync(const char *path)
int volume_open(const char *path, struct fuse_file_info *fi)
{
- //fi->direct_io = 1;
+ if (!sheepfs_page_cache)
+ fi->direct_io = 1;
+ return 0;
+}
+
+static void destroy_socket_pool(int array[], int len)
+{
+ int i;
+ for (i = 0; i < len; i++)
+ close(array[i]);
+}
+
+static int setup_socket_pool(int array[], int len)
+{
+ int fd, i, ret;
+
+ for (i = 0; i < len; i++) {
+ fd = connect_to("localhost", 7000);
+ if (fd < 0) {
+ syslog(LOG_ERR, "[%s] connect_to %m\n", __func__);
+ destroy_socket_pool(array, --i);
+ return -1;
+ }
+
+ ret = set_nodelay(fd);
+ if (ret) {
+ syslog(LOG_ERR, "[%s] %m\n", __func__);
+ destroy_socket_pool(array, i);
+ return -1;
+ }
+
+ array[i] = fd;
+ }
+
return 0;
}
static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
{
struct strbuf *buf;
- void *inode_buf;
- struct vdi_inode *inode;
+ void *inode_buf = NULL;
+ struct vdi_inode *inode = NULL;
char command[256] = { 0 };
- int ret = -1;
sprintf(command, "%s %s\n", "collie vdi list -r", entry);
buf = sheepfs_run_cmd(command);
@@ -278,32 +365,41 @@ static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
if (sscanf(buf->buf, "%*s %*s %*d %zu %*s %*s %*s %"PRIx32,
size, vid) < 2) {
syslog(LOG_ERR, "[%s] %m\n", __func__);
- goto out;
+ goto err;
}
inode_buf = malloc(SD_INODE_SIZE);
if (!inode_buf) {
syslog(LOG_ERR, "[%s] %m\n", __func__);
- goto out;
+ goto err;
}
+ inode = xzalloc(sizeof(*inode));
+ inode->vid = *vid;
+ if (setup_socket_pool(inode->socket_pool, SOCKET_POOL_SIZE) < 0) {
+ syslog(LOG_ERR, "[%s] failed to setup socket pool\n",
+ __func__);
+ goto err;
+ }
+ /* we need insert inode before calling volume_rw_object */
+ assert(!vdi_inode_tree_insert(inode));
if (volume_rw_object(inode_buf, vid_to_vdi_oid(*vid), SD_INODE_SIZE,
0, VOLUME_READ) < 0) {
- free(inode_buf);
- goto out;
+ rb_erase(&inode->rb, &vdi_inode_tree);
+ syslog(LOG_ERR, "[%s] failed to read inode for %"PRIx32"\n",
+ __func__, *vid);
+ goto err;
}
-
- inode = xzalloc(sizeof(*inode));
- inode->vid = *vid;
inode->inode = inode_buf;
- if (vdi_inode_tree_insert(inode)) {
- free(inode_buf);
- free(inode);
- }
- ret = 0;
-out:
strbuf_release(buf);
- return ret;
+ free(buf);
+ return 0;
+err:
+ free(inode_buf);
+ free(inode);
+ strbuf_release(buf);
+ free(buf);
+ return -1;
}
int volume_create_entry(const char *entry)
@@ -325,6 +421,7 @@ int volume_create_entry(const char *entry)
if (init_vdi_info(entry, &vid, &size) < 0)
return -1;
+
if (shadow_file_setxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0) {
shadow_file_delete(path);
return -1;
@@ -343,13 +440,16 @@ static int volume_sync_and_delete(uint32_t vid)
{
struct sd_obj_req hdr = { 0 };
struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
- int ret;
+ int ret, fd, idx;
unsigned wlen = 0, rlen = 0;
+ struct vdi_inode *vdi = vdi_inode_tree_search(vid);
hdr.opcode = SD_OP_FLUSH_DEL_CACHE;
hdr.oid = vid_to_vdi_oid(vid);
- ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
+ fd = get_socket_fd(vdi, &idx);
+ ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
+ put_socket_fd(vdi, idx);
if (ret || rsp->result != SD_RES_SUCCESS) {
syslog(LOG_ERR, "[%s] failed to flush vdi %" PRIx32 "\n",
@@ -364,6 +464,7 @@ int volume_remove_entry(const char *entry)
{
char path[PATH_MAX], *ch;
uint32_t vid;
+ struct vdi_inode *vdi;
ch = strchr(entry, '\n');
if (ch != NULL)
@@ -376,9 +477,20 @@ int volume_remove_entry(const char *entry)
if (shadow_file_getxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0)
return -1;
- if (volume_sync_and_delete(vid) < 0)
- return -1;
+ /* No need to check error code, for case of connected sheep crashed,
+ * we continue to do cleanup.
+ */
+ volume_sync_and_delete(vid);
+
+ vdi = vdi_inode_tree_search(vid);
+ destroy_socket_pool(vdi->socket_pool, SOCKET_POOL_SIZE);
+
+ pthread_rwlock_wrlock(&vdi_inode_tree_lock);
+ rb_erase(&vdi->rb, &vdi_inode_tree);
+ pthread_rwlock_unlock(&vdi_inode_tree_lock);
+ free(vdi->inode);
+ free(vdi);
shadow_file_delete(path);
return 0;
--
1.7.8.2
More information about the sheepdog
mailing list