From: Liu Yuan <tailai.ly at taobao.com> Socket pool is used for FUSE read threads, which use threads to simulate aysnc read. All sockets point to the same gateway. - add a read/write lock to be thead safe. - add an option to dis/enable page cache for volumes. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheepfs/cluster.c | 1 + sheepfs/core.c | 8 ++- sheepfs/node.c | 1 + sheepfs/sheepfs.h | 1 + sheepfs/vdi.c | 1 + sheepfs/volume.c | 174 ++++++++++++++++++++++++++++++++++++++++++++--------- 6 files changed, 158 insertions(+), 28 deletions(-) diff --git a/sheepfs/cluster.c b/sheepfs/cluster.c index c4a481b..1f645ba 100644 --- a/sheepfs/cluster.c +++ b/sheepfs/cluster.c @@ -55,5 +55,6 @@ size_t cluster_info_get_size(const char *path) len = shadow_file_write(path, buf->buf, buf->len); strbuf_release(buf); + free(buf); return len; } diff --git a/sheepfs/core.c b/sheepfs/core.c index a24d0c2..3a56333 100644 --- a/sheepfs/core.c +++ b/sheepfs/core.c @@ -31,15 +31,17 @@ char sheepfs_shadow[PATH_MAX]; static int sheepfs_debug; static int sheepfs_fg; +int sheepfs_page_cache = 0; static struct option const long_options[] = { {"debug", no_argument, NULL, 'd'}, {"help", no_argument, NULL, 'h'}, {"foreground", no_argument, NULL, 'f'}, + {"pagecache", no_argument, NULL, 'k'}, {NULL, 0, NULL, 0}, }; -static const char *short_options = "dhf"; +static const char *short_options = "dfhk"; static struct sheepfs_file_operation { int (*read)(const char *path, char *buf, size_t size, off_t); @@ -254,6 +256,7 @@ Usage: sheepfs [OPTION]... MOUNTPOINT\n\ Options:\n\ -d, --debug enable debug output (implies -f)\n\ -f, --foreground sheepfs run in the foreground\n\ + -k, --pagecache use local kernel's page cache to access volume\n\ -h, --help display this help and exit\n\ "); exit(inval); @@ -278,6 +281,9 @@ int main(int argc, char **argv) case 'f': sheepfs_fg = 1; break; + case 'k': + sheepfs_page_cache = 1; + break; default: usage(1); } diff --git a/sheepfs/node.c b/sheepfs/node.c index fcc490f..328ee79 100644 --- a/sheepfs/node.c +++ b/sheepfs/node.c @@ -62,6 +62,7 @@ size_t node_info_get_size(const char *path) len = shadow_file_write(path, buf->buf, buf->len); strbuf_release(buf); + free(buf); return len; } diff --git a/sheepfs/sheepfs.h b/sheepfs/sheepfs.h index a744c07..b8eb6cd 100644 --- a/sheepfs/sheepfs.h +++ b/sheepfs/sheepfs.h @@ -15,6 +15,7 @@ enum sheepfs_opcode { }; extern char sheepfs_shadow[]; +extern int sheepfs_page_cache; extern struct strbuf *sheepfs_run_cmd(const char *command); extern int sheepfs_set_op(const char *path, unsigned opcode); diff --git a/sheepfs/vdi.c b/sheepfs/vdi.c index e7888da..ccb241c 100644 --- a/sheepfs/vdi.c +++ b/sheepfs/vdi.c @@ -67,6 +67,7 @@ size_t vdi_list_get_size(const char *path) len = shadow_file_write(path, buf->buf, buf->len); strbuf_release(buf); + free(buf); return len; } diff --git a/sheepfs/volume.c b/sheepfs/volume.c index e51b1f7..8bce6e5 100644 --- a/sheepfs/volume.c +++ b/sheepfs/volume.c @@ -20,6 +20,8 @@ #include <time.h> #include <assert.h> #include <syslog.h> +#include <urcu/uatomic.h> +#include <pthread.h> #include "sheep.h" #include "strbuf.h" @@ -38,13 +40,28 @@ #define VOLUME_READ 0 #define VOLUME_WRITE 1 +/* #define DEBUG */ + struct vdi_inode { struct rb_node rb; uint32_t vid; struct sheepdog_inode *inode; +/* FIXME + * 1) Consider various VM request queue depth. + * 2) Most drive presents 31 to Linux, I set it as 31 to expect that VM's + * real queue depth never exceed 31 + */ +#define SOCKET_POOL_SIZE 31 +/* Socket pool is used for FUSE read threads, which use threads + * to simulate aysnc read. All sockets point to the same gateway + */ + int socket_pool[SOCKET_POOL_SIZE]; + char socket_in_use[SOCKET_POOL_SIZE]; /* 1 means in use */ + unsigned socket_poll_adder; }; static struct rb_root vdi_inode_tree = RB_ROOT; +static pthread_rwlock_t vdi_inode_tree_lock = PTHREAD_RWLOCK_INITIALIZER; static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new) { @@ -95,18 +112,44 @@ int create_volume_layout(void) return 0; } +/* We must use get/put_socket_fd in pair */ +static inline int get_socket_fd(struct vdi_inode *vdi, int *idx) +{ + int sock_idx, fd; + +retry: + sock_idx = uatomic_add_return(&vdi->socket_poll_adder, 1) % + SOCKET_POOL_SIZE; + /* if socket_in_use[sock_idx] == 0, set it to 1, otherwise, retry */ + if (uatomic_cmpxchg(&vdi->socket_in_use[sock_idx], 0, 1)) + goto retry; + fd = vdi->socket_pool[sock_idx]; + *idx = sock_idx; + + return fd; +} + +static inline void put_socket_fd(struct vdi_inode *vdi, int idx) +{ + uatomic_dec(&vdi->socket_in_use[idx]); +} + static int volume_rw_object(char *buf, uint64_t oid, size_t size, off_t off, int rw) { struct sd_req hdr = { 0 }; struct sd_rsp *rsp = (struct sd_rsp *)&hdr; - int ret; + int ret, fd, sock_idx; unsigned wlen = 0, rlen = 0; int create = 0; uint32_t vid = oid_to_vid(oid); - struct vdi_inode *vdi = vdi_inode_tree_search(vid); + struct vdi_inode *vdi; unsigned long idx = 0; + pthread_rwlock_rdlock(&vdi_inode_tree_lock); + vdi = vdi_inode_tree_search(vid); + pthread_rwlock_unlock(&vdi_inode_tree_lock); + if (is_data_obj(oid)) { if (off % SECTOR_SIZE || size % SECTOR_SIZE) { syslog(LOG_ERR, "offset or size not aligned\n"); @@ -140,12 +183,15 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t size, hdr.data_length = size; hdr.flags |= SD_FLAG_CMD_CACHE; - ret = exec_req(0, &hdr, buf, &wlen, &rlen); + fd = get_socket_fd(vdi, &sock_idx); + ret = exec_req(fd, &hdr, buf, &wlen, &rlen); + put_socket_fd(vdi, sock_idx); if (ret || rsp->result != SD_RES_SUCCESS) { - syslog(LOG_ERR, "failed to %s object %" PRIx64 " ret %d, res %d\n", - rw == VOLUME_READ ? "read" : "write", oid, ret, - rsp->result); + syslog(LOG_ERR, + "[%s] failed to %s object %" PRIx64 " ret %d, res %u\n", + __func__, rw == VOLUME_READ ? "read" : "write", + oid, ret, rsp->result); return -1; } @@ -184,10 +230,12 @@ static int volume_do_rw(const char *path, char *buf, size_t size, len = size; do { +#ifdef DEBUG syslog(LOG_INFO, "%s oid %"PRIx64", off %ju, len %zu," " size %zu\n", rw == VOLUME_READ ? "read" : "write", oid, start, len, size); +#endif ret = volume_rw_object(buf, oid, len, start, rw); if (ret != len) @@ -232,13 +280,20 @@ static int volume_do_sync(uint32_t vid) { struct sd_req hdr = { 0 }; struct sd_rsp *rsp = (struct sd_rsp *)&hdr; - int ret; + int ret, fd, idx; unsigned wlen = 0, rlen = 0; + struct vdi_inode *vdi; + + pthread_rwlock_rdlock(&vdi_inode_tree_lock); + vdi = vdi_inode_tree_search(vid); + pthread_rwlock_unlock(&vdi_inode_tree_lock); hdr.opcode = SD_OP_FLUSH_VDI; hdr.obj.oid = vid_to_vdi_oid(vid); - ret = exec_req(0, &hdr, NULL, &wlen, &rlen); + fd = get_socket_fd(vdi, &idx); + ret = exec_req(fd, &hdr, NULL, &wlen, &rlen); + put_socket_fd(vdi, idx); if (ret || rsp->result != SD_RES_SUCCESS) { syslog(LOG_ERR, "[%s] failed to flush vdi %"PRIx32"\n", @@ -264,16 +319,49 @@ int volume_sync(const char *path) int volume_open(const char *path, struct fuse_file_info *fi) { + if (!sheepfs_page_cache) + fi->direct_io = 1; + return 0; +} + +static void destroy_socket_pool(int array[], int len) +{ + int i; + for (i = 0; i < len; i++) + close(array[i]); +} + +static int setup_socket_pool(int array[], int len) +{ + int fd, i, ret; + + for (i = 0; i < len; i++) { + fd = connect_to("localhost", 7000); + if (fd < 0) { + syslog(LOG_ERR, "[%s] connect_to %m\n", __func__); + destroy_socket_pool(array, --i); + return -1; + } + + ret = set_nodelay(fd); + if (ret) { + syslog(LOG_ERR, "[%s] %m\n", __func__); + destroy_socket_pool(array, i); + return -1; + } + + array[i] = fd; + } + return 0; } static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size) { struct strbuf *buf; - void *inode_buf; - struct vdi_inode *inode; + void *inode_buf = NULL; + struct vdi_inode *inode = NULL, *dummy; char command[256] = { 0 }; - int ret = -1; sprintf(command, "%s %s\n", "collie vdi list -r", entry); buf = sheepfs_run_cmd(command); @@ -282,32 +370,45 @@ static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size) if (sscanf(buf->buf, "%*s %*s %*d %zu %*s %*s %*s %"PRIx32, size, vid) < 2) { syslog(LOG_ERR, "[%s] failed to sscanf %s\n", __func__, entry); - goto out; + goto err; } inode_buf = malloc(SD_INODE_SIZE); if (!inode_buf) { syslog(LOG_ERR, "[%s] %m\n", __func__); - goto out; + goto err; } + inode = xzalloc(sizeof(*inode)); + inode->vid = *vid; + if (setup_socket_pool(inode->socket_pool, SOCKET_POOL_SIZE) < 0) { + syslog(LOG_ERR, "[%s] failed to setup socket pool\n", + __func__); + goto err; + } + /* we need insert inode before calling volume_rw_object */ + pthread_rwlock_wrlock(&vdi_inode_tree_lock); + dummy = vdi_inode_tree_insert(inode); + pthread_rwlock_unlock(&vdi_inode_tree_lock); + if (dummy) + goto err; if (volume_rw_object(inode_buf, vid_to_vdi_oid(*vid), SD_INODE_SIZE, 0, VOLUME_READ) < 0) { - free(inode_buf); - goto out; + rb_erase(&inode->rb, &vdi_inode_tree); + syslog(LOG_ERR, "[%s] failed to read inode for %"PRIx32"\n", + __func__, *vid); + goto err; } - - inode = xzalloc(sizeof(*inode)); - inode->vid = *vid; inode->inode = inode_buf; - if (vdi_inode_tree_insert(inode)) { - free(inode_buf); - free(inode); - } - ret = 0; -out: strbuf_release(buf); - return ret; + free(buf); + return 0; +err: + free(inode_buf); + free(inode); + strbuf_release(buf); + free(buf); + return -1; } int volume_create_entry(const char *entry) @@ -348,13 +449,20 @@ static int volume_sync_and_delete(uint32_t vid) { struct sd_req hdr = { 0 }; struct sd_rsp *rsp = (struct sd_rsp *)&hdr; - int ret; + int ret, fd, idx; unsigned wlen = 0, rlen = 0; + struct vdi_inode *vdi; + + pthread_rwlock_rdlock(&vdi_inode_tree_lock); + vdi = vdi_inode_tree_search(vid); + pthread_rwlock_unlock(&vdi_inode_tree_lock); hdr.opcode = SD_OP_FLUSH_DEL_CACHE; hdr.obj.oid = vid_to_vdi_oid(vid); - ret = exec_req(0, &hdr, NULL, &wlen, &rlen); + fd = get_socket_fd(vdi, &idx); + ret = exec_req(fd, &hdr, NULL, &wlen, &rlen); + put_socket_fd(vdi, idx); if (ret || rsp->result != SD_RES_SUCCESS) { syslog(LOG_ERR, "[%s] failed to flush vdi %" PRIx32 "\n", @@ -369,6 +477,7 @@ int volume_remove_entry(const char *entry) { char path[PATH_MAX], *ch; uint32_t vid; + struct vdi_inode *vdi; ch = strchr(entry, '\n'); if (ch != NULL) @@ -384,6 +493,17 @@ int volume_remove_entry(const char *entry) if (volume_sync_and_delete(vid) < 0) return -1; + pthread_rwlock_rdlock(&vdi_inode_tree_lock); + vdi = vdi_inode_tree_search(vid); + pthread_rwlock_unlock(&vdi_inode_tree_lock); + destroy_socket_pool(vdi->socket_pool, SOCKET_POOL_SIZE); + + pthread_rwlock_wrlock(&vdi_inode_tree_lock); + rb_erase(&vdi->rb, &vdi_inode_tree); + pthread_rwlock_unlock(&vdi_inode_tree_lock); + + free(vdi->inode); + free(vdi); shadow_file_delete(path); return 0; -- 1.7.10.2 |