At Mon, 14 May 2012 17:47:37 +0800, Liu Yuan wrote: > > From: Liu Yuan <tailai.ly at taobao.com> > > Socket pool is used for FUSE read threads, which use threads to simulate aysnc > read. All sockets point to the same gateway. > > - add a read/write lock to be thead safe. > - add an option to dis/enable page cache for volumes. These changes should be added in different patches. > > Signed-off-by: Liu Yuan <tailai.ly at taobao.com> > --- > sheepfs/cluster.c | 1 + > sheepfs/core.c | 12 ++-- > sheepfs/node.c | 1 + > sheepfs/sheepfs.h | 1 + > sheepfs/vdi.c | 1 + > sheepfs/volume.c | 174 +++++++++++++++++++++++++++++++++++++++++++---------- > 6 files changed, 154 insertions(+), 36 deletions(-) > > diff --git a/sheepfs/cluster.c b/sheepfs/cluster.c > index c4a481b..1f645ba 100644 > --- a/sheepfs/cluster.c > +++ b/sheepfs/cluster.c > @@ -55,5 +55,6 @@ size_t cluster_info_get_size(const char *path) > > len = shadow_file_write(path, buf->buf, buf->len); > strbuf_release(buf); > + free(buf); > return len; > } > diff --git a/sheepfs/core.c b/sheepfs/core.c > index 869baac..394fb16 100644 > --- a/sheepfs/core.c > +++ b/sheepfs/core.c > @@ -31,16 +31,17 @@ char sheepfs_shadow[PATH_MAX]; > > static int sheepfs_debug; > static int sheepfs_fg; > +int sheepfs_page_cache = 0; > > static struct option const long_options[] = { > {"debug", no_argument, NULL, 'd'}, > - {"directio", no_argument, NULL, 'D'}, If we remove this option in this patch, shouldn't we add it in the previous one? Thanks, Kazutaka > {"help", no_argument, NULL, 'h'}, > {"foreground", no_argument, NULL, 'f'}, > + {"pagecache", no_argument, NULL, 'k'}, > {NULL, 0, NULL, 0}, > }; > > -static const char *short_options = "dDhf"; > +static const char *short_options = "dfhk"; > > static struct sheepfs_file_operation { > int (*read)(const char *path, char *buf, size_t size, off_t); > @@ -254,8 +255,8 @@ static void usage(int inval) > Usage: sheepfs [OPTION]... MOUNTPOINT\n\ > Options:\n\ > -d, --debug enable debug output (implies -f)\n\ > - -D, --directio use direct IO to access volume\n\ > -f, --foreground sheepfs run in the foreground\n\ > + -k, --pagecache use local kernel's page cache to access volume\n\ > -h, --help display this help and exit\n\ > "); > exit(inval); > @@ -274,14 +275,15 @@ int main(int argc, char **argv) > case 'd': > sheepfs_debug = 1; > break; > - case 'D': > - break;> case 'h': > usage(0); > break; > case 'f': > sheepfs_fg = 1; > break; > + case 'k': > + sheepfs_page_cache = 1; > + break; > default: > usage(1); > } > diff --git a/sheepfs/node.c b/sheepfs/node.c > index fcc490f..328ee79 100644 > --- a/sheepfs/node.c > +++ b/sheepfs/node.c > @@ -62,6 +62,7 @@ size_t node_info_get_size(const char *path) > > len = shadow_file_write(path, buf->buf, buf->len); > strbuf_release(buf); > + free(buf); > return len; > } > > diff --git a/sheepfs/sheepfs.h b/sheepfs/sheepfs.h > index a744c07..b8eb6cd 100644 > --- a/sheepfs/sheepfs.h > +++ b/sheepfs/sheepfs.h > @@ -15,6 +15,7 @@ enum sheepfs_opcode { > }; > > extern char sheepfs_shadow[]; > +extern int sheepfs_page_cache; > > extern struct strbuf *sheepfs_run_cmd(const char *command); > extern int sheepfs_set_op(const char *path, unsigned opcode); > diff --git a/sheepfs/vdi.c b/sheepfs/vdi.c > index bd4f875..57c04fa 100644 > --- a/sheepfs/vdi.c > +++ b/sheepfs/vdi.c > @@ -67,6 +67,7 @@ size_t vdi_list_get_size(const char *path) > > len = shadow_file_write(path, buf->buf, buf->len); > strbuf_release(buf); > + free(buf); > return len; > } > > diff --git a/sheepfs/volume.c b/sheepfs/volume.c > index f9b4dfc..eda5261 100644 > --- a/sheepfs/volume.c > +++ b/sheepfs/volume.c > @@ -20,6 +20,8 @@ > #include <time.h> > #include <assert.h> > #include <syslog.h> > +#include <urcu/uatomic.h> > +#include <pthread.h> > > #include "sheep.h" > #include "strbuf.h" > @@ -38,13 +40,28 @@ > #define VOLUME_READ 0 > #define VOLUME_WRITE 1 > > +/* #define DEBUG */ > + > struct vdi_inode { > struct rb_node rb; > uint32_t vid; > struct sheepdog_inode *inode; > +/* FIXME > + * 1) Consider various VM request queue depth. > + * 2) Most drive presents 31 to Linux, I set it as 31 to expect that VM's > + * real queue depth never exceed 31 > + */ > +#define SOCKET_POOL_SIZE 31 > +/* Socket pool is used for FUSE read threads, which use threads > + * to simulate aysnc read. All sockets point to the same gateway > + */ > + int socket_pool[SOCKET_POOL_SIZE]; > + char socket_in_use[SOCKET_POOL_SIZE]; /* 1 means in use */ > + unsigned socket_poll_adder; > }; > > static struct rb_root vdi_inode_tree = RB_ROOT; > +static pthread_rwlock_t vdi_inode_tree_lock = PTHREAD_RWLOCK_INITIALIZER; > > static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new) > { > @@ -52,6 +69,7 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new) > struct rb_node *parent = NULL; > struct vdi_inode *entry; > > + pthread_rwlock_wrlock(&vdi_inode_tree_lock); > while (*p) { > parent = *p; > entry = rb_entry(parent, struct vdi_inode, rb); > @@ -60,12 +78,15 @@ static struct vdi_inode *vdi_inode_tree_insert(struct vdi_inode *new) > p = &(*p)->rb_left; > else if (new->vid > entry->vid) > p = &(*p)->rb_right; > - else > + else { > + pthread_rwlock_unlock(&vdi_inode_tree_lock); > return entry; /* already has this entry */ > + } > } > rb_link_node(&new->rb, parent, p); > rb_insert_color(&new->rb, &vdi_inode_tree); > > + pthread_rwlock_unlock(&vdi_inode_tree_lock); > return NULL; /* insert successfully */ > } > > @@ -74,6 +95,7 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid) > struct rb_node *n = vdi_inode_tree.rb_node; > struct vdi_inode *t; > > + pthread_rwlock_rdlock(&vdi_inode_tree_lock); > while (n) { > t = rb_entry(n, struct vdi_inode, rb); > > @@ -81,10 +103,13 @@ static struct vdi_inode *vdi_inode_tree_search(uint32_t vid) > n = n->rb_left; > else if (vid > t->vid) > n = n->rb_right; > - else > + else { > + pthread_rwlock_unlock(&vdi_inode_tree_lock); > return t; /* found it */ > + } > } > > + pthread_rwlock_unlock(&vdi_inode_tree_lock); > return NULL; > } > > @@ -95,12 +120,34 @@ int create_volume_layout(void) > return 0; > } > > +/* We must use get/put_socket_fd in pair */ > +static inline int get_socket_fd(struct vdi_inode *vdi, int *idx) > +{ > + int sock_idx, fd; > + > +retry: > + sock_idx = uatomic_add_return(&vdi->socket_poll_adder, 1) % > + SOCKET_POOL_SIZE; > + /* if socket_in_use[sock_idx] == 0, set it to 1, otherwise, retry */ > + if (uatomic_cmpxchg(&vdi->socket_in_use[sock_idx], 0, 1)) > + goto retry; > + fd = vdi->socket_pool[sock_idx]; > + *idx = sock_idx; > + > + return fd; > +} > + > +static inline void put_socket_fd(struct vdi_inode *vdi, int idx) > +{ > + uatomic_dec(&vdi->socket_in_use[idx]); > +} > + > static int volume_rw_object(char *buf, uint64_t oid, size_t size, > off_t off, int rw) > { > struct sd_obj_req hdr = { 0 }; > struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; > - int ret; > + int ret, fd, sock_idx; > unsigned wlen = 0, rlen = 0; > int create = 0; > uint32_t vid = oid_to_vid(oid); > @@ -135,12 +182,15 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t size, > hdr.offset = off; > hdr.flags |= SD_FLAG_CMD_CACHE; > > - ret = exec_req(0, (struct sd_req *)&hdr, buf, &wlen, &rlen); > + fd = get_socket_fd(vdi, &sock_idx); > + ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen); > + put_socket_fd(vdi, sock_idx); > > if (ret || rsp->result != SD_RES_SUCCESS) { > - syslog(LOG_ERR, "failed to %s object %" PRIx64 " ret %d, res %d\n", > - rw == VOLUME_READ ? "read" : "write", oid, ret, > - rsp->result); > + syslog(LOG_ERR, > + "[%s] failed to %s object %" PRIx64 " ret %d, res %u\n", > + __func__, rw == VOLUME_READ ? "read" : "write", > + oid, ret, rsp->result); > return -1; > } > > @@ -179,10 +229,12 @@ static int volume_do_rw(const char *path, char *buf, size_t size, > len = size; > > do { > +#ifdef DEBUG > syslog(LOG_INFO, "%s oid %"PRIx64", off %ju, len %zu," > " size %zu\n", > rw == VOLUME_READ ? "read" : "write", > oid, start, len, size); > +#endif > ret = volume_rw_object(buf, oid, len, start, rw); > > if (ret != len) > @@ -227,13 +279,16 @@ static int volume_do_sync(uint32_t vid) > { > struct sd_obj_req hdr = { 0 }; > struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; > - int ret; > + int ret, fd, idx; > unsigned wlen = 0, rlen = 0; > + struct vdi_inode *vdi = vdi_inode_tree_search(vid); > > hdr.opcode = SD_OP_FLUSH_VDI; > hdr.oid = vid_to_vdi_oid(vid); > > - ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen); > + fd = get_socket_fd(vdi, &idx); > + ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen); > + put_socket_fd(vdi, idx); > > if (ret || rsp->result != SD_RES_SUCCESS) { > syslog(LOG_ERR, "[%s] failed to flush vdi %"PRIx32"\n", > @@ -259,17 +314,49 @@ int volume_sync(const char *path) > > int volume_open(const char *path, struct fuse_file_info *fi) > { > - //fi->direct_io = 1; > + if (!sheepfs_page_cache) > + fi->direct_io = 1; > + return 0; > +} > + > +static void destroy_socket_pool(int array[], int len) > +{ > + int i; > + for (i = 0; i < len; i++) > + close(array[i]); > +} > + > +static int setup_socket_pool(int array[], int len) > +{ > + int fd, i, ret; > + > + for (i = 0; i < len; i++) { > + fd = connect_to("localhost", 7000); > + if (fd < 0) { > + syslog(LOG_ERR, "[%s] connect_to %m\n", __func__); > + destroy_socket_pool(array, --i); > + return -1; > + } > + > + ret = set_nodelay(fd); > + if (ret) { > + syslog(LOG_ERR, "[%s] %m\n", __func__); > + destroy_socket_pool(array, i); > + return -1; > + } > + > + array[i] = fd; > + } > + > return 0; > } > > static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size) > { > struct strbuf *buf; > - void *inode_buf; > - struct vdi_inode *inode; > + void *inode_buf = NULL; > + struct vdi_inode *inode = NULL; > char command[256] = { 0 }; > - int ret = -1; > > sprintf(command, "%s %s\n", "collie vdi list -r", entry); > buf = sheepfs_run_cmd(command); > @@ -278,32 +365,41 @@ static int init_vdi_info(const char *entry, uint32_t *vid, size_t *size) > if (sscanf(buf->buf, "%*s %*s %*d %zu %*s %*s %*s %"PRIx32, > size, vid) < 2) { > syslog(LOG_ERR, "[%s] %m\n", __func__); > - goto out; > + goto err; > } > > inode_buf = malloc(SD_INODE_SIZE); > if (!inode_buf) { > syslog(LOG_ERR, "[%s] %m\n", __func__); > - goto out; > + goto err; > } > > + inode = xzalloc(sizeof(*inode)); > + inode->vid = *vid; > + if (setup_socket_pool(inode->socket_pool, SOCKET_POOL_SIZE) < 0) { > + syslog(LOG_ERR, "[%s] failed to setup socket pool\n", > + __func__); > + goto err; > + } > + /* we need insert inode before calling volume_rw_object */ > + assert(!vdi_inode_tree_insert(inode)); > if (volume_rw_object(inode_buf, vid_to_vdi_oid(*vid), SD_INODE_SIZE, > 0, VOLUME_READ) < 0) { > - free(inode_buf); > - goto out; > + rb_erase(&inode->rb, &vdi_inode_tree); > + syslog(LOG_ERR, "[%s] failed to read inode for %"PRIx32"\n", > + __func__, *vid); > + goto err; > } > - > - inode = xzalloc(sizeof(*inode)); > - inode->vid = *vid; > inode->inode = inode_buf; > - if (vdi_inode_tree_insert(inode)) { > - free(inode_buf); > - free(inode); > - } > - ret = 0; > -out: > strbuf_release(buf); > - return ret; > + free(buf); > + return 0; > +err: > + free(inode_buf); > + free(inode); > + strbuf_release(buf); > + free(buf); > + return -1; > } > > int volume_create_entry(const char *entry) > @@ -325,6 +421,7 @@ int volume_create_entry(const char *entry) > > if (init_vdi_info(entry, &vid, &size) < 0) > return -1; > + > if (shadow_file_setxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0) { > shadow_file_delete(path); > return -1; > @@ -343,13 +440,16 @@ static int volume_sync_and_delete(uint32_t vid) > { > struct sd_obj_req hdr = { 0 }; > struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; > - int ret; > + int ret, fd, idx; > unsigned wlen = 0, rlen = 0; > + struct vdi_inode *vdi = vdi_inode_tree_search(vid); > > hdr.opcode = SD_OP_FLUSH_DEL_CACHE; > hdr.oid = vid_to_vdi_oid(vid); > > - ret = exec_req(0, (struct sd_req *)&hdr, NULL, &wlen, &rlen); > + fd = get_socket_fd(vdi, &idx); > + ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen); > + put_socket_fd(vdi, idx); > > if (ret || rsp->result != SD_RES_SUCCESS) { > syslog(LOG_ERR, "[%s] failed to flush vdi %" PRIx32 "\n", > @@ -364,6 +464,7 @@ int volume_remove_entry(const char *entry) > { > char path[PATH_MAX], *ch; > uint32_t vid; > + struct vdi_inode *vdi; > > ch = strchr(entry, '\n'); > if (ch != NULL) > @@ -376,9 +477,20 @@ int volume_remove_entry(const char *entry) > if (shadow_file_getxattr(path, SH_VID_NAME, &vid, SH_VID_SIZE) < 0) > return -1; > > - if (volume_sync_and_delete(vid) < 0) > - return -1; > + /* No need to check error code, for case of connected sheep crashed, > + * we continue to do cleanup. > + */ > + volume_sync_and_delete(vid); > + > + vdi = vdi_inode_tree_search(vid); > + destroy_socket_pool(vdi->socket_pool, SOCKET_POOL_SIZE); > + > + pthread_rwlock_wrlock(&vdi_inode_tree_lock); > + rb_erase(&vdi->rb, &vdi_inode_tree); > + pthread_rwlock_unlock(&vdi_inode_tree_lock); > > + free(vdi->inode); > + free(vdi); > shadow_file_delete(path); > > return 0; > -- > 1.7.8.2 > |