[sheepdog] [PATCH 1/8] sheep: redesign a new cached sockfd pool
Liu Yuan
namei.unix at gmail.com
Wed Jun 27 09:25:32 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
Old sockfd pool has following defect:
0 statically allocated.
1 use too many fds per sheep, not scalable
2 implemented per thread, can't be shared between threads
3 need resetting at every membership change
The new sockfd cache aims to address these problems yet remain as effecient as
old one:
0 dynamically allocated/deallocated at node granularity.
1 cached fds are multiplexed by all threads.
2 each session (for e.g, forward_write_obj_req) can grab one fd at a time
3 if there isn't any FD available from cache, use normal connect_to() and close()
internally
4 FD are named by IP:PORT uniquely, hence no need of resetting at membership
change
5 the total number of FDs shrinks from (nr_gateway + nr_io) * nr_nodes to nr_nodes
6 just add one more API, totally 3 APIs: sheep_{get,put,del}_fd()
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
include/sheep.h | 2 +-
sheep/Makefile.am | 2 +-
sheep/gateway.c | 55 ++++++----
sheep/group.c | 9 +-
sheep/sdnet.c | 65 -----------
sheep/sheep_priv.h | 14 ++-
sheep/sockfd_cache.c | 292 ++++++++++++++++++++++++++++++++++++++++++++++++++
7 files changed, 345 insertions(+), 94 deletions(-)
create mode 100644 sheep/sockfd_cache.c
diff --git a/include/sheep.h b/include/sheep.h
index 06bd93c..a188d18 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -158,11 +158,11 @@ struct sd_node {
};
struct sd_vnode {
- uint64_t id;
uint8_t addr[16];
uint16_t port;
uint16_t node_idx;
uint32_t zone;
+ uint64_t id;
};
struct epoch_log {
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 1cb2ebf..0a874c6 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -26,7 +26,7 @@ sbin_PROGRAMS = sheep
sheep_SOURCES = sheep.c group.c sdnet.c gateway.c store.c vdi.c work.c \
journal.c ops.c recovery.c cluster/local.c \
- object_cache.c object_list_cache.c
+ object_cache.c object_list_cache.c sockfd_cache.c
if BUILD_COROSYNC
sheep_SOURCES += cluster/corosync.c
diff --git a/sheep/gateway.c b/sheep/gateway.c
index 42f028a..c3e5f80 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -58,7 +58,7 @@ read_remote:
if (vnode_is_local(v))
continue;
- fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch);
+ fd = sheep_get_fd(v);
if (fd < 0) {
ret = SD_RES_NETWORK_ERROR;
continue;
@@ -70,10 +70,11 @@ read_remote:
ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen);
if (ret) { /* network errors */
- del_sheep_fd(fd);
+ sheep_del_fd(v, fd);
ret = SD_RES_NETWORK_ERROR;
continue;
} else {
+ sheep_put_fd(v, fd);
memcpy(&req->rp, rsp, sizeof(*rsp));
ret = rsp->result;
break;
@@ -82,6 +83,11 @@ read_remote:
return ret;
}
+struct write_info {
+ struct pollfd pfds[SD_MAX_REDUNDANCY];
+ struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
+};
+
int forward_write_obj_req(struct request *req)
{
int i, fd, ret, pollret;
@@ -93,15 +99,14 @@ int forward_write_obj_req(struct request *req)
struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
uint64_t oid = req->rq.obj.oid;
int nr_copies;
- struct pollfd pfds[SD_MAX_REDUNDANCY];
- int nr_fds, local = 0;
+ int nr_fds = 0, local = 0;
+ struct write_info wi;
dprintf("%"PRIx64"\n", oid);
- nr_fds = 0;
- memset(pfds, 0, sizeof(pfds));
- for (i = 0; i < ARRAY_SIZE(pfds); i++)
- pfds[i].fd = -1;
+ memset(&wi, 0, sizeof(wi));
+ for (i = 0; i < SD_MAX_REDUNDANCY; i++)
+ wi.pfds[i].fd = -1;
memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
@@ -120,24 +125,23 @@ int forward_write_obj_req(struct request *req)
continue;
}
- fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch);
+ fd = sheep_get_fd(v);
if (fd < 0) {
- eprintf("failed to connect to %s:%"PRIu32"\n", name,
- v->port);
ret = SD_RES_NETWORK_ERROR;
goto err;
}
ret = send_req(fd, &fwd_hdr, req->data, &wlen);
if (ret) { /* network errors */
- del_sheep_fd(fd);
+ sheep_del_fd(v, fd);
ret = SD_RES_NETWORK_ERROR;
dprintf("fail %"PRIu32"\n", ret);
goto err;
}
- pfds[nr_fds].fd = fd;
- pfds[nr_fds].events = POLLIN;
+ wi.vnodes[nr_fds] = v;
+ wi.pfds[nr_fds].fd = fd;
+ wi.pfds[nr_fds].events = POLLIN;
nr_fds++;
}
@@ -157,7 +161,7 @@ int forward_write_obj_req(struct request *req)
ret = SD_RES_SUCCESS;
again:
- pollret = poll(pfds, nr_fds, -1);
+ pollret = poll(wi.pfds, nr_fds, -1);
if (pollret < 0) {
if (errno == EINTR)
goto again;
@@ -167,19 +171,20 @@ again:
}
for (i = 0; i < nr_fds; i++) {
- if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP ||
- pfds[i].revents & POLLNVAL) {
- del_sheep_fd(pfds[i].fd);
+ if (wi.pfds[i].revents & POLLERR ||
+ wi.pfds[i].revents & POLLHUP ||
+ wi.pfds[i].revents & POLLNVAL) {
+ sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
ret = SD_RES_NETWORK_ERROR;
break;
}
- if (!(pfds[i].revents & POLLIN))
+ if (!(wi.pfds[i].revents & POLLIN))
continue;
- if (do_read(pfds[i].fd, rsp, sizeof(*rsp))) {
+ if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
eprintf("failed to read a response: %m\n");
- del_sheep_fd(pfds[i].fd);
+ sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
ret = SD_RES_NETWORK_ERROR;
break;
}
@@ -189,11 +194,15 @@ again:
ret = rsp->result;
}
+ sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd);
break;
}
if (i < nr_fds) {
nr_fds--;
- memmove(pfds + i, pfds + i + 1, sizeof(*pfds) * (nr_fds - i));
+ memmove(wi.pfds + i, wi.pfds + i + 1,
+ sizeof(struct pollfd) * (nr_fds - i));
+ memmove(wi.vnodes + i, wi.vnodes + i + 1,
+ sizeof(struct sd_vnode *) * (nr_fds - i));
}
dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
@@ -204,7 +213,7 @@ out:
return ret;
err:
for (i = 0; i < nr_fds; i++)
- del_sheep_fd(pfds[i].fd);
+ sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
return ret;
}
diff --git a/sheep/group.c b/sheep/group.c
index 1dec931..681672c 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -803,10 +803,13 @@ static void finish_join(struct join_message *msg, struct sd_node *joined,
if (sd_store->purge_obj &&
sd_store->purge_obj() != SD_RES_SUCCESS)
eprintf("WARN: may have stale objects\n");
+
+ sockfd_cache_add_group(nodes, nr_nodes);
}
static void update_cluster_info(struct join_message *msg,
- struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes)
+ struct sd_node *joined, struct sd_node *nodes,
+ size_t nr_nodes)
{
struct vnode_info *old_vnode_info;
@@ -867,6 +870,8 @@ static void update_cluster_info(struct join_message *msg,
if (current_vnode_info->nr_zones >= sys->nr_copies)
sys_stat_set(SD_STATUS_OK);
}
+
+ sockfd_cache_add(joined);
}
/*
@@ -1110,6 +1115,8 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members,
if (current_vnode_info->nr_zones < sys->nr_copies)
sys_stat_set(SD_STATUS_HALT);
}
+
+ sockfd_cache_del((struct node_id *)left);
}
int create_cluster(int port, int64_t zone, int nr_vnodes,
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 978c8d0..3bcccce 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -749,68 +749,3 @@ int create_listen_port(int port, void *data)
return create_listen_ports(port, create_listen_port_fn, data);
}
-static __thread int cached_fds[SD_MAX_NODES];
-static __thread uint32_t cached_epoch = 0;
-
-void del_sheep_fd(int fd)
-{
- int i;
-
- for (i = 0; i < SD_MAX_NODES; i++) {
- if (cached_fds[i] == fd) {
- if (fd >= 0)
- close(fd);
-
- cached_fds[i] = -1;
-
- return;
- }
- }
-}
-
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch)
-{
- int i, fd;
- char name[INET6_ADDRSTRLEN];
-
- if (cached_epoch == 0) {
- /* initialize */
- for (i = 0; i < SD_MAX_NODES; i++)
- cached_fds[i] = -1;
-
- cached_epoch = epoch;
- }
-
- if (before(epoch, cached_epoch)) {
- eprintf("requested epoch is smaller than the previous one: %d < %d\n",
- epoch, cached_epoch);
- return -1;
- }
- if (after(epoch, cached_epoch)) {
- for (i = 0; i < SD_MAX_NODES; i++) {
- if (cached_fds[i] >= 0)
- close(cached_fds[i]);
-
- cached_fds[i] = -1;
- }
- cached_epoch = epoch;
- }
-
- fd = cached_fds[node_idx];
- dprintf("%d, %d\n", epoch, fd);
-
- if (cached_epoch == epoch && fd >= 0) {
- dprintf("using the cached fd %d\n", fd);
- return fd;
- }
-
- addr_to_str(name, sizeof(name), addr, 0);
-
- fd = connect_to(name, port);
- if (fd < 0)
- return -1;
-
- cached_fds[node_idx] = fd;
-
- return fd;
-}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index dfafeae..f7db3ed 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -289,9 +289,6 @@ int read_object(struct vnode_info *vnodes, uint32_t node_version,
int remove_object(struct vnode_info *vnodes, uint32_t node_version,
uint64_t oid, int nr);
-void del_sheep_fd(int fd);
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch);
-
int prealloc(int fd, uint32_t size);
int objlist_cache_insert(uint64_t oid);
@@ -401,4 +398,15 @@ void object_cache_delete(uint32_t vid);
int object_cache_init(const char *p);
+/* sockfd_cache */
+struct node_id;
+
+void sockfd_cache_del(struct node_id *);
+void sockfd_cache_add(struct sd_node *);
+void sockfd_cache_add_group(struct sd_node *nodes, int nr);
+
+int sheep_get_fd(struct sd_vnode *vnode);
+void sheep_put_fd(struct sd_vnode *vnode, int fd);
+void sheep_del_fd(struct sd_vnode *vnode, int fd);
+
#endif
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
new file mode 100644
index 0000000..00095c2
--- /dev/null
+++ b/sheep/sockfd_cache.c
@@ -0,0 +1,292 @@
+/*
+ * Copyright (C) 2012 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <urcu/uatomic.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "sheep.h"
+#include "sheep_priv.h"
+#include "list.h"
+#include "rbtree.h"
+#include "logger.h"
+#include "util.h"
+
+struct node_id {
+ uint8_t addr[16];
+ uint16_t port;
+};
+
+struct sockfd_cache {
+ struct rb_root root;
+ pthread_rwlock_t lock;
+ int count;
+};
+
+static struct sockfd_cache sockfd_cache = {
+ .root = RB_ROOT,
+ .lock = PTHREAD_RWLOCK_INITIALIZER,
+};
+
+struct sockfd_cache_entry {
+ struct rb_node rb;
+ int fd;
+ uint8_t refcount;
+ struct node_id nid;
+};
+
+static inline int node_id_cmp(const void *a, const void *b)
+{
+ const struct node_id *node1 = a;
+ const struct node_id *node2 = b;
+ int cmp;
+
+ cmp = memcmp(node1->addr, node2->addr, sizeof(node1->addr));
+ if (cmp != 0)
+ return cmp;
+
+ if (node1->port < node2->port)
+ return -1;
+ if (node1->port > node2->port)
+ return 1;
+ return 0;
+}
+
+static struct sockfd_cache_entry *
+sockfd_cache_insert(struct sockfd_cache_entry *new)
+{
+ struct rb_node **p = &sockfd_cache.root.rb_node;
+ struct rb_node *parent = NULL;
+ struct sockfd_cache_entry *entry;
+
+ while (*p) {
+ int cmp;
+
+ parent = *p;
+ entry = rb_entry(parent, struct sockfd_cache_entry, rb);
+ cmp = node_id_cmp(&new->nid, &entry->nid);
+
+ if (cmp < 0)
+ p = &(*p)->rb_left;
+ else if (cmp > 0)
+ p = &(*p)->rb_right;
+ else
+ return entry;
+ }
+ rb_link_node(&new->rb, parent, p);
+ rb_insert_color(&new->rb, &sockfd_cache.root);
+
+ return NULL; /* insert successfully */
+}
+
+static struct sockfd_cache_entry *sockfd_cache_search(struct node_id *nid)
+{
+ struct rb_node *n = sockfd_cache.root.rb_node;
+ struct sockfd_cache_entry *t;
+
+ while (n) {
+ int cmp;
+
+ t = rb_entry(n, struct sockfd_cache_entry, rb);
+ cmp = node_id_cmp(nid, &t->nid);
+
+ if (cmp < 0)
+ n = n->rb_left;
+ else if (cmp > 0)
+ n = n->rb_right;
+ else
+ return t; /* found it */
+ }
+
+ return NULL;
+}
+
+static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid)
+{
+ struct sockfd_cache_entry *entry;
+
+ pthread_rwlock_rdlock(&sockfd_cache.lock);
+ entry = sockfd_cache_search(nid);
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+ assert(entry);
+ /* if refcount == 0, set it to 1, otherwise someone holds it */
+ if (uatomic_cmpxchg(&entry->refcount, 0, 1))
+ return NULL;
+
+ return entry;
+}
+
+void sockfd_cache_del(struct node_id *nid)
+{
+ struct sockfd_cache_entry *entry;
+ char name[INET6_ADDRSTRLEN];
+ int n;
+
+ entry = sockfd_cache_grab(nid);
+ /* Hmmm, some victim still holds it, he is supposed to delete it */
+ if (!entry)
+ return;
+
+ rb_erase(&entry->rb, &sockfd_cache.root);
+ free(entry);
+ n = uatomic_sub_return(&sockfd_cache.count, 1);
+ addr_to_str(name, sizeof(name), nid->addr, 0);
+ dprintf("%s:%d, count %d\n", name, nid->port, n);
+}
+
+static void sockfd_cache_add_nolock(struct node_id *nid)
+{
+ struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+
+ new->fd = -1;
+ memcpy(&new->nid, nid, sizeof(struct node_id));
+ if (sockfd_cache_insert(new)) {
+ free(new);
+ return;
+ }
+ sockfd_cache.count++;
+}
+
+void sockfd_cache_add_group(struct sd_node *nodes, int nr)
+{
+ struct sd_node *p;
+ struct node_id *nid;
+
+ dprintf("%d\n", nr);
+ pthread_rwlock_wrlock(&sockfd_cache.lock);
+ while (nr--) {
+ p = nodes + nr;
+ nid = (struct node_id *)p;
+ sockfd_cache_add_nolock(nid);
+ }
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+}
+
+void sockfd_cache_add(struct sd_node *node)
+{
+ struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+ char name[INET6_ADDRSTRLEN];
+ int n;
+
+ new->fd = -1;
+ memcpy(&new->nid, node, sizeof(struct node_id));
+ pthread_rwlock_rdlock(&sockfd_cache.lock);
+ if (sockfd_cache_insert(new)) {
+ free(new);
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+ return;
+ }
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+ n = uatomic_add_return(&sockfd_cache.count, 1);
+ addr_to_str(name, sizeof(name), node->addr, 0);
+ dprintf("%s:%d, count %d\n", name, node->port, n);
+}
+
+static int sockfd_cache_get(struct node_id *nid)
+{
+ struct sockfd_cache_entry *entry;
+ char name[INET6_ADDRSTRLEN];
+ int fd;
+
+ entry = sockfd_cache_grab(nid);
+ if (!entry)
+ return -1;
+
+ if (entry->fd != -1)
+ return entry->fd;
+
+ /* Create a new connection for this vnode */
+ addr_to_str(name, sizeof(name), nid->addr, 0);
+ dprintf("create connection %s:%d\n", name, nid->port);
+ fd = connect_to(name, nid->port);
+ if (fd < 0) {
+ uatomic_dec(&entry->refcount);
+ return -1;
+ }
+ entry->fd = fd;
+
+ return fd;
+}
+
+static void sockfd_cache_put(struct node_id *nid)
+{
+ struct sockfd_cache_entry *entry;
+ char name[INET6_ADDRSTRLEN];
+ int refcnt;
+
+ addr_to_str(name, sizeof(name), nid->addr, 0);
+ dprintf("%s:%d\n", name, nid->port);
+ pthread_rwlock_rdlock(&sockfd_cache.lock);
+ entry = sockfd_cache_search(nid);
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+ assert(entry);
+ refcnt = uatomic_cmpxchg(&entry->refcount, 1, 0);
+ assert(refcnt == 1);
+}
+
+int sheep_get_fd(struct sd_vnode *vnode)
+{
+ struct node_id *nid = (struct node_id *)vnode;
+ char name[INET6_ADDRSTRLEN];
+ int fd = sockfd_cache_get(nid);
+
+ if (fd != -1)
+ return fd;
+
+ addr_to_str(name, sizeof(name), nid->addr, 0);
+ fd = connect_to(name, nid->port);
+ if (fd < 0) {
+ dprintf("failed connect to %s:%d\n", name, nid->port);
+ return -1;
+ }
+
+ dprintf("%d\n", fd);
+ return fd;
+}
+
+void sheep_put_fd(struct sd_vnode *vnode, int fd)
+{
+ struct node_id *nid = (struct node_id *)vnode;
+ struct sockfd_cache_entry *entry;
+
+ pthread_rwlock_rdlock(&sockfd_cache.lock);
+ entry = sockfd_cache_search(nid);
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+ assert(entry);
+ if (entry->fd == fd) {
+ sockfd_cache_put(nid);
+ } else {
+ dprintf("%d\n", fd);
+ close(fd);
+ }
+}
+
+void sheep_del_fd(struct sd_vnode *vnode, int fd)
+{
+ struct node_id *nid = (struct node_id *)vnode;
+ struct sockfd_cache_entry *entry;
+
+ pthread_rwlock_rdlock(&sockfd_cache.lock);
+ entry = sockfd_cache_search(nid);
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+ assert(entry);
+ if (entry->fd == fd) {
+ sockfd_cache_put(nid);
+ sockfd_cache_del(nid);
+ } else {
+ dprintf("%d\n", fd);
+ close(fd);
+ }
+}
--
1.7.10.2
More information about the sheepdog
mailing list