TODO: - forward directory read/write operations - move objects when node map is changed - prevent forwarded requests from using too many worker threads Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- collie/store.c | 116 ++++++++++++++++++++++++++++++++++----------- collie/vdi.c | 9 ++-- collie/work.h | 2 +- include/sheepdog_proto.h | 1 + 4 files changed, 94 insertions(+), 34 deletions(-) diff --git a/collie/store.c b/collie/store.c index 7c0f6d1..b2cf355 100644 --- a/collie/store.c +++ b/collie/store.c @@ -29,7 +29,7 @@ static char *obj_dir; static char *mnt_dir; static char *zero_block; -static int stat_sheep(uint64_t *store_size, uint64_t *store_free) +static int stat_sheep(uint64_t *store_size, uint32_t epoch, uint64_t *store_free) { struct statvfs vs; int ret; @@ -43,7 +43,8 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free) if (ret) return SD_RES_EIO; - dir = opendir(obj_dir); + snprintf(path, sizeof(path), "%s/obj/%x", obj_dir, epoch); + dir = opendir(path); if (!dir) return SD_RES_EIO; @@ -51,7 +52,8 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free) if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, "..")) continue; - snprintf(path, sizeof(path), "%s/%s", obj_dir, d->d_name); + snprintf(path, sizeof(path), "%s/obj/%x/%s", obj_dir, epoch, + d->d_name); ret = stat(path, &s); if (ret) @@ -66,27 +68,26 @@ static int stat_sheep(uint64_t *store_size, uint64_t *store_free) return SD_RES_SUCCESS; } -static int read_from_others(struct cluster_info *cluster, uint64_t oid, - unsigned *rlen, void *buf, uint64_t offset, +static int read_from_others(struct sheepdog_node_list_entry *e, int nr, + uint64_t oid, unsigned *rlen, void *buf, + uint64_t offset, uint16_t flags, uint32_t epoch, int copies) { - int i, n, nr, fd, ret; + int i, n, fd, ret; unsigned wlen; char name[128]; - struct sheepdog_node_list_entry *e; struct sd_obj_req hdr; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; - e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry)); again: - nr = build_node_list(&cluster->node_list, e); - for (i = 0; i < copies; i++) { n = obj_to_sheep(e, nr, oid, i); snprintf(name, sizeof(name), "%d.%d.%d.%d", e[n].addr[12], e[n].addr[13], e[n].addr[14], e[n].addr[15]); + dprintf("read %" PRIx64 " from %s, %d, %d\n", oid, name, i, + copies); fd = connect_to(name, e[n].port); if (fd < 0) @@ -96,9 +97,9 @@ again: hdr.opcode = SD_OP_READ_OBJ; hdr.copies = copies; hdr.oid = oid; - hdr.epoch = cluster->epoch; + hdr.epoch = epoch; - hdr.flags = 0; + hdr.flags = flags; hdr.data_length = *rlen; hdr.offset = offset; @@ -160,15 +161,21 @@ void store_queue_request(struct work *work, int idx) uint32_t opcode = hdr->opcode; uint32_t epoch = cluster->epoch; uint32_t req_epoch = hdr->epoch; + uint32_t cow_epoch = req_epoch; + uint16_t cow_flags = 0; struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp; int copies; unsigned rlen; + struct sheepdog_node_list_entry e[SD_MAX_NODES]; + int nr; /* use le_to_cpu */ - snprintf(path, sizeof(path), "%s/%" PRIx64, obj_dir, oid); + snprintf(path, sizeof(path), "%s/obj/%x/%" PRIx64, obj_dir, + req_epoch, oid); - dprintf("%d, %x, %s, %u, %u\n", idx, opcode, path, epoch, req_epoch); + dprintf("%d, %x, %s, %u, %u, %d\n", idx, opcode, path, epoch, req_epoch, + hdr->copies); if (list_empty(&cluster->node_list)) { /* we haven't got SD_OP_GET_NODE_LIST response yet. */ @@ -176,12 +183,13 @@ void store_queue_request(struct work *work, int idx) goto out; } - if (opcode != SD_OP_GET_NODE_LIST) { + if (opcode != SD_OP_GET_NODE_LIST && !(hdr->flags & SD_FLAG_CMD_FORWARD)) { ret = check_epoch(cluster, req); if (ret != SD_RES_SUCCESS) goto out; } +retry: switch (opcode) { case SD_OP_CREATE_AND_WRITE_OBJ: case SD_OP_WRITE_OBJ: @@ -192,12 +200,38 @@ void store_queue_request(struct work *work, int idx) fd = open(path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); if (fd < 0) { - if (errno == ENOENT) - ret = SD_RES_NO_OBJ; - else + if (errno != ENOENT) { ret = SD_RES_UNKNOWN; + goto out; + } - goto out; + if (opcode == SD_OP_WRITE_OBJ) { + if (!(hdr->flags & SD_FLAG_CMD_COW)) { + dprintf("force cow %" PRIx64 " %d\n", + oid, req_epoch); + opcode = SD_OP_CREATE_AND_WRITE_OBJ; + hdr->flags |= SD_FLAG_CMD_COW; + hdr->cow_oid = oid; + cow_epoch = req_epoch - 1; + cow_flags = SD_FLAG_CMD_FORWARD; + goto retry; + } + } else { + dprintf("forward request %d\n", opcode); + nr = read_node_list(req_epoch - 1, e, + SD_MAX_NODES); + if (nr < 0) { + ret = SD_RES_EIO; + goto out; + } + ret = read_from_others(e, nr, oid, + &hdr->data_length, + req->data, hdr->offset, + SD_FLAG_CMD_FORWARD, + req_epoch - 1, + hdr->copies); + goto out; + } } if (opcode != SD_OP_CREATE_AND_WRITE_OBJ) @@ -223,15 +257,23 @@ void store_queue_request(struct work *work, int idx) goto out; } - if (!is_data_obj(oid)) - break; - + rlen = SD_DATA_OBJ_SIZE; if (hdr->flags & SD_FLAG_CMD_COW) { dprintf("%" PRIu64 "\n", hdr->cow_oid); - rlen = SD_DATA_OBJ_SIZE; - ret = read_from_others(cluster, hdr->cow_oid, &rlen, buf, - 0, hdr->copies); + if (epoch == cow_epoch) + nr = build_node_list(&cluster->node_list, e); + else + nr = read_node_list(cow_epoch, e, SD_MAX_NODES); + + if (nr < 0) { + ret = SD_RES_EIO; + goto out; + } + + ret = read_from_others(e, nr, hdr->cow_oid, &rlen, buf, + 0, cow_flags, cow_epoch, + hdr->copies); if (ret) { ret = 1; @@ -239,13 +281,15 @@ void store_queue_request(struct work *work, int idx) } } else { dprintf("%" PRIu64 "\n", oid); - memset(buf, 0, SD_DATA_OBJ_SIZE); + if (rlen > hdr->data_length) + rlen = hdr->data_length; + memset(buf, 0, rlen); } dprintf("%" PRIu64 "\n", oid); - ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0); - if (ret != SD_DATA_OBJ_SIZE) { + ret = pwrite64(fd, buf, rlen, 0); + if (ret != rlen) { ret = SD_RES_EIO; goto out; } @@ -299,7 +343,8 @@ void store_queue_request(struct work *work, int idx) } break; case SD_OP_STAT_SHEEP: - ret = stat_sheep(&nrsp->store_size, &nrsp->store_free); + ret = stat_sheep(&nrsp->store_size, req_epoch, + &nrsp->store_free); break; } out: @@ -649,6 +694,13 @@ int init_store(char *dir) if (ret && errno != EEXIST) return 1; + strcpy(path, dir); + strcat(path, "/obj"); + ret = mkdir(path, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | + S_IWGRP | S_IXGRP); + if (ret && errno != EEXIST) + return 1; + fp = setmntent(MOUNTED, "r"); if (!fp) return 1; @@ -718,5 +770,11 @@ int write_node_list(uint32_t epoch, struct sheepdog_node_list_entry *entries, close(fd); + snprintf(path, sizeof(path), "%s/obj/%x", obj_dir, epoch); + ret = mkdir(path, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | + S_IWGRP | S_IXGRP); + if (ret && errno != EEXIST) + return -1; + return nr_nodes; } diff --git a/collie/vdi.c b/collie/vdi.c index 31567d0..2a19ad0 100644 --- a/collie/vdi.c +++ b/collie/vdi.c @@ -84,7 +84,7 @@ int add_vdi(struct cluster_info *ci, char *name, int len, uint64_t size, uint64_t *added_oid, uint64_t base_oid, uint32_t tag) { struct sheepdog_node_list_entry entries[SD_MAX_NODES]; - int nr_nodes; + int nr_nodes, nr_reqs; uint64_t oid = 0; int ret; int copies; @@ -100,15 +100,16 @@ int add_vdi(struct cluster_info *ci, char *name, int len, uint64_t size, /* todo */ /* copies = sb->default_nr_copies; */ copies = 3; - if (copies > nr_nodes) - copies = nr_nodes; + nr_reqs = copies; + if (nr_reqs > nr_nodes) + nr_reqs = nr_nodes; req.opcode = SD_OP_SO_NEW_VDI; req.copies = copies; req.tag = tag; ret = exec_reqs(entries, nr_nodes, ci->epoch, - SD_DIR_OID, (struct sd_req *)&req, name, len, 0, copies); + SD_DIR_OID, (struct sd_req *)&req, name, len, 0, nr_reqs); /* todo: error handling */ diff --git a/collie/work.h b/collie/work.h index 81d8c56..7020991 100644 --- a/collie/work.h +++ b/collie/work.h @@ -1,7 +1,7 @@ #ifndef __WORK_H__ #define __WORK_H__ -#define NR_WORKER_THREAD 4 +#define NR_WORKER_THREAD 16 struct work; diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h index 4bfb4e5..affc332 100644 --- a/include/sheepdog_proto.h +++ b/include/sheepdog_proto.h @@ -53,6 +53,7 @@ #define SD_FLAG_CMD_WRITE 0x01 #define SD_FLAG_CMD_COW 0x02 +#define SD_FLAG_CMD_FORWARD 0x04 #define SD_RES_SUCCESS 0x00 /* Success */ #define SD_RES_UNKNOWN 0x01 /* Unknown error */ -- 1.6.5 |