[sheepdog] [PATCH v2] sockfd cache: grow fds count dynamically

Liu Yuan namei.unix at gmail.com
Mon Jul 23 11:41:27 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

v2:
 - fix memory leak in check_idx()
 - remove extra whitespace
---------------------------------- >8

This will scale sheep daemon to serve more VMs on one node and do
it adoptively and automatically.

- fd count default to 16 instead of previous 8

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/sheep.c        |    1 +
 sheep/sheep_priv.h   |    1 +
 sheep/sockfd_cache.c |  127 ++++++++++++++++++++++++++++++++++++++------------
 3 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/sheep/sheep.c b/sheep/sheep.c
index df28a94..380a129 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -344,6 +344,7 @@ int main(int argc, char **argv)
 	sys->recovery_wqueue = init_work_queue("recovery", true);
 	sys->deletion_wqueue = init_work_queue("deletion", true);
 	sys->block_wqueue = init_work_queue("block", true);
+	sys->sockfd_wqueue = init_work_queue("sockfd", true);
 	if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
 	    !sys->deletion_wqueue || !sys->block_wqueue)
 		exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index e455d27..530fe14 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -122,6 +122,7 @@ struct cluster_info {
 	struct work_queue *deletion_wqueue;
 	struct work_queue *recovery_wqueue;
 	struct work_queue *block_wqueue;
+	struct work_queue *sockfd_wqueue;
 };
 
 struct siocb {
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index aa90299..c130ea6 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -48,23 +48,33 @@ static struct sockfd_cache sockfd_cache = {
 	.lock = PTHREAD_RWLOCK_INITIALIZER,
 };
 
+/*
+ * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at
+ * most 8 requests can be issued to the same sheep object. Based on this
+ * assumption, '16' would be effecient for servers that only host 2~4
+ * Guests.
+ *
+ * This fd count will be dynamically grown when the idx reaches watermark.
+ */
+#define DEFAULT_FDS_COUNT	16
+#define DEFAULT_FDS_WATERMARK	12
+
+/* How many FDs we cache for one node */
+static int fds_count = DEFAULT_FDS_COUNT;
+
+struct sockfd_cache_fd {
+	int fd;
+	uint8_t fd_in_use;
+};
+
 struct sockfd_cache_entry {
 	struct rb_node rb;
 	struct node_id nid;
-#define SOCKFD_CACHE_MAX_FD	8 /* How many FDs we cache for one node */
-	/*
-	 * FIXME: Make this macro configurable.
-	 *
-	 * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at
-	 * most 8 requests can be issued to the same sheep object. Based on this
-	 * assumption, '8' would be effecient for servers that only host 4~8
-	 * Guests, but for powerful servers that can host dozens of Guests, we
-	 * might consider bigger value.
-	 */
-	int fd[SOCKFD_CACHE_MAX_FD];
-	uint8_t fd_in_use[SOCKFD_CACHE_MAX_FD];
+	struct sockfd_cache_fd *fds;
 };
 
+#define fds_bytes (sizeof(struct sockfd_cache_fd) * fds_count)
+
 static struct sockfd_cache_entry *
 sockfd_cache_insert(struct sockfd_cache_entry *new)
 {
@@ -118,8 +128,8 @@ static inline int get_free_slot(struct sockfd_cache_entry *entry)
 {
 	int idx = -1, i;
 
-	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) {
-		if (uatomic_cmpxchg(&entry->fd_in_use[i], 0, 1))
+	for (i = 0; i < fds_count; i++) {
+		if (uatomic_cmpxchg(&entry->fds[i].fd_in_use, 0, 1))
 			continue;
 		idx = i;
 		break;
@@ -155,8 +165,8 @@ out:
 static inline bool slots_all_free(struct sockfd_cache_entry *entry)
 {
 	int i;
-	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
-		if (uatomic_read(&entry->fd_in_use[i]))
+	for (i = 0; i < fds_count; i++)
+		if (uatomic_read(&entry->fds[i].fd_in_use))
 			return false;
 	return true;
 }
@@ -164,9 +174,9 @@ static inline bool slots_all_free(struct sockfd_cache_entry *entry)
 static inline void destroy_all_slots(struct sockfd_cache_entry *entry)
 {
 	int i;
-	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
-		if (entry->fd[i] != -1)
-			close(entry->fd[i]);
+	for (i = 0; i < fds_count; i++)
+		if (entry->fds[i].fd != -1)
+			close(entry->fds[i].fd);
 }
 
 /*
@@ -220,11 +230,12 @@ void sockfd_cache_del(struct node_id *nid)
 
 static void sockfd_cache_add_nolock(struct node_id *nid)
 {
-	struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+	struct sockfd_cache_entry *new = xmalloc(sizeof(*new));
 	int i;
 
-	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
-		new->fd[i] = -1;
+	new->fds = xzalloc(fds_bytes);
+	for (i = 0; i < fds_count; i++)
+		new->fds[i].fd = -1;
 
 	memcpy(&new->nid, nid, sizeof(struct node_id));
 	if (sockfd_cache_insert(new)) {
@@ -251,15 +262,17 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr)
 /* Add one node to the cache means we can do caching tricks on this node */
 void sockfd_cache_add(struct node_id *nid)
 {
-	struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+	struct sockfd_cache_entry *new;
 	char name[INET6_ADDRSTRLEN];
 	int n, i;
 
-	for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
-		new->fd[i] = -1;
+	pthread_rwlock_rdlock(&sockfd_cache.lock);
+	new = xmalloc(sizeof(*new));
+	new->fds = xzalloc(fds_bytes);
+	for (i = 0; i < fds_count; i++)
+		new->fds[i].fd = -1;
 
 	memcpy(&new->nid, nid, sizeof(struct node_id));
-	pthread_rwlock_rdlock(&sockfd_cache.lock);
 	if (sockfd_cache_insert(new)) {
 		free(new);
 		pthread_rwlock_unlock(&sockfd_cache.lock);
@@ -271,6 +284,56 @@ void sockfd_cache_add(struct node_id *nid)
 	dprintf("%s:%d, count %d\n", name, nid->port, n);
 }
 
+static void do_grow_fds(struct work *work)
+{
+	struct sockfd_cache_entry *entry;
+	struct rb_node *p;
+	int old_fds_count, new_fds_count, new_size, i;
+
+	dprintf("%d\n", fds_count);
+	pthread_rwlock_wrlock(&sockfd_cache.lock);
+	old_fds_count = fds_count;
+	new_fds_count = fds_count * 2;
+	new_size = fds_bytes * 2;
+	for (p = rb_first(&sockfd_cache.root); p; p = rb_next(p)) {
+		entry = rb_entry(p, struct sockfd_cache_entry, rb);
+		entry->fds = xrealloc(entry->fds, new_size);
+		for (i = old_fds_count; i < new_fds_count; i++) {
+			entry->fds[i].fd = -1;
+			entry->fds[i].fd_in_use = 0;
+		}
+	}
+	pthread_rwlock_unlock(&sockfd_cache.lock);
+}
+
+static bool fds_in_grow;
+static int fds_high_watermark = DEFAULT_FDS_WATERMARK;
+
+static void grow_fds_done(struct work *work)
+{
+	fds_in_grow = false;
+	fds_count *= 2;
+	fds_high_watermark = fds_count * 3 / 4;
+	dprintf("fd count has been grown into %d\n", fds_count);
+	free(work);
+}
+
+static void inline check_idx(int idx)
+{
+	struct work *w;
+
+	if (idx <= fds_high_watermark)
+		return;
+	if (fds_in_grow)
+		return;
+
+	w = xmalloc(sizeof(*w));
+	w->fn = do_grow_fds;
+	w->done = grow_fds_done;
+	fds_in_grow = true;
+	queue_work(sys->sockfd_wqueue, w);
+}
+
 static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name)
 {
 	struct sockfd_cache_entry *entry;
@@ -281,7 +344,9 @@ static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name)
 	if (!entry)
 		return NULL;
 
-	if (entry->fd[idx] != -1) {
+	check_idx(idx);
+
+	if (entry->fds[idx].fd != -1) {
 		dprintf("%s:%d, idx %d\n", name, nid->port, idx);
 		goto out;
 	}
@@ -290,14 +355,14 @@ static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name)
 	dprintf("create connection %s:%d idx %d\n", name, nid->port, idx);
 	fd = connect_to(name, nid->port);
 	if (fd < 0) {
-		uatomic_dec(&entry->fd_in_use[idx]);
+		uatomic_dec(&entry->fds[idx].fd_in_use);
 		return NULL;
 	}
-	entry->fd[idx] = fd;
+	entry->fds[idx].fd = fd;
 
 out:
 	sfd = xmalloc(sizeof(*sfd));
-	sfd->fd = entry->fd[idx];
+	sfd->fd = entry->fds[idx].fd;
 	sfd->idx = idx;
 	return sfd;
 }
@@ -316,7 +381,7 @@ static void sockfd_cache_put(struct node_id *nid, int idx)
 	pthread_rwlock_unlock(&sockfd_cache.lock);
 
 	assert(entry);
-	refcnt = uatomic_cmpxchg(&entry->fd_in_use[idx], 1, 0);
+	refcnt = uatomic_cmpxchg(&entry->fds[idx].fd_in_use, 1, 0);
 	assert(refcnt == 1);
 }
 
-- 
1.7.10.2




More information about the sheepdog mailing list