[sheepdog] [PATCH v4 1/2] lib: move sockfd cache from sheep to lib

Liu Yuan namei.unix at gmail.com
Thu Jul 25 09:16:38 CEST 2013


On Thu, Jul 25, 2013 at 03:43:54PM +0900, Hitoshi Mitake wrote:
> On some subcommands, collie also issues lots of request to sheeps. So
> collie can enjoy sockfd caching. For this purpose, this patch moves
> sockfd from sheep to lib and generalize the interfaces of sockfd.
> 
> This patch doesn't change anything related to IO NIC in sockfd
> cache. Because collie can know address and port of IO NIC so
> potentially use them.
> 
> Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
> ---
>  include/Makefile.am           |    3 +-
>  include/internal_proto.h      |    2 +
>  include/sockfd_cache.h        |   23 ++
>  lib/Makefile.am               |    2 +-
>  {sheep => lib}/sockfd_cache.c |  113 +++++----
>  sheep/gateway.c               |   10 +-
>  sheep/group.c                 |    2 +-
>  sheep/sheep.c                 |    2 +
>  sheep/sheep_priv.h            |   11 +-
>  sheep/sockfd_cache.c          |  509 +----------------------------------------
>  10 files changed, 93 insertions(+), 584 deletions(-)
>  create mode 100644 include/sockfd_cache.h
>  copy {sheep => lib}/sockfd_cache.c (90%)
> 
> diff --git a/include/Makefile.am b/include/Makefile.am
> index 0acb76e..4d0c229 100644
> --- a/include/Makefile.am
> +++ b/include/Makefile.am
> @@ -2,4 +2,5 @@ MAINTAINERCLEANFILES    = Makefile.in config.h.in
>  
>  noinst_HEADERS          = bitops.h event.h logger.h sheepdog_proto.h util.h \
>  			  list.h net.h sheep.h exits.h strbuf.h rbtree.h \
> -			  sha1.h option.h internal_proto.h shepherd.h work.h
> +			  sha1.h option.h internal_proto.h shepherd.h work.h \
> +			  sockfd_cache.h
> diff --git a/include/internal_proto.h b/include/internal_proto.h
> index 0463eae..0061007 100644
> --- a/include/internal_proto.h
> +++ b/include/internal_proto.h
> @@ -20,6 +20,8 @@
>  #include <stdint.h>
>  #include <netinet/in.h>
>  
> +#include "sheepdog_proto.h"
> +
>  #define SD_SHEEP_PROTO_VER 0x08
>  
>  #define SD_DEFAULT_COPIES 3
> diff --git a/include/sockfd_cache.h b/include/sockfd_cache.h
> new file mode 100644
> index 0000000..76154c9
> --- /dev/null
> +++ b/include/sockfd_cache.h
> @@ -0,0 +1,23 @@
> +#ifndef SOCKFD_CACHE_H
> +#define SOCKFD_CACHE_H
> +
> +#include "internal_proto.h"
> +#include "work.h"
> +
> +struct sockfd *sockfd_cache_get(const struct node_id *nid);
> +void do_sockfd_cache_put(const struct node_id *nid, int idx);
> +void sockfd_cache_put(const struct node_id *nid, struct sockfd *sfd);
> +void sockfd_node_del(const struct node_id *nid);
> +void sockfd_cache_del(const struct node_id *nid, struct sockfd *sfd);
> +void sockfd_cache_add(const struct node_id *nid);
> +void sockfd_cache_add_group(const struct sd_node *nodes, int nr);
> +
> +void init_sockfd(struct work_queue *wq);
> +
> +/* sockfd_cache */
> +struct sockfd {
> +	int fd;
> +	int idx;
> +};
> +
> +#endif	/* SOCKFD_CACHE_H */
> diff --git a/lib/Makefile.am b/lib/Makefile.am
> index a95ce4c..22edf6e 100644
> --- a/lib/Makefile.am
> +++ b/lib/Makefile.am
> @@ -5,7 +5,7 @@ INCLUDES                = -I$(top_builddir)/include -I$(top_srcdir)/include
>  noinst_LIBRARIES	= libsheepdog.a
>  
>  libsheepdog_a_SOURCES	= event.c logger.c net.c util.c rbtree.c strbuf.c \
> -			  sha1.c option.c work.c
> +			  sha1.c option.c work.c sockfd_cache.c
>  
>  # support for GNU Flymake
>  check-syntax:
> diff --git a/sheep/sockfd_cache.c b/lib/sockfd_cache.c
> similarity index 90%
> copy from sheep/sockfd_cache.c
> copy to lib/sockfd_cache.c
> index 13bb7f6..334ba16 100644
> --- a/sheep/sockfd_cache.c
> +++ b/lib/sockfd_cache.c
> @@ -27,7 +27,13 @@
>   *    7 support dual connections to a single node.
>   */
>  
> -#include "sheep_priv.h"
> +#include <pthread.h>
> +
> +#include "sockfd_cache.h"
> +#include "work.h"
> +#include "rbtree.h"
> +#include "util.h"
> +#include "sheep.h"
>  
>  struct sockfd_cache {
>  	struct rb_root root;
> @@ -208,20 +214,6 @@ false_out:
>  	return false;
>  }
>  
> -/* When node craches, we should delete it from the cache */
> -void sockfd_cache_del(const struct node_id *nid)
> -{
> -	char name[INET6_ADDRSTRLEN];
> -	int n;
> -
> -	if (!sockfd_cache_destroy(nid))
> -		return;
> -
> -	n = uatomic_sub_return(&sockfd_cache.count, 1);
> -	addr_to_str(name, sizeof(name), nid->addr, 0);
> -	sd_dprintf("%s:%d, count %d", name, nid->port, n);
> -}
> -
>  static void sockfd_cache_add_nolock(const struct node_id *nid)
>  {
>  	struct sockfd_cache_entry *new = xmalloc(sizeof(*new));
> @@ -282,6 +274,8 @@ void sockfd_cache_add(const struct node_id *nid)
>  static uatomic_bool fds_in_grow;
>  static int fds_high_watermark = FDS_WATERMARK(DEFAULT_FDS_COUNT);
>  
> +static struct work_queue *grow_wq;
> +
>  static void do_grow_fds(struct work *work)
>  {
>  	struct sockfd_cache_entry *entry;
> @@ -318,6 +312,15 @@ static inline void check_idx(int idx)
>  {
>  	struct work *w;
>  
> +	if (!grow_wq) {
> +		sd_dprintf("Request for growing fds is issued, but"
> +			" a work queue for this work is not registered yet.");
> +		sd_dprintf("Initializing work queue in earlier stage"
> +			" is suggested.");
> +
> +		return;
> +	}
> +
>  	if (idx <= fds_high_watermark)
>  		return;
>  	if (!uatomic_set_true(&fds_in_grow))
> @@ -326,7 +329,7 @@ static inline void check_idx(int idx)
>  	w = xmalloc(sizeof(*w));
>  	w->fn = do_grow_fds;
>  	w->done = grow_fds_done;
> -	queue_work(sys->sockfd_wqueue, w);
> +	queue_work(grow_wq, w);
>  }
>  
>  /* Add the node back if it is still alive */
> @@ -350,7 +353,7 @@ alive:
>  }
>  
>  /* Try to create/get cached IO connection. If failed, fallback to non-IO one */
> -static struct sockfd *sockfd_cache_get(const struct node_id *nid)
> +static struct sockfd *do_sockfd_cache_get(const struct node_id *nid)
>  {
>  	struct sockfd_cache_entry *entry;
>  	struct sockfd *sfd;
> @@ -402,7 +405,7 @@ out:
>  	return sfd;
>  }
>  
> -static void sockfd_cache_put(const struct node_id *nid, int idx)
> +void do_sockfd_cache_put(const struct node_id *nid, int idx)
>  {
>  	bool use_io = nid->io_port ? true : false;
>  	const uint8_t *addr = use_io ? nid->io_addr : nid->addr;
> @@ -442,20 +445,29 @@ static void sockfd_cache_close(const struct node_id *nid, int idx)
>  }
>  
>  /*
> + * set work queue for growing fds
> + * before this function called, growing cannot be done
> + */
> +void init_sockfd(struct work_queue *wq)
> +{
> +	grow_wq = wq;
> +}
> +
> +/*
>   * Return a sockfd connected to the node to the caller
>   *
>   * Try to get a 'long' FD as best, which is cached and never closed. If no FD
>   * available, we return a 'short' FD which is supposed to be closed by
> - * sheep_put_sockfd().
> + * put_sockfd().
>   *
>   * ret_idx is opaque to the caller, -1 indicates it is a short FD.
>   */
> -struct sockfd *sheep_get_sockfd(const struct node_id *nid)
> +struct sockfd *sockfd_cache_get(const struct node_id *nid)
>  {
>  	struct sockfd *sfd;
>  	int fd;
>  
> -	sfd = sockfd_cache_get(nid);
> +	sfd = do_sockfd_cache_get(nid);
>  	if (sfd)
>  		return sfd;
>  
> @@ -473,7 +485,7 @@ struct sockfd *sheep_get_sockfd(const struct node_id *nid)
>  
>  /*
>   * Release a sockfd connected to the node, which is acquired from
> - * sheep_get_sockfd()
> + * sockfd_cache_get()
>   *
>   * If it is a long FD, just decrease the refcount to make it available again.
>   * If it is a short FD, close it.
> @@ -481,8 +493,7 @@ struct sockfd *sheep_get_sockfd(const struct node_id *nid)
>   * sheep_put_sockfd() or sheep_del_sockfd() should be paired with
>   * sheep_get_sockfd()
>   */
> -
> -void sheep_put_sockfd(const struct node_id *nid, struct sockfd *sfd)
> +void sockfd_cache_put(const struct node_id *nid, struct sockfd *sfd)
>  {
>  	if (sfd->idx == -1) {
>  		sd_dprintf("%d", sfd->fd);
> @@ -491,18 +502,32 @@ void sheep_put_sockfd(const struct node_id *nid, struct sockfd *sfd)
>  		return;
>  	}
>  
> -	sockfd_cache_put(nid, sfd->idx);
> +	do_sockfd_cache_put(nid, sfd->idx);
>  	free(sfd);
>  }
>  
> +/* Delete all sockfd connected to the node, when node is crashed. */
> +void sockfd_node_del(const struct node_id *nid)
> +{
> +	char name[INET6_ADDRSTRLEN];
> +	int n;
> +
> +	if (!sockfd_cache_destroy(nid))
> +		return;
> +
> +	n = uatomic_sub_return(&sockfd_cache.count, 1);
> +	addr_to_str(name, sizeof(name), nid->addr, 0);
> +	sd_dprintf("%s:%d, count %d", name, nid->port, n);
> +}
> +
>  /*
> - * Delete a sockfd connected to the node, when node is crashed.
> + * Delete a sockfd connected to the node.
>   *
>   * If it is a long FD, de-refcount it and tres to destroy all the cached FDs of
>   * this node in the cache.
>   * If it is a short FD, just close it.
>   */
> -void sheep_del_sockfd(const struct node_id *nid, struct sockfd *sfd)
> +void sockfd_cache_del(const struct node_id *nid, struct sockfd *sfd)
>  {
>  	if (sfd->idx == -1) {
>  		sd_dprintf("%d", sfd->fd);
> @@ -512,38 +537,6 @@ void sheep_del_sockfd(const struct node_id *nid, struct sockfd *sfd)
>  	}
>  
>  	sockfd_cache_close(nid, sfd->idx);
> -	sockfd_cache_del(nid);
> +	do_sockfd_cache_put(nid, sfd->idx);
>  	free(sfd);
>  }
> -
> -int sheep_exec_req(const struct node_id *nid, struct sd_req *hdr, void *buf)
> -{
> -	struct sd_rsp *rsp = (struct sd_rsp *)hdr;
> -	struct sockfd *sfd;
> -	int ret;
> -
> -	assert(is_worker_thread());
> -
> -	sfd = sheep_get_sockfd(nid);
> -	if (!sfd)
> -		return SD_RES_NETWORK_ERROR;
> -
> -	ret = exec_req(sfd->fd, hdr, buf, sheep_need_retry, hdr->epoch,
> -		       MAX_RETRY_COUNT);
> -	if (ret) {
> -		sd_dprintf("remote node might have gone away");
> -		sheep_del_sockfd(nid, sfd);
> -		return SD_RES_NETWORK_ERROR;
> -	}
> -	ret = rsp->result;
> -	if (ret != SD_RES_SUCCESS)
> -		sd_eprintf("failed %s", sd_strerror(ret));
> -
> -	sheep_put_sockfd(nid, sfd);
> -	return ret;
> -}
> -
> -bool sheep_need_retry(uint32_t epoch)
> -{
> -	return sys_epoch() == epoch;
> -}
> diff --git a/sheep/gateway.c b/sheep/gateway.c
> index 59d0127..a4278c9 100644
> --- a/sheep/gateway.c
> +++ b/sheep/gateway.c
> @@ -114,13 +114,13 @@ static inline void write_info_update(struct write_info *wi, int pos)
>  
>  static inline void finish_one_write(struct write_info *wi, int i)
>  {
> -	sheep_put_sockfd(wi->ent[i].nid, wi->ent[i].sfd);
> +	sockfd_cache_put(wi->ent[i].nid, wi->ent[i].sfd);
>  	write_info_update(wi, i);
>  }
>  
>  static inline void finish_one_write_err(struct write_info *wi, int i)
>  {
> -	sheep_del_sockfd(wi->ent[i].nid, wi->ent[i].sfd);
> +	sockfd_cache_del(wi->ent[i].nid, wi->ent[i].sfd);
>  	write_info_update(wi, i);
>  }
>  
> @@ -176,7 +176,7 @@ again:
>  		nr_sent = wi->nr_sent;
>  		/* XXX Blinedly close all the connections */
>  		for (i = 0; i < nr_sent; i++)
> -			sheep_del_sockfd(wi->ent[i].nid, wi->ent[i].sfd);
> +			sockfd_cache_del(wi->ent[i].nid, wi->ent[i].sfd);
>  
>  		return SD_RES_NETWORK_ERROR;
>  	}
> @@ -278,7 +278,7 @@ static int gateway_forward_request(struct request *req)
>  		}
>  
>  		nid = &target_nodes[i]->nid;
> -		sfd = sheep_get_sockfd(nid);
> +		sfd = sockfd_cache_get(nid);
>  		if (!sfd) {
>  			err_ret = SD_RES_NETWORK_ERROR;
>  			break;
> @@ -288,7 +288,7 @@ static int gateway_forward_request(struct request *req)
>  			       sheep_need_retry, req->rq.epoch,
>  			       MAX_RETRY_COUNT);
>  		if (ret) {
> -			sheep_del_sockfd(nid, sfd);
> +			sockfd_node_del(nid);
>  			err_ret = SD_RES_NETWORK_ERROR;
>  			sd_dprintf("fail %d", ret);
>  			break;
> diff --git a/sheep/group.c b/sheep/group.c
> index 4a2a83b..2508474 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -983,7 +983,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
>  
>  	put_vnode_info(old_vnode_info);
>  
> -	sockfd_cache_del(&left->nid);
> +	sockfd_node_del(&left->nid);
>  }
>  
>  static void update_node_size(struct sd_node *node)
> diff --git a/sheep/sheep.c b/sheep/sheep.c
> index c8d4478..8db000f 100644
> --- a/sheep/sheep.c
> +++ b/sheep/sheep.c
> @@ -443,6 +443,8 @@ static int create_work_queues(void)
>  	    !sys->deletion_wqueue || !sys->block_wqueue ||
>  	    !sys->sockfd_wqueue || !sys->md_wqueue)
>  			return -1;
> +
> +	init_sockfd(sys->sockfd_wqueue);
>  	return 0;
>  }
>  
> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> index bc95cd1..25caf63 100644
> --- a/sheep/sheep_priv.h
> +++ b/sheep/sheep_priv.h
> @@ -42,6 +42,7 @@
>  #include "strbuf.h"
>  #include "sha1.h"
>  #include "config.h"
> +#include "sockfd_cache.h"
>  
>  struct client_info {
>  	struct connection conn;
> @@ -399,16 +400,6 @@ int object_cache_remove(uint64_t oid);
>  /* store layout migration */
>  int sd_migrate_store(int from, int to);
>  
> -/* sockfd_cache */
> -struct sockfd {
> -	int fd;
> -	int idx;
> -};
> -
> -void sockfd_cache_del(const struct node_id *);
> -void sockfd_cache_add(const struct node_id *);
> -void sockfd_cache_add_group(const struct sd_node *nodes, int nr);
> -
>  struct sockfd *sheep_get_sockfd(const struct node_id *);
>  void sheep_put_sockfd(const struct node_id *, struct sockfd *);
>  void sheep_del_sockfd(const struct node_id *, struct sockfd *);
> diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
> index 13bb7f6..c580344 100644
> --- a/sheep/sockfd_cache.c
> +++ b/sheep/sockfd_cache.c
> @@ -11,511 +11,8 @@
>   * along with this program. If not, see <http://www.gnu.org/licenses/>.
>   */
>  
> -/*
> - * The sockfd cache provides us long TCP connections connected to the nodes
> - * in the cluster to accerlater the data transfer, which has the following
> - * characteristics:
> - *    0 dynamically allocated/deallocated at node granularity.
> - *    1 cached fds are multiplexed by all threads.
> - *    2 each session (for e.g, forward_write_obj_req) can grab one fd at a time.
> - *    3 if there isn't any FD available from cache, use normal connect_to() and
> - *      close() internally.
> - *    4 FD are named by IP:PORT uniquely, hence no need of resetting at
> - *      membership change.
> - *    5 the total number of FDs is scalable to massive nodes.
> - *    6 total 3 APIs: sheep_{get,put,del}_sockfd().
> - *    7 support dual connections to a single node.
> - */
> -
>  #include "sheep_priv.h"
>  
> -struct sockfd_cache {
> -	struct rb_root root;
> -	pthread_rwlock_t lock;
> -	int count;
> -};
> -
> -static struct sockfd_cache sockfd_cache = {
> -	.root = RB_ROOT,
> -	.lock = PTHREAD_RWLOCK_INITIALIZER,
> -};
> -
> -/*
> - * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at
> - * most 8 requests can be issued to the same sheep object. Based on this
> - * assumption, '8' would be effecient for servers that only host 2~4
> - * Guests.
> - *
> - * This fd count will be dynamically grown when the idx reaches watermark which
> - * is calculated by FDS_WATERMARK
> - */
> -#define FDS_WATERMARK(x) ((x) * 3 / 4)
> -#define DEFAULT_FDS_COUNT	8
> -
> -/* How many FDs we cache for one node */
> -static int fds_count = DEFAULT_FDS_COUNT;
> -
> -struct sockfd_cache_fd {
> -	int fd;
> -	uatomic_bool in_use;
> -};
> -
> -struct sockfd_cache_entry {
> -	struct rb_node rb;
> -	struct node_id nid;
> -	struct sockfd_cache_fd *fds;
> -};
> -
> -static struct sockfd_cache_entry *
> -sockfd_cache_insert(struct sockfd_cache_entry *new)
> -{
> -	struct rb_node **p = &sockfd_cache.root.rb_node;
> -	struct rb_node *parent = NULL;
> -	struct sockfd_cache_entry *entry;
> -
> -	while (*p) {
> -		int cmp;
> -
> -		parent = *p;
> -		entry = rb_entry(parent, struct sockfd_cache_entry, rb);
> -		cmp = node_id_cmp(&new->nid, &entry->nid);
> -
> -		if (cmp < 0)
> -			p = &(*p)->rb_left;
> -		else if (cmp > 0)
> -			p = &(*p)->rb_right;
> -		else
> -			return entry;
> -	}
> -	rb_link_node(&new->rb, parent, p);
> -	rb_insert_color(&new->rb, &sockfd_cache.root);
> -
> -	return NULL; /* insert successfully */
> -}
> -
> -static struct sockfd_cache_entry *sockfd_cache_search(const struct node_id *nid)
> -{
> -	struct rb_node *n = sockfd_cache.root.rb_node;
> -	struct sockfd_cache_entry *t;
> -
> -	while (n) {
> -		int cmp;
> -
> -		t = rb_entry(n, struct sockfd_cache_entry, rb);
> -		cmp = node_id_cmp(nid, &t->nid);
> -
> -		if (cmp < 0)
> -			n = n->rb_left;
> -		else if (cmp > 0)
> -			n = n->rb_right;
> -		else
> -			return t; /* found it */
> -	}
> -
> -	return NULL;
> -}
> -
> -static inline int get_free_slot(struct sockfd_cache_entry *entry)
> -{
> -	int idx = -1, i;
> -
> -	for (i = 0; i < fds_count; i++) {
> -		if (!uatomic_set_true(&entry->fds[i].in_use))
> -			continue;
> -		idx = i;
> -		break;
> -	}
> -	return idx;
> -}
> -
> -/*
> - * Grab a free slot of the node and inc the refcount of the slot
> - *
> - * If no free slot available, this typically means we should use short FD.
> - */
> -static struct sockfd_cache_entry *sockfd_cache_grab(const struct node_id *nid,
> -						    int *ret_idx)
> -{
> -	struct sockfd_cache_entry *entry;
> -
> -	pthread_rwlock_rdlock(&sockfd_cache.lock);
> -	entry = sockfd_cache_search(nid);
> -	if (!entry) {
> -		char name[INET6_ADDRSTRLEN];
> -
> -		addr_to_str(name, sizeof(name), nid->addr, 0);
> -		sd_dprintf("failed node %s:%d", name, nid->port);
> -		goto out;
> -	}
> -
> -	*ret_idx = get_free_slot(entry);
> -	if (*ret_idx == -1)
> -		entry = NULL;
> -out:
> -	pthread_rwlock_unlock(&sockfd_cache.lock);
> -	return entry;
> -}
> -
> -static inline bool slots_all_free(struct sockfd_cache_entry *entry)
> -{
> -	int i;
> -	for (i = 0; i < fds_count; i++)
> -		if (uatomic_is_true(&entry->fds[i].in_use))
> -			return false;
> -	return true;
> -}
> -
> -static inline void destroy_all_slots(struct sockfd_cache_entry *entry)
> -{
> -	int i;
> -	for (i = 0; i < fds_count; i++)
> -		if (entry->fds[i].fd != -1)
> -			close(entry->fds[i].fd);
> -}
> -
> -/*
> - * Destroy all the Cached FDs of the node
> - *
> - * We don't proceed if some other node grab one FD of the node. In this case,
> - * the victim node will finally find itself talking to a dead node and call
> - * sheep_del_fd() to delete this node from the cache.
> - */
> -static bool sockfd_cache_destroy(const struct node_id *nid)
> -{
> -	struct sockfd_cache_entry *entry;
> -
> -	pthread_rwlock_wrlock(&sockfd_cache.lock);
> -	entry = sockfd_cache_search(nid);
> -	if (!entry) {
> -		sd_dprintf("It is already destroyed");
> -		goto false_out;
> -	}
> -
> -	if (!slots_all_free(entry)) {
> -		sd_dprintf("Some victim still holds it");
> -		goto false_out;
> -	}
> -
> -	rb_erase(&entry->rb, &sockfd_cache.root);
> -	pthread_rwlock_unlock(&sockfd_cache.lock);
> -
> -	destroy_all_slots(entry);
> -	free(entry);
> -
> -	return true;
> -false_out:
> -	pthread_rwlock_unlock(&sockfd_cache.lock);
> -	return false;
> -}
> -
> -/* When node craches, we should delete it from the cache */
> -void sockfd_cache_del(const struct node_id *nid)
> -{
> -	char name[INET6_ADDRSTRLEN];
> -	int n;
> -
> -	if (!sockfd_cache_destroy(nid))
> -		return;
> -
> -	n = uatomic_sub_return(&sockfd_cache.count, 1);
> -	addr_to_str(name, sizeof(name), nid->addr, 0);
> -	sd_dprintf("%s:%d, count %d", name, nid->port, n);
> -}
> -
> -static void sockfd_cache_add_nolock(const struct node_id *nid)
> -{
> -	struct sockfd_cache_entry *new = xmalloc(sizeof(*new));
> -	int i;
> -
> -	new->fds = xzalloc(sizeof(struct sockfd_cache_fd) * fds_count);
> -	for (i = 0; i < fds_count; i++)
> -		new->fds[i].fd = -1;
> -
> -	memcpy(&new->nid, nid, sizeof(struct node_id));
> -	if (sockfd_cache_insert(new)) {
> -		free(new);
> -		return;
> -	}
> -	sockfd_cache.count++;
> -}
> -
> -/* Add group of nodes to the cache */
> -void sockfd_cache_add_group(const struct sd_node *nodes, int nr)
> -{
> -	const struct sd_node *p;
> -
> -	sd_dprintf("%d", nr);
> -	pthread_rwlock_wrlock(&sockfd_cache.lock);
> -	while (nr--) {
> -		p = nodes + nr;
> -		sockfd_cache_add_nolock(&p->nid);
> -	}
> -	pthread_rwlock_unlock(&sockfd_cache.lock);
> -}
> -
> -/* Add one node to the cache means we can do caching tricks on this node */
> -void sockfd_cache_add(const struct node_id *nid)
> -{
> -	struct sockfd_cache_entry *new;
> -	char name[INET6_ADDRSTRLEN];
> -	int n, i;
> -
> -	pthread_rwlock_wrlock(&sockfd_cache.lock);
> -	new = xmalloc(sizeof(*new));
> -	new->fds = xzalloc(sizeof(struct sockfd_cache_fd) * fds_count);
> -	for (i = 0; i < fds_count; i++)
> -		new->fds[i].fd = -1;
> -
> -	memcpy(&new->nid, nid, sizeof(struct node_id));
> -	if (sockfd_cache_insert(new)) {
> -		free(new->fds);
> -		free(new);
> -		pthread_rwlock_unlock(&sockfd_cache.lock);
> -		return;
> -	}
> -	pthread_rwlock_unlock(&sockfd_cache.lock);
> -	n = uatomic_add_return(&sockfd_cache.count, 1);
> -	addr_to_str(name, sizeof(name), nid->addr, 0);
> -	sd_dprintf("%s:%d, count %d", name, nid->port, n);
> -}
> -
> -static uatomic_bool fds_in_grow;
> -static int fds_high_watermark = FDS_WATERMARK(DEFAULT_FDS_COUNT);
> -
> -static void do_grow_fds(struct work *work)
> -{
> -	struct sockfd_cache_entry *entry;
> -	struct rb_node *p;
> -	int old_fds_count, new_fds_count, new_size, i;
> -
> -	sd_dprintf("%d", fds_count);
> -	pthread_rwlock_wrlock(&sockfd_cache.lock);
> -	old_fds_count = fds_count;
> -	new_fds_count = fds_count * 2;
> -	new_size = sizeof(struct sockfd_cache_fd) * fds_count * 2;
> -	for (p = rb_first(&sockfd_cache.root); p; p = rb_next(p)) {
> -		entry = rb_entry(p, struct sockfd_cache_entry, rb);
> -		entry->fds = xrealloc(entry->fds, new_size);
> -		for (i = old_fds_count; i < new_fds_count; i++) {
> -			entry->fds[i].fd = -1;
> -			uatomic_set_false(&entry->fds[i].in_use);
> -		}
> -	}
> -
> -	fds_count *= 2;
> -	fds_high_watermark = FDS_WATERMARK(fds_count);
> -	pthread_rwlock_unlock(&sockfd_cache.lock);
> -}
> -
> -static void grow_fds_done(struct work *work)
> -{
> -	sd_dprintf("fd count has been grown into %d", fds_count);
> -	uatomic_set_false(&fds_in_grow);
> -	free(work);
> -}
> -
> -static inline void check_idx(int idx)
> -{
> -	struct work *w;
> -
> -	if (idx <= fds_high_watermark)
> -		return;
> -	if (!uatomic_set_true(&fds_in_grow))
> -		return;
> -
> -	w = xmalloc(sizeof(*w));
> -	w->fn = do_grow_fds;
> -	w->done = grow_fds_done;
> -	queue_work(sys->sockfd_wqueue, w);
> -}
> -
> -/* Add the node back if it is still alive */
> -static inline int revalidate_node(const struct node_id *nid)
> -{
> -	bool use_io = nid->io_port ? true : false;
> -	int fd;
> -
> -	if (use_io) {
> -		fd = connect_to_addr(nid->io_addr, nid->io_port);
> -		if (fd >= 0)
> -			goto alive;
> -	}
> -	fd = connect_to_addr(nid->addr, nid->port);
> -	if (fd < 0)
> -		return false;
> -alive:
> -	close(fd);
> -	sockfd_cache_add(nid);
> -	return true;
> -}
> -
> -/* Try to create/get cached IO connection. If failed, fallback to non-IO one */
> -static struct sockfd *sockfd_cache_get(const struct node_id *nid)
> -{
> -	struct sockfd_cache_entry *entry;
> -	struct sockfd *sfd;
> -	bool use_io = nid->io_port ? true : false;
> -	const uint8_t *addr = use_io ? nid->io_addr : nid->addr;
> -	char name[INET6_ADDRSTRLEN];
> -	int fd, idx, port = use_io ? nid->io_port : nid->port;
> -
> -	addr_to_str(name, sizeof(name), addr, 0);
> -grab:
> -	entry = sockfd_cache_grab(nid, &idx);
> -	if (!entry) {
> -		/*
> -		 * The node is deleted, but someone askes us to grab it.
> -		 * The nid is not in the sockfd cache but probably it might be
> -		 * still alive due to broken network connection or was just too
> -		 * busy to serve any request that makes other nodes deleted it
> -		 * from the sockfd cache. In such cases, we need to add it back.
> -		 */
> -		if (!revalidate_node(nid))
> -			return NULL;
> -		goto grab;
> -	}
> -	check_idx(idx);
> -	if (entry->fds[idx].fd != -1) {
> -		sd_dprintf("%s:%d, idx %d", name, port, idx);
> -		goto out;
> -	}
> -
> -	/* Create a new cached connection for this node */
> -	sd_dprintf("create cache connection %s:%d idx %d", name, port, idx);
> -	fd = connect_to(name, port);
> -	if (fd < 0) {
> -		if (use_io) {
> -			sd_eprintf("fallback to non-io connection");
> -			fd = connect_to_addr(nid->addr, nid->port);
> -			if (fd >= 0)
> -				goto new;
> -		}
> -		uatomic_set_false(&entry->fds[idx].in_use);
> -		return NULL;
> -	}
> -new:
> -	entry->fds[idx].fd = fd;
> -out:
> -	sfd = xmalloc(sizeof(*sfd));
> -	sfd->fd = entry->fds[idx].fd;
> -	sfd->idx = idx;
> -	return sfd;
> -}
> -
> -static void sockfd_cache_put(const struct node_id *nid, int idx)
> -{
> -	bool use_io = nid->io_port ? true : false;
> -	const uint8_t *addr = use_io ? nid->io_addr : nid->addr;
> -	int port = use_io ? nid->io_port : nid->port;
> -	struct sockfd_cache_entry *entry;
> -	char name[INET6_ADDRSTRLEN];
> -
> -	addr_to_str(name, sizeof(name), addr, 0);
> -	sd_dprintf("%s:%d idx %d", name, port, idx);
> -
> -	pthread_rwlock_rdlock(&sockfd_cache.lock);
> -	entry = sockfd_cache_search(nid);
> -	if (entry)
> -		uatomic_set_false(&entry->fds[idx].in_use);
> -	pthread_rwlock_unlock(&sockfd_cache.lock);
> -}
> -
> -static void sockfd_cache_close(const struct node_id *nid, int idx)
> -{
> -	bool use_io = nid->io_port ? true : false;
> -	const uint8_t *addr = use_io ? nid->io_addr : nid->addr;
> -	int port = use_io ? nid->io_port : nid->port;
> -	struct sockfd_cache_entry *entry;
> -	char name[INET6_ADDRSTRLEN];
> -
> -	addr_to_str(name, sizeof(name), addr, 0);
> -	sd_dprintf("%s:%d idx %d", name, port, idx);
> -
> -	pthread_rwlock_wrlock(&sockfd_cache.lock);
> -	entry = sockfd_cache_search(nid);
> -	if (entry) {
> -		close(entry->fds[idx].fd);
> -		entry->fds[idx].fd = -1;
> -		uatomic_set_false(&entry->fds[idx].in_use);
> -	}
> -	pthread_rwlock_unlock(&sockfd_cache.lock);
> -}
> -
> -/*
> - * Return a sockfd connected to the node to the caller
> - *
> - * Try to get a 'long' FD as best, which is cached and never closed. If no FD
> - * available, we return a 'short' FD which is supposed to be closed by
> - * sheep_put_sockfd().
> - *
> - * ret_idx is opaque to the caller, -1 indicates it is a short FD.
> - */
> -struct sockfd *sheep_get_sockfd(const struct node_id *nid)
> -{
> -	struct sockfd *sfd;
> -	int fd;
> -
> -	sfd = sockfd_cache_get(nid);
> -	if (sfd)
> -		return sfd;
> -
> -	/* Fallback on a non-io connection that is to be closed shortly */
> -	fd = connect_to_addr(nid->addr, nid->port);
> -	if (fd < 0)
> -		return NULL;
> -
> -	sfd = xmalloc(sizeof(*sfd));
> -	sfd->idx = -1;
> -	sfd->fd = fd;
> -	sd_dprintf("%d", fd);
> -	return sfd;
> -}
> -
> -/*
> - * Release a sockfd connected to the node, which is acquired from
> - * sheep_get_sockfd()
> - *
> - * If it is a long FD, just decrease the refcount to make it available again.
> - * If it is a short FD, close it.
> - *
> - * sheep_put_sockfd() or sheep_del_sockfd() should be paired with
> - * sheep_get_sockfd()
> - */
> -
> -void sheep_put_sockfd(const struct node_id *nid, struct sockfd *sfd)
> -{
> -	if (sfd->idx == -1) {
> -		sd_dprintf("%d", sfd->fd);
> -		close(sfd->fd);
> -		free(sfd);
> -		return;
> -	}
> -
> -	sockfd_cache_put(nid, sfd->idx);
> -	free(sfd);
> -}
> -
> -/*
> - * Delete a sockfd connected to the node, when node is crashed.
> - *
> - * If it is a long FD, de-refcount it and tres to destroy all the cached FDs of
> - * this node in the cache.
> - * If it is a short FD, just close it.
> - */
> -void sheep_del_sockfd(const struct node_id *nid, struct sockfd *sfd)
> -{
> -	if (sfd->idx == -1) {
> -		sd_dprintf("%d", sfd->fd);
> -		close(sfd->fd);
> -		free(sfd);
> -		return;
> -	}
> -
> -	sockfd_cache_close(nid, sfd->idx);
> -	sockfd_cache_del(nid);
> -	free(sfd);
> -}
> -
>  int sheep_exec_req(const struct node_id *nid, struct sd_req *hdr, void *buf)
>  {
>  	struct sd_rsp *rsp = (struct sd_rsp *)hdr;
> @@ -524,7 +21,7 @@ int sheep_exec_req(const struct node_id *nid, struct sd_req *hdr, void *buf)
>  
>  	assert(is_worker_thread());
>  
> -	sfd = sheep_get_sockfd(nid);
> +	sfd = sockfd_cache_get(nid);
>  	if (!sfd)
>  		return SD_RES_NETWORK_ERROR;
>  
> @@ -532,14 +29,14 @@ int sheep_exec_req(const struct node_id *nid, struct sd_req *hdr, void *buf)
>  		       MAX_RETRY_COUNT);
>  	if (ret) {
>  		sd_dprintf("remote node might have gone away");
> -		sheep_del_sockfd(nid, sfd);
> +		sockfd_cache_del(nid, sfd);
>  		return SD_RES_NETWORK_ERROR;
>  	}
>  	ret = rsp->result;
>  	if (ret != SD_RES_SUCCESS)
>  		sd_eprintf("failed %s", sd_strerror(ret));
>  
> -	sheep_put_sockfd(nid, sfd);
> +	do_sockfd_cache_put(nid, sfd->idx);

This is wrong. use sockfd_cache_put(), fd can be a short fd. 
	
use do_sockfd_cache_put as a sockfd function is wrong, any do_xxx_helpers
should not be used as a generic function names.

Thanks
Yuan



More information about the sheepdog mailing list