[Sheepdog] [PATCH] sheep: cache socket discriptors
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Mon Oct 25 07:12:55 CEST 2010
This patch reuses socket discriptors when accessing data objects, and
improves latency.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/sdnet.c | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++
sheep/sheep_priv.h | 3 ++
sheep/store.c | 23 +++---------
3 files changed, 103 insertions(+), 17 deletions(-)
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index c8cbd87..9ad0bc7 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -13,6 +13,7 @@
#include <unistd.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
+#include <fcntl.h>
#include "sheep_priv.h"
@@ -670,3 +671,96 @@ int remove_object(struct sheepdog_node_list_entry *e,
return 0;
}
+
+static int set_nonblocking(int fd)
+{
+ int ret;
+
+ ret = fcntl(fd, F_GETFL);
+ if (ret < 0) {
+ eprintf("can't fcntl (F_GETFL), %m\n");
+ close(fd);
+ } else {
+ ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
+ if (ret < 0)
+ eprintf("can't fcntl (O_NONBLOCK), %m\n");
+ }
+
+ return ret;
+}
+
+static int set_nodelay(int fd)
+{
+ int ret, opt;
+
+ opt = 1;
+ ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
+ return ret;
+}
+
+int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx,
+ uint32_t epoch, int worker_idx)
+{
+ static int cached_fds[NR_WORKER_THREAD][SD_MAX_NODES];
+ static uint32_t cached_epoch = 0;
+ int i, j, fd, ret;
+ char name[INET6_ADDRSTRLEN];
+
+ if (cached_epoch == 0) {
+ /* initialize */
+ for (i = 0; i < NR_WORKER_THREAD; i++) {
+ for (j = 0; j < SD_MAX_NODES; j++)
+ cached_fds[i][j] = -1;
+ }
+ cached_epoch = epoch;
+ }
+
+ if (before(epoch, cached_epoch)) {
+ eprintf("requested epoch is smaller than the previous one, %d %d\n",
+ cached_epoch, epoch);
+ return -1;
+ }
+ if (after(epoch, cached_epoch)) {
+ for (i = 0; i < NR_WORKER_THREAD; i++) {
+ for (j = 0; j < SD_MAX_NODES; j++) {
+ if (cached_fds[i][j] >= 0)
+ close(cached_fds[i][j]);
+
+ cached_fds[i][j] = -1;
+ }
+ }
+ cached_epoch = epoch;
+ }
+
+ fd = cached_fds[worker_idx][node_idx];
+ dprintf("%d, %d\n", epoch, fd);
+
+ if (cached_epoch == epoch && fd >= 0) {
+ dprintf("use a cached fd %d\n", fd);
+ return fd;
+ }
+
+ addr_to_str(name, sizeof(name), e[node_idx].addr, 0);
+
+ fd = connect_to(name, e[node_idx].port);
+ if (fd < 0)
+ return -1;
+
+ ret = set_nonblocking(fd);
+ if (ret) {
+ eprintf("%m\n");
+ close(fd);
+ return -1;
+ }
+
+ ret = set_nodelay(fd);
+ if (ret) {
+ eprintf("%m\n");
+ close(fd);
+ return -1;
+ }
+
+ cached_fds[worker_idx][node_idx] = fd;
+
+ return fd;
+}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index c66baf4..a8af306 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -190,6 +190,9 @@ int remove_object(struct sheepdog_node_list_entry *e,
int nodes, uint32_t node_version,
uint64_t oid, int nr);
+int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx,
+ uint32_t epoch, int worker_idx);
+
static inline int is_myself(struct sheepdog_node_list_entry *e)
{
return e->id == sys->this_node.id;
diff --git a/sheep/store.c b/sheep/store.c
index 06e35e7..6043054 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -391,11 +391,10 @@ static int read_from_other_sheeps(struct request *req, uint32_t epoch,
static int store_queue_request_local(struct request *req, uint32_t epoch);
-static int forward_read_obj_req(struct request *req)
+static int forward_read_obj_req(struct request *req, int idx)
{
int i, n, nr, fd, ret;
unsigned wlen, rlen;
- char name[128];
struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)hdr;
struct sheepdog_node_list_entry *e;
@@ -427,9 +426,7 @@ static int forward_read_obj_req(struct request *req)
n = obj_to_sheep(e, nr, oid, 0);
- addr_to_str(name, sizeof(name), e[n].addr, 0);
-
- fd = connect_to(name, e[n].port);
+ fd = get_sheep_fd(e, n, hdr->epoch, idx);
if (fd < 0) {
ret = SD_RES_NETWORK_ERROR;
goto out;
@@ -440,8 +437,6 @@ static int forward_read_obj_req(struct request *req)
ret = exec_req(fd, (struct sd_req *)hdr, req->data, &wlen, &rlen);
- close(fd);
-
if (ret) /* network errors */
ret = SD_RES_NETWORK_ERROR;
else {
@@ -455,7 +450,7 @@ out:
return ret;
}
-static int forward_write_obj_req(struct request *req)
+static int forward_write_obj_req(struct request *req, int idx)
{
int i, n, nr, fd, ret;
unsigned wlen, rlen;
@@ -501,7 +496,7 @@ static int forward_write_obj_req(struct request *req)
continue;
}
- fd = connect_to(name, e[n].port);
+ fd = get_sheep_fd(e, n, hdr->epoch, idx);
if (fd < 0) {
eprintf("failed to connect to %s:%"PRIu32"\n", name, e[n].port);
ret = SD_RES_NETWORK_ERROR;
@@ -582,12 +577,6 @@ again:
ret = SD_RES_SUCCESS;
out:
-
- for (i = 0; i < ARRAY_SIZE(pfds); i++){
- if (pfds[i].fd >= 0)
- close(pfds[i].fd);
- }
-
hdr->flags &= ~SD_FLAG_CMD_DIRECT;
return ret;
@@ -831,9 +820,9 @@ void store_queue_request(struct work *work, int idx)
if (!(hdr->flags & SD_FLAG_CMD_DIRECT)) {
if (hdr->flags & SD_FLAG_CMD_WRITE)
- ret = forward_write_obj_req(req);
+ ret = forward_write_obj_req(req, idx);
else
- ret = forward_read_obj_req(req);
+ ret = forward_read_obj_req(req, idx);
goto out;
}
--
1.5.6.5
More information about the sheepdog
mailing list