[sheepdog] [PATCH 8/8] sockfd cache: group idx and fd as as a struct
Liu Yuan
namei.unix at gmail.com
Wed Jun 27 09:25:39 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
- We get a wew APIs for manipulation of sockfd:
struct sockfd *sheep_get_sockfd(struct node_id *);
void sheep_put_sockfd(struct node_id *, struct sockfd *);
void sheep_del_sockfd(struct node_id *, struct sockfd *);
- change write_info accordingly
- add a sub-structure
- add a new pfd_info for poll system call
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/gateway.c | 108 ++++++++++++++++++++++++++++++--------------------
sheep/sheep_priv.h | 11 +++--
sheep/sockfd_cache.c | 75 ++++++++++++++++++++---------------
3 files changed, 116 insertions(+), 78 deletions(-)
diff --git a/sheep/gateway.c b/sheep/gateway.c
index ccc0e5b..f8a60ea 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -22,7 +22,7 @@
*/
int forward_read_obj_req(struct request *req)
{
- int i, fd, ret = SD_RES_SUCCESS;
+ int i, ret = SD_RES_SUCCESS;
unsigned wlen, rlen;
struct sd_req fwd_hdr;
struct sd_rsp *rsp = (struct sd_rsp *)&fwd_hdr;
@@ -59,15 +59,15 @@ read_remote:
*/
j = random();
for (i = 0; i < nr_copies; i++) {
+ struct sockfd *sfd;
int idx = (i + j) % nr_copies;
- int sock_idx;
v = obj_vnodes[idx];
if (vnode_is_local(v))
continue;
- fd = sheep_get_fd(&v->nid, &sock_idx);
- if (fd < 0) {
+ sfd = sheep_get_sockfd(&v->nid);
+ if (!sfd) {
ret = SD_RES_NETWORK_ERROR;
continue;
}
@@ -75,23 +75,23 @@ read_remote:
wlen = 0;
rlen = fwd_hdr.data_length;
- ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen);
+ ret = exec_req(sfd->fd, &fwd_hdr, req->data, &wlen, &rlen);
if (!ret && rsp->result == SD_RES_SUCCESS) {
memcpy(&req->rp, rsp, sizeof(*rsp));
ret = rsp->result;
- sheep_put_fd(&v->nid, fd, sock_idx);
+ sheep_put_sockfd(&v->nid, sfd);
break; /* Read success */
}
if (ret) {
dprintf("remote node might have gone away");
- sheep_del_fd(&v->nid, fd, sock_idx);
+ sheep_del_sockfd(&v->nid, sfd);
ret = SD_RES_NETWORK_ERROR;
} else {
ret = rsp->result;
eprintf("remote read fail %x\n", ret);
- sheep_put_fd(&v->nid, fd, sock_idx);
+ sheep_put_sockfd(&v->nid, sfd);
}
/* Reset the hdr for next read */
memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
@@ -100,37 +100,48 @@ read_remote:
return ret;
}
+struct write_info_entry {
+ struct pollfd pfd;
+ struct node_id *nid;
+ struct sockfd *sfd;
+};
+
struct write_info {
- struct pollfd pfds[SD_MAX_REDUNDANCY];
- struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
- int sock_idx[SD_MAX_REDUNDANCY];
+ struct write_info_entry ent[SD_MAX_REDUNDANCY];
int nr_sent;
};
-static inline void update_write_info(struct write_info *wi, int pos)
+static inline void write_info_update(struct write_info *wi, int pos)
{
dprintf("%d, %d\n", wi->nr_sent, pos);
wi->nr_sent--;
- memmove(wi->pfds + pos, wi->pfds + pos + 1,
- sizeof(struct pollfd) * (wi->nr_sent - pos));
- memmove(wi->vnodes + pos, wi->vnodes + pos + 1,
- sizeof(struct sd_vnode *) * (wi->nr_sent - pos));
- memmove(wi->sock_idx + pos, wi->sock_idx + pos + 1,
- sizeof(int) * (wi->nr_sent - pos));
+ memmove(wi->ent + pos, wi->ent + pos + 1,
+ sizeof(struct write_info_entry) * (wi->nr_sent - pos));
}
static inline void finish_one_write(struct write_info *wi, int i)
{
- sheep_put_fd(&wi->vnodes[i]->nid, wi->pfds[i].fd,
- wi->sock_idx[i]);
- update_write_info(wi, i);
+ sheep_put_sockfd(wi->ent[i].nid, wi->ent[i].sfd);
+ write_info_update(wi, i);
}
static inline void finish_one_write_err(struct write_info *wi, int i)
{
- sheep_del_fd(&wi->vnodes[i]->nid, wi->pfds[i].fd,
- wi->sock_idx[i]);
- update_write_info(wi, i);
+ sheep_del_sockfd(wi->ent[i].nid, wi->ent[i].sfd);
+ write_info_update(wi, i);
+}
+
+struct pfd_info {
+ struct pollfd pfds[SD_MAX_REDUNDANCY];
+ int nr;
+};
+
+static inline void pfd_info_init(struct write_info *wi, struct pfd_info *pi)
+{
+ int i;
+ for (i = 0; i < wi->nr_sent; i++)
+ pi->pfds[i] = wi->ent[i].pfd;
+ pi->nr = wi->nr_sent;
}
/*
@@ -144,8 +155,10 @@ static inline void finish_one_write_err(struct write_info *wi, int i)
static int wait_forward_write(struct write_info *wi, struct sd_rsp *rsp)
{
int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i;
+ struct pfd_info pi;;
again:
- pollret = poll(wi->pfds, wi->nr_sent, -1);
+ pfd_info_init(wi, &pi);
+ pollret = poll(pi.pfds, pi.nr, -1);
if (pollret < 0) {
if (errno == EINTR)
goto again;
@@ -155,16 +168,16 @@ again:
nr_sent = wi->nr_sent;
for (i = 0; i < nr_sent; i++)
- if (wi->pfds[i].revents & POLLIN)
+ if (pi.pfds[i].revents & POLLIN)
break;
if (i < nr_sent) {
- int re = wi->pfds[i].revents;
+ int re = pi.pfds[i].revents;
dprintf("%d, revents %x\n", i, re);
if (re & (POLLERR | POLLHUP | POLLNVAL)) {
err_ret = SD_RES_NETWORK_ERROR;
finish_one_write_err(wi, i);
} else if (re & POLLIN) {
- if (do_read(wi->pfds[i].fd, rsp, sizeof(*rsp))) {
+ if (do_read(pi.pfds[i].fd, rsp, sizeof(*rsp))) {
eprintf("remote node might have gone away\n");
err_ret = SD_RES_NETWORK_ERROR;
finish_one_write_err(wi, i);
@@ -188,19 +201,28 @@ finish_write:
return err_ret;
}
-static void init_write_info(struct write_info *wi)
+static inline void write_info_init(struct write_info *wi)
{
int i;
- for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
- wi->pfds[i].fd = -1;
- wi->vnodes[i] = NULL;
- }
+ for (i = 0; i < SD_MAX_REDUNDANCY; i++)
+ wi->ent[i].pfd.fd = -1;
wi->nr_sent = 0;
}
+static inline void
+write_info_advance(struct write_info *wi, struct sd_vnode *v,
+ struct sockfd *sfd)
+{
+ wi->ent[wi->nr_sent].nid = &v->nid;
+ wi->ent[wi->nr_sent].pfd.fd = sfd->fd;
+ wi->ent[wi->nr_sent].pfd.events = POLLIN;
+ wi->ent[wi->nr_sent].sfd = sfd;
+ wi->nr_sent++;
+}
+
int forward_write_obj_req(struct request *req)
{
- int i, fd, err_ret = SD_RES_SUCCESS, ret, local = -1;
+ int i, err_ret = SD_RES_SUCCESS, ret, local = -1;
unsigned wlen;
struct sd_req fwd_hdr;
struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
@@ -212,7 +234,7 @@ int forward_write_obj_req(struct request *req)
dprintf("%"PRIx64"\n", oid);
- init_write_info(&wi);
+ write_info_init(&wi);
memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
@@ -222,30 +244,28 @@ int forward_write_obj_req(struct request *req)
oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
for (i = 0; i < nr_copies; i++) {
+ struct sockfd *sfd;
+
v = obj_vnodes[i];
if (vnode_is_local(v)) {
local = i;
continue;
}
- fd = sheep_get_fd(&v->nid, &wi.sock_idx[wi.nr_sent]);
- if (fd < 0) {
+ sfd = sheep_get_sockfd(&v->nid);
+ if (!sfd) {
err_ret = SD_RES_NETWORK_ERROR;
break;
}
- ret = send_req(fd, &fwd_hdr, req->data, &wlen);
+ ret = send_req(sfd->fd, &fwd_hdr, req->data, &wlen);
if (ret) {
- sheep_del_fd(&v->nid, fd, wi.sock_idx[wi.nr_sent]);
+ sheep_del_sockfd(&v->nid, sfd);
err_ret = SD_RES_NETWORK_ERROR;
dprintf("fail %d\n", ret);
break;
}
-
- wi.vnodes[wi.nr_sent] = v;
- wi.pfds[wi.nr_sent].fd = fd;
- wi.pfds[wi.nr_sent].events = POLLIN;
- wi.nr_sent++;
+ write_info_advance(&wi, v, sfd);
}
if (local != -1 && err_ret == SD_RES_SUCCESS) {
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index a5b0399..a497b24 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -399,12 +399,17 @@ void object_cache_delete(uint32_t vid);
int object_cache_init(const char *p);
/* sockfd_cache */
+struct sockfd {
+ int fd;
+ int idx;
+};
+
void sockfd_cache_del(struct node_id *);
void sockfd_cache_add(struct node_id *);
void sockfd_cache_add_group(struct sd_node *nodes, int nr);
-int sheep_get_fd(struct node_id *, int *);
-void sheep_put_fd(struct node_id *, int fd, int);
-void sheep_del_fd(struct node_id *, int fd, int);
+struct sockfd *sheep_get_sockfd(struct node_id *);
+void sheep_put_sockfd(struct node_id *, struct sockfd *);
+void sheep_del_sockfd(struct node_id *, struct sockfd *);
#endif
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index c27d99b..51e0c02 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -272,30 +272,35 @@ void sockfd_cache_add(struct node_id *nid)
dprintf("%s:%d, count %d\n", name, nid->port, n);
}
-static int sockfd_cache_get(struct node_id *nid, char *name, int *ret_idx)
+static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name)
{
struct sockfd_cache_entry *entry;
- int fd;
+ struct sockfd *sfd;
+ int fd, idx;
- entry = sockfd_cache_grab(nid, name, ret_idx);
+ entry = sockfd_cache_grab(nid, name, &idx);
if (!entry)
- return -1;
+ return NULL;
- if (entry->fd[*ret_idx] != -1) {
- dprintf("%s:%d, idx %d\n", name, nid->port, *ret_idx);
- return entry->fd[*ret_idx];
+ if (entry->fd[idx] != -1) {
+ dprintf("%s:%d, idx %d\n", name, nid->port, idx);
+ goto out;
}
/* Create a new cached connection for this vnode */
- dprintf("create connection %s:%d idx %d\n", name, nid->port, *ret_idx);
+ dprintf("create connection %s:%d idx %d\n", name, nid->port, idx);
fd = connect_to(name, nid->port);
if (fd < 0) {
- uatomic_dec(&entry->fd_in_use[*ret_idx]);
- return -1;
+ uatomic_dec(&entry->fd_in_use[idx]);
+ return NULL;
}
- entry->fd[*ret_idx] = fd;
+ entry->fd[idx] = fd;
- return fd;
+out:
+ sfd = xmalloc(sizeof(*sfd));
+ sfd->fd = entry->fd[idx];
+ sfd->idx = idx;
+ return sfd;
}
static void sockfd_cache_put(struct node_id *nid, int idx)
@@ -317,7 +322,7 @@ static void sockfd_cache_put(struct node_id *nid, int idx)
}
/*
- * Return a FD connected to the vnode to the caller
+ * Return a sockfd connected to the vnode to the caller
*
* Try to get a 'long' FD as best, which is cached and never closed. If no FD
* available, we return a 'short' FD which is supposed to be closed by
@@ -325,29 +330,33 @@ static void sockfd_cache_put(struct node_id *nid, int idx)
*
* ret_idx is opaque to the caller, -1 indicates it is a short FD.
*/
-int sheep_get_fd(struct node_id *nid, int *ret_idx)
+struct sockfd *sheep_get_sockfd(struct node_id *nid)
{
char name[INET6_ADDRSTRLEN];
+ struct sockfd *sfd;
int fd;
addr_to_str(name, sizeof(name), nid->addr, 0);
- fd = sockfd_cache_get(nid, name, ret_idx);
- if (fd != -1)
- return fd;
+ sfd = sockfd_cache_get(nid, name);
+ if (sfd)
+ return sfd;
/* Create a fd that is to be closed */
fd = connect_to(name, nid->port);
if (fd < 0) {
dprintf("failed connect to %s:%d\n", name, nid->port);
- return -1;
+ return NULL;
}
+ sfd = xmalloc(sizeof(*sfd));
+ sfd->idx = -1;
+ sfd->fd = fd;
dprintf("%d\n", fd);
- return fd;
+ return sfd;
}
/*
- * Rlease a FD connected to the vnode, which is acquired from sheep_get_fd()
+ * Rlease a sockfd connected to the vnode, which is acquired from sheep_get_fd()
*
* If it is a long FD, just decrease the refcount to make it available again.
* If it is a short FD, close it.
@@ -355,32 +364,36 @@ int sheep_get_fd(struct node_id *nid, int *ret_idx)
* sheep_put_fd() or sheep_del_fd() should be paired with sheep_get_fd()
*/
-void sheep_put_fd(struct node_id *nid, int fd, int idx)
+void sheep_put_sockfd(struct node_id *nid, struct sockfd *sfd)
{
- if (idx == -1) {
- dprintf("%d\n", fd);
- close(fd);
+ if (sfd->idx == -1) {
+ dprintf("%d\n", sfd->fd);
+ close(sfd->fd);
+ free(sfd);
return;
}
- sockfd_cache_put(nid, idx);
+ sockfd_cache_put(nid, sfd->idx);
+ free(sfd);
}
/*
- * Delete a FD connected to the vnode, when vnode is crashed.
+ * Delete a sockfd connected to the vnode, when vnode is crashed.
*
* If it is a long FD, de-refcount it and tres to destroy all the cached FDs of
* this vnode in the cache.
* If it is a short FD, just close it.
*/
-void sheep_del_fd(struct node_id *nid, int fd, int idx)
+void sheep_del_sockfd(struct node_id *nid, struct sockfd *sfd)
{
- if (idx == -1) {
- dprintf("%d\n", fd);
- close(fd);
+ if (sfd->idx == -1) {
+ dprintf("%d\n", sfd->fd);
+ close(sfd->fd);
+ free(sfd);
return;
}
- sockfd_cache_put(nid, idx);
+ sockfd_cache_put(nid, sfd->idx);
sockfd_cache_del(nid);
+ free(sfd);
}
--
1.7.10.2
More information about the sheepdog
mailing list