[sheepdog] [PATCH 2/6] sheep: use new sockfd cache
Liu Yuan
namei.unix at gmail.com
Sun Jun 24 14:51:49 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/gateway.c | 55 ++++++++++++++++++++++++++++++++-----------------------
sheep/group.c | 9 ++++++++-
2 files changed, 40 insertions(+), 24 deletions(-)
diff --git a/sheep/gateway.c b/sheep/gateway.c
index 42f028a..c3e5f80 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -58,7 +58,7 @@ read_remote:
if (vnode_is_local(v))
continue;
- fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch);
+ fd = sheep_get_fd(v);
if (fd < 0) {
ret = SD_RES_NETWORK_ERROR;
continue;
@@ -70,10 +70,11 @@ read_remote:
ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen);
if (ret) { /* network errors */
- del_sheep_fd(fd);
+ sheep_del_fd(v, fd);
ret = SD_RES_NETWORK_ERROR;
continue;
} else {
+ sheep_put_fd(v, fd);
memcpy(&req->rp, rsp, sizeof(*rsp));
ret = rsp->result;
break;
@@ -82,6 +83,11 @@ read_remote:
return ret;
}
+struct write_info {
+ struct pollfd pfds[SD_MAX_REDUNDANCY];
+ struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
+};
+
int forward_write_obj_req(struct request *req)
{
int i, fd, ret, pollret;
@@ -93,15 +99,14 @@ int forward_write_obj_req(struct request *req)
struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
uint64_t oid = req->rq.obj.oid;
int nr_copies;
- struct pollfd pfds[SD_MAX_REDUNDANCY];
- int nr_fds, local = 0;
+ int nr_fds = 0, local = 0;
+ struct write_info wi;
dprintf("%"PRIx64"\n", oid);
- nr_fds = 0;
- memset(pfds, 0, sizeof(pfds));
- for (i = 0; i < ARRAY_SIZE(pfds); i++)
- pfds[i].fd = -1;
+ memset(&wi, 0, sizeof(wi));
+ for (i = 0; i < SD_MAX_REDUNDANCY; i++)
+ wi.pfds[i].fd = -1;
memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
@@ -120,24 +125,23 @@ int forward_write_obj_req(struct request *req)
continue;
}
- fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch);
+ fd = sheep_get_fd(v);
if (fd < 0) {
- eprintf("failed to connect to %s:%"PRIu32"\n", name,
- v->port);
ret = SD_RES_NETWORK_ERROR;
goto err;
}
ret = send_req(fd, &fwd_hdr, req->data, &wlen);
if (ret) { /* network errors */
- del_sheep_fd(fd);
+ sheep_del_fd(v, fd);
ret = SD_RES_NETWORK_ERROR;
dprintf("fail %"PRIu32"\n", ret);
goto err;
}
- pfds[nr_fds].fd = fd;
- pfds[nr_fds].events = POLLIN;
+ wi.vnodes[nr_fds] = v;
+ wi.pfds[nr_fds].fd = fd;
+ wi.pfds[nr_fds].events = POLLIN;
nr_fds++;
}
@@ -157,7 +161,7 @@ int forward_write_obj_req(struct request *req)
ret = SD_RES_SUCCESS;
again:
- pollret = poll(pfds, nr_fds, -1);
+ pollret = poll(wi.pfds, nr_fds, -1);
if (pollret < 0) {
if (errno == EINTR)
goto again;
@@ -167,19 +171,20 @@ again:
}
for (i = 0; i < nr_fds; i++) {
- if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP ||
- pfds[i].revents & POLLNVAL) {
- del_sheep_fd(pfds[i].fd);
+ if (wi.pfds[i].revents & POLLERR ||
+ wi.pfds[i].revents & POLLHUP ||
+ wi.pfds[i].revents & POLLNVAL) {
+ sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
ret = SD_RES_NETWORK_ERROR;
break;
}
- if (!(pfds[i].revents & POLLIN))
+ if (!(wi.pfds[i].revents & POLLIN))
continue;
- if (do_read(pfds[i].fd, rsp, sizeof(*rsp))) {
+ if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
eprintf("failed to read a response: %m\n");
- del_sheep_fd(pfds[i].fd);
+ sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
ret = SD_RES_NETWORK_ERROR;
break;
}
@@ -189,11 +194,15 @@ again:
ret = rsp->result;
}
+ sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd);
break;
}
if (i < nr_fds) {
nr_fds--;
- memmove(pfds + i, pfds + i + 1, sizeof(*pfds) * (nr_fds - i));
+ memmove(wi.pfds + i, wi.pfds + i + 1,
+ sizeof(struct pollfd) * (nr_fds - i));
+ memmove(wi.vnodes + i, wi.vnodes + i + 1,
+ sizeof(struct sd_vnode *) * (nr_fds - i));
}
dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
@@ -204,7 +213,7 @@ out:
return ret;
err:
for (i = 0; i < nr_fds; i++)
- del_sheep_fd(pfds[i].fd);
+ sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
return ret;
}
diff --git a/sheep/group.c b/sheep/group.c
index 6721025..ede59ef 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -804,10 +804,13 @@ static void finish_join(struct join_message *msg, struct sd_node *joined,
if (sd_store->purge_obj &&
sd_store->purge_obj() != SD_RES_SUCCESS)
eprintf("WARN: may have stale objects\n");
+
+ sockfd_cache_add_group(nodes, nr_nodes);
}
static void update_cluster_info(struct join_message *msg,
- struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes)
+ struct sd_node *joined, struct sd_node *nodes,
+ size_t nr_nodes)
{
struct vnode_info *old_vnode_info;
@@ -868,6 +871,8 @@ static void update_cluster_info(struct join_message *msg,
if (current_vnode_info->nr_zones >= sys->nr_copies)
sys_stat_set(SD_STATUS_OK);
}
+
+ sockfd_cache_add(joined);
}
/*
@@ -1111,6 +1116,8 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members,
if (current_vnode_info->nr_zones < sys->nr_copies)
sys_stat_set(SD_STATUS_HALT);
}
+
+ sockfd_cache_del((struct node_id *)left);
}
int create_cluster(int port, int64_t zone, int nr_vnodes,
--
1.7.10.2
More information about the sheepdog
mailing list