[sheepdog] [PATCH 4/4] sockfd cache: support dual connections for a single node

Liu Yuan namei.unix at gmail.com
Sun Jan 13 16:17:01 CET 2013


From: Liu Yuan <tailai.ly at taobao.com>

This patch adds dual network card support.

- use one dedicated connection for IO requests
- if IO connection fails, then fallback to non-IO one

Redundant connection is supposed to make Sheepdog more reliable.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/sockfd_cache.c |   69 ++++++++++++++++++++++++++++++++------------------
 1 file changed, 45 insertions(+), 24 deletions(-)

diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index 3ae084b..269f7ec 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -24,6 +24,7 @@
  *      membership change.
  *    5 the total number of FDs is scalable to massive nodes.
  *    6 total 3 APIs: sheep_{get,put,del}_sockfd().
+ *    7 support dual connections to a single node.
  */
 #include <urcu/uatomic.h>
 #include <pthread.h>
@@ -336,30 +337,45 @@ static inline void check_idx(int idx)
 	queue_work(sys->sockfd_wqueue, w);
 }
 
+static inline int connect_to_addr(const uint8_t *addr, int port)
+{
+	char name[INET6_ADDRSTRLEN];
+
+	addr_to_str(name, sizeof(name), addr, 0);
+	return connect_to(name, port);
+}
+
 /* Add the node back if it is still alive */
 static inline int revalidate_node(const struct node_id *nid)
 {
-	char name[INET6_ADDRSTRLEN];
+	bool use_io = nid->io_port ? true : false;
 	int fd;
 
-	addr_to_str(name, sizeof(name), nid->addr, 0);
-	fd = connect_to(name, nid->port);
+	if (use_io) {
+		fd = connect_to_addr(nid->io_addr, nid->io_port);
+		if (fd >= 0)
+			goto alive;
+	}
+	fd = connect_to_addr(nid->addr, nid->port);
 	if (fd < 0)
-		return -1;
+		return false;
+alive:
 	close(fd);
 	sockfd_cache_add(nid);
-
-	return 0;
+	return true;
 }
 
+/* Try to create/get cached IO connection. If failed, fallback to non-IO one */
 static struct sockfd *sockfd_cache_get(const struct node_id *nid)
 {
 	struct sockfd_cache_entry *entry;
 	struct sockfd *sfd;
+	bool use_io = nid->io_port ? true : false;
+	const uint8_t *addr = use_io ? nid->io_addr : nid->addr;
 	char name[INET6_ADDRSTRLEN];
-	int fd, idx;
+	int fd, idx, port = use_io ? nid->io_port : nid->port;
 
-	addr_to_str(name, sizeof(name), nid->addr, 0);
+	addr_to_str(name, sizeof(name), addr, 0);
 grab:
 	entry = sockfd_cache_grab(nid, &idx);
 	if (!entry) {
@@ -370,23 +386,30 @@ grab:
 		 * busy to serve any request that makes other nodes deleted it
 		 * from the sockfd cache. In such cases, we need to add it back.
 		 */
-		if (revalidate_node(nid) < 0)
+		if (!revalidate_node(nid))
 			return NULL;
 		goto grab;
 	}
 	check_idx(idx);
 	if (entry->fds[idx].fd != -1) {
-		dprintf("%s:%d, idx %d\n", name, nid->port, idx);
+		dprintf("%s:%d, idx %d\n", name, port, idx);
 		goto out;
 	}
 
 	/* Create a new cached connection for this vnode */
-	dprintf("create connection %s:%d idx %d\n", name, nid->port, idx);
-	fd = connect_to(name, nid->port);
+	dprintf("create cache connection %s:%d idx %d\n", name, port, idx);
+	fd = connect_to(name, port);
 	if (fd < 0) {
+		if (use_io) {
+			eprintf("fallback to non-io connection\n");
+			fd = connect_to_addr(nid->addr, nid->port);
+			if (fd >= 0)
+				goto new;
+		}
 		uatomic_set_false(&entry->fds[idx].in_use);
 		return NULL;
 	}
+new:
 	entry->fds[idx].fd = fd;
 out:
 	sfd = xmalloc(sizeof(*sfd));
@@ -397,18 +420,20 @@ out:
 
 static void sockfd_cache_put(const struct node_id *nid, int idx)
 {
+	bool use_io = nid->io_port ? true : false;
+	const uint8_t *addr = use_io ? nid->io_addr : nid->addr;
+	int port = use_io ? nid->io_port : nid->port;
 	struct sockfd_cache_entry *entry;
 	char name[INET6_ADDRSTRLEN];
 
-	addr_to_str(name, sizeof(name), nid->addr, 0);
-	dprintf("%s:%d idx %d\n", name, nid->port, idx);
+	addr_to_str(name, sizeof(name), addr, 0);
+	dprintf("%s:%d idx %d\n", name, port, idx);
 
 	pthread_rwlock_rdlock(&sockfd_cache.lock);
 	entry = sockfd_cache_search(nid);
+	if (entry)
+		uatomic_set_false(&entry->fds[idx].in_use);
 	pthread_rwlock_unlock(&sockfd_cache.lock);
-
-	assert(entry);
-	uatomic_set_false(&entry->fds[idx].in_use);
 }
 
 /*
@@ -422,7 +447,6 @@ static void sockfd_cache_put(const struct node_id *nid, int idx)
  */
 struct sockfd *sheep_get_sockfd(const struct node_id *nid)
 {
-	char name[INET6_ADDRSTRLEN];
 	struct sockfd *sfd;
 	int fd;
 
@@ -430,13 +454,10 @@ struct sockfd *sheep_get_sockfd(const struct node_id *nid)
 	if (sfd)
 		return sfd;
 
-	/* Create a fd that is to be closed */
-	addr_to_str(name, sizeof(name), nid->addr, 0);
-	fd = connect_to(name, nid->port);
-	if (fd < 0) {
-		dprintf("failed connect to %s:%d\n", name, nid->port);
+	/* Fallback on a non-io connection that is to be closed shortly */
+	fd = connect_to_addr(nid->addr, nid->port);
+	if (fd < 0)
 		return NULL;
-	}
 
 	sfd = xmalloc(sizeof(*sfd));
 	sfd->idx = -1;
-- 
1.7.9.5




More information about the sheepdog mailing list