[sheepdog] [PATCH v2] sockfd cache: grow fds count dynamically
Liu Yuan
namei.unix at gmail.com
Mon Jul 23 11:41:27 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
v2:
- fix memory leak in check_idx()
- remove extra whitespace
---------------------------------- >8
This will scale sheep daemon to serve more VMs on one node and do
it adoptively and automatically.
- fd count default to 16 instead of previous 8
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/sheep.c | 1 +
sheep/sheep_priv.h | 1 +
sheep/sockfd_cache.c | 127 ++++++++++++++++++++++++++++++++++++++------------
3 files changed, 98 insertions(+), 31 deletions(-)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index df28a94..380a129 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -344,6 +344,7 @@ int main(int argc, char **argv)
sys->recovery_wqueue = init_work_queue("recovery", true);
sys->deletion_wqueue = init_work_queue("deletion", true);
sys->block_wqueue = init_work_queue("block", true);
+ sys->sockfd_wqueue = init_work_queue("sockfd", true);
if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
!sys->deletion_wqueue || !sys->block_wqueue)
exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index e455d27..530fe14 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -122,6 +122,7 @@ struct cluster_info {
struct work_queue *deletion_wqueue;
struct work_queue *recovery_wqueue;
struct work_queue *block_wqueue;
+ struct work_queue *sockfd_wqueue;
};
struct siocb {
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index aa90299..c130ea6 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -48,23 +48,33 @@ static struct sockfd_cache sockfd_cache = {
.lock = PTHREAD_RWLOCK_INITIALIZER,
};
+/*
+ * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at
+ * most 8 requests can be issued to the same sheep object. Based on this
+ * assumption, '16' would be effecient for servers that only host 2~4
+ * Guests.
+ *
+ * This fd count will be dynamically grown when the idx reaches watermark.
+ */
+#define DEFAULT_FDS_COUNT 16
+#define DEFAULT_FDS_WATERMARK 12
+
+/* How many FDs we cache for one node */
+static int fds_count = DEFAULT_FDS_COUNT;
+
+struct sockfd_cache_fd {
+ int fd;
+ uint8_t fd_in_use;
+};
+
struct sockfd_cache_entry {
struct rb_node rb;
struct node_id nid;
-#define SOCKFD_CACHE_MAX_FD 8 /* How many FDs we cache for one node */
- /*
- * FIXME: Make this macro configurable.
- *
- * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at
- * most 8 requests can be issued to the same sheep object. Based on this
- * assumption, '8' would be effecient for servers that only host 4~8
- * Guests, but for powerful servers that can host dozens of Guests, we
- * might consider bigger value.
- */
- int fd[SOCKFD_CACHE_MAX_FD];
- uint8_t fd_in_use[SOCKFD_CACHE_MAX_FD];
+ struct sockfd_cache_fd *fds;
};
+#define fds_bytes (sizeof(struct sockfd_cache_fd) * fds_count)
+
static struct sockfd_cache_entry *
sockfd_cache_insert(struct sockfd_cache_entry *new)
{
@@ -118,8 +128,8 @@ static inline int get_free_slot(struct sockfd_cache_entry *entry)
{
int idx = -1, i;
- for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) {
- if (uatomic_cmpxchg(&entry->fd_in_use[i], 0, 1))
+ for (i = 0; i < fds_count; i++) {
+ if (uatomic_cmpxchg(&entry->fds[i].fd_in_use, 0, 1))
continue;
idx = i;
break;
@@ -155,8 +165,8 @@ out:
static inline bool slots_all_free(struct sockfd_cache_entry *entry)
{
int i;
- for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
- if (uatomic_read(&entry->fd_in_use[i]))
+ for (i = 0; i < fds_count; i++)
+ if (uatomic_read(&entry->fds[i].fd_in_use))
return false;
return true;
}
@@ -164,9 +174,9 @@ static inline bool slots_all_free(struct sockfd_cache_entry *entry)
static inline void destroy_all_slots(struct sockfd_cache_entry *entry)
{
int i;
- for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
- if (entry->fd[i] != -1)
- close(entry->fd[i]);
+ for (i = 0; i < fds_count; i++)
+ if (entry->fds[i].fd != -1)
+ close(entry->fds[i].fd);
}
/*
@@ -220,11 +230,12 @@ void sockfd_cache_del(struct node_id *nid)
static void sockfd_cache_add_nolock(struct node_id *nid)
{
- struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+ struct sockfd_cache_entry *new = xmalloc(sizeof(*new));
int i;
- for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
- new->fd[i] = -1;
+ new->fds = xzalloc(fds_bytes);
+ for (i = 0; i < fds_count; i++)
+ new->fds[i].fd = -1;
memcpy(&new->nid, nid, sizeof(struct node_id));
if (sockfd_cache_insert(new)) {
@@ -251,15 +262,17 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr)
/* Add one node to the cache means we can do caching tricks on this node */
void sockfd_cache_add(struct node_id *nid)
{
- struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+ struct sockfd_cache_entry *new;
char name[INET6_ADDRSTRLEN];
int n, i;
- for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
- new->fd[i] = -1;
+ pthread_rwlock_rdlock(&sockfd_cache.lock);
+ new = xmalloc(sizeof(*new));
+ new->fds = xzalloc(fds_bytes);
+ for (i = 0; i < fds_count; i++)
+ new->fds[i].fd = -1;
memcpy(&new->nid, nid, sizeof(struct node_id));
- pthread_rwlock_rdlock(&sockfd_cache.lock);
if (sockfd_cache_insert(new)) {
free(new);
pthread_rwlock_unlock(&sockfd_cache.lock);
@@ -271,6 +284,56 @@ void sockfd_cache_add(struct node_id *nid)
dprintf("%s:%d, count %d\n", name, nid->port, n);
}
+static void do_grow_fds(struct work *work)
+{
+ struct sockfd_cache_entry *entry;
+ struct rb_node *p;
+ int old_fds_count, new_fds_count, new_size, i;
+
+ dprintf("%d\n", fds_count);
+ pthread_rwlock_wrlock(&sockfd_cache.lock);
+ old_fds_count = fds_count;
+ new_fds_count = fds_count * 2;
+ new_size = fds_bytes * 2;
+ for (p = rb_first(&sockfd_cache.root); p; p = rb_next(p)) {
+ entry = rb_entry(p, struct sockfd_cache_entry, rb);
+ entry->fds = xrealloc(entry->fds, new_size);
+ for (i = old_fds_count; i < new_fds_count; i++) {
+ entry->fds[i].fd = -1;
+ entry->fds[i].fd_in_use = 0;
+ }
+ }
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+}
+
+static bool fds_in_grow;
+static int fds_high_watermark = DEFAULT_FDS_WATERMARK;
+
+static void grow_fds_done(struct work *work)
+{
+ fds_in_grow = false;
+ fds_count *= 2;
+ fds_high_watermark = fds_count * 3 / 4;
+ dprintf("fd count has been grown into %d\n", fds_count);
+ free(work);
+}
+
+static void inline check_idx(int idx)
+{
+ struct work *w;
+
+ if (idx <= fds_high_watermark)
+ return;
+ if (fds_in_grow)
+ return;
+
+ w = xmalloc(sizeof(*w));
+ w->fn = do_grow_fds;
+ w->done = grow_fds_done;
+ fds_in_grow = true;
+ queue_work(sys->sockfd_wqueue, w);
+}
+
static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name)
{
struct sockfd_cache_entry *entry;
@@ -281,7 +344,9 @@ static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name)
if (!entry)
return NULL;
- if (entry->fd[idx] != -1) {
+ check_idx(idx);
+
+ if (entry->fds[idx].fd != -1) {
dprintf("%s:%d, idx %d\n", name, nid->port, idx);
goto out;
}
@@ -290,14 +355,14 @@ static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name)
dprintf("create connection %s:%d idx %d\n", name, nid->port, idx);
fd = connect_to(name, nid->port);
if (fd < 0) {
- uatomic_dec(&entry->fd_in_use[idx]);
+ uatomic_dec(&entry->fds[idx].fd_in_use);
return NULL;
}
- entry->fd[idx] = fd;
+ entry->fds[idx].fd = fd;
out:
sfd = xmalloc(sizeof(*sfd));
- sfd->fd = entry->fd[idx];
+ sfd->fd = entry->fds[idx].fd;
sfd->idx = idx;
return sfd;
}
@@ -316,7 +381,7 @@ static void sockfd_cache_put(struct node_id *nid, int idx)
pthread_rwlock_unlock(&sockfd_cache.lock);
assert(entry);
- refcnt = uatomic_cmpxchg(&entry->fd_in_use[idx], 1, 0);
+ refcnt = uatomic_cmpxchg(&entry->fds[idx].fd_in_use, 1, 0);
assert(refcnt == 1);
}
--
1.7.10.2
More information about the sheepdog
mailing list