[Sheepdog] [PATCH v2 12/15] sheepfs: add a socket pool to speedup connection

Liu Yuan namei.unix at gmail.com
Mon May 14 11:47:37 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

Socket pool is used for FUSE read threads, which use threads to simulate aysnc
read. All sockets point to the same gateway.

- add a read/write lock to be thead safe.
- add an option to dis/enable page cache for volumes.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheepfs/cluster.c |    1 +
 sheepfs/core.c    |   12 ++--
 sheepfs/node.c    |    1 +
 sheepfs/sheepfs.h |    1 +
 sheepfs/vdi.c     |    1 +
 sheepfs/volume.c  |  174 +++++++++++++++++++++++++++++++++++++++++++----------
 6 files changed, 154 insertions(+), 36 deletions(-)

diff --git a/sheepfs/cluster.c b/sheepfs/cluster.c
index c4a481b..1f645ba 100644
--- a/sheepfs/cluster.c
+++ b/sheepfs/cluster.c
@@ -55,5 +55,6 @@ size_t cluster_info_get_size(const char *path)
 
 	len = shadow_file_write(path, buf->buf, buf->len);
 	strbuf_release(buf);
+	free(buf);
 	return len;
 }
diff --git a/sheepfs/core.c b/sheepfs/core.c
index 869baac..394fb16 100644
--- a/sheepfs/core.c
+++ b/sheepfs/core.c
@@ -31,16 +31,17 @@ char sheepfs_shadow[PATH_MAX];
 
 static int sheepfs_debug;
 static int sheepfs_fg;
+int sheepfs_page_cache = 0;
 
 static struct option const long_options[] = {
 	{"debug", no_argument, NULL, 'd'},
-	{"directio", no_argument, NULL, 'D'},
 	{"help", no_argument, NULL, 'h'},
 	{"foreground", no_argument, NULL, 'f'},
+	{"pagecache", no_argument, NULL, 'k'},
 	{NULL, 0, NULL, 0},
 };
 
-static const char *short_options = "dDhf";
+static const char *short_options = "dfhk";
 
 static struct sheepfs_file_operation {
 	int (*read)(const char *path, char *buf, size_t size, off_t);
@@ -254,8 +255,8 @@ static void usage(int inval)
 Usage: sheepfs [OPTION]... MOUNTPOINT\n\
 Options:\n\
   -d, --debug             enable debug output (implies -f)\n\
-  -D, --directio          use direct IO to access volume\n\
   -f, --foreground        sheepfs run in the foreground\n\
+  -k, --pagecache         use local kernel's page cache to access volume\n\
   -h, --help              display this help and exit\n\
 ");
 	exit(inval);
@@ -274,14 +275,15 @@ int main(int argc, char **argv)
 			case 'd':
 				sheepfs_debug = 1;
 				break;
-			case 'D':
-				break;
 			case 'h':
 				usage(0);
 				break;
 			case 'f':
 				sheepfs_fg = 1;
 				break;
+			case 'k':
+				sheepfs_page_cache = 1;
+				break;
 			default:
 				usage(1);
 		}
diff --git a/sheepfs/node.c b/sheepfs/node.c
index fcc490f..328ee79 100644
--- a/sheepfs/node.c
+++ b/sheepfs/node.c
@@ -62,6 +62,7 @@ size_t node_info_get_size(const char *path)
 
 	len = shadow_file_write(path, buf->buf, buf->len);
 	strbuf_release(buf);
+	free(buf);
 	return len;
 }
 
diff --git a/sheepfs/sheepfs.h b/sheepfs/sheepfs.h
index a744c07..b8eb6cd 100644
--- a/sheepfs/sheepfs.h
+++ b/sheepfs/sheepfs.h
@@ -15,6 +15,7 @@ enum sheepfs_opcode {
 };
 
 extern char sheepfs_shadow[];
+extern int sheepfs_page_cache;
 
 extern struct strbuf *sheepfs_run_cmd(const char *command);
 extern int sheepfs_set_op(const char *path, unsigned opcode);
diff --git a/sheepfs/vdi.c b/sheepfs/vdi.c
index bd4f875..57c04fa 100644
--- a/sheepfs/vdi.c
+++ b/sheepfs/vdi.c
@@ -67,6 +67,7 @@ size_t vdi_list_get_size(const char *path)
 
 	len = shadow_file_write(path, buf->buf, buf->len);
 	strbuf_release(buf);
+	free(buf);
 	return len;
 }
 
diff --git a/sheepfs/volume.c b/sheepfs/volume.c
index f9b4dfc..eda5261 100644
--- a/sheepfs/volume.c
+++ b/sheepfs/volume.c
@@ -20,6 +20,8 @@
 #include <time.h>
 #include <assert.h>
 #include <syslog.h>
+#include <urcu/uatomic.h>
+#include <pthread.h>
 
 #include "sheep.h"
 #include "strbuf.h"
@@ -38,13 +40,28 @@
 #define VOLUME_READ   0
 #define VOLUME_WRITE  1
 
+/* #define DEBUG */
+
 struct vdi_inode {
 	struct rb_node rb;
 	uint32_t vid;
 	struct sheepdog_inode *inode;
+/* FIXME
+ * 1) Consider various VM request queue depth.
+ * 2) Most drive presents 31 to Linux, I set it as 31 to expect that VM's
+ *    real queue depth never exceed 31
+ */
+#define SOCKET_POOL_SIZE  31
+/* Socket pool is used for FUSE read threads, which use threads
+ * to simulate aysnc read. All sockets point to the same gateway
+ */
+	int socket_pool[SOCKET_POOL_SIZE];
+	char socket_in_use[SOCKET_POOL_SIZE]; /* 1 means in use */
+	unsigned socket_poll_adder;
 };
 
 static struct rb_root vdi_inode_tree = RB_ROOT;
+static pthread_rwlock_t vdi_inode_tree_lock = PTHREAD_RWLOCK_INITIALIZER;
 
 static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
 {
@@ -52,6 +69,7 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
 	struct rb_node *parent = NULL;
 	struct vdi_inode *entry;
 
+	pthread_rwlock_wrlock(&vdi_inode_tree_lock);
 	while (*p) {
 		parent = *p;
 		entry = rb_entry(parent, struct vdi_inode, rb);
@@ -60,12 +78,15 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
 			p = &(*p)->rb_left;
 		else if (new->vid > entry->vid)
 			p = &(*p)->rb_right;
-		else
+		else {
+			pthread_rwlock_unlock(&vdi_inode_tree_lock);
 			return entry; /* already has this entry */
+		}
 	}
 	rb_link_node(&new->rb, parent, p);
 	rb_insert_color(&new->rb, &vdi_inode_tree);
 
+	pthread_rwlock_unlock(&vdi_inode_tree_lock);
 	return NULL; /* insert successfully */
 }
 
@@ -74,6 +95,7 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid)
 	struct rb_node *n = vdi_inode_tree.rb_node;
 	struct vdi_inode *t;
 
+	pthread_rwlock_rdlock(&vdi_inode_tree_lock);
 	while (n) {
 		t = rb_entry(n, struct vdi_inode, rb);
 
@@ -81,10 +103,13 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid)
 			n = n->rb_left;
 		else if (vid > t->vid)
 			n = n->rb_right;
-		else
+		else {
+			pthread_rwlock_unlock(&vdi_inode_tree_lock);
 			return t; /* found it */
+		}
 	}
 
+	pthread_rwlock_unlock(&vdi_inode_tree_lock);
 	return NULL;
 }
 
@@ -95,12 +120,34 @@ int create_volume_layout(void)
 	return 0;
 }
 
+/* We must use get/put_socket_fd in pair */
+static inline int get_socket_fd(struct vdi_inode *vdi, int *idx)
+{
+	int sock_idx, fd;
+
+retry:
+	sock_idx = uatomic_add_return(&vdi->socket_poll_adder, 1) %
+		   SOCKET_POOL_SIZE;
+	/* if socket_in_use[sock_idx] == 0, set it to 1, otherwise, retry */
+	if (uatomic_cmpxchg(&vdi->socket_in_use[sock_idx], 0, 1))
+		goto retry;
+	fd = vdi->socket_pool[sock_idx];
+	*idx = sock_idx;
+
+	return fd;
+}
+
+static inline void put_socket_fd(struct vdi_inode *vdi, int idx)
+{
+	uatomic_dec(&vdi->socket_in_use[idx]);
+}
+
 static int volume_rw_object(char *buf, uint64_t oid, size_t size,
 			     off_t off, int rw)
 {
 	struct sd_obj_req hdr = { 0 };
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
-	int ret;
+	int ret, fd, sock_idx;
 	unsigned wlen = 0, rlen = 0;
 	int create = 0;
 	uint32_t vid = oid_to_vid(oid);
@@ -135,12 +182,15 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t size,
 	hdr.offset = off;
 	hdr.flags |= SD_FLAG_CMD_CACHE;
 
-	ret = exec_req(0, (struct sd_req *)&hdr, buf, &wlen, &rlen);
+	fd = get_socket_fd(vdi, &sock_idx);
+	ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
+	put_socket_fd(vdi, sock_idx);
 
 	if (ret || rsp->result != SD_RES_SUCCESS) {
-		syslog(LOG_ERR, "failed to %s object %" PRIx64 " ret %d, res %d\n",
-			rw == VOLUME_READ ? "read" : "write", oid, ret,
-			rsp->result);
+		syslog(LOG_ERR,
+			"[%s] failed to %s object %" PRIx64 " ret %d, res %u\n",
+			__func__, rw == VOLUME_READ ? "read" : "write",
+			oid, ret, rsp->result);
 		return -1;
 	}
 
@@ -179,10 +229,12 @@ static int volume_do_rw(const char *path, char *buf, size_t size,
 		len = size;
 
 	do {
+#ifdef DEBUG
 		syslog(LOG_INFO, "%s oid %"PRIx64", off %ju, len %zu,"
 			" size %zu\n",
 			rw == VOLUME_READ ? "read" : "write",
 			oid, start, len, size);
+#endif
 		ret = volume_rw_object(buf, oid, len, start, rw);
 
 		if (ret != len)
@@ -227,13 +279,16 @@ static int volume_do_sync(uint32_t vid)
 {
 	struct sd_obj_req hdr = { 0 };
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
-	int ret;
+	int ret, fd, idx;
 	unsigned wlen = 0, rlen = 0;
+	struct vdi_inode *vdi = vdi_inode_tree_search(vid);
 
 	hdr.opcode = SD_OP_FLUSH_VDI;
 	hdr.oid = vid_to_vdi_oid(vid);
 
-	ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
+	fd = get_socket_fd(vdi, &idx);
+	ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
+	put_socket_fd(vdi, idx);
 
 	if (ret || rsp->result != SD_RES_SUCCESS) {
 		syslog(LOG_ERR, "[%s] failed to flush vdi %"PRIx32"\n",
@@ -259,17 +314,49 @@ int volume_sync(const char *path)
 
 int volume_open(const char *path, struct fuse_file_info *fi)
 {
-	//fi->direct_io = 1;
+	if (!sheepfs_page_cache)
+		fi->direct_io = 1;
+	return 0;
+}
+
+static void destroy_socket_pool(int array[], int len)
+{
+	int i;
+	for (i = 0; i < len; i++)
+		close(array[i]);
+}
+
+static int setup_socket_pool(int array[], int len)
+{
+	int fd, i, ret;
+
+	for (i = 0; i < len; i++) {
+		fd = connect_to("localhost", 7000);
+		if (fd < 0) {
+			syslog(LOG_ERR, "[%s] connect_to %m\n", __func__);
+			destroy_socket_pool(array, --i);
+			return -1;
+		}
+
+		ret = set_nodelay(fd);
+		if (ret) {
+			syslog(LOG_ERR, "[%s] %m\n", __func__);
+			destroy_socket_pool(array, i);
+			return -1;
+		}
+
+		array[i] = fd;
+	}
+
 	return 0;
 }
 
 static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
 {
 	struct strbuf *buf;
-	void *inode_buf;
-	struct vdi_inode *inode;
+	void *inode_buf = NULL;
+	struct vdi_inode *inode = NULL;
 	char command[256] = { 0 };
-	int ret = -1;
 
 	sprintf(command, "%s %s\n", "collie vdi list -r", entry);
 	buf = sheepfs_run_cmd(command);
@@ -278,32 +365,41 @@ static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
 	if (sscanf(buf->buf, "%*s %*s %*d %zu %*s %*s %*s %"PRIx32,
 	    size, vid) < 2) {
 		syslog(LOG_ERR, "[%s] %m\n", __func__);
-		goto out;
+		goto err;
 	}
 
 	inode_buf = malloc(SD_INODE_SIZE);
 	if (!inode_buf) {
 		syslog(LOG_ERR, "[%s] %m\n", __func__);
-		goto out;
+		goto err;
 	}
 
+	inode = xzalloc(sizeof(*inode));
+	inode->vid = *vid;
+	if (setup_socket_pool(inode->socket_pool, SOCKET_POOL_SIZE) < 0) {
+		syslog(LOG_ERR, "[%s] failed to setup socket pool\n",
+			__func__);
+		goto err;
+	}
+	/* we need insert inode before calling volume_rw_object */
+	assert(!vdi_inode_tree_insert(inode));
 	if (volume_rw_object(inode_buf, vid_to_vdi_oid(*vid), SD_INODE_SIZE,
 			     0, VOLUME_READ) < 0) {
-		free(inode_buf);
-		goto out;
+		rb_erase(&inode->rb, &vdi_inode_tree);
+		syslog(LOG_ERR, "[%s] failed to read inode for %"PRIx32"\n",
+			__func__, *vid);
+		goto err;
 	}
-
-	inode = xzalloc(sizeof(*inode));
-	inode->vid = *vid;
 	inode->inode = inode_buf;
-	if (vdi_inode_tree_insert(inode)) {
-		free(inode_buf);
-		free(inode);
-	}
-	ret = 0;
-out:
 	strbuf_release(buf);
-	return ret;
+	free(buf);
+	return 0;
+err:
+	free(inode_buf);
+	free(inode);
+	strbuf_release(buf);
+	free(buf);
+	return -1;
 }
 
 int volume_create_entry(const char *entry)
@@ -325,6 +421,7 @@ int volume_create_entry(const char *entry)
 
 	if (init_vdi_info(entry, &vid, &size) < 0)
 		return -1;
+
 	if (shadow_file_setxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0) {
 		shadow_file_delete(path);
 		return -1;
@@ -343,13 +440,16 @@ static int volume_sync_and_delete(uint32_t vid)
 {
 	struct sd_obj_req hdr = { 0 };
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
-	int ret;
+	int ret, fd, idx;
 	unsigned wlen = 0, rlen = 0;
+	struct vdi_inode *vdi = vdi_inode_tree_search(vid);
 
 	hdr.opcode = SD_OP_FLUSH_DEL_CACHE;
 	hdr.oid = vid_to_vdi_oid(vid);
 
-	ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
+	fd = get_socket_fd(vdi, &idx);
+	ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen);
+	put_socket_fd(vdi, idx);
 
 	if (ret || rsp->result != SD_RES_SUCCESS) {
 		syslog(LOG_ERR, "[%s] failed to flush vdi %" PRIx32 "\n",
@@ -364,6 +464,7 @@ int volume_remove_entry(const char *entry)
 {
 	char path[PATH_MAX], *ch;
 	uint32_t vid;
+	struct vdi_inode *vdi;
 
 	ch = strchr(entry, '\n');
 	if (ch != NULL)
@@ -376,9 +477,20 @@ int volume_remove_entry(const char *entry)
 	if (shadow_file_getxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0)
 		return -1;
 
-	if (volume_sync_and_delete(vid) < 0)
-		return -1;
+	/* No need to check error code, for case of connected sheep crashed,
+	 * we continue to do cleanup.
+	 */
+	volume_sync_and_delete(vid);
+
+	vdi = vdi_inode_tree_search(vid);
+	destroy_socket_pool(vdi->socket_pool, SOCKET_POOL_SIZE);
+
+	pthread_rwlock_wrlock(&vdi_inode_tree_lock);
+	rb_erase(&vdi->rb, &vdi_inode_tree);
+	pthread_rwlock_unlock(&vdi_inode_tree_lock);
 
+	free(vdi->inode);
+	free(vdi);
 	shadow_file_delete(path);
 
 	return 0;
-- 
1.7.8.2




More information about the sheepdog mailing list