[sheepdog] [PATCH v2 12/15] sheepfs: add a socket pool to speedup connection
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Sun May 20 12:14:47 CEST 2012
At Mon, 14 May 2012 17:47:37 +0800,
Liu Yuan wrote:
>
> From: Liu Yuan <tailai.ly at taobao.com>
>
> Socket pool is used for FUSE read threads, which use threads to simulate aysnc
> read. All sockets point to the same gateway.
>
> - add a read/write lock to be thead safe.
> - add an option to dis/enable page cache for volumes.
These changes should be added in different patches.
>
> Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
> ---
> sheepfs/cluster.c | 1 +
> sheepfs/core.c | 12 ++--
> sheepfs/node.c | 1 +
> sheepfs/sheepfs.h | 1 +
> sheepfs/vdi.c | 1 +
> sheepfs/volume.c | 174 +++++++++++++++++++++++++++++++++++++++++++----------
> 6 files changed, 154 insertions(+), 36 deletions(-)
>
> diff --git a/sheepfs/cluster.c b/sheepfs/cluster.c
> index c4a481b..1f645ba 100644
> --- a/sheepfs/cluster.c
> +++ b/sheepfs/cluster.c
> @@ -55,5 +55,6 @@ size_t cluster_info_get_size(const char *path)
>
> len = shadow_file_write(path, buf->buf, buf->len);
> strbuf_release(buf);
> + free(buf);
> return len;
> }
> diff --git a/sheepfs/core.c b/sheepfs/core.c
> index 869baac..394fb16 100644
> --- a/sheepfs/core.c
> +++ b/sheepfs/core.c
> @@ -31,16 +31,17 @@ char sheepfs_shadow[PATH_MAX];
>
> static int sheepfs_debug;
> static int sheepfs_fg;
> +int sheepfs_page_cache = 0;
>
> static struct option const long_options[] = {
> {"debug", no_argument, NULL, 'd'},
> - {"directio", no_argument, NULL, 'D'},
If we remove this option in this patch, shouldn't we add it in the
previous one?
Thanks,
Kazutaka
> {"help", no_argument, NULL, 'h'},
> {"foreground", no_argument, NULL, 'f'},
> + {"pagecache", no_argument, NULL, 'k'},
> {NULL, 0, NULL, 0},
> };
>
> -static const char *short_options = "dDhf";
> +static const char *short_options = "dfhk";
>
> static struct sheepfs_file_operation {
> int (*read)(const char *path, char *buf, size_t size, off_t);
> @@ -254,8 +255,8 @@ static void usage(int inval)
> Usage: sheepfs [OPTION]... MOUNTPOINT\n\
> Options:\n\
> -d, --debug enable debug output (implies -f)\n\
> - -D, --directio use direct IO to access volume\n\
> -f, --foreground sheepfs run in the foreground\n\
> + -k, --pagecache use local kernel's page cache to access volume\n\
> -h, --help display this help and exit\n\
> ");
> exit(inval);
> @@ -274,14 +275,15 @@ int main(int argc, char **argv)
> case 'd':
> sheepfs_debug = 1;
> break;
> - case 'D':
> - break;> case 'h':
> usage(0);
> break;
> case 'f':
> sheepfs_fg = 1;
> break;
> + case 'k':
> + sheepfs_page_cache = 1;
> + break;
> default:
> usage(1);
> }
> diff --git a/sheepfs/node.c b/sheepfs/node.c
> index fcc490f..328ee79 100644
> --- a/sheepfs/node.c
> +++ b/sheepfs/node.c
> @@ -62,6 +62,7 @@ size_t node_info_get_size(const char *path)
>
> len = shadow_file_write(path, buf->buf, buf->len);
> strbuf_release(buf);
> + free(buf);
> return len;
> }
>
> diff --git a/sheepfs/sheepfs.h b/sheepfs/sheepfs.h
> index a744c07..b8eb6cd 100644
> --- a/sheepfs/sheepfs.h
> +++ b/sheepfs/sheepfs.h
> @@ -15,6 +15,7 @@ enum sheepfs_opcode {
> };
>
> extern char sheepfs_shadow[];
> +extern int sheepfs_page_cache;
>
> extern struct strbuf *sheepfs_run_cmd(const char *command);
> extern int sheepfs_set_op(const char *path, unsigned opcode);
> diff --git a/sheepfs/vdi.c b/sheepfs/vdi.c
> index bd4f875..57c04fa 100644
> --- a/sheepfs/vdi.c
> +++ b/sheepfs/vdi.c
> @@ -67,6 +67,7 @@ size_t vdi_list_get_size(const char *path)
>
> len = shadow_file_write(path, buf->buf, buf->len);
> strbuf_release(buf);
> + free(buf);
> return len;
> }
>
> diff --git a/sheepfs/volume.c b/sheepfs/volume.c
> index f9b4dfc..eda5261 100644
> --- a/sheepfs/volume.c
> +++ b/sheepfs/volume.c
> @@ -20,6 +20,8 @@
> #include <time.h>
> #include <assert.h>
> #include <syslog.h>
> +#include <urcu/uatomic.h>
> +#include <pthread.h>
>
> #include "sheep.h"
> #include "strbuf.h"
> @@ -38,13 +40,28 @@
> #define VOLUME_READ 0
> #define VOLUME_WRITE 1
>
> +/* #define DEBUG */
> +
> struct vdi_inode {
> struct rb_node rb;
> uint32_t vid;
> struct sheepdog_inode *inode;
> +/* FIXME
> + * 1) Consider various VM request queue depth.
> + * 2) Most drive presents 31 to Linux, I set it as 31 to expect that VM's
> + * real queue depth never exceed 31
> + */
> +#define SOCKET_POOL_SIZE 31
> +/* Socket pool is used for FUSE read threads, which use threads
> + * to simulate aysnc read. All sockets point to the same gateway
> + */
> + int socket_pool[SOCKET_POOL_SIZE];
> + char socket_in_use[SOCKET_POOL_SIZE]; /* 1 means in use */
> + unsigned socket_poll_adder;
> };
>
> static struct rb_root vdi_inode_tree = RB_ROOT;
> +static pthread_rwlock_t vdi_inode_tree_lock = PTHREAD_RWLOCK_INITIALIZER;
>
> static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
> {
> @@ -52,6 +69,7 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
> struct rb_node *parent = NULL;
> struct vdi_inode *entry;
>
> + pthread_rwlock_wrlock(&vdi_inode_tree_lock);
> while (*p) {
> parent = *p;
> entry = rb_entry(parent, struct vdi_inode, rb);
> @@ -60,12 +78,15 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
> p = &(*p)->rb_left;
> else if (new->vid > entry->vid)
> p = &(*p)->rb_right;
> - else
> + else {
> + pthread_rwlock_unlock(&vdi_inode_tree_lock);
> return entry; /* already has this entry */
> + }
> }
> rb_link_node(&new->rb, parent, p);
> rb_insert_color(&new->rb, &vdi_inode_tree);
>
> + pthread_rwlock_unlock(&vdi_inode_tree_lock);
> return NULL; /* insert successfully */
> }
>
> @@ -74,6 +95,7 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid)
> struct rb_node *n = vdi_inode_tree.rb_node;
> struct vdi_inode *t;
>
> + pthread_rwlock_rdlock(&vdi_inode_tree_lock);
> while (n) {
> t = rb_entry(n, struct vdi_inode, rb);
>
> @@ -81,10 +103,13 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid)
> n = n->rb_left;
> else if (vid > t->vid)
> n = n->rb_right;
> - else
> + else {
> + pthread_rwlock_unlock(&vdi_inode_tree_lock);
> return t; /* found it */
> + }
> }
>
> + pthread_rwlock_unlock(&vdi_inode_tree_lock);
> return NULL;
> }
>
> @@ -95,12 +120,34 @@ int create_volume_layout(void)
> return 0;
> }
>
> +/* We must use get/put_socket_fd in pair */
> +static inline int get_socket_fd(struct vdi_inode *vdi, int *idx)
> +{
> + int sock_idx, fd;
> +
> +retry:
> + sock_idx = uatomic_add_return(&vdi->socket_poll_adder, 1) %
> + SOCKET_POOL_SIZE;
> + /* if socket_in_use[sock_idx] == 0, set it to 1, otherwise, retry */
> + if (uatomic_cmpxchg(&vdi->socket_in_use[sock_idx], 0, 1))
> + goto retry;
> + fd = vdi->socket_pool[sock_idx];
> + *idx = sock_idx;
> +
> + return fd;
> +}
> +
> +static inline void put_socket_fd(struct vdi_inode *vdi, int idx)
> +{
> + uatomic_dec(&vdi->socket_in_use[idx]);
> +}
> +
> static int volume_rw_object(char *buf, uint64_t oid, size_t size,
> off_t off, int rw)
> {
> struct sd_obj_req hdr = { 0 };
> struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
> - int ret;
> + int ret, fd, sock_idx;
> unsigned wlen = 0, rlen = 0;
> int create = 0;
> uint32_t vid = oid_to_vid(oid);
> @@ -135,12 +182,15 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t size,
> hdr.offset = off;
> hdr.flags |= SD_FLAG_CMD_CACHE;
>
> - ret = exec_req(0, (struct sd_req *)&hdr, buf, &wlen, &rlen);
> + fd = get_socket_fd(vdi, &sock_idx);
> + ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
> + put_socket_fd(vdi, sock_idx);
>
> if (ret || rsp->result != SD_RES_SUCCESS) {
> - syslog(LOG_ERR, "failed to %s object %" PRIx64 " ret %d, res %d\n",
> - rw == VOLUME_READ ? "read" : "write", oid, ret,
> - rsp->result);
> + syslog(LOG_ERR,
> + "[%s] failed to %s object %" PRIx64 " ret %d, res %u\n",
> + __func__, rw == VOLUME_READ ? "read" : "write",
> + oid, ret, rsp->result);
> return -1;
> }
>
> @@ -179,10 +229,12 @@ static int volume_do_rw(const char *path, char *buf, size_t size,
> len = size;
>
> do {
> +#ifdef DEBUG
> syslog(LOG_INFO, "%s oid %"PRIx64", off %ju, len %zu,"
> " size %zu\n",
> rw == VOLUME_READ ? "read" : "write",
> oid, start, len, size);
> +#endif
> ret = volume_rw_object(buf, oid, len, start, rw);
>
> if (ret != len)
> @@ -227,13 +279,16 @@ static int volume_do_sync(uint32_t vid)
> {
> struct sd_obj_req hdr = { 0 };
> struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
> - int ret;
> + int ret, fd, idx;
> unsigned wlen = 0, rlen = 0;
> + struct vdi_inode *vdi = vdi_inode_tree_search(vid);
>
> hdr.opcode = SD_OP_FLUSH_VDI;
> hdr.oid = vid_to_vdi_oid(vid);
>
> - ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
> + fd = get_socket_fd(vdi, &idx);
> + ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
> + put_socket_fd(vdi, idx);
>
> if (ret || rsp->result != SD_RES_SUCCESS) {
> syslog(LOG_ERR, "[%s] failed to flush vdi %"PRIx32"\n",
> @@ -259,17 +314,49 @@ int volume_sync(const char *path)
>
> int volume_open(const char *path, struct fuse_file_info *fi)
> {
> - //fi->direct_io = 1;
> + if (!sheepfs_page_cache)
> + fi->direct_io = 1;
> + return 0;
> +}
> +
> +static void destroy_socket_pool(int array[], int len)
> +{
> + int i;
> + for (i = 0; i < len; i++)
> + close(array[i]);
> +}
> +
> +static int setup_socket_pool(int array[], int len)
> +{
> + int fd, i, ret;
> +
> + for (i = 0; i < len; i++) {
> + fd = connect_to("localhost", 7000);
> + if (fd < 0) {
> + syslog(LOG_ERR, "[%s] connect_to %m\n", __func__);
> + destroy_socket_pool(array, --i);
> + return -1;
> + }
> +
> + ret = set_nodelay(fd);
> + if (ret) {
> + syslog(LOG_ERR, "[%s] %m\n", __func__);
> + destroy_socket_pool(array, i);
> + return -1;
> + }
> +
> + array[i] = fd;
> + }
> +
> return 0;
> }
>
> static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
> {
> struct strbuf *buf;
> - void *inode_buf;
> - struct vdi_inode *inode;
> + void *inode_buf = NULL;
> + struct vdi_inode *inode = NULL;
> char command[256] = { 0 };
> - int ret = -1;
>
> sprintf(command, "%s %s\n", "collie vdi list -r", entry);
> buf = sheepfs_run_cmd(command);
> @@ -278,32 +365,41 @@ static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
> if (sscanf(buf->buf, "%*s %*s %*d %zu %*s %*s %*s %"PRIx32,
> size, vid) < 2) {
> syslog(LOG_ERR, "[%s] %m\n", __func__);
> - goto out;
> + goto err;
> }
>
> inode_buf = malloc(SD_INODE_SIZE);
> if (!inode_buf) {
> syslog(LOG_ERR, "[%s] %m\n", __func__);
> - goto out;
> + goto err;
> }
>
> + inode = xzalloc(sizeof(*inode));
> + inode->vid = *vid;
> + if (setup_socket_pool(inode->socket_pool, SOCKET_POOL_SIZE) < 0) {
> + syslog(LOG_ERR, "[%s] failed to setup socket pool\n",
> + __func__);
> + goto err;
> + }
> + /* we need insert inode before calling volume_rw_object */
> + assert(!vdi_inode_tree_insert(inode));
> if (volume_rw_object(inode_buf, vid_to_vdi_oid(*vid), SD_INODE_SIZE,
> 0, VOLUME_READ) < 0) {
> - free(inode_buf);
> - goto out;
> + rb_erase(&inode->rb, &vdi_inode_tree);
> + syslog(LOG_ERR, "[%s] failed to read inode for %"PRIx32"\n",
> + __func__, *vid);
> + goto err;
> }
> -
> - inode = xzalloc(sizeof(*inode));
> - inode->vid = *vid;
> inode->inode = inode_buf;
> - if (vdi_inode_tree_insert(inode)) {
> - free(inode_buf);
> - free(inode);
> - }
> - ret = 0;
> -out:
> strbuf_release(buf);
> - return ret;
> + free(buf);
> + return 0;
> +err:
> + free(inode_buf);
> + free(inode);
> + strbuf_release(buf);
> + free(buf);
> + return -1;
> }
>
> int volume_create_entry(const char *entry)
> @@ -325,6 +421,7 @@ int volume_create_entry(const char *entry)
>
> if (init_vdi_info(entry, &vid, &size) < 0)
> return -1;
> +
> if (shadow_file_setxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0) {
> shadow_file_delete(path);
> return -1;
> @@ -343,13 +440,16 @@ static int volume_sync_and_delete(uint32_t vid)
> {
> struct sd_obj_req hdr = { 0 };
> struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
> - int ret;
> + int ret, fd, idx;
> unsigned wlen = 0, rlen = 0;
> + struct vdi_inode *vdi = vdi_inode_tree_search(vid);
>
> hdr.opcode = SD_OP_FLUSH_DEL_CACHE;
> hdr.oid = vid_to_vdi_oid(vid);
>
> - ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
> + fd = get_socket_fd(vdi, &idx);
> + ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
> + put_socket_fd(vdi, idx);
>
> if (ret || rsp->result != SD_RES_SUCCESS) {
> syslog(LOG_ERR, "[%s] failed to flush vdi %" PRIx32 "\n",
> @@ -364,6 +464,7 @@ int volume_remove_entry(const char *entry)
> {
> char path[PATH_MAX], *ch;
> uint32_t vid;
> + struct vdi_inode *vdi;
>
> ch = strchr(entry, '\n');
> if (ch != NULL)
> @@ -376,9 +477,20 @@ int volume_remove_entry(const char *entry)
> if (shadow_file_getxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0)
> return -1;
>
> - if (volume_sync_and_delete(vid) < 0)
> - return -1;
> + /* No need to check error code, for case of connected sheep crashed,
> + * we continue to do cleanup.
> + */
> + volume_sync_and_delete(vid);
> +
> + vdi = vdi_inode_tree_search(vid);
> + destroy_socket_pool(vdi->socket_pool, SOCKET_POOL_SIZE);
> +
> + pthread_rwlock_wrlock(&vdi_inode_tree_lock);
> + rb_erase(&vdi->rb, &vdi_inode_tree);
> + pthread_rwlock_unlock(&vdi_inode_tree_lock);
>
> + free(vdi->inode);
> + free(vdi);
> shadow_file_delete(path);
>
> return 0;
> --
> 1.7.8.2
>
More information about the sheepdog
mailing list