[sheepdog] [PATCH 1/6] sheep: redesign a new cached sockfd pool

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Mon Jun 25 07:22:05 CEST 2012


At Sun, 24 Jun 2012 20:51:48 +0800,
Liu Yuan wrote:
> 
> From: Liu Yuan <tailai.ly at taobao.com>
> 
> Old sockfd pool has following defect:
>  0 statically allocated.
>  1 use too many fds per sheep, not scalable
>  2 implemented per thread, can't be shared between threads
>  3 need resetting at every membership change
> 
> The new sockfd cache aims to address these problems yet remain as effecient as
> old one:
>  0 dynamically allocated/deallocated at node granularity.
>  1 cached fds are multiplexed by all threads.
>  2 each session (for e.g, forward_write_obj_req) can grab one fd at a time
>  3 if there isn't any FD available from cache, use normal connect_to() and close()
>    internally
>  4 FD are named by IP:PORT uniquely, hence no need of resetting at membership
>    change
>  5 the total number of FDs shrinks from (nr_gateway + nr_io) * nr_nodes to nr_nodes
>  6 just add one more API, totally 3 APIs: sheep_{get,put,del}_fd()
> 
> Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
> ---
>  include/sheep.h      |    2 +-
>  sheep/Makefile.am    |    2 +-
>  sheep/sdnet.c        |   66 +-----------
>  sheep/sheep_priv.h   |   14 ++-
>  sheep/sockfd_cache.c |  293 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  5 files changed, 307 insertions(+), 70 deletions(-)
>  create mode 100644 sheep/sockfd_cache.c
> 
> diff --git a/include/sheep.h b/include/sheep.h
> index ac9179c..01aa202 100644
> --- a/include/sheep.h
> +++ b/include/sheep.h
> @@ -155,9 +155,9 @@ struct sd_node {
>  };
>  
>  struct sd_vnode {
> -	uint64_t        id;
>  	uint8_t         addr[16];
>  	uint16_t        port;
> +	uint64_t        id;
>  	uint16_t	node_idx;
>  	uint32_t	zone;
>  };

It looks better to change this structure so that the size of sd_vnode
doesn't increase:

  struct sd_vnode {
      uint8_t  addr[16];
      uint16_t port;
      uint16_t node_idx;
      uint32_t zone;
      uint64_t id;
  };

> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> index afdaad8..83a42e3 100644
> --- a/sheep/sheep_priv.h
> +++ b/sheep/sheep_priv.h
> @@ -292,9 +292,6 @@ int read_object(struct vnode_info *vnodes, uint32_t node_version,
>  int remove_object(struct vnode_info *vnodes, uint32_t node_version,
>  		  uint64_t oid, int nr);
>  
> -void del_sheep_fd(int fd);
> -int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch);
> -
>  int prealloc(int fd, uint32_t size);
>  
>  int objlist_cache_insert(uint64_t oid);
> @@ -404,4 +401,15 @@ void object_cache_delete(uint32_t vid);
>  
>  int object_cache_init(const char *p);
>  
> +/* sockfd_cache */
> +struct node_id;
> +
> +void sockfd_cache_del(struct node_id *);

How about changing this prototype to

  void sockfd_cache_del(struct sd_node *);

and avoiding exporting struct node_id?

> +void sockfd_cache_add(struct sd_node *);
> +void sockfd_cache_add_group(struct sd_node *nodes, int nr);
> +
> +int sheep_get_fd(struct sd_vnode *vnode);
> +void sheep_put_fd(struct sd_vnode *vnode, int fd);
> +void sheep_del_fd(struct sd_vnode *vnode, int fd);
> +
>  #endif
> diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
> new file mode 100644
> index 0000000..4f7ff5c
> --- /dev/null
> +++ b/sheep/sockfd_cache.c
> @@ -0,0 +1,293 @@
> +/*
> + * Copyright (C) 2012 Taobao Inc.
> + *
> + * Liu Yuan <namei.unix at gmail.com>
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#include <urcu/uatomic.h>
> +#include <pthread.h>
> +#include <stdint.h>
> +#include <stdlib.h>
> +#include <stdio.h>
> +
> +#include "sheep.h"
> +#include "sheep_priv.h"
> +#include "list.h"
> +#include "rbtree.h"
> +#include "logger.h"
> +#include "util.h"
> +
> +struct node_id {
> +	uint8_t addr[16];
> +	uint16_t port;
> +};
> +
> +struct sockfd_cache {
> +	struct rb_root root;
> +	pthread_rwlock_t lock;
> +	int count;
> +};
> +
> +static struct sockfd_cache sockfd_cache = {
> +	.root = RB_ROOT,
> +	.lock = PTHREAD_RWLOCK_INITIALIZER,
> +};
> +
> +struct sockfd_cache_entry {
> +	struct rb_node rb;
> +	int fd;
> +	uint8_t refcount;
> +	struct node_id nid;
> +};
> +
> +static inline int node_id_cmp(const void *a, const void *b)
> +{
> +	const struct node_id *node1 = a;
> +	const struct node_id *node2 = b;
> +	int cmp;
> +
> +	cmp = memcmp(node1->addr, node2->addr, sizeof(node1->addr));
> +	if (cmp != 0)
> +		return cmp;
> +
> +	if (node1->port < node2->port)
> +		return -1;
> +	if (node1->port > node2->port)
> +		return 1;
> +	return 0;
> +

Remove the redundant blank line.

> +}
> +
> +static struct sockfd_cache_entry *
> +sockfd_cache_insert(struct sockfd_cache_entry *new)
> +{
> +	struct rb_node **p = &sockfd_cache.root.rb_node;
> +	struct rb_node *parent = NULL;
> +	struct sockfd_cache_entry *entry;
> +
> +	while (*p) {
> +		int cmp;
> +
> +		parent = *p;
> +		entry = rb_entry(parent, struct sockfd_cache_entry, rb);
> +		cmp = node_id_cmp(&new->nid, &entry->nid);
> +
> +		if (cmp < 0)
> +			p = &(*p)->rb_left;
> +		else if (cmp > 0)
> +			p = &(*p)->rb_right;
> +		else
> +			return entry;
> +	}
> +	rb_link_node(&new->rb, parent, p);
> +	rb_insert_color(&new->rb, &sockfd_cache.root);
> +
> +	return NULL; /* insert successfully */
> +}
> +
> +static struct sockfd_cache_entry *sockfd_cache_search(struct node_id *nid)
> +{
> +	struct rb_node *n = sockfd_cache.root.rb_node;
> +	struct sockfd_cache_entry *t;
> +
> +	while (n) {
> +		int cmp;
> +
> +		t = rb_entry(n, struct sockfd_cache_entry, rb);
> +		cmp = node_id_cmp(nid, &t->nid);
> +
> +		if (cmp < 0)
> +			n = n->rb_left;
> +		else if (cmp > 0)
> +			n = n->rb_right;
> +		else
> +			return t; /* found it */
> +	}
> +
> +	return NULL;
> +}
> +
> +static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid)
> +{
> +	struct sockfd_cache_entry *entry;
> +
> +	pthread_rwlock_rdlock(&sockfd_cache.lock);
> +	entry = sockfd_cache_search(nid);
> +	pthread_rwlock_unlock(&sockfd_cache.lock);
> +	assert(entry);
> +	/* if refcount == 0, set it to 1, otherwise someone holds it */
> +	if (uatomic_cmpxchg(&entry->refcount, 0, 1))
> +		return NULL;
> +
> +	return entry;
> +}
> +
> +void sockfd_cache_del(struct node_id *nid)
> +{
> +	struct sockfd_cache_entry *entry;
> +	char name[INET6_ADDRSTRLEN];
> +	int n;
> +
> +	entry = sockfd_cache_grab(nid);
> +	/* Hmmm, some victim still holds it, he is supposed to delete it */
> +	if (!entry)
> +		return;
> +
> +	rb_erase(&entry->rb, &sockfd_cache.root);
> +	free(entry);
> +	n = uatomic_sub_return(&sockfd_cache.count, 1);
> +	addr_to_str(name, sizeof(name), nid->addr, 0);
> +	dprintf("%s:%d, count %d\n", name, nid->port, n);
> +}
> +
> +static void sockfd_cache_add_nolock(struct node_id *nid)
> +{
> +	struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
> +
> +	new->fd = -1;
> +	memcpy(&new->nid, nid, sizeof(struct node_id));
> +	if (sockfd_cache_insert(new)) {
> +		free(new);
> +		return;
> +	}
> +       sockfd_cache.count++;

Use a tab instead of spaces.

Thanks,

Kazutaka



More information about the sheepdog mailing list