[Sheepdog] [PATCH] collie: add asynchronous write support
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Sat Apr 17 16:34:26 CEST 2010
This patch uses poll when collie forward write requests to multiple
nodes.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
collie/store.c | 211 ++++++++++++++++++++++++++++++++++++----------
include/sheepdog_proto.h | 1 +
2 files changed, 169 insertions(+), 43 deletions(-)
diff --git a/collie/store.c b/collie/store.c
index 72c079c..0fa711e 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -15,6 +15,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
+#include <poll.h>
#include <sys/xattr.h>
#include <sys/statvfs.h>
@@ -323,21 +324,90 @@ static int read_from_other_sheeps(uint64_t oid, char *buf, int copies)
static int store_queue_request_local(struct request *req, char *buf, uint32_t epoch);
-static int forward_obj_req(struct request *req, char *buf)
+static int forward_read_obj_req(struct request *req, char *buf)
{
int i, n, nr, fd, ret;
unsigned wlen, rlen;
char name[128];
struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
+ struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)hdr;
struct sheepdog_node_list_entry *e;
- struct sd_obj_req hdr2;
- struct sd_rsp *rsp = (struct sd_rsp *)&hdr2;
uint64_t oid = hdr->oid;
int copies;
- uint32_t epoch;
e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
-again:
+ if (!e)
+ return SD_RES_NO_MEM;
+
+ nr = get_ordered_sd_node_list(e);
+
+ copies = hdr->copies;
+
+ /* temporary hack */
+ if (!copies)
+ copies = sys->nr_sobjs;
+
+ hdr->flags |= SD_FLAG_CMD_FORWARD;
+ hdr->epoch = sys->epoch;
+
+ /* TODO: we can do better; we need to check this first */
+ for (i = 0; i < copies; i++) {
+ n = obj_to_sheep(e, nr, oid, i);
+
+ if (is_myself(&e[n])) {
+ ret = store_queue_request_local(req, buf, sys->epoch);
+ goto out;
+ }
+ }
+
+ n = obj_to_sheep(e, nr, oid, 0);
+
+ addr_to_str(name, sizeof(name), e[n].addr, 0);
+
+ fd = connect_to(name, e[n].port);
+ if (fd < 0) {
+ ret = SD_RES_EIO;
+ goto out;
+ }
+
+ wlen = 0;
+ rlen = hdr->data_length;
+
+ ret = exec_req(fd, (struct sd_req *)hdr, req->data, &wlen, &rlen);
+
+ close(fd);
+
+ if (ret) /* network errors */
+ ret = SD_RES_EIO;
+ else {
+ memcpy(&req->rp, rsp, sizeof(*rsp));
+ ret = rsp->result;
+ }
+
+out:
+ free(e);
+
+ return ret;
+}
+
+static int forward_write_obj_req(struct request *req, char *buf)
+{
+ int i, n, nr, fd, ret;
+ unsigned wlen, rlen;
+ char name[128];
+ struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
+ struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+ struct sheepdog_node_list_entry *e;
+ uint64_t oid = hdr->oid;
+ int copies;
+ struct pollfd pfds[SD_MAX_REDUNDANCY];
+ int done, nr_fds, local = 0;
+
+ dprintf("%lx\n", oid);
+ e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
+ if (!e)
+ return SD_RES_NO_MEM;
+
nr = get_ordered_sd_node_list(e);
copies = hdr->copies;
@@ -346,65 +416,117 @@ again:
if (!copies)
copies = sys->nr_sobjs;
+ nr_fds = 0;
+ done = 0;
+ memset(pfds, 0, sizeof(pfds));
+ for (i = 0; i < ARRAY_SIZE(pfds); i++)
+ pfds[i].fd = -1;
+
+ hdr->flags |= SD_FLAG_CMD_FORWARD;
+ hdr->epoch = sys->epoch;
+
+ wlen = hdr->data_length;
+ rlen = 0;
+
for (i = 0; i < copies; i++) {
n = obj_to_sheep(e, nr, oid, i);
addr_to_str(name, sizeof(name), e[n].addr, 0);
- /* TODO: we can do better; we need to chech this first */
if (is_myself(&e[n])) {
- if (hdr->flags & SD_FLAG_CMD_RECOVERY)
- epoch = hdr->tgt_epoch;
- else
- epoch = sys->epoch;
- ret = store_queue_request_local(req, buf, epoch);
- memcpy(rsp, &req->rp, sizeof(*rsp));
- rsp->result = ret;
- goto done;
+ local = 1;
+ continue;
}
fd = connect_to(name, e[n].port);
- if (fd < 0)
- goto again;
-
- memcpy(&hdr2, hdr, sizeof(hdr2));
+ if (fd < 0) {
+ eprintf("failed to connect to %s:%d\n", name, e[n].port);
+ ret = SD_RES_EIO;
+ goto out;
+ }
- if (hdr->flags & SD_FLAG_CMD_WRITE) {
- wlen = hdr->data_length;
- rlen = 0;
- } else {
- wlen = 0;
- rlen = hdr->data_length;
+ ret = send_req(fd, (struct sd_req *)hdr, req->data, &wlen);
+ if (ret) { /* network errors */
+ ret = SD_RES_EIO;
+ dprintf("fail %d\n", ret);
+ goto out;
}
- hdr2.flags |= SD_FLAG_CMD_FORWARD;
- hdr2.epoch = sys->epoch;
+ pfds[nr_fds].fd = fd;
+ pfds[nr_fds].events = POLLIN;
+ nr_fds++;
+ }
- ret = exec_req(fd, (struct sd_req *)&hdr2, req->data, &wlen, &rlen);
+ if (local) {
+ ret = store_queue_request_local(req, buf, sys->epoch);
+ rsp->result = ret;
- close(fd);
+ if (nr_fds == 0) {
+ eprintf("exit %d\n", ret);
+ goto out;
+ }
+
+ if (rsp->result != SD_RES_SUCCESS) {
+ eprintf("fail %d\n", ret);
+ goto out;
+ }
+ }
- if (ret) /* network errors */
+again:
+ ret = poll(pfds, nr_fds, -1);
+
+ if (ret < 0) {
+ if (errno == EINTR)
goto again;
- done:
- if (hdr->flags & SD_FLAG_CMD_WRITE) {
- if (rsp->result != SD_RES_SUCCESS) {
- free(e);
- return rsp->result;
- }
- } else {
- if (rsp->result == SD_RES_SUCCESS) {
- memcpy(&req->rp, rsp, sizeof(req->rp));
- free(e);
- return SD_RES_SUCCESS;
- }
+ ret = SD_RES_EIO;
+ goto out;
+ }
+
+ for (i = 0; i < nr_fds; i++) {
+ if (pfds[i].fd < 0)
+ continue;
+
+ if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP) {
+ ret = SD_RES_EIO;
+ goto out;
}
+
+ if (!(pfds[i].revents & POLLIN))
+ continue;
+
+ ret = do_read(pfds[i].fd, rsp, sizeof(*rsp));
+
+ if (ret) {
+ eprintf("failed to get a rsp, %m\n");
+ ret = SD_RES_EIO;
+ goto out;
+ }
+
+ if (rsp->result != SD_RES_SUCCESS) {
+ eprintf("fail %d\n", rsp->result);
+ ret = rsp->result;
+ goto out;
+ }
+
+ done++;
}
+ dprintf("%lx %d %d\n", oid, nr_fds, done);
+
+ if (done != nr_fds)
+ goto again;
+
+ ret = SD_RES_SUCCESS;
+out:
free(e);
- return (hdr->flags & SD_FLAG_CMD_WRITE) ? SD_RES_SUCCESS: rsp->result;
+ for (i = 0; i < ARRAY_SIZE(pfds); i++){
+ if (pfds[i].fd >= 0)
+ close(pfds[i].fd);
+ }
+
+ return ret;
}
static int check_epoch(struct request *req)
@@ -638,7 +760,10 @@ void store_queue_request(struct work *work, int idx)
}
if (!(hdr->flags & SD_FLAG_CMD_FORWARD)) {
- ret = forward_obj_req(req, buf);
+ if (hdr->flags & SD_FLAG_CMD_WRITE)
+ ret = forward_write_obj_req(req, buf);
+ else
+ ret = forward_read_obj_req(req, buf);
goto out;
}
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index b2c0fb8..c66f8bb 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -20,6 +20,7 @@
#define SD_MAX_NODES 1024
#define SD_MAX_VMS 4096
#define SD_MAX_VDI_LEN 256
+#define SD_MAX_REDUNDANCY 8
/* -> vmon */
--
1.5.6.5
More information about the sheepdog
mailing list