[sheepdog] [PATCH 1/6] sheep: redesign a new cached sockfd pool

Liu Yuan namei.unix at gmail.com
Sun Jun 24 14:51:48 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

Old sockfd pool has following defect:
 0 statically allocated.
 1 use too many fds per sheep, not scalable
 2 implemented per thread, can't be shared between threads
 3 need resetting at every membership change

The new sockfd cache aims to address these problems yet remain as effecient as
old one:
 0 dynamically allocated/deallocated at node granularity.
 1 cached fds are multiplexed by all threads.
 2 each session (for e.g, forward_write_obj_req) can grab one fd at a time
 3 if there isn't any FD available from cache, use normal connect_to() and close()
   internally
 4 FD are named by IP:PORT uniquely, hence no need of resetting at membership
   change
 5 the total number of FDs shrinks from (nr_gateway + nr_io) * nr_nodes to nr_nodes
 6 just add one more API, totally 3 APIs: sheep_{get,put,del}_fd()

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 include/sheep.h      |    2 +-
 sheep/Makefile.am    |    2 +-
 sheep/sdnet.c        |   66 +-----------
 sheep/sheep_priv.h   |   14 ++-
 sheep/sockfd_cache.c |  293 ++++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 307 insertions(+), 70 deletions(-)
 create mode 100644 sheep/sockfd_cache.c

diff --git a/include/sheep.h b/include/sheep.h
index ac9179c..01aa202 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -155,9 +155,9 @@ struct sd_node {
 };
 
 struct sd_vnode {
-	uint64_t        id;
 	uint8_t         addr[16];
 	uint16_t        port;
+	uint64_t        id;
 	uint16_t	node_idx;
 	uint32_t	zone;
 };
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 1cb2ebf..0a874c6 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -26,7 +26,7 @@ sbin_PROGRAMS		= sheep
 
 sheep_SOURCES		= sheep.c group.c sdnet.c gateway.c store.c vdi.c work.c \
 			  journal.c ops.c recovery.c cluster/local.c \
-			  object_cache.c object_list_cache.c
+			  object_cache.c object_list_cache.c sockfd_cache.c
 
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 34d65cf..e7da711 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -19,6 +19,7 @@
 #include <fcntl.h>
 
 #include "sheep_priv.h"
+#include "rbtree.h"
 
 static void requeue_request(struct request *req);
 
@@ -740,68 +741,3 @@ int create_listen_port(int port, void *data)
 	return create_listen_ports(port, create_listen_port_fn, data);
 }
 
-static __thread int cached_fds[SD_MAX_NODES];
-static __thread uint32_t cached_epoch = 0;
-
-void del_sheep_fd(int fd)
-{
-	int i;
-
-	for (i = 0; i < SD_MAX_NODES; i++) {
-		if (cached_fds[i] == fd) {
-			if (fd >= 0)
-				close(fd);
-
-			cached_fds[i] = -1;
-
-			return;
-		}
-	}
-}
-
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch)
-{
-	int i, fd;
-	char name[INET6_ADDRSTRLEN];
-
-	if (cached_epoch == 0) {
-		/* initialize */
-		for (i = 0; i < SD_MAX_NODES; i++)
-			cached_fds[i] = -1;
-
-		cached_epoch = epoch;
-	}
-
-	if (before(epoch, cached_epoch)) {
-		eprintf("requested epoch is smaller than the previous one: %d < %d\n",
-			epoch, cached_epoch);
-		return -1;
-	}
-	if (after(epoch, cached_epoch)) {
-		for (i = 0; i < SD_MAX_NODES; i++) {
-			if (cached_fds[i] >= 0)
-				close(cached_fds[i]);
-
-			cached_fds[i] = -1;
-		}
-		cached_epoch = epoch;
-	}
-
-	fd = cached_fds[node_idx];
-	dprintf("%d, %d\n", epoch, fd);
-
-	if (cached_epoch == epoch && fd >= 0) {
-		dprintf("using the cached fd %d\n", fd);
-		return fd;
-	}
-
-	addr_to_str(name, sizeof(name), addr, 0);
-
-	fd = connect_to(name, port);
-	if (fd < 0)
-		return -1;
-
-	cached_fds[node_idx] = fd;
-
-	return fd;
-}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index afdaad8..83a42e3 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -292,9 +292,6 @@ int read_object(struct vnode_info *vnodes, uint32_t node_version,
 int remove_object(struct vnode_info *vnodes, uint32_t node_version,
 		  uint64_t oid, int nr);
 
-void del_sheep_fd(int fd);
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch);
-
 int prealloc(int fd, uint32_t size);
 
 int objlist_cache_insert(uint64_t oid);
@@ -404,4 +401,15 @@ void object_cache_delete(uint32_t vid);
 
 int object_cache_init(const char *p);
 
+/* sockfd_cache */
+struct node_id;
+
+void sockfd_cache_del(struct node_id *);
+void sockfd_cache_add(struct sd_node *);
+void sockfd_cache_add_group(struct sd_node *nodes, int nr);
+
+int sheep_get_fd(struct sd_vnode *vnode);
+void sheep_put_fd(struct sd_vnode *vnode, int fd);
+void sheep_del_fd(struct sd_vnode *vnode, int fd);
+
 #endif
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
new file mode 100644
index 0000000..4f7ff5c
--- /dev/null
+++ b/sheep/sockfd_cache.c
@@ -0,0 +1,293 @@
+/*
+ * Copyright (C) 2012 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <urcu/uatomic.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "sheep.h"
+#include "sheep_priv.h"
+#include "list.h"
+#include "rbtree.h"
+#include "logger.h"
+#include "util.h"
+
+struct node_id {
+	uint8_t addr[16];
+	uint16_t port;
+};
+
+struct sockfd_cache {
+	struct rb_root root;
+	pthread_rwlock_t lock;
+	int count;
+};
+
+static struct sockfd_cache sockfd_cache = {
+	.root = RB_ROOT,
+	.lock = PTHREAD_RWLOCK_INITIALIZER,
+};
+
+struct sockfd_cache_entry {
+	struct rb_node rb;
+	int fd;
+	uint8_t refcount;
+	struct node_id nid;
+};
+
+static inline int node_id_cmp(const void *a, const void *b)
+{
+	const struct node_id *node1 = a;
+	const struct node_id *node2 = b;
+	int cmp;
+
+	cmp = memcmp(node1->addr, node2->addr, sizeof(node1->addr));
+	if (cmp != 0)
+		return cmp;
+
+	if (node1->port < node2->port)
+		return -1;
+	if (node1->port > node2->port)
+		return 1;
+	return 0;
+
+}
+
+static struct sockfd_cache_entry *
+sockfd_cache_insert(struct sockfd_cache_entry *new)
+{
+	struct rb_node **p = &sockfd_cache.root.rb_node;
+	struct rb_node *parent = NULL;
+	struct sockfd_cache_entry *entry;
+
+	while (*p) {
+		int cmp;
+
+		parent = *p;
+		entry = rb_entry(parent, struct sockfd_cache_entry, rb);
+		cmp = node_id_cmp(&new->nid, &entry->nid);
+
+		if (cmp < 0)
+			p = &(*p)->rb_left;
+		else if (cmp > 0)
+			p = &(*p)->rb_right;
+		else
+			return entry;
+	}
+	rb_link_node(&new->rb, parent, p);
+	rb_insert_color(&new->rb, &sockfd_cache.root);
+
+	return NULL; /* insert successfully */
+}
+
+static struct sockfd_cache_entry *sockfd_cache_search(struct node_id *nid)
+{
+	struct rb_node *n = sockfd_cache.root.rb_node;
+	struct sockfd_cache_entry *t;
+
+	while (n) {
+		int cmp;
+
+		t = rb_entry(n, struct sockfd_cache_entry, rb);
+		cmp = node_id_cmp(nid, &t->nid);
+
+		if (cmp < 0)
+			n = n->rb_left;
+		else if (cmp > 0)
+			n = n->rb_right;
+		else
+			return t; /* found it */
+	}
+
+	return NULL;
+}
+
+static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid)
+{
+	struct sockfd_cache_entry *entry;
+
+	pthread_rwlock_rdlock(&sockfd_cache.lock);
+	entry = sockfd_cache_search(nid);
+	pthread_rwlock_unlock(&sockfd_cache.lock);
+	assert(entry);
+	/* if refcount == 0, set it to 1, otherwise someone holds it */
+	if (uatomic_cmpxchg(&entry->refcount, 0, 1))
+		return NULL;
+
+	return entry;
+}
+
+void sockfd_cache_del(struct node_id *nid)
+{
+	struct sockfd_cache_entry *entry;
+	char name[INET6_ADDRSTRLEN];
+	int n;
+
+	entry = sockfd_cache_grab(nid);
+	/* Hmmm, some victim still holds it, he is supposed to delete it */
+	if (!entry)
+		return;
+
+	rb_erase(&entry->rb, &sockfd_cache.root);
+	free(entry);
+	n = uatomic_sub_return(&sockfd_cache.count, 1);
+	addr_to_str(name, sizeof(name), nid->addr, 0);
+	dprintf("%s:%d, count %d\n", name, nid->port, n);
+}
+
+static void sockfd_cache_add_nolock(struct node_id *nid)
+{
+	struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+
+	new->fd = -1;
+	memcpy(&new->nid, nid, sizeof(struct node_id));
+	if (sockfd_cache_insert(new)) {
+		free(new);
+		return;
+	}
+       sockfd_cache.count++;
+}
+
+void sockfd_cache_add_group(struct sd_node *nodes, int nr)
+{
+	struct sd_node *p;
+	struct node_id *nid;
+
+	dprintf("%d\n", nr);
+	pthread_rwlock_wrlock(&sockfd_cache.lock);
+	while (nr--) {
+		p = nodes + nr;
+		nid = (struct node_id *)p;
+		sockfd_cache_add_nolock(nid);
+	}
+	pthread_rwlock_unlock(&sockfd_cache.lock);
+}
+
+void sockfd_cache_add(struct sd_node *node)
+{
+	struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+	char name[INET6_ADDRSTRLEN];
+	int n;
+
+	new->fd = -1;
+	memcpy(&new->nid, node, sizeof(struct node_id));
+	pthread_rwlock_rdlock(&sockfd_cache.lock);
+	if (sockfd_cache_insert(new)) {
+		free(new);
+		pthread_rwlock_unlock(&sockfd_cache.lock);
+		return;
+	}
+	pthread_rwlock_unlock(&sockfd_cache.lock);
+	n = uatomic_add_return(&sockfd_cache.count, 1);
+	addr_to_str(name, sizeof(name), node->addr, 0);
+	dprintf("%s:%d, count %d\n", name, node->port, n);
+}
+
+static int sockfd_cache_get(struct node_id *nid)
+{
+	struct sockfd_cache_entry *entry;
+	char name[INET6_ADDRSTRLEN];
+	int fd;
+
+	entry = sockfd_cache_grab(nid);
+	if (!entry)
+		return -1;
+
+	if (entry->fd != -1)
+		return entry->fd;
+
+	/* Create a new connection for this vnode */
+	addr_to_str(name, sizeof(name), nid->addr, 0);
+	dprintf("create connection %s:%d\n", name, nid->port);
+	fd = connect_to(name, nid->port);
+	if (fd < 0) {
+		uatomic_dec(&entry->refcount);
+		return -1;
+	}
+	entry->fd = fd;
+
+	return fd;
+}
+
+static void sockfd_cache_put(struct node_id *nid)
+{
+	struct sockfd_cache_entry *entry;
+	char name[INET6_ADDRSTRLEN];
+	int refcnt;
+
+	addr_to_str(name, sizeof(name), nid->addr, 0);
+	dprintf("%s:%d\n", name, nid->port);
+	pthread_rwlock_rdlock(&sockfd_cache.lock);
+	entry = sockfd_cache_search(nid);
+	pthread_rwlock_unlock(&sockfd_cache.lock);
+	assert(entry);
+	refcnt = uatomic_cmpxchg(&entry->refcount, 1, 0);
+	assert(refcnt == 1);
+}
+
+int sheep_get_fd(struct sd_vnode *vnode)
+{
+	struct node_id *nid = (struct node_id *)vnode;
+	char name[INET6_ADDRSTRLEN];
+	int fd = sockfd_cache_get(nid);
+
+	if (fd != -1)
+		return fd;
+
+	addr_to_str(name, sizeof(name), nid->addr, 0);
+	fd = connect_to(name, nid->port);
+	if (fd < 0) {
+		dprintf("failed connect to %s:%d\n", name, nid->port);
+		return -1;
+	}
+
+	dprintf("%d\n", fd);
+	return fd;
+}
+
+void sheep_put_fd(struct sd_vnode *vnode, int fd)
+{
+	struct node_id *nid = (struct node_id *)vnode;
+	struct sockfd_cache_entry *entry;
+
+	pthread_rwlock_rdlock(&sockfd_cache.lock);
+	entry = sockfd_cache_search(nid);
+	pthread_rwlock_unlock(&sockfd_cache.lock);
+	assert(entry);
+	if (entry->fd == fd) {
+		sockfd_cache_put(nid);
+	} else {
+		dprintf("%d\n", fd);
+		close(fd);
+	}
+}
+
+void sheep_del_fd(struct sd_vnode *vnode, int fd)
+{
+	struct node_id *nid = (struct node_id *)vnode;
+	struct sockfd_cache_entry *entry;
+
+	pthread_rwlock_rdlock(&sockfd_cache.lock);
+	entry = sockfd_cache_search(nid);
+	pthread_rwlock_unlock(&sockfd_cache.lock);
+	assert(entry);
+	if (entry->fd == fd) {
+		sockfd_cache_put(nid);
+		sockfd_cache_del(nid);
+	} else {
+		dprintf("%d\n", fd);
+		close(fd);
+	}
+}
-- 
1.7.10.2




More information about the sheepdog mailing list