[sheepdog] [PATCH 3/6] sheep, sockfd cache: cache more than one FD for each node
Liu Yuan
namei.unix at gmail.com
Sun Jun 24 14:51:50 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
This is inspired by the observation that each Guest can issue as much
as 4 requests in one go.
The complexity added to the code is seen outside of sockfd cache:
add one more parameter to the API: FD index. The underlying core needs this to
identify which FD belongs one node is actually used.
I think this trade-off is a good deal.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/gateway.c | 31 ++++---
sheep/sheep_priv.h | 6 +-
sheep/sockfd_cache.c | 218 ++++++++++++++++++++++++++++++++++++++------------
3 files changed, 188 insertions(+), 67 deletions(-)
diff --git a/sheep/gateway.c b/sheep/gateway.c
index c3e5f80..f126dfb 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -54,11 +54,13 @@ read_remote:
j = random();
for (i = 0; i < nr_copies; i++) {
int idx = (i + j) % nr_copies;
+ int sock_idx;
+
v = obj_vnodes[idx];
if (vnode_is_local(v))
continue;
- fd = sheep_get_fd(v);
+ fd = sheep_get_fd(v, &sock_idx);
if (fd < 0) {
ret = SD_RES_NETWORK_ERROR;
continue;
@@ -70,11 +72,11 @@ read_remote:
ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen);
if (ret) { /* network errors */
- sheep_del_fd(v, fd);
+ sheep_del_fd(v, fd, sock_idx);
ret = SD_RES_NETWORK_ERROR;
continue;
} else {
- sheep_put_fd(v, fd);
+ sheep_put_fd(v, fd, sock_idx);
memcpy(&req->rp, rsp, sizeof(*rsp));
ret = rsp->result;
break;
@@ -86,6 +88,7 @@ read_remote:
struct write_info {
struct pollfd pfds[SD_MAX_REDUNDANCY];
struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
+ int sock_idx[SD_MAX_REDUNDANCY];
};
int forward_write_obj_req(struct request *req)
@@ -104,9 +107,10 @@ int forward_write_obj_req(struct request *req)
dprintf("%"PRIx64"\n", oid);
- memset(&wi, 0, sizeof(wi));
- for (i = 0; i < SD_MAX_REDUNDANCY; i++)
+ for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
wi.pfds[i].fd = -1;
+ wi.vnodes[i] = NULL;
+ }
memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
@@ -125,7 +129,7 @@ int forward_write_obj_req(struct request *req)
continue;
}
- fd = sheep_get_fd(v);
+ fd = sheep_get_fd(v, &wi.sock_idx[nr_fds]);
if (fd < 0) {
ret = SD_RES_NETWORK_ERROR;
goto err;
@@ -133,7 +137,7 @@ int forward_write_obj_req(struct request *req)
ret = send_req(fd, &fwd_hdr, req->data, &wlen);
if (ret) { /* network errors */
- sheep_del_fd(v, fd);
+ sheep_del_fd(v, fd, wi.sock_idx[nr_fds]);
ret = SD_RES_NETWORK_ERROR;
dprintf("fail %"PRIu32"\n", ret);
goto err;
@@ -174,7 +178,8 @@ again:
if (wi.pfds[i].revents & POLLERR ||
wi.pfds[i].revents & POLLHUP ||
wi.pfds[i].revents & POLLNVAL) {
- sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
+ sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
+ wi.sock_idx[i]);
ret = SD_RES_NETWORK_ERROR;
break;
}
@@ -184,7 +189,8 @@ again:
if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
eprintf("failed to read a response: %m\n");
- sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
+ sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
+ wi.sock_idx[i]);
ret = SD_RES_NETWORK_ERROR;
break;
}
@@ -193,8 +199,7 @@ again:
eprintf("fail %"PRIu32"\n", rsp->result);
ret = rsp->result;
}
-
- sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd);
+ sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
break;
}
if (i < nr_fds) {
@@ -203,6 +208,8 @@ again:
sizeof(struct pollfd) * (nr_fds - i));
memmove(wi.vnodes + i, wi.vnodes + i + 1,
sizeof(struct sd_vnode *) * (nr_fds - i));
+ memmove(wi.sock_idx + i, wi.sock_idx + i + 1,
+ sizeof(int) * (nr_fds - i));
}
dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
@@ -213,7 +220,7 @@ out:
return ret;
err:
for (i = 0; i < nr_fds; i++)
- sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
+ sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
return ret;
}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 83a42e3..747599c 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -408,8 +408,8 @@ void sockfd_cache_del(struct node_id *);
void sockfd_cache_add(struct sd_node *);
void sockfd_cache_add_group(struct sd_node *nodes, int nr);
-int sheep_get_fd(struct sd_vnode *vnode);
-void sheep_put_fd(struct sd_vnode *vnode, int fd);
-void sheep_del_fd(struct sd_vnode *vnode, int fd);
+int sheep_get_fd(struct sd_vnode *vnode, int *);
+void sheep_put_fd(struct sd_vnode *vnode, int fd, int);
+void sheep_del_fd(struct sd_vnode *vnode, int fd, int);
#endif
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index 4f7ff5c..a756834 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -11,6 +11,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+/*
+ * The sockfd cache provides us long TCP connections connected to the nodes
+ * in the cluster to accerlater the data transfer, which has the following
+ * characteristics:
+ * 0 dynamically allocated/deallocated at node granularity.
+ * 1 cached fds are multiplexed by all threads.
+ * 2 each session (for e.g, forward_write_obj_req) can grab one fd at a time.
+ * 3 if there isn't any FD available from cache, use normal connect_to() and
+ * close() internally.
+ * 4 FD are named by IP:PORT uniquely, hence no need of resetting at
+ * membership change.
+ * 5 the total number of FDs is scalable to massive nodes.
+ * 6 total 3 APIs: sheep_{get,put,del}_fd().
+ */
#include <urcu/uatomic.h>
#include <pthread.h>
#include <stdint.h>
@@ -42,9 +56,10 @@ static struct sockfd_cache sockfd_cache = {
struct sockfd_cache_entry {
struct rb_node rb;
- int fd;
- uint8_t refcount;
struct node_id nid;
+#define SOCKFD_CACHE_MAX_FD 8 /* How many FDs we cache for one node */
+ int fd[SOCKFD_CACHE_MAX_FD];
+ uint8_t fd_in_use[SOCKFD_CACHE_MAX_FD];
};
static inline int node_id_cmp(const void *a, const void *b)
@@ -114,34 +129,105 @@ static struct sockfd_cache_entry *sockfd_cache_search(struct node_id *nid)
return NULL;
}
-static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid)
+static inline int get_free_slot(struct sockfd_cache_entry *entry)
+{
+ int idx = -1, i;
+
+ for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) {
+ if (uatomic_cmpxchg(&entry->fd_in_use[i], 0, 1))
+ continue;
+ idx = i;
+ break;
+ }
+ return idx;
+}
+
+/*
+ * Grab a free slot of the node and inc the refcount of the slot
+ *
+ * If no free slot available, this typically means we should use short FD.
+ */
+static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid,
+ char *name, int *ret_idx)
{
struct sockfd_cache_entry *entry;
pthread_rwlock_rdlock(&sockfd_cache.lock);
entry = sockfd_cache_search(nid);
- pthread_rwlock_unlock(&sockfd_cache.lock);
- assert(entry);
- /* if refcount == 0, set it to 1, otherwise someone holds it */
- if (uatomic_cmpxchg(&entry->refcount, 0, 1))
- return NULL;
+ if (!entry){
+ dprintf("failed node %s:%d\n", name, nid->port);
+ goto out;
+ }
+ *ret_idx = get_free_slot(entry);
+ if (*ret_idx == -1)
+ entry = NULL;
+out:
+ pthread_rwlock_unlock(&sockfd_cache.lock);
return entry;
}
-void sockfd_cache_del(struct node_id *nid)
+static inline bool slots_all_free(struct sockfd_cache_entry *entry)
+{
+ int i;
+ for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+ if (uatomic_read(&entry->fd_in_use[i]))
+ return false;
+ return true;
+}
+
+static inline void destroy_all_slots(struct sockfd_cache_entry *entry)
+{
+ int i;
+ for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+ if (entry->fd[i] != -1)
+ close(entry->fd[i]);
+}
+
+/*
+ * Destroy all the Cached FDs of the node
+ *
+ * We don't proceed if some other node grab one FD of the node. In this case,
+ * the victim node will finally find itself talking to a dead node and call
+ * sheep_del_fd() to delete this node from the cache.
+ */
+static bool sockfd_cache_destroy(struct node_id *nid)
{
struct sockfd_cache_entry *entry;
+
+ pthread_rwlock_wrlock(&sockfd_cache.lock);
+ entry = sockfd_cache_search(nid);
+ if (!entry) {
+ dprintf("It is already destroyed\n");
+ goto false_out;
+ }
+
+ if (!slots_all_free(entry)) {
+ dprintf("Some victim still holds it\n");
+ goto false_out;
+ }
+
+ rb_erase(&entry->rb, &sockfd_cache.root);
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+
+ destroy_all_slots(entry);
+ free(entry);
+
+ return true;
+false_out:
+ pthread_rwlock_unlock(&sockfd_cache.lock);
+ return false;
+}
+
+/* When node craches, we should delete it from the cache */
+void sockfd_cache_del(struct node_id *nid)
+{
char name[INET6_ADDRSTRLEN];
int n;
- entry = sockfd_cache_grab(nid);
- /* Hmmm, some victim still holds it, he is supposed to delete it */
- if (!entry)
+ if (!sockfd_cache_destroy(nid))
return;
- rb_erase(&entry->rb, &sockfd_cache.root);
- free(entry);
n = uatomic_sub_return(&sockfd_cache.count, 1);
addr_to_str(name, sizeof(name), nid->addr, 0);
dprintf("%s:%d, count %d\n", name, nid->port, n);
@@ -150,8 +236,11 @@ void sockfd_cache_del(struct node_id *nid)
static void sockfd_cache_add_nolock(struct node_id *nid)
{
struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+ int i;
+
+ for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+ new->fd[i] = -1;
- new->fd = -1;
memcpy(&new->nid, nid, sizeof(struct node_id));
if (sockfd_cache_insert(new)) {
free(new);
@@ -160,6 +249,7 @@ static void sockfd_cache_add_nolock(struct node_id *nid)
sockfd_cache.count++;
}
+/* Add group of nodes to the cache */
void sockfd_cache_add_group(struct sd_node *nodes, int nr)
{
struct sd_node *p;
@@ -175,13 +265,16 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr)
pthread_rwlock_unlock(&sockfd_cache.lock);
}
+/* Add one node to the cache means we can do caching tricks on this node */
void sockfd_cache_add(struct sd_node *node)
{
struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
char name[INET6_ADDRSTRLEN];
- int n;
+ int n, i;
+
+ for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+ new->fd[i] = -1;
- new->fd = -1;
memcpy(&new->nid, node, sizeof(struct node_id));
pthread_rwlock_rdlock(&sockfd_cache.lock);
if (sockfd_cache_insert(new)) {
@@ -195,58 +288,71 @@ void sockfd_cache_add(struct sd_node *node)
dprintf("%s:%d, count %d\n", name, node->port, n);
}
-static int sockfd_cache_get(struct node_id *nid)
+static int sockfd_cache_get(struct node_id *nid, char *name, int *ret_idx)
{
struct sockfd_cache_entry *entry;
- char name[INET6_ADDRSTRLEN];
int fd;
- entry = sockfd_cache_grab(nid);
+ entry = sockfd_cache_grab(nid, name, ret_idx);
if (!entry)
return -1;
- if (entry->fd != -1)
- return entry->fd;
+ if (entry->fd[*ret_idx] != -1) {
+ dprintf("%s:%d, idx %d\n", name, nid->port, *ret_idx);
+ return entry->fd[*ret_idx];
+ }
- /* Create a new connection for this vnode */
- addr_to_str(name, sizeof(name), nid->addr, 0);
- dprintf("create connection %s:%d\n", name, nid->port);
+ /* Create a new cached connection for this vnode */
+ dprintf("create connection %s:%d idx %d\n", name, nid->port, *ret_idx);
fd = connect_to(name, nid->port);
if (fd < 0) {
- uatomic_dec(&entry->refcount);
+ uatomic_dec(&entry->fd_in_use[*ret_idx]);
return -1;
}
- entry->fd = fd;
+ entry->fd[*ret_idx] = fd;
return fd;
}
-static void sockfd_cache_put(struct node_id *nid)
+static void sockfd_cache_put(struct node_id *nid, int idx)
{
struct sockfd_cache_entry *entry;
char name[INET6_ADDRSTRLEN];
int refcnt;
addr_to_str(name, sizeof(name), nid->addr, 0);
- dprintf("%s:%d\n", name, nid->port);
+ dprintf("%s:%d idx %d\n", name, nid->port, idx);
+
pthread_rwlock_rdlock(&sockfd_cache.lock);
entry = sockfd_cache_search(nid);
pthread_rwlock_unlock(&sockfd_cache.lock);
+
assert(entry);
- refcnt = uatomic_cmpxchg(&entry->refcount, 1, 0);
+ refcnt = uatomic_cmpxchg(&entry->fd_in_use[idx], 1, 0);
assert(refcnt == 1);
}
-int sheep_get_fd(struct sd_vnode *vnode)
+/*
+ * Return a FD connected to the vnode to the caller
+ *
+ * Try to get a 'long' FD as best, which is cached and never closed. If no FD
+ * available, we return a 'short' FD which is supposed to be closed by
+ * sheep_get_put().
+ *
+ * ret_idx is opaque to the caller, -1 indicates it is a short FD.
+ */
+int sheep_get_fd(struct sd_vnode *vnode, int *ret_idx)
{
struct node_id *nid = (struct node_id *)vnode;
char name[INET6_ADDRSTRLEN];
- int fd = sockfd_cache_get(nid);
+ int fd;
+ addr_to_str(name, sizeof(name), nid->addr, 0);
+ fd = sockfd_cache_get(nid, name, ret_idx);
if (fd != -1)
return fd;
- addr_to_str(name, sizeof(name), nid->addr, 0);
+ /* Create a fd that is to be closed */
fd = connect_to(name, nid->port);
if (fd < 0) {
dprintf("failed connect to %s:%d\n", name, nid->port);
@@ -257,37 +363,45 @@ int sheep_get_fd(struct sd_vnode *vnode)
return fd;
}
-void sheep_put_fd(struct sd_vnode *vnode, int fd)
+/*
+ * Rlease a FD connected to the vnode, which is acquired from sheep_get_fd()
+ *
+ * If it is a long FD, just decrease the refcount to make it available again.
+ * If it is a short FD, close it.
+ *
+ * sheep_put_fd() or sheep_del_fd() should be paired with sheep_get_fd()
+ */
+
+void sheep_put_fd(struct sd_vnode *vnode, int fd, int idx)
{
struct node_id *nid = (struct node_id *)vnode;
- struct sockfd_cache_entry *entry;
- pthread_rwlock_rdlock(&sockfd_cache.lock);
- entry = sockfd_cache_search(nid);
- pthread_rwlock_unlock(&sockfd_cache.lock);
- assert(entry);
- if (entry->fd == fd) {
- sockfd_cache_put(nid);
- } else {
+ if (idx == -1) {
dprintf("%d\n", fd);
close(fd);
+ return;
}
+
+ sockfd_cache_put(nid, idx);
}
-void sheep_del_fd(struct sd_vnode *vnode, int fd)
+/*
+ * Delete a FD connected to the vnode, when vnode is crashed.
+ *
+ * If it is a long FD, de-refcount it and tres to destroy all the cached FDs of
+ * this vnode in the cache.
+ * If it is a short FD, just close it.
+ */
+void sheep_del_fd(struct sd_vnode *vnode, int fd, int idx)
{
struct node_id *nid = (struct node_id *)vnode;
- struct sockfd_cache_entry *entry;
- pthread_rwlock_rdlock(&sockfd_cache.lock);
- entry = sockfd_cache_search(nid);
- pthread_rwlock_unlock(&sockfd_cache.lock);
- assert(entry);
- if (entry->fd == fd) {
- sockfd_cache_put(nid);
- sockfd_cache_del(nid);
- } else {
+ if (idx == -1) {
dprintf("%d\n", fd);
close(fd);
+ return;
}
+
+ sockfd_cache_put(nid, idx);
+ sockfd_cache_del(nid);
}
--
1.7.10.2
More information about the sheepdog
mailing list