From: Liu Yuan <tailai.ly at taobao.com> Old sockfd pool has following defect: 0 statically allocated. 1 use too many fds per sheep, not scalable 2 implemented per thread, can't be shared between threads 3 need resetting at every membership change The new sockfd cache aims to address these problems yet remain as effecient as old one: 0 dynamically allocated/deallocated at node granularity. 1 cached fds are multiplexed by all threads. 2 each session (for e.g, forward_write_obj_req) can grab one fd at a time 3 if there isn't any FD available from cache, use normal connect_to() and close() internally 4 FD are named by IP:PORT uniquely, hence no need of resetting at membership change 5 the total number of FDs shrinks from (nr_gateway + nr_io) * nr_nodes to nr_nodes 6 just add one more API, totally 3 APIs: sheep_{get,put,del}_fd() Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- include/sheep.h | 2 +- sheep/Makefile.am | 2 +- sheep/gateway.c | 55 ++++++---- sheep/group.c | 9 +- sheep/sdnet.c | 65 ----------- sheep/sheep_priv.h | 14 ++- sheep/sockfd_cache.c | 292 ++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 345 insertions(+), 94 deletions(-) create mode 100644 sheep/sockfd_cache.c diff --git a/include/sheep.h b/include/sheep.h index 06bd93c..a188d18 100644 --- a/include/sheep.h +++ b/include/sheep.h @@ -158,11 +158,11 @@ struct sd_node { }; struct sd_vnode { - uint64_t id; uint8_t addr[16]; uint16_t port; uint16_t node_idx; uint32_t zone; + uint64_t id; }; struct epoch_log { diff --git a/sheep/Makefile.am b/sheep/Makefile.am index 1cb2ebf..0a874c6 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -26,7 +26,7 @@ sbin_PROGRAMS = sheep sheep_SOURCES = sheep.c group.c sdnet.c gateway.c store.c vdi.c work.c \ journal.c ops.c recovery.c cluster/local.c \ - object_cache.c object_list_cache.c + object_cache.c object_list_cache.c sockfd_cache.c if BUILD_COROSYNC sheep_SOURCES += cluster/corosync.c diff --git a/sheep/gateway.c b/sheep/gateway.c index 42f028a..c3e5f80 100644 --- a/sheep/gateway.c +++ b/sheep/gateway.c @@ -58,7 +58,7 @@ read_remote: if (vnode_is_local(v)) continue; - fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch); + fd = sheep_get_fd(v); if (fd < 0) { ret = SD_RES_NETWORK_ERROR; continue; @@ -70,10 +70,11 @@ read_remote: ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen); if (ret) { /* network errors */ - del_sheep_fd(fd); + sheep_del_fd(v, fd); ret = SD_RES_NETWORK_ERROR; continue; } else { + sheep_put_fd(v, fd); memcpy(&req->rp, rsp, sizeof(*rsp)); ret = rsp->result; break; @@ -82,6 +83,11 @@ read_remote: return ret; } +struct write_info { + struct pollfd pfds[SD_MAX_REDUNDANCY]; + struct sd_vnode *vnodes[SD_MAX_REDUNDANCY]; +}; + int forward_write_obj_req(struct request *req) { int i, fd, ret, pollret; @@ -93,15 +99,14 @@ int forward_write_obj_req(struct request *req) struct sd_vnode *obj_vnodes[SD_MAX_COPIES]; uint64_t oid = req->rq.obj.oid; int nr_copies; - struct pollfd pfds[SD_MAX_REDUNDANCY]; - int nr_fds, local = 0; + int nr_fds = 0, local = 0; + struct write_info wi; dprintf("%"PRIx64"\n", oid); - nr_fds = 0; - memset(pfds, 0, sizeof(pfds)); - for (i = 0; i < ARRAY_SIZE(pfds); i++) - pfds[i].fd = -1; + memset(&wi, 0, sizeof(wi)); + for (i = 0; i < SD_MAX_REDUNDANCY; i++) + wi.pfds[i].fd = -1; memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL; @@ -120,24 +125,23 @@ int forward_write_obj_req(struct request *req) continue; } - fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch); + fd = sheep_get_fd(v); if (fd < 0) { - eprintf("failed to connect to %s:%"PRIu32"\n", name, - v->port); ret = SD_RES_NETWORK_ERROR; goto err; } ret = send_req(fd, &fwd_hdr, req->data, &wlen); if (ret) { /* network errors */ - del_sheep_fd(fd); + sheep_del_fd(v, fd); ret = SD_RES_NETWORK_ERROR; dprintf("fail %"PRIu32"\n", ret); goto err; } - pfds[nr_fds].fd = fd; - pfds[nr_fds].events = POLLIN; + wi.vnodes[nr_fds] = v; + wi.pfds[nr_fds].fd = fd; + wi.pfds[nr_fds].events = POLLIN; nr_fds++; } @@ -157,7 +161,7 @@ int forward_write_obj_req(struct request *req) ret = SD_RES_SUCCESS; again: - pollret = poll(pfds, nr_fds, -1); + pollret = poll(wi.pfds, nr_fds, -1); if (pollret < 0) { if (errno == EINTR) goto again; @@ -167,19 +171,20 @@ again: } for (i = 0; i < nr_fds; i++) { - if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP || - pfds[i].revents & POLLNVAL) { - del_sheep_fd(pfds[i].fd); + if (wi.pfds[i].revents & POLLERR || + wi.pfds[i].revents & POLLHUP || + wi.pfds[i].revents & POLLNVAL) { + sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd); ret = SD_RES_NETWORK_ERROR; break; } - if (!(pfds[i].revents & POLLIN)) + if (!(wi.pfds[i].revents & POLLIN)) continue; - if (do_read(pfds[i].fd, rsp, sizeof(*rsp))) { + if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) { eprintf("failed to read a response: %m\n"); - del_sheep_fd(pfds[i].fd); + sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd); ret = SD_RES_NETWORK_ERROR; break; } @@ -189,11 +194,15 @@ again: ret = rsp->result; } + sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd); break; } if (i < nr_fds) { nr_fds--; - memmove(pfds + i, pfds + i + 1, sizeof(*pfds) * (nr_fds - i)); + memmove(wi.pfds + i, wi.pfds + i + 1, + sizeof(struct pollfd) * (nr_fds - i)); + memmove(wi.vnodes + i, wi.vnodes + i + 1, + sizeof(struct sd_vnode *) * (nr_fds - i)); } dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds); @@ -204,7 +213,7 @@ out: return ret; err: for (i = 0; i < nr_fds; i++) - del_sheep_fd(pfds[i].fd); + sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd); return ret; } diff --git a/sheep/group.c b/sheep/group.c index 1dec931..681672c 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -803,10 +803,13 @@ static void finish_join(struct join_message *msg, struct sd_node *joined, if (sd_store->purge_obj && sd_store->purge_obj() != SD_RES_SUCCESS) eprintf("WARN: may have stale objects\n"); + + sockfd_cache_add_group(nodes, nr_nodes); } static void update_cluster_info(struct join_message *msg, - struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes) + struct sd_node *joined, struct sd_node *nodes, + size_t nr_nodes) { struct vnode_info *old_vnode_info; @@ -867,6 +870,8 @@ static void update_cluster_info(struct join_message *msg, if (current_vnode_info->nr_zones >= sys->nr_copies) sys_stat_set(SD_STATUS_OK); } + + sockfd_cache_add(joined); } /* @@ -1110,6 +1115,8 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members, if (current_vnode_info->nr_zones < sys->nr_copies) sys_stat_set(SD_STATUS_HALT); } + + sockfd_cache_del((struct node_id *)left); } int create_cluster(int port, int64_t zone, int nr_vnodes, diff --git a/sheep/sdnet.c b/sheep/sdnet.c index 978c8d0..3bcccce 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -749,68 +749,3 @@ int create_listen_port(int port, void *data) return create_listen_ports(port, create_listen_port_fn, data); } -static __thread int cached_fds[SD_MAX_NODES]; -static __thread uint32_t cached_epoch = 0; - -void del_sheep_fd(int fd) -{ - int i; - - for (i = 0; i < SD_MAX_NODES; i++) { - if (cached_fds[i] == fd) { - if (fd >= 0) - close(fd); - - cached_fds[i] = -1; - - return; - } - } -} - -int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch) -{ - int i, fd; - char name[INET6_ADDRSTRLEN]; - - if (cached_epoch == 0) { - /* initialize */ - for (i = 0; i < SD_MAX_NODES; i++) - cached_fds[i] = -1; - - cached_epoch = epoch; - } - - if (before(epoch, cached_epoch)) { - eprintf("requested epoch is smaller than the previous one: %d < %d\n", - epoch, cached_epoch); - return -1; - } - if (after(epoch, cached_epoch)) { - for (i = 0; i < SD_MAX_NODES; i++) { - if (cached_fds[i] >= 0) - close(cached_fds[i]); - - cached_fds[i] = -1; - } - cached_epoch = epoch; - } - - fd = cached_fds[node_idx]; - dprintf("%d, %d\n", epoch, fd); - - if (cached_epoch == epoch && fd >= 0) { - dprintf("using the cached fd %d\n", fd); - return fd; - } - - addr_to_str(name, sizeof(name), addr, 0); - - fd = connect_to(name, port); - if (fd < 0) - return -1; - - cached_fds[node_idx] = fd; - - return fd; -} diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index dfafeae..f7db3ed 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -289,9 +289,6 @@ int read_object(struct vnode_info *vnodes, uint32_t node_version, int remove_object(struct vnode_info *vnodes, uint32_t node_version, uint64_t oid, int nr); -void del_sheep_fd(int fd); -int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch); - int prealloc(int fd, uint32_t size); int objlist_cache_insert(uint64_t oid); @@ -401,4 +398,15 @@ void object_cache_delete(uint32_t vid); int object_cache_init(const char *p); +/* sockfd_cache */ +struct node_id; + +void sockfd_cache_del(struct node_id *); +void sockfd_cache_add(struct sd_node *); +void sockfd_cache_add_group(struct sd_node *nodes, int nr); + +int sheep_get_fd(struct sd_vnode *vnode); +void sheep_put_fd(struct sd_vnode *vnode, int fd); +void sheep_del_fd(struct sd_vnode *vnode, int fd); + #endif diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c new file mode 100644 index 0000000..00095c2 --- /dev/null +++ b/sheep/sockfd_cache.c @@ -0,0 +1,292 @@ +/* + * Copyright (C) 2012 Taobao Inc. + * + * Liu Yuan <namei.unix at gmail.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <urcu/uatomic.h> +#include <pthread.h> +#include <stdint.h> +#include <stdlib.h> +#include <stdio.h> + +#include "sheep.h" +#include "sheep_priv.h" +#include "list.h" +#include "rbtree.h" +#include "logger.h" +#include "util.h" + +struct node_id { + uint8_t addr[16]; + uint16_t port; +}; + +struct sockfd_cache { + struct rb_root root; + pthread_rwlock_t lock; + int count; +}; + +static struct sockfd_cache sockfd_cache = { + .root = RB_ROOT, + .lock = PTHREAD_RWLOCK_INITIALIZER, +}; + +struct sockfd_cache_entry { + struct rb_node rb; + int fd; + uint8_t refcount; + struct node_id nid; +}; + +static inline int node_id_cmp(const void *a, const void *b) +{ + const struct node_id *node1 = a; + const struct node_id *node2 = b; + int cmp; + + cmp = memcmp(node1->addr, node2->addr, sizeof(node1->addr)); + if (cmp != 0) + return cmp; + + if (node1->port < node2->port) + return -1; + if (node1->port > node2->port) + return 1; + return 0; +} + +static struct sockfd_cache_entry * +sockfd_cache_insert(struct sockfd_cache_entry *new) +{ + struct rb_node **p = &sockfd_cache.root.rb_node; + struct rb_node *parent = NULL; + struct sockfd_cache_entry *entry; + + while (*p) { + int cmp; + + parent = *p; + entry = rb_entry(parent, struct sockfd_cache_entry, rb); + cmp = node_id_cmp(&new->nid, &entry->nid); + + if (cmp < 0) + p = &(*p)->rb_left; + else if (cmp > 0) + p = &(*p)->rb_right; + else + return entry; + } + rb_link_node(&new->rb, parent, p); + rb_insert_color(&new->rb, &sockfd_cache.root); + + return NULL; /* insert successfully */ +} + +static struct sockfd_cache_entry *sockfd_cache_search(struct node_id *nid) +{ + struct rb_node *n = sockfd_cache.root.rb_node; + struct sockfd_cache_entry *t; + + while (n) { + int cmp; + + t = rb_entry(n, struct sockfd_cache_entry, rb); + cmp = node_id_cmp(nid, &t->nid); + + if (cmp < 0) + n = n->rb_left; + else if (cmp > 0) + n = n->rb_right; + else + return t; /* found it */ + } + + return NULL; +} + +static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid) +{ + struct sockfd_cache_entry *entry; + + pthread_rwlock_rdlock(&sockfd_cache.lock); + entry = sockfd_cache_search(nid); + pthread_rwlock_unlock(&sockfd_cache.lock); + assert(entry); + /* if refcount == 0, set it to 1, otherwise someone holds it */ + if (uatomic_cmpxchg(&entry->refcount, 0, 1)) + return NULL; + + return entry; +} + +void sockfd_cache_del(struct node_id *nid) +{ + struct sockfd_cache_entry *entry; + char name[INET6_ADDRSTRLEN]; + int n; + + entry = sockfd_cache_grab(nid); + /* Hmmm, some victim still holds it, he is supposed to delete it */ + if (!entry) + return; + + rb_erase(&entry->rb, &sockfd_cache.root); + free(entry); + n = uatomic_sub_return(&sockfd_cache.count, 1); + addr_to_str(name, sizeof(name), nid->addr, 0); + dprintf("%s:%d, count %d\n", name, nid->port, n); +} + +static void sockfd_cache_add_nolock(struct node_id *nid) +{ + struct sockfd_cache_entry *new = xzalloc(sizeof(*new)); + + new->fd = -1; + memcpy(&new->nid, nid, sizeof(struct node_id)); + if (sockfd_cache_insert(new)) { + free(new); + return; + } + sockfd_cache.count++; +} + +void sockfd_cache_add_group(struct sd_node *nodes, int nr) +{ + struct sd_node *p; + struct node_id *nid; + + dprintf("%d\n", nr); + pthread_rwlock_wrlock(&sockfd_cache.lock); + while (nr--) { + p = nodes + nr; + nid = (struct node_id *)p; + sockfd_cache_add_nolock(nid); + } + pthread_rwlock_unlock(&sockfd_cache.lock); +} + +void sockfd_cache_add(struct sd_node *node) +{ + struct sockfd_cache_entry *new = xzalloc(sizeof(*new)); + char name[INET6_ADDRSTRLEN]; + int n; + + new->fd = -1; + memcpy(&new->nid, node, sizeof(struct node_id)); + pthread_rwlock_rdlock(&sockfd_cache.lock); + if (sockfd_cache_insert(new)) { + free(new); + pthread_rwlock_unlock(&sockfd_cache.lock); + return; + } + pthread_rwlock_unlock(&sockfd_cache.lock); + n = uatomic_add_return(&sockfd_cache.count, 1); + addr_to_str(name, sizeof(name), node->addr, 0); + dprintf("%s:%d, count %d\n", name, node->port, n); +} + +static int sockfd_cache_get(struct node_id *nid) +{ + struct sockfd_cache_entry *entry; + char name[INET6_ADDRSTRLEN]; + int fd; + + entry = sockfd_cache_grab(nid); + if (!entry) + return -1; + + if (entry->fd != -1) + return entry->fd; + + /* Create a new connection for this vnode */ + addr_to_str(name, sizeof(name), nid->addr, 0); + dprintf("create connection %s:%d\n", name, nid->port); + fd = connect_to(name, nid->port); + if (fd < 0) { + uatomic_dec(&entry->refcount); + return -1; + } + entry->fd = fd; + + return fd; +} + +static void sockfd_cache_put(struct node_id *nid) +{ + struct sockfd_cache_entry *entry; + char name[INET6_ADDRSTRLEN]; + int refcnt; + + addr_to_str(name, sizeof(name), nid->addr, 0); + dprintf("%s:%d\n", name, nid->port); + pthread_rwlock_rdlock(&sockfd_cache.lock); + entry = sockfd_cache_search(nid); + pthread_rwlock_unlock(&sockfd_cache.lock); + assert(entry); + refcnt = uatomic_cmpxchg(&entry->refcount, 1, 0); + assert(refcnt == 1); +} + +int sheep_get_fd(struct sd_vnode *vnode) +{ + struct node_id *nid = (struct node_id *)vnode; + char name[INET6_ADDRSTRLEN]; + int fd = sockfd_cache_get(nid); + + if (fd != -1) + return fd; + + addr_to_str(name, sizeof(name), nid->addr, 0); + fd = connect_to(name, nid->port); + if (fd < 0) { + dprintf("failed connect to %s:%d\n", name, nid->port); + return -1; + } + + dprintf("%d\n", fd); + return fd; +} + +void sheep_put_fd(struct sd_vnode *vnode, int fd) +{ + struct node_id *nid = (struct node_id *)vnode; + struct sockfd_cache_entry *entry; + + pthread_rwlock_rdlock(&sockfd_cache.lock); + entry = sockfd_cache_search(nid); + pthread_rwlock_unlock(&sockfd_cache.lock); + assert(entry); + if (entry->fd == fd) { + sockfd_cache_put(nid); + } else { + dprintf("%d\n", fd); + close(fd); + } +} + +void sheep_del_fd(struct sd_vnode *vnode, int fd) +{ + struct node_id *nid = (struct node_id *)vnode; + struct sockfd_cache_entry *entry; + + pthread_rwlock_rdlock(&sockfd_cache.lock); + entry = sockfd_cache_search(nid); + pthread_rwlock_unlock(&sockfd_cache.lock); + assert(entry); + if (entry->fd == fd) { + sockfd_cache_put(nid); + sockfd_cache_del(nid); + } else { + dprintf("%d\n", fd); + close(fd); + } +} -- 1.7.10.2 |