On Sun, 2012-06-24 at 20:51 +0800, Liu Yuan wrote: > From: Liu Yuan <tailai.ly at taobao.com> > > Old sockfd pool has following defect: > 0 statically allocated. > 1 use too many fds per sheep, not scalable > 2 implemented per thread, can't be shared between threads > 3 need resetting at every membership change > > The new sockfd cache aims to address these problems yet remain as effecient as > old one: > 0 dynamically allocated/deallocated at node granularity. > 1 cached fds are multiplexed by all threads. > 2 each session (for e.g, forward_write_obj_req) can grab one fd at a time > 3 if there isn't any FD available from cache, use normal connect_to() and close() > internally > 4 FD are named by IP:PORT uniquely, hence no need of resetting at membership > change > 5 the total number of FDs shrinks from (nr_gateway + nr_io) * nr_nodes to nr_nodes > 6 just add one more API, totally 3 APIs: sheep_{get,put,del}_fd() > > Signed-off-by: Liu Yuan <tailai.ly at taobao.com> > --- > include/sheep.h | 2 +- > sheep/Makefile.am | 2 +- > sheep/sdnet.c | 66 +----------- > sheep/sheep_priv.h | 14 ++- > sheep/sockfd_cache.c | 293 ++++++++++++++++++++++++++++++++++++++++++++++++++ > 5 files changed, 307 insertions(+), 70 deletions(-) > create mode 100644 sheep/sockfd_cache.c > > diff --git a/include/sheep.h b/include/sheep.h > index ac9179c..01aa202 100644 > --- a/include/sheep.h > +++ b/include/sheep.h > @@ -155,9 +155,9 @@ struct sd_node { > }; > > struct sd_vnode { > - uint64_t id; > uint8_t addr[16]; > uint16_t port; > + uint64_t id; > uint16_t node_idx; > uint32_t zone; > }; > diff --git a/sheep/Makefile.am b/sheep/Makefile.am > index 1cb2ebf..0a874c6 100644 > --- a/sheep/Makefile.am > +++ b/sheep/Makefile.am > @@ -26,7 +26,7 @@ sbin_PROGRAMS = sheep > > sheep_SOURCES = sheep.c group.c sdnet.c gateway.c store.c vdi.c work.c \ > journal.c ops.c recovery.c cluster/local.c \ > - object_cache.c object_list_cache.c > + object_cache.c object_list_cache.c sockfd_cache.c > > if BUILD_COROSYNC > sheep_SOURCES += cluster/corosync.c > diff --git a/sheep/sdnet.c b/sheep/sdnet.c > index 34d65cf..e7da711 100644 > --- a/sheep/sdnet.c > +++ b/sheep/sdnet.c > @@ -19,6 +19,7 @@ > #include <fcntl.h> > > #include "sheep_priv.h" > +#include "rbtree.h" > > static void requeue_request(struct request *req); > > @@ -740,68 +741,3 @@ int create_listen_port(int port, void *data) > return create_listen_ports(port, create_listen_port_fn, data); > } > > -static __thread int cached_fds[SD_MAX_NODES]; > -static __thread uint32_t cached_epoch = 0; > - > -void del_sheep_fd(int fd) > -{ > - int i; > - > - for (i = 0; i < SD_MAX_NODES; i++) { > - if (cached_fds[i] == fd) { > - if (fd >= 0) > - close(fd); > - > - cached_fds[i] = -1; > - > - return; > - } > - } > -} > - > -int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch) > -{ > - int i, fd; > - char name[INET6_ADDRSTRLEN]; > - > - if (cached_epoch == 0) { > - /* initialize */ > - for (i = 0; i < SD_MAX_NODES; i++) > - cached_fds[i] = -1; > - > - cached_epoch = epoch; > - } > - > - if (before(epoch, cached_epoch)) { > - eprintf("requested epoch is smaller than the previous one: %d < %d\n", > - epoch, cached_epoch); > - return -1; > - } > - if (after(epoch, cached_epoch)) { > - for (i = 0; i < SD_MAX_NODES; i++) { > - if (cached_fds[i] >= 0) > - close(cached_fds[i]); > - > - cached_fds[i] = -1; > - } > - cached_epoch = epoch; > - } > - > - fd = cached_fds[node_idx]; > - dprintf("%d, %d\n", epoch, fd); > - > - if (cached_epoch == epoch && fd >= 0) { > - dprintf("using the cached fd %d\n", fd); > - return fd; > - } > - > - addr_to_str(name, sizeof(name), addr, 0); > - > - fd = connect_to(name, port); > - if (fd < 0) > - return -1; > - > - cached_fds[node_idx] = fd; > - > - return fd; > -} > diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h > index afdaad8..83a42e3 100644 > --- a/sheep/sheep_priv.h > +++ b/sheep/sheep_priv.h > @@ -292,9 +292,6 @@ int read_object(struct vnode_info *vnodes, uint32_t node_version, > int remove_object(struct vnode_info *vnodes, uint32_t node_version, > uint64_t oid, int nr); > > -void del_sheep_fd(int fd); > -int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch); > - > int prealloc(int fd, uint32_t size); > > int objlist_cache_insert(uint64_t oid); > @@ -404,4 +401,15 @@ void object_cache_delete(uint32_t vid); > > int object_cache_init(const char *p); > > +/* sockfd_cache */ > +struct node_id; > + > +void sockfd_cache_del(struct node_id *); > +void sockfd_cache_add(struct sd_node *); > +void sockfd_cache_add_group(struct sd_node *nodes, int nr); > + > +int sheep_get_fd(struct sd_vnode *vnode); > +void sheep_put_fd(struct sd_vnode *vnode, int fd); > +void sheep_del_fd(struct sd_vnode *vnode, int fd); > + > #endif > diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c > new file mode 100644 > index 0000000..4f7ff5c > --- /dev/null > +++ b/sheep/sockfd_cache.c > @@ -0,0 +1,293 @@ > +/* > + * Copyright (C) 2012 Taobao Inc. > + * > + * Liu Yuan <namei.unix at gmail.com> > + * > + * This program is free software; you can redistribute it and/or > + * modify it under the terms of the GNU General Public License version > + * 2 as published by the Free Software Foundation. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program. If not, see <http://www.gnu.org/licenses/>. > + */ > + > +#include <urcu/uatomic.h> > +#include <pthread.h> > +#include <stdint.h> > +#include <stdlib.h> > +#include <stdio.h> > + > +#include "sheep.h" > +#include "sheep_priv.h" > +#include "list.h" > +#include "rbtree.h" > +#include "logger.h" > +#include "util.h" > + > +struct node_id { > + uint8_t addr[16]; > + uint16_t port; > +}; > + > +struct sockfd_cache { > + struct rb_root root; > + pthread_rwlock_t lock; > + int count; > +}; > + > +static struct sockfd_cache sockfd_cache = { > + .root = RB_ROOT, > + .lock = PTHREAD_RWLOCK_INITIALIZER, > +}; > + > +struct sockfd_cache_entry { > + struct rb_node rb; > + int fd; > + uint8_t refcount; > + struct node_id nid; > +}; > + > +static inline int node_id_cmp(const void *a, const void *b) > +{ > + const struct node_id *node1 = a; > + const struct node_id *node2 = b; > + int cmp; > + > + cmp = memcmp(node1->addr, node2->addr, sizeof(node1->addr)); > + if (cmp != 0) > + return cmp; > + > + if (node1->port < node2->port) > + return -1; > + if (node1->port > node2->port) > + return 1; > + return 0; > + > +} > + > +static struct sockfd_cache_entry * > +sockfd_cache_insert(struct sockfd_cache_entry *new) > +{ > + struct rb_node **p = &sockfd_cache.root.rb_node; > + struct rb_node *parent = NULL; > + struct sockfd_cache_entry *entry; > + > + while (*p) { > + int cmp; > + > + parent = *p; > + entry = rb_entry(parent, struct sockfd_cache_entry, rb); > + cmp = node_id_cmp(&new->nid, &entry->nid); > + > + if (cmp < 0) > + p = &(*p)->rb_left; > + else if (cmp > 0) > + p = &(*p)->rb_right; > + else > + return entry; > + } > + rb_link_node(&new->rb, parent, p); > + rb_insert_color(&new->rb, &sockfd_cache.root); > + > + return NULL; /* insert successfully */ > +} > + > +static struct sockfd_cache_entry *sockfd_cache_search(struct node_id *nid) > +{ > + struct rb_node *n = sockfd_cache.root.rb_node; > + struct sockfd_cache_entry *t; > + > + while (n) { > + int cmp; > + > + t = rb_entry(n, struct sockfd_cache_entry, rb); > + cmp = node_id_cmp(nid, &t->nid); > + > + if (cmp < 0) > + n = n->rb_left; > + else if (cmp > 0) > + n = n->rb_right; > + else > + return t; /* found it */ > + } > + > + return NULL; > +} > + > +static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid) > +{ > + struct sockfd_cache_entry *entry; > + > + pthread_rwlock_rdlock(&sockfd_cache.lock); > + entry = sockfd_cache_search(nid); > + pthread_rwlock_unlock(&sockfd_cache.lock); > + assert(entry); > + /* if refcount == 0, set it to 1, otherwise someone holds it */ > + if (uatomic_cmpxchg(&entry->refcount, 0, 1)) > + return NULL; > + > + return entry; > +} > + > +void sockfd_cache_del(struct node_id *nid) > +{ > + struct sockfd_cache_entry *entry; > + char name[INET6_ADDRSTRLEN]; > + int n; > + > + entry = sockfd_cache_grab(nid); > + /* Hmmm, some victim still holds it, he is supposed to delete it */ > + if (!entry) > + return; > + > + rb_erase(&entry->rb, &sockfd_cache.root); > + free(entry); > + n = uatomic_sub_return(&sockfd_cache.count, 1); > + addr_to_str(name, sizeof(name), nid->addr, 0); > + dprintf("%s:%d, count %d\n", name, nid->port, n); > +} > + > +static void sockfd_cache_add_nolock(struct node_id *nid) > +{ > + struct sockfd_cache_entry *new = xzalloc(sizeof(*new)); > + > + new->fd = -1; > + memcpy(&new->nid, nid, sizeof(struct node_id)); > + if (sockfd_cache_insert(new)) { > + free(new); > + return; > + } > + sockfd_cache.count++; > +} > + > +void sockfd_cache_add_group(struct sd_node *nodes, int nr) > +{ > + struct sd_node *p; > + struct node_id *nid; > + > + dprintf("%d\n", nr); > + pthread_rwlock_wrlock(&sockfd_cache.lock); > + while (nr--) { > + p = nodes + nr; > + nid = (struct node_id *)p; > + sockfd_cache_add_nolock(nid); > + } > + pthread_rwlock_unlock(&sockfd_cache.lock); > +} > + > +void sockfd_cache_add(struct sd_node *node) > +{ > + struct sockfd_cache_entry *new = xzalloc(sizeof(*new)); > + char name[INET6_ADDRSTRLEN]; > + int n; > + > + new->fd = -1; > + memcpy(&new->nid, node, sizeof(struct node_id)); > + pthread_rwlock_rdlock(&sockfd_cache.lock); > + if (sockfd_cache_insert(new)) { > + free(new); > + pthread_rwlock_unlock(&sockfd_cache.lock); > + return; > + } > + pthread_rwlock_unlock(&sockfd_cache.lock); > + n = uatomic_add_return(&sockfd_cache.count, 1); > + addr_to_str(name, sizeof(name), node->addr, 0); > + dprintf("%s:%d, count %d\n", name, node->port, n); > +} > + > +static int sockfd_cache_get(struct node_id *nid) > +{ > + struct sockfd_cache_entry *entry; > + char name[INET6_ADDRSTRLEN]; > + int fd; > + > + entry = sockfd_cache_grab(nid); > + if (!entry) > + return -1; > + > + if (entry->fd != -1) > + return entry->fd; > + > + /* Create a new connection for this vnode */ > + addr_to_str(name, sizeof(name), nid->addr, 0); > + dprintf("create connection %s:%d\n", name, nid->port); > + fd = connect_to(name, nid->port); > + if (fd < 0) { > + uatomic_dec(&entry->refcount); > + return -1; > + } > + entry->fd = fd; > + > + return fd; > +} > + > +static void sockfd_cache_put(struct node_id *nid) > +{ > + struct sockfd_cache_entry *entry; > + char name[INET6_ADDRSTRLEN]; > + int refcnt; > + > + addr_to_str(name, sizeof(name), nid->addr, 0); > + dprintf("%s:%d\n", name, nid->port); > + pthread_rwlock_rdlock(&sockfd_cache.lock); > + entry = sockfd_cache_search(nid); > + pthread_rwlock_unlock(&sockfd_cache.lock); > + assert(entry); > + refcnt = uatomic_cmpxchg(&entry->refcount, 1, 0); > + assert(refcnt == 1); > +} > + > +int sheep_get_fd(struct sd_vnode *vnode) > +{ > + struct node_id *nid = (struct node_id *)vnode; > + char name[INET6_ADDRSTRLEN]; > + int fd = sockfd_cache_get(nid); > + > + if (fd != -1) > + return fd; > + > + addr_to_str(name, sizeof(name), nid->addr, 0); > + fd = connect_to(name, nid->port); > + if (fd < 0) { > + dprintf("failed connect to %s:%d\n", name, nid->port); > + return -1; > + } > + > + dprintf("%d\n", fd); > + return fd; > +} > + > +void sheep_put_fd(struct sd_vnode *vnode, int fd) > +{ > + struct node_id *nid = (struct node_id *)vnode; > + struct sockfd_cache_entry *entry; > + > + pthread_rwlock_rdlock(&sockfd_cache.lock); > + entry = sockfd_cache_search(nid); > + pthread_rwlock_unlock(&sockfd_cache.lock); > + assert(entry); > + if (entry->fd == fd) { > + sockfd_cache_put(nid); > + } else { > + dprintf("%d\n", fd); > + close(fd); I suggests that we can put fd into sockfd_cache when the cached fd have been used by other thread, so that next call of sheep_get_fd() need not to execute connect_to() again. > + } > +} > + > +void sheep_del_fd(struct sd_vnode *vnode, int fd) > +{ > + struct node_id *nid = (struct node_id *)vnode; > + struct sockfd_cache_entry *entry; > + > + pthread_rwlock_rdlock(&sockfd_cache.lock); > + entry = sockfd_cache_search(nid); > + pthread_rwlock_unlock(&sockfd_cache.lock); > + assert(entry); > + if (entry->fd == fd) { > + sockfd_cache_put(nid); > + sockfd_cache_del(nid); > + } else { > + dprintf("%d\n", fd); > + close(fd); > + } > +} > -- > 1.7.10.2 > ________________________________ This email (including any attachments) is confidential and may be legally privileged. If you received this email in error, please delete it immediately and do not copy it or use it for any purpose or disclose its contents to any other person. Thank you. 本电邮(包括任何附件)可能含有机密资料并受法律保护。如您不是正确的收件人,请您立即删除本邮件。请不要将本电邮进行复制并用作任何其他用途、或透露本邮件之内容。谢谢。 |