[Sheepdog] [PATCH 3/3] collie: support request forwarding
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Wed Dec 23 19:25:34 CET 2009
TODO:
- forward directory read/write operations
- move objects when node map is changed
- prevent forwarded requests from using too many worker threads
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
collie/store.c | 116 ++++++++++++++++++++++++++++++++++-----------
collie/vdi.c | 9 ++--
collie/work.h | 2 +-
include/sheepdog_proto.h | 1 +
4 files changed, 94 insertions(+), 34 deletions(-)
diff --git a/collie/store.c b/collie/store.c
index 7c0f6d1..b2cf355 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -29,7 +29,7 @@ static char *obj_dir;
static char *mnt_dir;
static char *zero_block;
-static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
+static int stat_sheep(uint64_t *store_size, uint32_t epoch, uint64_t *store_free)
{
struct statvfs vs;
int ret;
@@ -43,7 +43,8 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
if (ret)
return SD_RES_EIO;
- dir = opendir(obj_dir);
+ snprintf(path, sizeof(path), "%s/obj/%x", obj_dir, epoch);
+ dir = opendir(path);
if (!dir)
return SD_RES_EIO;
@@ -51,7 +52,8 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
continue;
- snprintf(path, sizeof(path), "%s/%s", obj_dir, d->d_name);
+ snprintf(path, sizeof(path), "%s/obj/%x/%s", obj_dir, epoch,
+ d->d_name);
ret = stat(path, &s);
if (ret)
@@ -66,27 +68,26 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
return SD_RES_SUCCESS;
}
-static int read_from_others(struct cluster_info *cluster, uint64_t oid,
- unsigned *rlen, void *buf, uint64_t offset,
+static int read_from_others(struct sheepdog_node_list_entry *e, int nr,
+ uint64_t oid, unsigned *rlen, void *buf,
+ uint64_t offset, uint16_t flags, uint32_t epoch,
int copies)
{
- int i, n, nr, fd, ret;
+ int i, n, fd, ret;
unsigned wlen;
char name[128];
- struct sheepdog_node_list_entry *e;
struct sd_obj_req hdr;
struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
- e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
again:
- nr = build_node_list(&cluster->node_list, e);
-
for (i = 0; i < copies; i++) {
n = obj_to_sheep(e, nr, oid, i);
snprintf(name, sizeof(name), "%d.%d.%d.%d",
e[n].addr[12], e[n].addr[13],
e[n].addr[14], e[n].addr[15]);
+ dprintf("read %" PRIx64 " from %s, %d, %d\n", oid, name, i,
+ copies);
fd = connect_to(name, e[n].port);
if (fd < 0)
@@ -96,9 +97,9 @@ again:
hdr.opcode = SD_OP_READ_OBJ;
hdr.copies = copies;
hdr.oid = oid;
- hdr.epoch = cluster->epoch;
+ hdr.epoch = epoch;
- hdr.flags = 0;
+ hdr.flags = flags;
hdr.data_length = *rlen;
hdr.offset = offset;
@@ -160,15 +161,21 @@ void store_queue_request(struct work *work, int idx)
uint32_t opcode = hdr->opcode;
uint32_t epoch = cluster->epoch;
uint32_t req_epoch = hdr->epoch;
+ uint32_t cow_epoch = req_epoch;
+ uint16_t cow_flags = 0;
struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
int copies;
unsigned rlen;
+ struct sheepdog_node_list_entry e[SD_MAX_NODES];
+ int nr;
/* use le_to_cpu */
- snprintf(path, sizeof(path), "%s/%" PRIx64, obj_dir, oid);
+ snprintf(path, sizeof(path), "%s/obj/%x/%" PRIx64, obj_dir,
+ req_epoch, oid);
- dprintf("%d, %x, %s, %u, %u\n", idx, opcode, path, epoch, req_epoch);
+ dprintf("%d, %x, %s, %u, %u, %d\n", idx, opcode, path, epoch, req_epoch,
+ hdr->copies);
if (list_empty(&cluster->node_list)) {
/* we haven't got SD_OP_GET_NODE_LIST response yet. */
@@ -176,12 +183,13 @@ void store_queue_request(struct work *work, int idx)
goto out;
}
- if (opcode != SD_OP_GET_NODE_LIST) {
+ if (opcode != SD_OP_GET_NODE_LIST && !(hdr->flags & SD_FLAG_CMD_FORWARD)) {
ret = check_epoch(cluster, req);
if (ret != SD_RES_SUCCESS)
goto out;
}
+retry:
switch (opcode) {
case SD_OP_CREATE_AND_WRITE_OBJ:
case SD_OP_WRITE_OBJ:
@@ -192,12 +200,38 @@ void store_queue_request(struct work *work, int idx)
fd = open(path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
if (fd < 0) {
- if (errno == ENOENT)
- ret = SD_RES_NO_OBJ;
- else
+ if (errno != ENOENT) {
ret = SD_RES_UNKNOWN;
+ goto out;
+ }
- goto out;
+ if (opcode == SD_OP_WRITE_OBJ) {
+ if (!(hdr->flags & SD_FLAG_CMD_COW)) {
+ dprintf("force cow %" PRIx64 " %d\n",
+ oid, req_epoch);
+ opcode = SD_OP_CREATE_AND_WRITE_OBJ;
+ hdr->flags |= SD_FLAG_CMD_COW;
+ hdr->cow_oid = oid;
+ cow_epoch = req_epoch - 1;
+ cow_flags = SD_FLAG_CMD_FORWARD;
+ goto retry;
+ }
+ } else {
+ dprintf("forward request %d\n", opcode);
+ nr = read_node_list(req_epoch - 1, e,
+ SD_MAX_NODES);
+ if (nr < 0) {
+ ret = SD_RES_EIO;
+ goto out;
+ }
+ ret = read_from_others(e, nr, oid,
+ &hdr->data_length,
+ req->data, hdr->offset,
+ SD_FLAG_CMD_FORWARD,
+ req_epoch - 1,
+ hdr->copies);
+ goto out;
+ }
}
if (opcode != SD_OP_CREATE_AND_WRITE_OBJ)
@@ -223,15 +257,23 @@ void store_queue_request(struct work *work, int idx)
goto out;
}
- if (!is_data_obj(oid))
- break;
-
+ rlen = SD_DATA_OBJ_SIZE;
if (hdr->flags & SD_FLAG_CMD_COW) {
dprintf("%" PRIu64 "\n", hdr->cow_oid);
- rlen = SD_DATA_OBJ_SIZE;
- ret = read_from_others(cluster, hdr->cow_oid, &rlen, buf,
- 0, hdr->copies);
+ if (epoch == cow_epoch)
+ nr = build_node_list(&cluster->node_list, e);
+ else
+ nr = read_node_list(cow_epoch, e, SD_MAX_NODES);
+
+ if (nr < 0) {
+ ret = SD_RES_EIO;
+ goto out;
+ }
+
+ ret = read_from_others(e, nr, hdr->cow_oid, &rlen, buf,
+ 0, cow_flags, cow_epoch,
+ hdr->copies);
if (ret) {
ret = 1;
@@ -239,13 +281,15 @@ void store_queue_request(struct work *work, int idx)
}
} else {
dprintf("%" PRIu64 "\n", oid);
- memset(buf, 0, SD_DATA_OBJ_SIZE);
+ if (rlen > hdr->data_length)
+ rlen = hdr->data_length;
+ memset(buf, 0, rlen);
}
dprintf("%" PRIu64 "\n", oid);
- ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0);
- if (ret != SD_DATA_OBJ_SIZE) {
+ ret = pwrite64(fd, buf, rlen, 0);
+ if (ret != rlen) {
ret = SD_RES_EIO;
goto out;
}
@@ -299,7 +343,8 @@ void store_queue_request(struct work *work, int idx)
}
break;
case SD_OP_STAT_SHEEP:
- ret = stat_sheep(&nrsp->store_size, &nrsp->store_free);
+ ret = stat_sheep(&nrsp->store_size, req_epoch,
+ &nrsp->store_free);
break;
}
out:
@@ -649,6 +694,13 @@ int init_store(char *dir)
if (ret && errno != EEXIST)
return 1;
+ strcpy(path, dir);
+ strcat(path, "/obj");
+ ret = mkdir(path, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP |
+ S_IWGRP | S_IXGRP);
+ if (ret && errno != EEXIST)
+ return 1;
+
fp = setmntent(MOUNTED, "r");
if (!fp)
return 1;
@@ -718,5 +770,11 @@ int write_node_list(uint32_t epoch, struct sheepdog_node_list_entry *entries,
close(fd);
+ snprintf(path, sizeof(path), "%s/obj/%x", obj_dir, epoch);
+ ret = mkdir(path, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP |
+ S_IWGRP | S_IXGRP);
+ if (ret && errno != EEXIST)
+ return -1;
+
return nr_nodes;
}
diff --git a/collie/vdi.c b/collie/vdi.c
index 31567d0..2a19ad0 100644
--- a/collie/vdi.c
+++ b/collie/vdi.c
@@ -84,7 +84,7 @@ int add_vdi(struct cluster_info *ci, char *name, int len, uint64_t size,
uint64_t *added_oid, uint64_t base_oid, uint32_t tag)
{
struct sheepdog_node_list_entry entries[SD_MAX_NODES];
- int nr_nodes;
+ int nr_nodes, nr_reqs;
uint64_t oid = 0;
int ret;
int copies;
@@ -100,15 +100,16 @@ int add_vdi(struct cluster_info *ci, char *name, int len, uint64_t size,
/* todo */
/* copies = sb->default_nr_copies; */
copies = 3;
- if (copies > nr_nodes)
- copies = nr_nodes;
+ nr_reqs = copies;
+ if (nr_reqs > nr_nodes)
+ nr_reqs = nr_nodes;
req.opcode = SD_OP_SO_NEW_VDI;
req.copies = copies;
req.tag = tag;
ret = exec_reqs(entries, nr_nodes, ci->epoch,
- SD_DIR_OID, (struct sd_req *)&req, name, len, 0, copies);
+ SD_DIR_OID, (struct sd_req *)&req, name, len, 0, nr_reqs);
/* todo: error handling */
diff --git a/collie/work.h b/collie/work.h
index 81d8c56..7020991 100644
--- a/collie/work.h
+++ b/collie/work.h
@@ -1,7 +1,7 @@
#ifndef __WORK_H__
#define __WORK_H__
-#define NR_WORKER_THREAD 4
+#define NR_WORKER_THREAD 16
struct work;
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 4bfb4e5..affc332 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -53,6 +53,7 @@
#define SD_FLAG_CMD_WRITE 0x01
#define SD_FLAG_CMD_COW 0x02
+#define SD_FLAG_CMD_FORWARD 0x04
#define SD_RES_SUCCESS 0x00 /* Success */
#define SD_RES_UNKNOWN 0x01 /* Unknown error */
--
1.6.5
More information about the sheepdog
mailing list