[sheepdog] [PATCH v3 12/15] sheepfs: add a socket pool to speedup connection

Liu Yuan namei.unix at gmail.com
Mon May 21 17:25:56 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

Socket pool is used for FUSE read threads, which use threads to simulate aysnc
read. All sockets point to the same gateway.

- add a read/write lock to be thead safe.
- add an option to dis/enable page cache for volumes.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheepfs/cluster.c |    1 +
 sheepfs/core.c    |    8 ++-
 sheepfs/node.c    |    1 +
 sheepfs/sheepfs.h |    1 +
 sheepfs/vdi.c     |    1 +
 sheepfs/volume.c  |  174 ++++++++++++++++++++++++++++++++++++++++++++---------
 6 files changed, 158 insertions(+), 28 deletions(-)

diff --git a/sheepfs/cluster.c b/sheepfs/cluster.c
index c4a481b..1f645ba 100644
--- a/sheepfs/cluster.c
+++ b/sheepfs/cluster.c
@@ -55,5 +55,6 @@ size_t cluster_info_get_size(const char *path)
 
 	len = shadow_file_write(path, buf->buf, buf->len);
 	strbuf_release(buf);
+	free(buf);
 	return len;
 }
diff --git a/sheepfs/core.c b/sheepfs/core.c
index a24d0c2..3a56333 100644
--- a/sheepfs/core.c
+++ b/sheepfs/core.c
@@ -31,15 +31,17 @@ char sheepfs_shadow[PATH_MAX];
 
 static int sheepfs_debug;
 static int sheepfs_fg;
+int sheepfs_page_cache = 0;
 
 static struct option const long_options[] = {
 	{"debug", no_argument, NULL, 'd'},
 	{"help", no_argument, NULL, 'h'},
 	{"foreground", no_argument, NULL, 'f'},
+	{"pagecache", no_argument, NULL, 'k'},
 	{NULL, 0, NULL, 0},
 };
 
-static const char *short_options = "dhf";
+static const char *short_options = "dfhk";
 
 static struct sheepfs_file_operation {
 	int (*read)(const char *path, char *buf, size_t size, off_t);
@@ -254,6 +256,7 @@ Usage: sheepfs [OPTION]... MOUNTPOINT\n\
 Options:\n\
   -d, --debug             enable debug output (implies -f)\n\
   -f, --foreground        sheepfs run in the foreground\n\
+  -k, --pagecache         use local kernel's page cache to access volume\n\
   -h, --help              display this help and exit\n\
 ");
 	exit(inval);
@@ -278,6 +281,9 @@ int main(int argc, char **argv)
 		case 'f':
 			sheepfs_fg = 1;
 			break;
+		case 'k':
+			sheepfs_page_cache = 1;
+			break;
 		default:
 			usage(1);
 		}
diff --git a/sheepfs/node.c b/sheepfs/node.c
index fcc490f..328ee79 100644
--- a/sheepfs/node.c
+++ b/sheepfs/node.c
@@ -62,6 +62,7 @@ size_t node_info_get_size(const char *path)
 
 	len = shadow_file_write(path, buf->buf, buf->len);
 	strbuf_release(buf);
+	free(buf);
 	return len;
 }
 
diff --git a/sheepfs/sheepfs.h b/sheepfs/sheepfs.h
index a744c07..b8eb6cd 100644
--- a/sheepfs/sheepfs.h
+++ b/sheepfs/sheepfs.h
@@ -15,6 +15,7 @@ enum sheepfs_opcode {
 };
 
 extern char sheepfs_shadow[];
+extern int sheepfs_page_cache;
 
 extern struct strbuf *sheepfs_run_cmd(const char *command);
 extern int sheepfs_set_op(const char *path, unsigned opcode);
diff --git a/sheepfs/vdi.c b/sheepfs/vdi.c
index e7888da..ccb241c 100644
--- a/sheepfs/vdi.c
+++ b/sheepfs/vdi.c
@@ -67,6 +67,7 @@ size_t vdi_list_get_size(const char *path)
 
 	len = shadow_file_write(path, buf->buf, buf->len);
 	strbuf_release(buf);
+	free(buf);
 	return len;
 }
 
diff --git a/sheepfs/volume.c b/sheepfs/volume.c
index e51b1f7..8bce6e5 100644
--- a/sheepfs/volume.c
+++ b/sheepfs/volume.c
@@ -20,6 +20,8 @@
 #include <time.h>
 #include <assert.h>
 #include <syslog.h>
+#include <urcu/uatomic.h>
+#include <pthread.h>
 
 #include "sheep.h"
 #include "strbuf.h"
@@ -38,13 +40,28 @@
 #define VOLUME_READ   0
 #define VOLUME_WRITE  1
 
+/* #define DEBUG */
+
 struct vdi_inode {
 	struct rb_node rb;
 	uint32_t vid;
 	struct sheepdog_inode *inode;
+/* FIXME
+ * 1) Consider various VM request queue depth.
+ * 2) Most drive presents 31 to Linux, I set it as 31 to expect that VM's
+ *    real queue depth never exceed 31
+ */
+#define SOCKET_POOL_SIZE  31
+/* Socket pool is used for FUSE read threads, which use threads
+ * to simulate aysnc read. All sockets point to the same gateway
+ */
+	int socket_pool[SOCKET_POOL_SIZE];
+	char socket_in_use[SOCKET_POOL_SIZE]; /* 1 means in use */
+	unsigned socket_poll_adder;
 };
 
 static struct rb_root vdi_inode_tree = RB_ROOT;
+static pthread_rwlock_t vdi_inode_tree_lock = PTHREAD_RWLOCK_INITIALIZER;
 
 static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new)
 {
@@ -95,18 +112,44 @@ int create_volume_layout(void)
 	return 0;
 }
 
+/* We must use get/put_socket_fd in pair */
+static inline int get_socket_fd(struct vdi_inode *vdi, int *idx)
+{
+	int sock_idx, fd;
+
+retry:
+	sock_idx = uatomic_add_return(&vdi->socket_poll_adder, 1) %
+		   SOCKET_POOL_SIZE;
+	/* if socket_in_use[sock_idx] == 0, set it to 1, otherwise, retry */
+	if (uatomic_cmpxchg(&vdi->socket_in_use[sock_idx], 0, 1))
+		goto retry;
+	fd = vdi->socket_pool[sock_idx];
+	*idx = sock_idx;
+
+	return fd;
+}
+
+static inline void put_socket_fd(struct vdi_inode *vdi, int idx)
+{
+	uatomic_dec(&vdi->socket_in_use[idx]);
+}
+
 static int volume_rw_object(char *buf, uint64_t oid, size_t size,
 			    off_t off, int rw)
 {
 	struct sd_req hdr = { 0 };
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
-	int ret;
+	int ret, fd, sock_idx;
 	unsigned wlen = 0, rlen = 0;
 	int create = 0;
 	uint32_t vid = oid_to_vid(oid);
-	struct vdi_inode *vdi = vdi_inode_tree_search(vid);
+	struct vdi_inode *vdi;
 	unsigned long idx = 0;
 
+	pthread_rwlock_rdlock(&vdi_inode_tree_lock);
+	vdi = vdi_inode_tree_search(vid);
+	pthread_rwlock_unlock(&vdi_inode_tree_lock);
+
 	if (is_data_obj(oid)) {
 		if (off % SECTOR_SIZE || size % SECTOR_SIZE) {
 			syslog(LOG_ERR, "offset or size not aligned\n");
@@ -140,12 +183,15 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t size,
 	hdr.data_length = size;
 	hdr.flags |= SD_FLAG_CMD_CACHE;
 
-	ret = exec_req(0, &hdr, buf, &wlen, &rlen);
+	fd = get_socket_fd(vdi, &sock_idx);
+	ret = exec_req(fd, &hdr, buf, &wlen, &rlen);
+	put_socket_fd(vdi, sock_idx);
 
 	if (ret || rsp->result != SD_RES_SUCCESS) {
-		syslog(LOG_ERR, "failed to %s object %" PRIx64 " ret %d, res %d\n",
-			rw == VOLUME_READ ? "read" : "write", oid, ret,
-			rsp->result);
+		syslog(LOG_ERR,
+			"[%s] failed to %s object %" PRIx64 " ret %d, res %u\n",
+			__func__, rw == VOLUME_READ ? "read" : "write",
+			oid, ret, rsp->result);
 		return -1;
 	}
 
@@ -184,10 +230,12 @@ static int volume_do_rw(const char *path, char *buf, size_t size,
 		len = size;
 
 	do {
+#ifdef DEBUG
 		syslog(LOG_INFO, "%s oid %"PRIx64", off %ju, len %zu,"
 			" size %zu\n",
 			rw == VOLUME_READ ? "read" : "write",
 			oid, start, len, size);
+#endif
 		ret = volume_rw_object(buf, oid, len, start, rw);
 
 		if (ret != len)
@@ -232,13 +280,20 @@ static int volume_do_sync(uint32_t vid)
 {
 	struct sd_req hdr = { 0 };
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
-	int ret;
+	int ret, fd, idx;
 	unsigned wlen = 0, rlen = 0;
+	struct vdi_inode *vdi;
+
+	pthread_rwlock_rdlock(&vdi_inode_tree_lock);
+	vdi = vdi_inode_tree_search(vid);
+	pthread_rwlock_unlock(&vdi_inode_tree_lock);
 
 	hdr.opcode = SD_OP_FLUSH_VDI;
 	hdr.obj.oid = vid_to_vdi_oid(vid);
 
-	ret = exec_req(0, &hdr, NULL, &wlen, &rlen);
+	fd = get_socket_fd(vdi, &idx);
+	ret = exec_req(fd, &hdr, NULL, &wlen, &rlen);
+	put_socket_fd(vdi, idx);
 
 	if (ret || rsp->result != SD_RES_SUCCESS) {
 		syslog(LOG_ERR, "[%s] failed to flush vdi %"PRIx32"\n",
@@ -264,16 +319,49 @@ int volume_sync(const char *path)
 
 int volume_open(const char *path, struct fuse_file_info *fi)
 {
+	if (!sheepfs_page_cache)
+		fi->direct_io = 1;
+	return 0;
+}
+
+static void destroy_socket_pool(int array[], int len)
+{
+	int i;
+	for (i = 0; i < len; i++)
+		close(array[i]);
+}
+
+static int setup_socket_pool(int array[], int len)
+{
+	int fd, i, ret;
+
+	for (i = 0; i < len; i++) {
+		fd = connect_to("localhost", 7000);
+		if (fd < 0) {
+			syslog(LOG_ERR, "[%s] connect_to %m\n", __func__);
+			destroy_socket_pool(array, --i);
+			return -1;
+		}
+
+		ret = set_nodelay(fd);
+		if (ret) {
+			syslog(LOG_ERR, "[%s] %m\n", __func__);
+			destroy_socket_pool(array, i);
+			return -1;
+		}
+
+		array[i] = fd;
+	}
+
 	return 0;
 }
 
 static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
 {
 	struct strbuf *buf;
-	void *inode_buf;
-	struct vdi_inode *inode;
+	void *inode_buf = NULL;
+	struct vdi_inode *inode = NULL, *dummy;
 	char command[256] = { 0 };
-	int ret = -1;
 
 	sprintf(command, "%s %s\n", "collie vdi list -r", entry);
 	buf = sheepfs_run_cmd(command);
@@ -282,32 +370,45 @@ static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size)
 	if (sscanf(buf->buf, "%*s %*s %*d %zu %*s %*s %*s %"PRIx32,
 	    size, vid) < 2) {
 		syslog(LOG_ERR, "[%s] failed to sscanf %s\n", __func__, entry);
-		goto out;
+		goto err;
 	}
 
 	inode_buf = malloc(SD_INODE_SIZE);
 	if (!inode_buf) {
 		syslog(LOG_ERR, "[%s] %m\n", __func__);
-		goto out;
+		goto err;
 	}
 
+	inode = xzalloc(sizeof(*inode));
+	inode->vid = *vid;
+	if (setup_socket_pool(inode->socket_pool, SOCKET_POOL_SIZE) < 0) {
+		syslog(LOG_ERR, "[%s] failed to setup socket pool\n",
+			__func__);
+		goto err;
+	}
+	/* we need insert inode before calling volume_rw_object */
+	pthread_rwlock_wrlock(&vdi_inode_tree_lock);
+	dummy = vdi_inode_tree_insert(inode);
+	pthread_rwlock_unlock(&vdi_inode_tree_lock);
+	if (dummy)
+		goto err;
 	if (volume_rw_object(inode_buf, vid_to_vdi_oid(*vid), SD_INODE_SIZE,
 			     0, VOLUME_READ) < 0) {
-		free(inode_buf);
-		goto out;
+		rb_erase(&inode->rb, &vdi_inode_tree);
+		syslog(LOG_ERR, "[%s] failed to read inode for %"PRIx32"\n",
+			__func__, *vid);
+		goto err;
 	}
-
-	inode = xzalloc(sizeof(*inode));
-	inode->vid = *vid;
 	inode->inode = inode_buf;
-	if (vdi_inode_tree_insert(inode)) {
-		free(inode_buf);
-		free(inode);
-	}
-	ret = 0;
-out:
 	strbuf_release(buf);
-	return ret;
+	free(buf);
+	return 0;
+err:
+	free(inode_buf);
+	free(inode);
+	strbuf_release(buf);
+	free(buf);
+	return -1;
 }
 
 int volume_create_entry(const char *entry)
@@ -348,13 +449,20 @@ static int volume_sync_and_delete(uint32_t vid)
 {
 	struct sd_req hdr = { 0 };
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
-	int ret;
+	int ret, fd, idx;
 	unsigned wlen = 0, rlen = 0;
+	struct vdi_inode *vdi;
+
+	pthread_rwlock_rdlock(&vdi_inode_tree_lock);
+	vdi = vdi_inode_tree_search(vid);
+	pthread_rwlock_unlock(&vdi_inode_tree_lock);
 
 	hdr.opcode = SD_OP_FLUSH_DEL_CACHE;
 	hdr.obj.oid = vid_to_vdi_oid(vid);
 
-	ret = exec_req(0, &hdr, NULL, &wlen, &rlen);
+	fd = get_socket_fd(vdi, &idx);
+	ret = exec_req(fd, &hdr, NULL, &wlen, &rlen);
+	put_socket_fd(vdi, idx);
 
 	if (ret || rsp->result != SD_RES_SUCCESS) {
 		syslog(LOG_ERR, "[%s] failed to flush vdi %" PRIx32 "\n",
@@ -369,6 +477,7 @@ int volume_remove_entry(const char *entry)
 {
 	char path[PATH_MAX], *ch;
 	uint32_t vid;
+	struct vdi_inode *vdi;
 
 	ch = strchr(entry, '\n');
 	if (ch != NULL)
@@ -384,6 +493,17 @@ int volume_remove_entry(const char *entry)
 	if (volume_sync_and_delete(vid) < 0)
 		return -1;
 
+	pthread_rwlock_rdlock(&vdi_inode_tree_lock);
+	vdi = vdi_inode_tree_search(vid);
+	pthread_rwlock_unlock(&vdi_inode_tree_lock);
+	destroy_socket_pool(vdi->socket_pool, SOCKET_POOL_SIZE);
+
+	pthread_rwlock_wrlock(&vdi_inode_tree_lock);
+	rb_erase(&vdi->rb, &vdi_inode_tree);
+	pthread_rwlock_unlock(&vdi_inode_tree_lock);
+
+	free(vdi->inode);
+	free(vdi);
 	shadow_file_delete(path);
 
 	return 0;
-- 
1.7.10.2




More information about the sheepdog mailing list