[sheepdog] [PATCH 8/8] sockfd cache: group idx and fd as as a struct

Liu Yuan namei.unix at gmail.com
Wed Jun 27 09:25:39 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

- We get a wew APIs for manipulation of sockfd:

struct sockfd *sheep_get_sockfd(struct node_id *);
void sheep_put_sockfd(struct node_id *, struct sockfd *);
void sheep_del_sockfd(struct node_id *, struct sockfd *);

- change write_info accordingly
  - add a sub-structure
  - add a new pfd_info for poll system call

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/gateway.c      |  108 ++++++++++++++++++++++++++++++--------------------
 sheep/sheep_priv.h   |   11 +++--
 sheep/sockfd_cache.c |   75 ++++++++++++++++++++---------------
 3 files changed, 116 insertions(+), 78 deletions(-)

diff --git a/sheep/gateway.c b/sheep/gateway.c
index ccc0e5b..f8a60ea 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -22,7 +22,7 @@
  */
 int forward_read_obj_req(struct request *req)
 {
-	int i, fd, ret = SD_RES_SUCCESS;
+	int i, ret = SD_RES_SUCCESS;
 	unsigned wlen, rlen;
 	struct sd_req fwd_hdr;
 	struct sd_rsp *rsp = (struct sd_rsp *)&fwd_hdr;
@@ -59,15 +59,15 @@ read_remote:
 	 */
 	j = random();
 	for (i = 0; i < nr_copies; i++) {
+		struct sockfd *sfd;
 		int idx = (i + j) % nr_copies;
-		int sock_idx;
 
 		v = obj_vnodes[idx];
 		if (vnode_is_local(v))
 			continue;
 
-		fd = sheep_get_fd(&v->nid, &sock_idx);
-		if (fd < 0) {
+		sfd = sheep_get_sockfd(&v->nid);
+		if (!sfd) {
 			ret = SD_RES_NETWORK_ERROR;
 			continue;
 		}
@@ -75,23 +75,23 @@ read_remote:
 		wlen = 0;
 		rlen = fwd_hdr.data_length;
 
-		ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen);
+		ret = exec_req(sfd->fd, &fwd_hdr, req->data, &wlen, &rlen);
 
 		if (!ret && rsp->result == SD_RES_SUCCESS) {
 			memcpy(&req->rp, rsp, sizeof(*rsp));
 			ret = rsp->result;
-			sheep_put_fd(&v->nid, fd, sock_idx);
+			sheep_put_sockfd(&v->nid, sfd);
 			break; /* Read success */
 		}
 
 		if (ret) {
 			dprintf("remote node might have gone away");
-			sheep_del_fd(&v->nid, fd, sock_idx);
+			sheep_del_sockfd(&v->nid, sfd);
 			ret = SD_RES_NETWORK_ERROR;
 		} else {
 			ret = rsp->result;
 			eprintf("remote read fail %x\n", ret);
-			sheep_put_fd(&v->nid, fd, sock_idx);
+			sheep_put_sockfd(&v->nid, sfd);
 		}
 		/* Reset the hdr for next read */
 		memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
@@ -100,37 +100,48 @@ read_remote:
 	return ret;
 }
 
+struct write_info_entry {
+	struct pollfd pfd;
+	struct node_id *nid;
+	struct sockfd *sfd;
+};
+
 struct write_info {
-	struct pollfd pfds[SD_MAX_REDUNDANCY];
-	struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
-	int sock_idx[SD_MAX_REDUNDANCY];
+	struct write_info_entry ent[SD_MAX_REDUNDANCY];
 	int nr_sent;
 };
 
-static inline void update_write_info(struct write_info *wi, int pos)
+static inline void write_info_update(struct write_info *wi, int pos)
 {
 	dprintf("%d, %d\n", wi->nr_sent, pos);
 	wi->nr_sent--;
-	memmove(wi->pfds + pos, wi->pfds + pos + 1,
-		sizeof(struct pollfd) * (wi->nr_sent - pos));
-	memmove(wi->vnodes + pos, wi->vnodes + pos + 1,
-		sizeof(struct sd_vnode *) * (wi->nr_sent - pos));
-	memmove(wi->sock_idx + pos, wi->sock_idx + pos + 1,
-		sizeof(int) * (wi->nr_sent - pos));
+	memmove(wi->ent + pos, wi->ent + pos + 1,
+		sizeof(struct write_info_entry) * (wi->nr_sent - pos));
 }
 
 static inline void finish_one_write(struct write_info *wi, int i)
 {
-	sheep_put_fd(&wi->vnodes[i]->nid, wi->pfds[i].fd,
-		     wi->sock_idx[i]);
-	update_write_info(wi, i);
+	sheep_put_sockfd(wi->ent[i].nid, wi->ent[i].sfd);
+	write_info_update(wi, i);
 }
 
 static inline void finish_one_write_err(struct write_info *wi, int i)
 {
-	sheep_del_fd(&wi->vnodes[i]->nid, wi->pfds[i].fd,
-		     wi->sock_idx[i]);
-	update_write_info(wi, i);
+	sheep_del_sockfd(wi->ent[i].nid, wi->ent[i].sfd);
+	write_info_update(wi, i);
+}
+
+struct pfd_info {
+	struct pollfd pfds[SD_MAX_REDUNDANCY];
+	int nr;
+};
+
+static inline void pfd_info_init(struct write_info *wi, struct pfd_info *pi)
+{
+	int i;
+	for (i = 0; i < wi->nr_sent; i++)
+		pi->pfds[i] = wi->ent[i].pfd;
+	pi->nr = wi->nr_sent;
 }
 
 /*
@@ -144,8 +155,10 @@ static inline void finish_one_write_err(struct write_info *wi, int i)
 static int wait_forward_write(struct write_info *wi, struct sd_rsp *rsp)
 {
 	int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i;
+	struct pfd_info pi;;
 again:
-	pollret = poll(wi->pfds, wi->nr_sent, -1);
+	pfd_info_init(wi, &pi);
+	pollret = poll(pi.pfds, pi.nr, -1);
 	if (pollret < 0) {
 		if (errno == EINTR)
 			goto again;
@@ -155,16 +168,16 @@ again:
 
 	nr_sent = wi->nr_sent;
 	for (i = 0; i < nr_sent; i++)
-		if (wi->pfds[i].revents & POLLIN)
+		if (pi.pfds[i].revents & POLLIN)
 			break;
 	if (i < nr_sent) {
-		int re = wi->pfds[i].revents;
+		int re = pi.pfds[i].revents;
 		dprintf("%d, revents %x\n", i, re);
 		if (re & (POLLERR | POLLHUP | POLLNVAL)) {
 			err_ret = SD_RES_NETWORK_ERROR;
 			finish_one_write_err(wi, i);
 		} else if (re & POLLIN) {
-			if (do_read(wi->pfds[i].fd, rsp, sizeof(*rsp))) {
+			if (do_read(pi.pfds[i].fd, rsp, sizeof(*rsp))) {
 				eprintf("remote node might have gone away\n");
 				err_ret = SD_RES_NETWORK_ERROR;
 				finish_one_write_err(wi, i);
@@ -188,19 +201,28 @@ finish_write:
 	return err_ret;
 }
 
-static void init_write_info(struct write_info *wi)
+static inline void write_info_init(struct write_info *wi)
 {
 	int i;
-	for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
-		wi->pfds[i].fd = -1;
-		wi->vnodes[i] = NULL;
-	}
+	for (i = 0; i < SD_MAX_REDUNDANCY; i++)
+		wi->ent[i].pfd.fd = -1;
 	wi->nr_sent = 0;
 }
 
+static inline void
+write_info_advance(struct write_info *wi, struct sd_vnode *v,
+		   struct sockfd *sfd)
+{
+	wi->ent[wi->nr_sent].nid = &v->nid;
+	wi->ent[wi->nr_sent].pfd.fd = sfd->fd;
+	wi->ent[wi->nr_sent].pfd.events = POLLIN;
+	wi->ent[wi->nr_sent].sfd = sfd;
+	wi->nr_sent++;
+}
+
 int forward_write_obj_req(struct request *req)
 {
-	int i, fd, err_ret = SD_RES_SUCCESS, ret, local = -1;
+	int i, err_ret = SD_RES_SUCCESS, ret, local = -1;
 	unsigned wlen;
 	struct sd_req fwd_hdr;
 	struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
@@ -212,7 +234,7 @@ int forward_write_obj_req(struct request *req)
 
 	dprintf("%"PRIx64"\n", oid);
 
-	init_write_info(&wi);
+	write_info_init(&wi);
 	memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
 	fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
 
@@ -222,30 +244,28 @@ int forward_write_obj_req(struct request *req)
 	oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
 
 	for (i = 0; i < nr_copies; i++) {
+		struct sockfd *sfd;
+
 		v = obj_vnodes[i];
 		if (vnode_is_local(v)) {
 			local = i;
 			continue;
 		}
 
-		fd = sheep_get_fd(&v->nid, &wi.sock_idx[wi.nr_sent]);
-		if (fd < 0) {
+		sfd = sheep_get_sockfd(&v->nid);
+		if (!sfd) {
 			err_ret = SD_RES_NETWORK_ERROR;
 			break;
 		}
 
-		ret = send_req(fd, &fwd_hdr, req->data, &wlen);
+		ret = send_req(sfd->fd, &fwd_hdr, req->data, &wlen);
 		if (ret) {
-			sheep_del_fd(&v->nid, fd, wi.sock_idx[wi.nr_sent]);
+			sheep_del_sockfd(&v->nid, sfd);
 			err_ret = SD_RES_NETWORK_ERROR;
 			dprintf("fail %d\n", ret);
 			break;
 		}
-
-		wi.vnodes[wi.nr_sent] = v;
-		wi.pfds[wi.nr_sent].fd = fd;
-		wi.pfds[wi.nr_sent].events = POLLIN;
-		wi.nr_sent++;
+		write_info_advance(&wi, v, sfd);
 	}
 
 	if (local != -1 && err_ret == SD_RES_SUCCESS) {
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index a5b0399..a497b24 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -399,12 +399,17 @@ void object_cache_delete(uint32_t vid);
 int object_cache_init(const char *p);
 
 /* sockfd_cache */
+struct sockfd {
+	int fd;
+	int idx;
+};
+
 void sockfd_cache_del(struct node_id *);
 void sockfd_cache_add(struct node_id *);
 void sockfd_cache_add_group(struct sd_node *nodes, int nr);
 
-int sheep_get_fd(struct node_id *, int *);
-void sheep_put_fd(struct node_id *, int fd, int);
-void sheep_del_fd(struct node_id *, int fd, int);
+struct sockfd *sheep_get_sockfd(struct node_id *);
+void sheep_put_sockfd(struct node_id *, struct sockfd *);
+void sheep_del_sockfd(struct node_id *, struct sockfd *);
 
 #endif
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index c27d99b..51e0c02 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -272,30 +272,35 @@ void sockfd_cache_add(struct node_id *nid)
 	dprintf("%s:%d, count %d\n", name, nid->port, n);
 }
 
-static int sockfd_cache_get(struct node_id *nid, char *name, int *ret_idx)
+static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name)
 {
 	struct sockfd_cache_entry *entry;
-	int fd;
+	struct sockfd *sfd;
+	int fd, idx;
 
-	entry = sockfd_cache_grab(nid, name, ret_idx);
+	entry = sockfd_cache_grab(nid, name, &idx);
 	if (!entry)
-		return -1;
+		return NULL;
 
-	if (entry->fd[*ret_idx] != -1) {
-		dprintf("%s:%d, idx %d\n", name, nid->port, *ret_idx);
-		return entry->fd[*ret_idx];
+	if (entry->fd[idx] != -1) {
+		dprintf("%s:%d, idx %d\n", name, nid->port, idx);
+		goto out;
 	}
 
 	/* Create a new cached connection for this vnode */
-	dprintf("create connection %s:%d idx %d\n", name, nid->port, *ret_idx);
+	dprintf("create connection %s:%d idx %d\n", name, nid->port, idx);
 	fd = connect_to(name, nid->port);
 	if (fd < 0) {
-		uatomic_dec(&entry->fd_in_use[*ret_idx]);
-		return -1;
+		uatomic_dec(&entry->fd_in_use[idx]);
+		return NULL;
 	}
-	entry->fd[*ret_idx] = fd;
+	entry->fd[idx] = fd;
 
-	return fd;
+out:
+	sfd = xmalloc(sizeof(*sfd));
+	sfd->fd = entry->fd[idx];
+	sfd->idx = idx;
+	return sfd;
 }
 
 static void sockfd_cache_put(struct node_id *nid, int idx)
@@ -317,7 +322,7 @@ static void sockfd_cache_put(struct node_id *nid, int idx)
 }
 
 /*
- * Return a FD connected to the vnode to the caller
+ * Return a sockfd connected to the vnode to the caller
  *
  * Try to get a 'long' FD as best, which is cached and never closed. If no FD
  * available, we return a 'short' FD which is supposed to be closed by
@@ -325,29 +330,33 @@ static void sockfd_cache_put(struct node_id *nid, int idx)
  *
  * ret_idx is opaque to the caller, -1 indicates it is a short FD.
  */
-int sheep_get_fd(struct node_id *nid, int *ret_idx)
+struct sockfd *sheep_get_sockfd(struct node_id *nid)
 {
 	char name[INET6_ADDRSTRLEN];
+	struct sockfd *sfd;
 	int fd;
 
 	addr_to_str(name, sizeof(name), nid->addr, 0);
-	fd = sockfd_cache_get(nid, name, ret_idx);
-	if (fd != -1)
-		return fd;
+	sfd = sockfd_cache_get(nid, name);
+	if (sfd)
+		return sfd;
 
 	/* Create a fd that is to be closed */
 	fd = connect_to(name, nid->port);
 	if (fd < 0) {
 		dprintf("failed connect to %s:%d\n", name, nid->port);
-		return -1;
+		return NULL;
 	}
 
+	sfd = xmalloc(sizeof(*sfd));
+	sfd->idx = -1;
+	sfd->fd = fd;
 	dprintf("%d\n", fd);
-	return fd;
+	return sfd;
 }
 
 /*
- * Rlease a FD connected to the vnode, which is acquired from sheep_get_fd()
+ * Rlease a sockfd connected to the vnode, which is acquired from sheep_get_fd()
  *
  * If it is a long FD, just decrease the refcount to make it available again.
  * If it is a short FD, close it.
@@ -355,32 +364,36 @@ int sheep_get_fd(struct node_id *nid, int *ret_idx)
  * sheep_put_fd() or sheep_del_fd() should be paired with sheep_get_fd()
  */
 
-void sheep_put_fd(struct node_id *nid, int fd, int idx)
+void sheep_put_sockfd(struct node_id *nid, struct sockfd *sfd)
 {
-	if (idx == -1) {
-		dprintf("%d\n", fd);
-		close(fd);
+	if (sfd->idx == -1) {
+		dprintf("%d\n", sfd->fd);
+		close(sfd->fd);
+		free(sfd);
 		return;
 	}
 
-	sockfd_cache_put(nid, idx);
+	sockfd_cache_put(nid, sfd->idx);
+	free(sfd);
 }
 
 /*
- * Delete a FD connected to the vnode, when vnode is crashed.
+ * Delete a sockfd connected to the vnode, when vnode is crashed.
  *
  * If it is a long FD, de-refcount it and tres to destroy all the cached FDs of
  * this vnode in the cache.
  * If it is a short FD, just close it.
  */
-void sheep_del_fd(struct node_id *nid, int fd, int idx)
+void sheep_del_sockfd(struct node_id *nid, struct sockfd *sfd)
 {
-	if (idx == -1) {
-		dprintf("%d\n", fd);
-		close(fd);
+	if (sfd->idx == -1) {
+		dprintf("%d\n", sfd->fd);
+		close(sfd->fd);
+		free(sfd);
 		return;
 	}
 
-	sockfd_cache_put(nid, idx);
+	sockfd_cache_put(nid, sfd->idx);
 	sockfd_cache_del(nid);
+	free(sfd);
 }
-- 
1.7.10.2




More information about the sheepdog mailing list