[sheepdog] [PATCH 3/6] sheep, sockfd cache: cache more than one FD for each node

Liu Yuan namei.unix at gmail.com
Sun Jun 24 14:51:50 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

This is inspired by the observation that each Guest can issue as much
as 4 requests in one go.

The complexity added to the code is seen outside of sockfd cache:
add one more parameter to the API: FD index. The underlying core needs this to
identify which FD belongs one node is actually used.

I think this trade-off is a good deal.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/gateway.c      |   31 ++++---
 sheep/sheep_priv.h   |    6 +-
 sheep/sockfd_cache.c |  218 ++++++++++++++++++++++++++++++++++++++------------
 3 files changed, 188 insertions(+), 67 deletions(-)

diff --git a/sheep/gateway.c b/sheep/gateway.c
index c3e5f80..f126dfb 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -54,11 +54,13 @@ read_remote:
 	j = random();
 	for (i = 0; i < nr_copies; i++) {
 		int idx = (i + j) % nr_copies;
+		int sock_idx;
+
 		v = obj_vnodes[idx];
 		if (vnode_is_local(v))
 			continue;
 
-		fd = sheep_get_fd(v);
+		fd = sheep_get_fd(v, &sock_idx);
 		if (fd < 0) {
 			ret = SD_RES_NETWORK_ERROR;
 			continue;
@@ -70,11 +72,11 @@ read_remote:
 		ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen);
 
 		if (ret) { /* network errors */
-			sheep_del_fd(v, fd);
+			sheep_del_fd(v, fd, sock_idx);
 			ret = SD_RES_NETWORK_ERROR;
 			continue;
 		} else {
-			sheep_put_fd(v, fd);
+			sheep_put_fd(v, fd, sock_idx);
 			memcpy(&req->rp, rsp, sizeof(*rsp));
 			ret = rsp->result;
 			break;
@@ -86,6 +88,7 @@ read_remote:
 struct write_info {
 	struct pollfd pfds[SD_MAX_REDUNDANCY];
 	struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
+	int sock_idx[SD_MAX_REDUNDANCY];
 };
 
 int forward_write_obj_req(struct request *req)
@@ -104,9 +107,10 @@ int forward_write_obj_req(struct request *req)
 
 	dprintf("%"PRIx64"\n", oid);
 
-	memset(&wi, 0, sizeof(wi));
-	for (i = 0; i < SD_MAX_REDUNDANCY; i++)
+	for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
 		wi.pfds[i].fd = -1;
+		wi.vnodes[i] = NULL;
+	}
 
 	memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
 	fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
@@ -125,7 +129,7 @@ int forward_write_obj_req(struct request *req)
 			continue;
 		}
 
-		fd = sheep_get_fd(v);
+		fd = sheep_get_fd(v, &wi.sock_idx[nr_fds]);
 		if (fd < 0) {
 			ret = SD_RES_NETWORK_ERROR;
 			goto err;
@@ -133,7 +137,7 @@ int forward_write_obj_req(struct request *req)
 
 		ret = send_req(fd, &fwd_hdr, req->data, &wlen);
 		if (ret) { /* network errors */
-			sheep_del_fd(v, fd);
+			sheep_del_fd(v, fd, wi.sock_idx[nr_fds]);
 			ret = SD_RES_NETWORK_ERROR;
 			dprintf("fail %"PRIu32"\n", ret);
 			goto err;
@@ -174,7 +178,8 @@ again:
 		if (wi.pfds[i].revents & POLLERR ||
 		    wi.pfds[i].revents & POLLHUP ||
 		    wi.pfds[i].revents & POLLNVAL) {
-			sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
+			sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
+				     wi.sock_idx[i]);
 			ret = SD_RES_NETWORK_ERROR;
 			break;
 		}
@@ -184,7 +189,8 @@ again:
 
 		if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
 			eprintf("failed to read a response: %m\n");
-			sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
+			sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
+				     wi.sock_idx[i]);
 			ret = SD_RES_NETWORK_ERROR;
 			break;
 		}
@@ -193,8 +199,7 @@ again:
 			eprintf("fail %"PRIu32"\n", rsp->result);
 			ret = rsp->result;
 		}
-
-		sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd);
+		sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
 		break;
 	}
 	if (i < nr_fds) {
@@ -203,6 +208,8 @@ again:
 			sizeof(struct pollfd) * (nr_fds - i));
 		memmove(wi.vnodes + i, wi.vnodes + i + 1,
 			sizeof(struct sd_vnode *) * (nr_fds - i));
+		memmove(wi.sock_idx + i, wi.sock_idx + i + 1,
+			sizeof(int) * (nr_fds - i));
 	}
 
 	dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
@@ -213,7 +220,7 @@ out:
 	return ret;
 err:
 	for (i = 0; i < nr_fds; i++)
-		sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
+		sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
 	return ret;
 }
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 83a42e3..747599c 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -408,8 +408,8 @@ void sockfd_cache_del(struct node_id *);
 void sockfd_cache_add(struct sd_node *);
 void sockfd_cache_add_group(struct sd_node *nodes, int nr);
 
-int sheep_get_fd(struct sd_vnode *vnode);
-void sheep_put_fd(struct sd_vnode *vnode, int fd);
-void sheep_del_fd(struct sd_vnode *vnode, int fd);
+int sheep_get_fd(struct sd_vnode *vnode, int *);
+void sheep_put_fd(struct sd_vnode *vnode, int fd, int);
+void sheep_del_fd(struct sd_vnode *vnode, int fd, int);
 
 #endif
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index 4f7ff5c..a756834 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -11,6 +11,20 @@
  * along with this program. If not, see <http://www.gnu.org/licenses/>.
  */
 
+/*
+ * The sockfd cache provides us long TCP connections connected to the nodes
+ * in the cluster to accerlater the data transfer, which has the following
+ * characteristics:
+ *    0 dynamically allocated/deallocated at node granularity.
+ *    1 cached fds are multiplexed by all threads.
+ *    2 each session (for e.g, forward_write_obj_req) can grab one fd at a time.
+ *    3 if there isn't any FD available from cache, use normal connect_to() and
+ *      close() internally.
+ *    4 FD are named by IP:PORT uniquely, hence no need of resetting at
+ *      membership change.
+ *    5 the total number of FDs is scalable to massive nodes.
+ *    6 total 3 APIs: sheep_{get,put,del}_fd().
+ */
 #include <urcu/uatomic.h>
 #include <pthread.h>
 #include <stdint.h>
@@ -42,9 +56,10 @@ static struct sockfd_cache sockfd_cache = {
 
 struct sockfd_cache_entry {
 	struct rb_node rb;
-	int fd;
-	uint8_t refcount;
 	struct node_id nid;
+#define SOCKFD_CACHE_MAX_FD	8 /* How many FDs we cache for one node */
+	int fd[SOCKFD_CACHE_MAX_FD];
+	uint8_t fd_in_use[SOCKFD_CACHE_MAX_FD];
 };
 
 static inline int node_id_cmp(const void *a, const void *b)
@@ -114,34 +129,105 @@ static struct sockfd_cache_entry *sockfd_cache_search(struct node_id *nid)
 	return NULL;
 }
 
-static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid)
+static inline int get_free_slot(struct sockfd_cache_entry *entry)
+{
+	int idx = -1, i;
+
+	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) {
+		if (uatomic_cmpxchg(&entry->fd_in_use[i], 0, 1))
+			continue;
+		idx = i;
+		break;
+	}
+	return idx;
+}
+
+/*
+ * Grab a free slot of the node and inc the refcount of the slot
+ *
+ * If no free slot available, this typically means we should use short FD.
+ */
+static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid,
+						    char *name, int *ret_idx)
 {
 	struct sockfd_cache_entry *entry;
 
 	pthread_rwlock_rdlock(&sockfd_cache.lock);
 	entry = sockfd_cache_search(nid);
-	pthread_rwlock_unlock(&sockfd_cache.lock);
-	assert(entry);
-	/* if refcount == 0, set it to 1, otherwise someone holds it */
-	if (uatomic_cmpxchg(&entry->refcount, 0, 1))
-		return NULL;
+	if (!entry){
+		dprintf("failed node %s:%d\n", name, nid->port);
+		goto out;
+	}
 
+	*ret_idx = get_free_slot(entry);
+	if (*ret_idx == -1)
+		entry = NULL;
+out:
+	pthread_rwlock_unlock(&sockfd_cache.lock);
 	return entry;
 }
 
-void sockfd_cache_del(struct node_id *nid)
+static inline bool slots_all_free(struct sockfd_cache_entry *entry)
+{
+	int i;
+	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+		if (uatomic_read(&entry->fd_in_use[i]))
+			return false;
+	return true;
+}
+
+static inline void destroy_all_slots(struct sockfd_cache_entry *entry)
+{
+	int i;
+	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+		if (entry->fd[i] != -1)
+			close(entry->fd[i]);
+}
+
+/*
+ * Destroy all the Cached FDs of the node
+ *
+ * We don't proceed if some other node grab one FD of the node. In this case,
+ * the victim node will finally find itself talking to a dead node and call
+ * sheep_del_fd() to delete this node from the cache.
+ */
+static bool sockfd_cache_destroy(struct node_id *nid)
 {
 	struct sockfd_cache_entry *entry;
+
+	pthread_rwlock_wrlock(&sockfd_cache.lock);
+	entry = sockfd_cache_search(nid);
+	if (!entry) {
+		dprintf("It is already destroyed\n");
+		goto false_out;
+	}
+
+	if (!slots_all_free(entry)) {
+		dprintf("Some victim still holds it\n");
+		goto false_out;
+	}
+
+	rb_erase(&entry->rb, &sockfd_cache.root);
+	pthread_rwlock_unlock(&sockfd_cache.lock);
+
+	destroy_all_slots(entry);
+	free(entry);
+
+	return true;
+false_out:
+	pthread_rwlock_unlock(&sockfd_cache.lock);
+	return false;
+}
+
+/* When node craches, we should delete it from the cache */
+void sockfd_cache_del(struct node_id *nid)
+{
 	char name[INET6_ADDRSTRLEN];
 	int n;
 
-	entry = sockfd_cache_grab(nid);
-	/* Hmmm, some victim still holds it, he is supposed to delete it */
-	if (!entry)
+	if (!sockfd_cache_destroy(nid))
 		return;
 
-	rb_erase(&entry->rb, &sockfd_cache.root);
-	free(entry);
 	n = uatomic_sub_return(&sockfd_cache.count, 1);
 	addr_to_str(name, sizeof(name), nid->addr, 0);
 	dprintf("%s:%d, count %d\n", name, nid->port, n);
@@ -150,8 +236,11 @@ void sockfd_cache_del(struct node_id *nid)
 static void sockfd_cache_add_nolock(struct node_id *nid)
 {
 	struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+	int i;
+
+	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+		new->fd[i] = -1;
 
-	new->fd = -1;
 	memcpy(&new->nid, nid, sizeof(struct node_id));
 	if (sockfd_cache_insert(new)) {
 		free(new);
@@ -160,6 +249,7 @@ static void sockfd_cache_add_nolock(struct node_id *nid)
        sockfd_cache.count++;
 }
 
+/* Add group of nodes to the cache */
 void sockfd_cache_add_group(struct sd_node *nodes, int nr)
 {
 	struct sd_node *p;
@@ -175,13 +265,16 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr)
 	pthread_rwlock_unlock(&sockfd_cache.lock);
 }
 
+/* Add one node to the cache means we can do caching tricks on this node */
 void sockfd_cache_add(struct sd_node *node)
 {
 	struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
 	char name[INET6_ADDRSTRLEN];
-	int n;
+	int n, i;
+
+	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+		new->fd[i] = -1;
 
-	new->fd = -1;
 	memcpy(&new->nid, node, sizeof(struct node_id));
 	pthread_rwlock_rdlock(&sockfd_cache.lock);
 	if (sockfd_cache_insert(new)) {
@@ -195,58 +288,71 @@ void sockfd_cache_add(struct sd_node *node)
 	dprintf("%s:%d, count %d\n", name, node->port, n);
 }
 
-static int sockfd_cache_get(struct node_id *nid)
+static int sockfd_cache_get(struct node_id *nid, char *name, int *ret_idx)
 {
 	struct sockfd_cache_entry *entry;
-	char name[INET6_ADDRSTRLEN];
 	int fd;
 
-	entry = sockfd_cache_grab(nid);
+	entry = sockfd_cache_grab(nid, name, ret_idx);
 	if (!entry)
 		return -1;
 
-	if (entry->fd != -1)
-		return entry->fd;
+	if (entry->fd[*ret_idx] != -1) {
+		dprintf("%s:%d, idx %d\n", name, nid->port, *ret_idx);
+		return entry->fd[*ret_idx];
+	}
 
-	/* Create a new connection for this vnode */
-	addr_to_str(name, sizeof(name), nid->addr, 0);
-	dprintf("create connection %s:%d\n", name, nid->port);
+	/* Create a new cached connection for this vnode */
+	dprintf("create connection %s:%d idx %d\n", name, nid->port, *ret_idx);
 	fd = connect_to(name, nid->port);
 	if (fd < 0) {
-		uatomic_dec(&entry->refcount);
+		uatomic_dec(&entry->fd_in_use[*ret_idx]);
 		return -1;
 	}
-	entry->fd = fd;
+	entry->fd[*ret_idx] = fd;
 
 	return fd;
 }
 
-static void sockfd_cache_put(struct node_id *nid)
+static void sockfd_cache_put(struct node_id *nid, int idx)
 {
 	struct sockfd_cache_entry *entry;
 	char name[INET6_ADDRSTRLEN];
 	int refcnt;
 
 	addr_to_str(name, sizeof(name), nid->addr, 0);
-	dprintf("%s:%d\n", name, nid->port);
+	dprintf("%s:%d idx %d\n", name, nid->port, idx);
+
 	pthread_rwlock_rdlock(&sockfd_cache.lock);
 	entry = sockfd_cache_search(nid);
 	pthread_rwlock_unlock(&sockfd_cache.lock);
+
 	assert(entry);
-	refcnt = uatomic_cmpxchg(&entry->refcount, 1, 0);
+	refcnt = uatomic_cmpxchg(&entry->fd_in_use[idx], 1, 0);
 	assert(refcnt == 1);
 }
 
-int sheep_get_fd(struct sd_vnode *vnode)
+/*
+ * Return a FD connected to the vnode to the caller
+ *
+ * Try to get a 'long' FD as best, which is cached and never closed. If no FD
+ * available, we return a 'short' FD which is supposed to be closed by
+ * sheep_get_put().
+ *
+ * ret_idx is opaque to the caller, -1 indicates it is a short FD.
+ */
+int sheep_get_fd(struct sd_vnode *vnode, int *ret_idx)
 {
 	struct node_id *nid = (struct node_id *)vnode;
 	char name[INET6_ADDRSTRLEN];
-	int fd = sockfd_cache_get(nid);
+	int fd;
 
+	addr_to_str(name, sizeof(name), nid->addr, 0);
+	fd = sockfd_cache_get(nid, name, ret_idx);
 	if (fd != -1)
 		return fd;
 
-	addr_to_str(name, sizeof(name), nid->addr, 0);
+	/* Create a fd that is to be closed */
 	fd = connect_to(name, nid->port);
 	if (fd < 0) {
 		dprintf("failed connect to %s:%d\n", name, nid->port);
@@ -257,37 +363,45 @@ int sheep_get_fd(struct sd_vnode *vnode)
 	return fd;
 }
 
-void sheep_put_fd(struct sd_vnode *vnode, int fd)
+/*
+ * Rlease a FD connected to the vnode, which is acquired from sheep_get_fd()
+ *
+ * If it is a long FD, just decrease the refcount to make it available again.
+ * If it is a short FD, close it.
+ *
+ * sheep_put_fd() or sheep_del_fd() should be paired with sheep_get_fd()
+ */
+
+void sheep_put_fd(struct sd_vnode *vnode, int fd, int idx)
 {
 	struct node_id *nid = (struct node_id *)vnode;
-	struct sockfd_cache_entry *entry;
 
-	pthread_rwlock_rdlock(&sockfd_cache.lock);
-	entry = sockfd_cache_search(nid);
-	pthread_rwlock_unlock(&sockfd_cache.lock);
-	assert(entry);
-	if (entry->fd == fd) {
-		sockfd_cache_put(nid);
-	} else {
+	if (idx == -1) {
 		dprintf("%d\n", fd);
 		close(fd);
+		return;
 	}
+
+	sockfd_cache_put(nid, idx);
 }
 
-void sheep_del_fd(struct sd_vnode *vnode, int fd)
+/*
+ * Delete a FD connected to the vnode, when vnode is crashed.
+ *
+ * If it is a long FD, de-refcount it and tres to destroy all the cached FDs of
+ * this vnode in the cache.
+ * If it is a short FD, just close it.
+ */
+void sheep_del_fd(struct sd_vnode *vnode, int fd, int idx)
 {
 	struct node_id *nid = (struct node_id *)vnode;
-	struct sockfd_cache_entry *entry;
 
-	pthread_rwlock_rdlock(&sockfd_cache.lock);
-	entry = sockfd_cache_search(nid);
-	pthread_rwlock_unlock(&sockfd_cache.lock);
-	assert(entry);
-	if (entry->fd == fd) {
-		sockfd_cache_put(nid);
-		sockfd_cache_del(nid);
-	} else {
+	if (idx == -1) {
 		dprintf("%d\n", fd);
 		close(fd);
+		return;
 	}
+
+	sockfd_cache_put(nid, idx);
+	sockfd_cache_del(nid);
 }
-- 
1.7.10.2




More information about the sheepdog mailing list