[sheepdog] [PATCH v2 12/15] sheepfs: add a socket pool to speedup connection

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Sun May 20 12:14:47 CEST 2012


At Mon, 14 May 2012 17:47:37 +0800,
Liu Yuan wrote:
> 
> From: Liu Yuan <tailai.ly at taobao.com>
> 
> Socket pool is used for FUSE read threads, which use threads to simulate aysnc
> read. All sockets point to the same gateway.
> 
> - add a read/write lock to be thead safe.
> - add an option to dis/enable page cache for volumes.

These changes should be added in different patches.


> 
> Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
> ---
>  sheepfs/cluster.c |    1 +
>  sheepfs/core.c    |   12 ++--
>  sheepfs/node.c    |    1 +
>  sheepfs/sheepfs.h |    1 +
>  sheepfs/vdi.c     |    1 +
>  sheepfs/volume.c  |  174 +++++++++++++++++++++++++++++++++++++++++++----------
>  6 files changed, 154 insertions(+), 36 deletions(-)
> 
> diff --git a/sheepfs/cluster.c b/sheepfs/cluster.c
> index c4a481b..1f645ba 100644
> --- a/sheepfs/cluster.c
> +++ b/sheepfs/cluster.c
> @@ -55,5 +55,6 @@ size_t cluster_info_get_size(const char *path)
>  
>  	len = shadow_file_write(path, buf->buf, buf->len);
>  	strbuf_release(buf);
> +	free(buf);
>  	return len;
>  }
> diff --git a/sheepfs/core.c b/sheepfs/core.c
> index 869baac..394fb16 100644
> --- a/sheepfs/core.c
> +++ b/sheepfs/core.c
> @@ -31,16 +31,17 @@ char sheepfs_shadow[PATH_MAX];
>  
>  static int sheepfs_debug;
>  static int sheepfs_fg;
> +int sheepfs_page_cache = 0;
>  
>  static struct option const long_options[] = {
>  	{"debug", no_argument, NULL, 'd'},
> -	{"directio", no_argument, NULL, 'D'},

If we remove this option in this patch, shouldn't we add it in the
previous one?

Thanks,

Kazutaka

>  	{"help", no_argument, NULL, 'h'},
>  	{"foreground", no_argument, NULL, 'f'},
> +	{"pagecache", no_argument, NULL, 'k'},
>  	{NULL, 0, NULL, 0},
>  };
>  
> -static const char *short_options = "dDhf";
> +static const char *short_options = "dfhk";
>  
>  static struct sheepfs_file_operation {
>  	int (*read)(const char *path, char *buf, size_t size, off_t);
> @@ -254,8 +255,8 @@ static void usage(int inval)
>  Usage: sheepfs [OPTION]... MOUNTPOINT\n\
>  Options:\n\
>    -d, --debug             enable debug output (implies -f)\n\
> -  -D, --directio          use direct IO to access volume\n\
>    -f, --foreground        sheepfs run in the foreground\n\
> +  -k, --pagecache         use local kernel's page cache to access volume\n\
>    -h, --help              display this help and exit\n\
>  ");
>  	exit(inval);
> @@ -274,14 +275,15 @@ int main(int argc, char **argv)
>  			case 'd':
>  				sheepfs_debug = 1;
>  				break;
> -			case 'D':
> -				break;>  			case 'h':
>  				usage(0);
>  				break;
>  			case 'f':
>  				sheepfs_fg = 1;
>  				break;
> +			case 'k':
> +				sheepfs_page_cache = 1;
> +				break;
>  			default:
>  				usage(1);
>  		}
> diff --git a/sheepfs/node.c b/sheepfs/node.c
> index fcc490f..328ee79 100644
> --- a/sheepfs/node.c
> +++ b/sheepfs/node.c
> @@ -62,6 +62,7 @@ size_t node_info_get_size(const char *path)
>  
>  	len = shadow_file_write(path, buf->buf, buf->len);
>  	strbuf_release(buf);
> +	free(buf);
>  	return len;
>  }
>  
> diff --git a/sheepfs/sheepfs.h b/sheepfs/sheepfs.h
> index a744c07..b8eb6cd 100644
> --- a/sheepfs/sheepfs.h
> +++ b/sheepfs/sheepfs.h
> @@ -15,6 +15,7 @@ enum sheepfs_opcode {
>  };
>  
>  extern char sheepfs_shadow[];
> +extern int sheepfs_page_cache;
>  
>  extern struct strbuf *sheepfs_run_cmd(const char *command);
>  extern int sheepfs_set_op(const char *path, unsigned opcode);
> diff --git a/sheepfs/vdi.c b/sheepfs/vdi.c
> index bd4f875..57c04fa 100644
> --- a/sheepfs/vdi.c
> +++ b/sheepfs/vdi.c
> @@ -67,6 +67,7 @@ size_t vdi_list_get_size(const char *path)
>  
>  	len = shadow_file_write(path, buf->buf, buf->len);
>  	strbuf_release(buf);
> +	free(buf);
>  	return len;
>  }
>  
> diff --git a/sheepfs/volume.c b/sheepfs/volume.c
> index f9b4dfc..eda5261 100644
> --- a/sheepfs/volume.c
> +++ b/sheepfs/volume.c
> @@ -20,6 +20,8 @@
>  #include <time.h>
>  #include <assert.h>
>  #include <syslog.h>
> +#include <urcu/uatomic.h>
> +#include <pthread.h>
>  
>  #include "sheep.h"
>  #include "strbuf.h"
> @@ -38,13 +40,28 @@
>  #define VOLUME_READ   0
>  #define VOLUME_WRITE  1
>  
> +/* #define DEBUG */
> +
>  struct vdi_inode {
>  	struct rb_node rb;
>  	uint32_t vid;
>  	struct sheepdog_inode *inode;
> +/* FIXME
> + * 1) Consider various VM request queue depth.
> + * 2) Most drive presents 31 to Linux, I set it as 31 to expect that VM's
> + *    real queue depth never exceed 31
> + */
> +#define SOCKET_POOL_SIZE  31
> +/* Socket pool is used for FUSE read threads, which use threads
> + * to simulate aysnc read. All sockets point to the same gateway
> + */
> +	int socket_pool[SOCKET_POOL_SIZE];
> +	char socket_in_use[SOCKET_POOL_SIZE]; /* 1 means in use */
> +	unsigned socket_poll_adder;
>  };
>  
>  static struct rb_root vdi_inode_tree = RB_ROOT;
> +static pthread_rwlock_t vdi_inode_tree_lock = PTHREAD_RWLOCK_INITIALIZER;
>  
>  static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
>  {
> @@ -52,6 +69,7 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
>  	struct rb_node *parent = NULL;
>  	struct vdi_inode *entry;
>  
> +	pthread_rwlock_wrlock(&vdi_inode_tree_lock);
>  	while (*p) {
>  		parent = *p;
>  		entry = rb_entry(parent, struct vdi_inode, rb);
> @@ -60,12 +78,15 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
>  			p = &(*p)->rb_left;
>  		else if (new->vid > entry->vid)
>  			p = &(*p)->rb_right;
> -		else
> +		else {
> +			pthread_rwlock_unlock(&vdi_inode_tree_lock);
>  			return entry; /* already has this entry */
> +		}
>  	}
>  	rb_link_node(&new->rb, parent, p);
>  	rb_insert_color(&new->rb, &vdi_inode_tree);
>  
> +	pthread_rwlock_unlock(&vdi_inode_tree_lock);
>  	return NULL; /* insert successfully */
>  }
>  
> @@ -74,6 +95,7 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid)
>  	struct rb_node *n = vdi_inode_tree.rb_node;
>  	struct vdi_inode *t;
>  
> +	pthread_rwlock_rdlock(&vdi_inode_tree_lock);
>  	while (n) {
>  		t = rb_entry(n, struct vdi_inode, rb);
>  
> @@ -81,10 +103,13 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid)
>  			n = n->rb_left;
>  		else if (vid > t->vid)
>  			n = n->rb_right;
> -		else
> +		else {
> +			pthread_rwlock_unlock(&vdi_inode_tree_lock);
>  			return t; /* found it */
> +		}
>  	}
>  
> +	pthread_rwlock_unlock(&vdi_inode_tree_lock);
>  	return NULL;
>  }
>  
> @@ -95,12 +120,34 @@ int create_volume_layout(void)
>  	return 0;
>  }
>  
> +/* We must use get/put_socket_fd in pair */
> +static inline int get_socket_fd(struct vdi_inode *vdi, int *idx)
> +{
> +	int sock_idx, fd;
> +
> +retry:
> +	sock_idx = uatomic_add_return(&vdi->socket_poll_adder, 1) %
> +		   SOCKET_POOL_SIZE;
> +	/* if socket_in_use[sock_idx] == 0, set it to 1, otherwise, retry */
> +	if (uatomic_cmpxchg(&vdi->socket_in_use[sock_idx], 0, 1))
> +		goto retry;
> +	fd = vdi->socket_pool[sock_idx];
> +	*idx = sock_idx;
> +
> +	return fd;
> +}
> +
> +static inline void put_socket_fd(struct vdi_inode *vdi, int idx)
> +{
> +	uatomic_dec(&vdi->socket_in_use[idx]);
> +}
> +
>  static int volume_rw_object(char *buf, uint64_t oid, size_t size,
>  			     off_t off, int rw)
>  {
>  	struct sd_obj_req hdr = { 0 };
>  	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
> -	int ret;
> +	int ret, fd, sock_idx;
>  	unsigned wlen = 0, rlen = 0;
>  	int create = 0;
>  	uint32_t vid = oid_to_vid(oid);
> @@ -135,12 +182,15 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t size,
>  	hdr.offset = off;
>  	hdr.flags |= SD_FLAG_CMD_CACHE;
>  
> -	ret = exec_req(0, (struct sd_req *)&hdr, buf, &wlen, &rlen);
> +	fd = get_socket_fd(vdi, &sock_idx);
> +	ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
> +	put_socket_fd(vdi, sock_idx);
>  
>  	if (ret || rsp->result != SD_RES_SUCCESS) {
> -		syslog(LOG_ERR, "failed to %s object %" PRIx64 " ret %d, res %d\n",
> -			rw == VOLUME_READ ? "read" : "write", oid, ret,
> -			rsp->result);
> +		syslog(LOG_ERR,
> +			"[%s] failed to %s object %" PRIx64 " ret %d, res %u\n",
> +			__func__, rw == VOLUME_READ ? "read" : "write",
> +			oid, ret, rsp->result);
>  		return -1;
>  	}
>  
> @@ -179,10 +229,12 @@ static int volume_do_rw(const char *path, char *buf, size_t size,
>  		len = size;
>  
>  	do {
> +#ifdef DEBUG
>  		syslog(LOG_INFO, "%s oid %"PRIx64", off %ju, len %zu,"
>  			" size %zu\n",
>  			rw == VOLUME_READ ? "read" : "write",
>  			oid, start, len, size);
> +#endif
>  		ret = volume_rw_object(buf, oid, len, start, rw);
>  
>  		if (ret != len)
> @@ -227,13 +279,16 @@ static int volume_do_sync(uint32_t vid)
>  {
>  	struct sd_obj_req hdr = { 0 };
>  	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
> -	int ret;
> +	int ret, fd, idx;
>  	unsigned wlen = 0, rlen = 0;
> +	struct vdi_inode *vdi = vdi_inode_tree_search(vid);
>  
>  	hdr.opcode = SD_OP_FLUSH_VDI;
>  	hdr.oid = vid_to_vdi_oid(vid);
>  
> -	ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
> +	fd = get_socket_fd(vdi, &idx);
> +	ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
> +	put_socket_fd(vdi, idx);
>  
>  	if (ret || rsp->result != SD_RES_SUCCESS) {
>  		syslog(LOG_ERR, "[%s] failed to flush vdi %"PRIx32"\n",
> @@ -259,17 +314,49 @@ int volume_sync(const char *path)
>  
>  int volume_open(const char *path, struct fuse_file_info *fi)
>  {
> -	//fi->direct_io = 1;
> +	if (!sheepfs_page_cache)
> +		fi->direct_io = 1;
> +	return 0;
> +}
> +
> +static void destroy_socket_pool(int array[], int len)
> +{
> +	int i;
> +	for (i = 0; i < len; i++)
> +		close(array[i]);
> +}
> +
> +static int setup_socket_pool(int array[], int len)
> +{
> +	int fd, i, ret;
> +
> +	for (i = 0; i < len; i++) {
> +		fd = connect_to("localhost", 7000);
> +		if (fd < 0) {
> +			syslog(LOG_ERR, "[%s] connect_to %m\n", __func__);
> +			destroy_socket_pool(array, --i);
> +			return -1;
> +		}
> +
> +		ret = set_nodelay(fd);
> +		if (ret) {
> +			syslog(LOG_ERR, "[%s] %m\n", __func__);
> +			destroy_socket_pool(array, i);
> +			return -1;
> +		}
> +
> +		array[i] = fd;
> +	}
> +
>  	return 0;
>  }
>  
>  static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
>  {
>  	struct strbuf *buf;
> -	void *inode_buf;
> -	struct vdi_inode *inode;
> +	void *inode_buf = NULL;
> +	struct vdi_inode *inode = NULL;
>  	char command[256] = { 0 };
> -	int ret = -1;
>  
>  	sprintf(command, "%s %s\n", "collie vdi list -r", entry);
>  	buf = sheepfs_run_cmd(command);
> @@ -278,32 +365,41 @@ static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
>  	if (sscanf(buf->buf, "%*s %*s %*d %zu %*s %*s %*s %"PRIx32,
>  	    size, vid) < 2) {
>  		syslog(LOG_ERR, "[%s] %m\n", __func__);
> -		goto out;
> +		goto err;
>  	}
>  
>  	inode_buf = malloc(SD_INODE_SIZE);
>  	if (!inode_buf) {
>  		syslog(LOG_ERR, "[%s] %m\n", __func__);
> -		goto out;
> +		goto err;
>  	}
>  
> +	inode = xzalloc(sizeof(*inode));
> +	inode->vid = *vid;
> +	if (setup_socket_pool(inode->socket_pool, SOCKET_POOL_SIZE) < 0) {
> +		syslog(LOG_ERR, "[%s] failed to setup socket pool\n",
> +			__func__);
> +		goto err;
> +	}
> +	/* we need insert inode before calling volume_rw_object */
> +	assert(!vdi_inode_tree_insert(inode));
>  	if (volume_rw_object(inode_buf, vid_to_vdi_oid(*vid), SD_INODE_SIZE,
>  			     0, VOLUME_READ) < 0) {
> -		free(inode_buf);
> -		goto out;
> +		rb_erase(&inode->rb, &vdi_inode_tree);
> +		syslog(LOG_ERR, "[%s] failed to read inode for %"PRIx32"\n",
> +			__func__, *vid);
> +		goto err;
>  	}
> -
> -	inode = xzalloc(sizeof(*inode));
> -	inode->vid = *vid;
>  	inode->inode = inode_buf;
> -	if (vdi_inode_tree_insert(inode)) {
> -		free(inode_buf);
> -		free(inode);
> -	}
> -	ret = 0;
> -out:
>  	strbuf_release(buf);
> -	return ret;
> +	free(buf);
> +	return 0;
> +err:
> +	free(inode_buf);
> +	free(inode);
> +	strbuf_release(buf);
> +	free(buf);
> +	return -1;
>  }
>  
>  int volume_create_entry(const char *entry)
> @@ -325,6 +421,7 @@ int volume_create_entry(const char *entry)
>  
>  	if (init_vdi_info(entry, &vid, &size) < 0)
>  		return -1;
> +
>  	if (shadow_file_setxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0) {
>  		shadow_file_delete(path);
>  		return -1;
> @@ -343,13 +440,16 @@ static int volume_sync_and_delete(uint32_t vid)
>  {
>  	struct sd_obj_req hdr = { 0 };
>  	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
> -	int ret;
> +	int ret, fd, idx;
>  	unsigned wlen = 0, rlen = 0;
> +	struct vdi_inode *vdi = vdi_inode_tree_search(vid);
>  
>  	hdr.opcode = SD_OP_FLUSH_DEL_CACHE;
>  	hdr.oid = vid_to_vdi_oid(vid);
>  
> -	ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
> +	fd = get_socket_fd(vdi, &idx);
> +	ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
> +	put_socket_fd(vdi, idx);
>  
>  	if (ret || rsp->result != SD_RES_SUCCESS) {
>  		syslog(LOG_ERR, "[%s] failed to flush vdi %" PRIx32 "\n",
> @@ -364,6 +464,7 @@ int volume_remove_entry(const char *entry)
>  {
>  	char path[PATH_MAX], *ch;
>  	uint32_t vid;
> +	struct vdi_inode *vdi;
>  
>  	ch = strchr(entry, '\n');
>  	if (ch != NULL)
> @@ -376,9 +477,20 @@ int volume_remove_entry(const char *entry)
>  	if (shadow_file_getxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0)
>  		return -1;
>  
> -	if (volume_sync_and_delete(vid) < 0)
> -		return -1;
> +	/* No need to check error code, for case of connected sheep crashed,
> +	 * we continue to do cleanup.
> +	 */
> +	volume_sync_and_delete(vid);
> +
> +	vdi = vdi_inode_tree_search(vid);
> +	destroy_socket_pool(vdi->socket_pool, SOCKET_POOL_SIZE);
> +
> +	pthread_rwlock_wrlock(&vdi_inode_tree_lock);
> +	rb_erase(&vdi->rb, &vdi_inode_tree);
> +	pthread_rwlock_unlock(&vdi_inode_tree_lock);
>  
> +	free(vdi->inode);
> +	free(vdi);
>  	shadow_file_delete(path);
>  
>  	return 0;
> -- 
> 1.7.8.2
> 



More information about the sheepdog mailing list