From: Liu Yuan <tailai.ly at taobao.com> This will scale sheep daemon to serve more VMs on one node and do it adoptively and automatically. - fd count default to 16 instead of previous 8 Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/sheep.c | 1 + sheep/sheep_priv.h | 1 + sheep/sockfd_cache.c | 127 ++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 98 insertions(+), 31 deletions(-) diff --git a/sheep/sheep.c b/sheep/sheep.c index df28a94..380a129 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -344,6 +344,7 @@ int main(int argc, char **argv) sys->recovery_wqueue = init_work_queue("recovery", true); sys->deletion_wqueue = init_work_queue("deletion", true); sys->block_wqueue = init_work_queue("block", true); + sys->sockfd_wqueue = init_work_queue("sockfd", true); if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue || !sys->deletion_wqueue || !sys->block_wqueue) exit(1); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index e455d27..530fe14 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -122,6 +122,7 @@ struct cluster_info { struct work_queue *deletion_wqueue; struct work_queue *recovery_wqueue; struct work_queue *block_wqueue; + struct work_queue *sockfd_wqueue; }; struct siocb { diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c index aa90299..c130ea6 100644 --- a/sheep/sockfd_cache.c +++ b/sheep/sockfd_cache.c @@ -48,23 +48,33 @@ static struct sockfd_cache sockfd_cache = { .lock = PTHREAD_RWLOCK_INITIALIZER, }; +/* + * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at + * most 8 requests can be issued to the same sheep object. Based on this + * assumption, '16' would be effecient for servers that only host 2~4 + * Guests. + * + * This fd count will be dynamically grown when the idx reaches watermark. + */ +#define DEFAULT_FDS_COUNT 16 +#define DEFAULT_FDS_WATERMARK 12 + +/* How many FDs we cache for one node */ +static int fds_count = DEFAULT_FDS_COUNT; + +struct sockfd_cache_fd { + int fd; + uint8_t fd_in_use; +}; + struct sockfd_cache_entry { struct rb_node rb; struct node_id nid; -#define SOCKFD_CACHE_MAX_FD 8 /* How many FDs we cache for one node */ - /* - * FIXME: Make this macro configurable. - * - * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at - * most 8 requests can be issued to the same sheep object. Based on this - * assumption, '8' would be effecient for servers that only host 4~8 - * Guests, but for powerful servers that can host dozens of Guests, we - * might consider bigger value. - */ - int fd[SOCKFD_CACHE_MAX_FD]; - uint8_t fd_in_use[SOCKFD_CACHE_MAX_FD]; + struct sockfd_cache_fd *fds; }; +#define fds_bytes (sizeof(struct sockfd_cache_fd) * fds_count) + static struct sockfd_cache_entry * sockfd_cache_insert(struct sockfd_cache_entry *new) { @@ -118,8 +128,8 @@ static inline int get_free_slot(struct sockfd_cache_entry *entry) { int idx = -1, i; - for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) { - if (uatomic_cmpxchg(&entry->fd_in_use[i], 0, 1)) + for (i = 0; i < fds_count; i++) { + if (uatomic_cmpxchg(&entry->fds[i].fd_in_use, 0, 1)) continue; idx = i; break; @@ -155,8 +165,8 @@ out: static inline bool slots_all_free(struct sockfd_cache_entry *entry) { int i; - for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) - if (uatomic_read(&entry->fd_in_use[i])) + for (i = 0; i < fds_count; i++) + if (uatomic_read(&entry->fds[i].fd_in_use)) return false; return true; } @@ -164,9 +174,9 @@ static inline bool slots_all_free(struct sockfd_cache_entry *entry) static inline void destroy_all_slots(struct sockfd_cache_entry *entry) { int i; - for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) - if (entry->fd[i] != -1) - close(entry->fd[i]); + for (i = 0; i < fds_count; i++) + if (entry->fds[i].fd != -1) + close(entry->fds[i].fd); } /* @@ -220,11 +230,12 @@ void sockfd_cache_del(struct node_id *nid) static void sockfd_cache_add_nolock(struct node_id *nid) { - struct sockfd_cache_entry *new = xzalloc(sizeof(*new)); + struct sockfd_cache_entry *new = xmalloc(sizeof(*new)); int i; - for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) - new->fd[i] = -1; + new->fds = xzalloc(fds_bytes); + for (i = 0; i < fds_count; i++) + new->fds[i].fd = -1; memcpy(&new->nid, nid, sizeof(struct node_id)); if (sockfd_cache_insert(new)) { @@ -251,15 +262,17 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr) /* Add one node to the cache means we can do caching tricks on this node */ void sockfd_cache_add(struct node_id *nid) { - struct sockfd_cache_entry *new = xzalloc(sizeof(*new)); + struct sockfd_cache_entry *new; char name[INET6_ADDRSTRLEN]; int n, i; - for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) - new->fd[i] = -1; + pthread_rwlock_rdlock(&sockfd_cache.lock); + new = xmalloc(sizeof(*new)); + new->fds = xzalloc(fds_bytes); + for (i = 0; i < fds_count; i++) + new->fds[i].fd = -1; memcpy(&new->nid, nid, sizeof(struct node_id)); - pthread_rwlock_rdlock(&sockfd_cache.lock); if (sockfd_cache_insert(new)) { free(new); pthread_rwlock_unlock(&sockfd_cache.lock); @@ -271,6 +284,56 @@ void sockfd_cache_add(struct node_id *nid) dprintf("%s:%d, count %d\n", name, nid->port, n); } +static void do_grow_fds(struct work *work) +{ + struct sockfd_cache_entry *entry; + struct rb_node *p; + int old_fds_count, new_fds_count, new_size, i; + + dprintf("%d\n", fds_count); + pthread_rwlock_wrlock(&sockfd_cache.lock); + old_fds_count = fds_count; + new_fds_count = fds_count * 2; + new_size = fds_bytes * 2; + for (p = rb_first(&sockfd_cache.root); p; p = rb_next(p)) { + entry = rb_entry(p, struct sockfd_cache_entry, rb); + entry->fds = xrealloc(entry->fds, new_size); + for (i = old_fds_count; i < new_fds_count; i++) { + entry->fds[i].fd = -1; + entry->fds[i].fd_in_use = 0; + } + } + pthread_rwlock_unlock(&sockfd_cache.lock); +} + +static bool fds_in_grow; +static int fds_high_watermark = DEFAULT_FDS_WATERMARK; + +static void grow_fds_done(struct work *work) +{ + fds_in_grow = false; + fds_count *= 2; + fds_high_watermark = fds_count * 3 / 4; + dprintf("fd count has been grown into %d\n", fds_count); + free(work); +} + +static void inline check_idx(int idx) +{ + struct work *w; + + if (idx <= fds_high_watermark) + return; + if (fds_in_grow) + return; + + w = xmalloc(sizeof(*w)); + w->fn = do_grow_fds; + w->done = grow_fds_done; + fds_in_grow = true; + queue_work(sys->sockfd_wqueue, w); +} + static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name) { struct sockfd_cache_entry *entry; @@ -281,7 +344,9 @@ static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name) if (!entry) return NULL; - if (entry->fd[idx] != -1) { + check_idx(idx); + + if (entry->fds[idx].fd != -1) { dprintf("%s:%d, idx %d\n", name, nid->port, idx); goto out; } @@ -290,14 +355,14 @@ static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name) dprintf("create connection %s:%d idx %d\n", name, nid->port, idx); fd = connect_to(name, nid->port); if (fd < 0) { - uatomic_dec(&entry->fd_in_use[idx]); + uatomic_dec(&entry->fds[idx].fd_in_use); return NULL; } - entry->fd[idx] = fd; + entry->fds[idx].fd = fd; out: sfd = xmalloc(sizeof(*sfd)); - sfd->fd = entry->fd[idx]; + sfd->fd = entry->fds[idx].fd; sfd->idx = idx; return sfd; } @@ -316,7 +381,7 @@ static void sockfd_cache_put(struct node_id *nid, int idx) pthread_rwlock_unlock(&sockfd_cache.lock); assert(entry); - refcnt = uatomic_cmpxchg(&entry->fd_in_use[idx], 1, 0); + refcnt = uatomic_cmpxchg(&entry->fds[idx].fd_in_use, 1, 0); assert(refcnt == 1); } -- 1.7.10.2 |