From: Liu Yuan <tailai.ly at taobao.com> This is inspired by the observation that each Guest can issue as much as 4 requests in one go. The complexity added to the code is seen outside of sockfd cache: add one more parameter to the API: FD index. The underlying core needs this to identify which FD belongs one node is actually used. I think this trade-off is a good deal. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/gateway.c | 31 ++++--- sheep/sheep_priv.h | 6 +- sheep/sockfd_cache.c | 218 ++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 188 insertions(+), 67 deletions(-) diff --git a/sheep/gateway.c b/sheep/gateway.c index c3e5f80..f126dfb 100644 --- a/sheep/gateway.c +++ b/sheep/gateway.c @@ -54,11 +54,13 @@ read_remote: j = random(); for (i = 0; i < nr_copies; i++) { int idx = (i + j) % nr_copies; + int sock_idx; + v = obj_vnodes[idx]; if (vnode_is_local(v)) continue; - fd = sheep_get_fd(v); + fd = sheep_get_fd(v, &sock_idx); if (fd < 0) { ret = SD_RES_NETWORK_ERROR; continue; @@ -70,11 +72,11 @@ read_remote: ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen); if (ret) { /* network errors */ - sheep_del_fd(v, fd); + sheep_del_fd(v, fd, sock_idx); ret = SD_RES_NETWORK_ERROR; continue; } else { - sheep_put_fd(v, fd); + sheep_put_fd(v, fd, sock_idx); memcpy(&req->rp, rsp, sizeof(*rsp)); ret = rsp->result; break; @@ -86,6 +88,7 @@ read_remote: struct write_info { struct pollfd pfds[SD_MAX_REDUNDANCY]; struct sd_vnode *vnodes[SD_MAX_REDUNDANCY]; + int sock_idx[SD_MAX_REDUNDANCY]; }; int forward_write_obj_req(struct request *req) @@ -104,9 +107,10 @@ int forward_write_obj_req(struct request *req) dprintf("%"PRIx64"\n", oid); - memset(&wi, 0, sizeof(wi)); - for (i = 0; i < SD_MAX_REDUNDANCY; i++) + for (i = 0; i < SD_MAX_REDUNDANCY; i++) { wi.pfds[i].fd = -1; + wi.vnodes[i] = NULL; + } memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL; @@ -125,7 +129,7 @@ int forward_write_obj_req(struct request *req) continue; } - fd = sheep_get_fd(v); + fd = sheep_get_fd(v, &wi.sock_idx[nr_fds]); if (fd < 0) { ret = SD_RES_NETWORK_ERROR; goto err; @@ -133,7 +137,7 @@ int forward_write_obj_req(struct request *req) ret = send_req(fd, &fwd_hdr, req->data, &wlen); if (ret) { /* network errors */ - sheep_del_fd(v, fd); + sheep_del_fd(v, fd, wi.sock_idx[nr_fds]); ret = SD_RES_NETWORK_ERROR; dprintf("fail %"PRIu32"\n", ret); goto err; @@ -174,7 +178,8 @@ again: if (wi.pfds[i].revents & POLLERR || wi.pfds[i].revents & POLLHUP || wi.pfds[i].revents & POLLNVAL) { - sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd); + sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, + wi.sock_idx[i]); ret = SD_RES_NETWORK_ERROR; break; } @@ -184,7 +189,8 @@ again: if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) { eprintf("failed to read a response: %m\n"); - sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd); + sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, + wi.sock_idx[i]); ret = SD_RES_NETWORK_ERROR; break; } @@ -193,8 +199,7 @@ again: eprintf("fail %"PRIu32"\n", rsp->result); ret = rsp->result; } - - sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd); + sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]); break; } if (i < nr_fds) { @@ -203,6 +208,8 @@ again: sizeof(struct pollfd) * (nr_fds - i)); memmove(wi.vnodes + i, wi.vnodes + i + 1, sizeof(struct sd_vnode *) * (nr_fds - i)); + memmove(wi.sock_idx + i, wi.sock_idx + i + 1, + sizeof(int) * (nr_fds - i)); } dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds); @@ -213,7 +220,7 @@ out: return ret; err: for (i = 0; i < nr_fds; i++) - sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd); + sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]); return ret; } diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 83a42e3..747599c 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -408,8 +408,8 @@ void sockfd_cache_del(struct node_id *); void sockfd_cache_add(struct sd_node *); void sockfd_cache_add_group(struct sd_node *nodes, int nr); -int sheep_get_fd(struct sd_vnode *vnode); -void sheep_put_fd(struct sd_vnode *vnode, int fd); -void sheep_del_fd(struct sd_vnode *vnode, int fd); +int sheep_get_fd(struct sd_vnode *vnode, int *); +void sheep_put_fd(struct sd_vnode *vnode, int fd, int); +void sheep_del_fd(struct sd_vnode *vnode, int fd, int); #endif diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c index 4f7ff5c..a756834 100644 --- a/sheep/sockfd_cache.c +++ b/sheep/sockfd_cache.c @@ -11,6 +11,20 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ +/* + * The sockfd cache provides us long TCP connections connected to the nodes + * in the cluster to accerlater the data transfer, which has the following + * characteristics: + * 0 dynamically allocated/deallocated at node granularity. + * 1 cached fds are multiplexed by all threads. + * 2 each session (for e.g, forward_write_obj_req) can grab one fd at a time. + * 3 if there isn't any FD available from cache, use normal connect_to() and + * close() internally. + * 4 FD are named by IP:PORT uniquely, hence no need of resetting at + * membership change. + * 5 the total number of FDs is scalable to massive nodes. + * 6 total 3 APIs: sheep_{get,put,del}_fd(). + */ #include <urcu/uatomic.h> #include <pthread.h> #include <stdint.h> @@ -42,9 +56,10 @@ static struct sockfd_cache sockfd_cache = { struct sockfd_cache_entry { struct rb_node rb; - int fd; - uint8_t refcount; struct node_id nid; +#define SOCKFD_CACHE_MAX_FD 8 /* How many FDs we cache for one node */ + int fd[SOCKFD_CACHE_MAX_FD]; + uint8_t fd_in_use[SOCKFD_CACHE_MAX_FD]; }; static inline int node_id_cmp(const void *a, const void *b) @@ -114,34 +129,105 @@ static struct sockfd_cache_entry *sockfd_cache_search(struct node_id *nid) return NULL; } -static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid) +static inline int get_free_slot(struct sockfd_cache_entry *entry) +{ + int idx = -1, i; + + for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) { + if (uatomic_cmpxchg(&entry->fd_in_use[i], 0, 1)) + continue; + idx = i; + break; + } + return idx; +} + +/* + * Grab a free slot of the node and inc the refcount of the slot + * + * If no free slot available, this typically means we should use short FD. + */ +static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid, + char *name, int *ret_idx) { struct sockfd_cache_entry *entry; pthread_rwlock_rdlock(&sockfd_cache.lock); entry = sockfd_cache_search(nid); - pthread_rwlock_unlock(&sockfd_cache.lock); - assert(entry); - /* if refcount == 0, set it to 1, otherwise someone holds it */ - if (uatomic_cmpxchg(&entry->refcount, 0, 1)) - return NULL; + if (!entry){ + dprintf("failed node %s:%d\n", name, nid->port); + goto out; + } + *ret_idx = get_free_slot(entry); + if (*ret_idx == -1) + entry = NULL; +out: + pthread_rwlock_unlock(&sockfd_cache.lock); return entry; } -void sockfd_cache_del(struct node_id *nid) +static inline bool slots_all_free(struct sockfd_cache_entry *entry) +{ + int i; + for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) + if (uatomic_read(&entry->fd_in_use[i])) + return false; + return true; +} + +static inline void destroy_all_slots(struct sockfd_cache_entry *entry) +{ + int i; + for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) + if (entry->fd[i] != -1) + close(entry->fd[i]); +} + +/* + * Destroy all the Cached FDs of the node + * + * We don't proceed if some other node grab one FD of the node. In this case, + * the victim node will finally find itself talking to a dead node and call + * sheep_del_fd() to delete this node from the cache. + */ +static bool sockfd_cache_destroy(struct node_id *nid) { struct sockfd_cache_entry *entry; + + pthread_rwlock_wrlock(&sockfd_cache.lock); + entry = sockfd_cache_search(nid); + if (!entry) { + dprintf("It is already destroyed\n"); + goto false_out; + } + + if (!slots_all_free(entry)) { + dprintf("Some victim still holds it\n"); + goto false_out; + } + + rb_erase(&entry->rb, &sockfd_cache.root); + pthread_rwlock_unlock(&sockfd_cache.lock); + + destroy_all_slots(entry); + free(entry); + + return true; +false_out: + pthread_rwlock_unlock(&sockfd_cache.lock); + return false; +} + +/* When node craches, we should delete it from the cache */ +void sockfd_cache_del(struct node_id *nid) +{ char name[INET6_ADDRSTRLEN]; int n; - entry = sockfd_cache_grab(nid); - /* Hmmm, some victim still holds it, he is supposed to delete it */ - if (!entry) + if (!sockfd_cache_destroy(nid)) return; - rb_erase(&entry->rb, &sockfd_cache.root); - free(entry); n = uatomic_sub_return(&sockfd_cache.count, 1); addr_to_str(name, sizeof(name), nid->addr, 0); dprintf("%s:%d, count %d\n", name, nid->port, n); @@ -150,8 +236,11 @@ void sockfd_cache_del(struct node_id *nid) static void sockfd_cache_add_nolock(struct node_id *nid) { struct sockfd_cache_entry *new = xzalloc(sizeof(*new)); + int i; + + for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) + new->fd[i] = -1; - new->fd = -1; memcpy(&new->nid, nid, sizeof(struct node_id)); if (sockfd_cache_insert(new)) { free(new); @@ -160,6 +249,7 @@ static void sockfd_cache_add_nolock(struct node_id *nid) sockfd_cache.count++; } +/* Add group of nodes to the cache */ void sockfd_cache_add_group(struct sd_node *nodes, int nr) { struct sd_node *p; @@ -175,13 +265,16 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr) pthread_rwlock_unlock(&sockfd_cache.lock); } +/* Add one node to the cache means we can do caching tricks on this node */ void sockfd_cache_add(struct sd_node *node) { struct sockfd_cache_entry *new = xzalloc(sizeof(*new)); char name[INET6_ADDRSTRLEN]; - int n; + int n, i; + + for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) + new->fd[i] = -1; - new->fd = -1; memcpy(&new->nid, node, sizeof(struct node_id)); pthread_rwlock_rdlock(&sockfd_cache.lock); if (sockfd_cache_insert(new)) { @@ -195,58 +288,71 @@ void sockfd_cache_add(struct sd_node *node) dprintf("%s:%d, count %d\n", name, node->port, n); } -static int sockfd_cache_get(struct node_id *nid) +static int sockfd_cache_get(struct node_id *nid, char *name, int *ret_idx) { struct sockfd_cache_entry *entry; - char name[INET6_ADDRSTRLEN]; int fd; - entry = sockfd_cache_grab(nid); + entry = sockfd_cache_grab(nid, name, ret_idx); if (!entry) return -1; - if (entry->fd != -1) - return entry->fd; + if (entry->fd[*ret_idx] != -1) { + dprintf("%s:%d, idx %d\n", name, nid->port, *ret_idx); + return entry->fd[*ret_idx]; + } - /* Create a new connection for this vnode */ - addr_to_str(name, sizeof(name), nid->addr, 0); - dprintf("create connection %s:%d\n", name, nid->port); + /* Create a new cached connection for this vnode */ + dprintf("create connection %s:%d idx %d\n", name, nid->port, *ret_idx); fd = connect_to(name, nid->port); if (fd < 0) { - uatomic_dec(&entry->refcount); + uatomic_dec(&entry->fd_in_use[*ret_idx]); return -1; } - entry->fd = fd; + entry->fd[*ret_idx] = fd; return fd; } -static void sockfd_cache_put(struct node_id *nid) +static void sockfd_cache_put(struct node_id *nid, int idx) { struct sockfd_cache_entry *entry; char name[INET6_ADDRSTRLEN]; int refcnt; addr_to_str(name, sizeof(name), nid->addr, 0); - dprintf("%s:%d\n", name, nid->port); + dprintf("%s:%d idx %d\n", name, nid->port, idx); + pthread_rwlock_rdlock(&sockfd_cache.lock); entry = sockfd_cache_search(nid); pthread_rwlock_unlock(&sockfd_cache.lock); + assert(entry); - refcnt = uatomic_cmpxchg(&entry->refcount, 1, 0); + refcnt = uatomic_cmpxchg(&entry->fd_in_use[idx], 1, 0); assert(refcnt == 1); } -int sheep_get_fd(struct sd_vnode *vnode) +/* + * Return a FD connected to the vnode to the caller + * + * Try to get a 'long' FD as best, which is cached and never closed. If no FD + * available, we return a 'short' FD which is supposed to be closed by + * sheep_get_put(). + * + * ret_idx is opaque to the caller, -1 indicates it is a short FD. + */ +int sheep_get_fd(struct sd_vnode *vnode, int *ret_idx) { struct node_id *nid = (struct node_id *)vnode; char name[INET6_ADDRSTRLEN]; - int fd = sockfd_cache_get(nid); + int fd; + addr_to_str(name, sizeof(name), nid->addr, 0); + fd = sockfd_cache_get(nid, name, ret_idx); if (fd != -1) return fd; - addr_to_str(name, sizeof(name), nid->addr, 0); + /* Create a fd that is to be closed */ fd = connect_to(name, nid->port); if (fd < 0) { dprintf("failed connect to %s:%d\n", name, nid->port); @@ -257,37 +363,45 @@ int sheep_get_fd(struct sd_vnode *vnode) return fd; } -void sheep_put_fd(struct sd_vnode *vnode, int fd) +/* + * Rlease a FD connected to the vnode, which is acquired from sheep_get_fd() + * + * If it is a long FD, just decrease the refcount to make it available again. + * If it is a short FD, close it. + * + * sheep_put_fd() or sheep_del_fd() should be paired with sheep_get_fd() + */ + +void sheep_put_fd(struct sd_vnode *vnode, int fd, int idx) { struct node_id *nid = (struct node_id *)vnode; - struct sockfd_cache_entry *entry; - pthread_rwlock_rdlock(&sockfd_cache.lock); - entry = sockfd_cache_search(nid); - pthread_rwlock_unlock(&sockfd_cache.lock); - assert(entry); - if (entry->fd == fd) { - sockfd_cache_put(nid); - } else { + if (idx == -1) { dprintf("%d\n", fd); close(fd); + return; } + + sockfd_cache_put(nid, idx); } -void sheep_del_fd(struct sd_vnode *vnode, int fd) +/* + * Delete a FD connected to the vnode, when vnode is crashed. + * + * If it is a long FD, de-refcount it and tres to destroy all the cached FDs of + * this vnode in the cache. + * If it is a short FD, just close it. + */ +void sheep_del_fd(struct sd_vnode *vnode, int fd, int idx) { struct node_id *nid = (struct node_id *)vnode; - struct sockfd_cache_entry *entry; - pthread_rwlock_rdlock(&sockfd_cache.lock); - entry = sockfd_cache_search(nid); - pthread_rwlock_unlock(&sockfd_cache.lock); - assert(entry); - if (entry->fd == fd) { - sockfd_cache_put(nid); - sockfd_cache_del(nid); - } else { + if (idx == -1) { dprintf("%d\n", fd); close(fd); + return; } + + sockfd_cache_put(nid, idx); + sockfd_cache_del(nid); } -- 1.7.10.2 |