[Sheepdog] [PATCH 3/6] collie: add disk I/O manager
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Tue Dec 1 19:35:20 CET 2009
This is originally a part of sheep.
Currently, only btrfs is supported as a local file system, but
we think of removing this restriction.
Data recovery will be also supported soon.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
collie/store.c | 366 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 366 insertions(+), 0 deletions(-)
create mode 100644 collie/store.c
diff --git a/collie/store.c b/collie/store.c
new file mode 100644
index 0000000..6b66a92
--- /dev/null
+++ b/collie/store.c
@@ -0,0 +1,366 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <mntent.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/xattr.h>
+#include <sys/statvfs.h>
+
+#include "collie.h"
+#include "meta.h"
+
+static char *obj_dir;
+static char *mnt_dir;
+static char *zero_block;
+
+static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
+{
+ struct statvfs vs;
+ int ret;
+ DIR *dir;
+ struct dirent *d;
+ uint64_t used = 0;
+ struct stat s;
+ char path[1024];
+
+ ret = statvfs(mnt_dir, &vs);
+ if (ret)
+ return SD_RES_EIO;
+
+ dir = opendir(obj_dir);
+ if (!dir)
+ return SD_RES_EIO;
+
+ while ((d = readdir(dir))) {
+ if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", obj_dir, d->d_name);
+
+ ret = stat(path, &s);
+ if (ret)
+ continue;
+
+ used += s.st_size;
+ }
+
+ *store_size = vs.f_frsize * vs.f_bfree;
+ *store_free = vs.f_frsize * vs.f_bfree - used;
+
+ return SD_RES_SUCCESS;
+}
+
+static int read_from_one(struct cluster_info *cluster, uint64_t oid,
+ unsigned *rlen, void *buf, uint64_t offset)
+{
+ int i, n, nr, fd, ret;
+ unsigned wlen;
+ char name[128];
+ struct sheepdog_node_list_entry *e;
+ struct sd_obj_req hdr;
+ struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
+
+ e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
+again:
+ nr = build_node_list(&cluster->node_list, e);
+
+ for (i = 0; i < nr; i++) {
+ n = obj_to_sheep(e, nr, oid, i);
+
+ snprintf(name, sizeof(name), "%d.%d.%d.%d",
+ e[n].addr[12], e[n].addr[13],
+ e[n].addr[14], e[n].addr[15]);
+
+ fd = connect_to(name, e[n].port);
+ if (fd < 0)
+ continue;
+
+ memset(&hdr, 0, sizeof(hdr));
+ hdr.opcode = SD_OP_READ_OBJ;
+ hdr.oid = oid;
+ hdr.epoch = cluster->epoch;
+
+ hdr.flags = 0;
+ hdr.data_length = *rlen;
+ hdr.offset = offset;
+
+ ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, rlen);
+
+ close(fd);
+
+ if (ret)
+ continue;
+
+ switch (rsp->result) {
+ case SD_RES_SUCCESS:
+ return 0;
+ case SD_RES_OLD_NODE_VER:
+ case SD_RES_NEW_NODE_VER:
+ /* waits for the node list timer */
+ sleep(2);
+ goto again;
+ break;
+ default:
+ ;
+ }
+ }
+
+ return -1;
+}
+
+static int read_from_other_sheeps(struct cluster_info *cluster,
+ uint64_t oid, char *buf, int copies)
+{
+ int ret;
+ unsigned int rlen;
+
+ rlen = SD_DATA_OBJ_SIZE;
+
+ ret = read_from_one(cluster, oid, &rlen, buf, 0);
+
+ return ret;
+}
+
+void store_queue_request(struct work *work, int idx)
+{
+ struct request *req = container_of(work, struct request, work);
+ struct cluster_info *cluster = req->ci->cluster;
+ char path[1024];
+ int fd = -1, ret = SD_RES_SUCCESS;
+ int flags = O_RDWR;
+ char *buf = zero_block + idx * SD_DATA_OBJ_SIZE;
+ char aname[] = "user.sheepdog.copies";
+ struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
+ struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+ uint64_t oid = hdr->oid;
+ uint32_t opcode = hdr->opcode;
+ uint32_t epoch = cluster->epoch;
+ uint32_t req_epoch = hdr->epoch;
+ struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
+ int copies;
+
+ /* use le_to_cpu */
+
+ snprintf(path, sizeof(path), "%s/%" PRIx64, obj_dir, oid);
+
+ dprintf("%d, %x, %s, %u, %u\n", idx, opcode, path, epoch, req_epoch);
+
+ if (list_empty(&cluster->node_list)) {
+ /* we haven't got SD_OP_GET_NODE_LIST response yet. */
+ ret = SD_RES_SYSTEM_ERROR;
+ goto out;
+ }
+
+ if (opcode != SD_OP_GET_NODE_LIST) {
+ if (before(req_epoch, epoch)) {
+ ret = SD_RES_OLD_NODE_VER;
+ eprintf("old node version %u %u, %x %" PRIx64 "\n",
+ epoch, req_epoch, opcode, oid);
+ goto out;
+ } else if (after(req_epoch, epoch)) {
+ ret = SD_RES_NEW_NODE_VER;
+ eprintf("new node version %u %u %x %" PRIx64 "\n",
+ epoch, req_epoch, opcode, oid);
+ goto out;
+ }
+ }
+
+ switch (opcode) {
+ case SD_OP_CREATE_AND_WRITE_OBJ:
+ case SD_OP_WRITE_OBJ:
+ case SD_OP_READ_OBJ:
+ case SD_OP_SYNC_OBJ:
+ if (opcode == SD_OP_CREATE_AND_WRITE_OBJ)
+ flags |= O_CREAT;
+
+ fd = open(path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+ if (fd < 0) {
+ if (errno == ENOENT)
+ ret = SD_RES_NO_OBJ;
+ else
+ ret = SD_RES_UNKNOWN;
+
+ goto out;
+ }
+
+ if (opcode != SD_OP_CREATE_AND_WRITE_OBJ)
+ break;
+
+ if (!hdr->copies) {
+ eprintf("zero copies is invalid\n");
+ ret = SD_RES_INVALID_PARMS;
+ goto out;
+ }
+
+ ret = ftruncate(fd, 0);
+ if (ret) {
+ ret = SD_RES_EIO;
+ goto out;
+ }
+
+ ret = fsetxattr(fd, aname, &hdr->copies,
+ sizeof(hdr->copies), 0);
+ if (ret) {
+ eprintf("use 'user_xattr' option?\n");
+ ret = SD_RES_SYSTEM_ERROR;
+ goto out;
+ }
+
+ if (!is_data_obj(oid))
+ break;
+
+ if (hdr->flags & SD_FLAG_CMD_COW) {
+ dprintf("%" PRIu64 "\n", hdr->cow_oid);
+
+ ret = read_from_other_sheeps(cluster,
+ hdr->cow_oid, buf,
+ hdr->copies);
+ if (ret) {
+ ret = 1;
+ goto out;
+ }
+ } else {
+ dprintf("%" PRIu64 "\n", oid);
+ memset(buf, 0, SD_DATA_OBJ_SIZE);
+ }
+
+ dprintf("%" PRIu64 "\n", oid);
+
+ ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0);
+ if (ret != SD_DATA_OBJ_SIZE) {
+ ret = SD_RES_EIO;
+ goto out;
+ }
+ default:
+ break;
+ }
+
+ switch (opcode) {
+ case SD_OP_REMOVE_OBJ:
+ ret = unlink(path);
+ if (ret)
+ ret = 1;
+ break;
+ case SD_OP_READ_OBJ:
+ /*
+ * TODO: should be optional (we can use the flags) for
+ * performance; qemu doesn't always need the copies.
+ */
+ copies = 0;
+ ret = fgetxattr(fd, aname, &copies, sizeof(copies));
+ if (ret != sizeof(copies)) {
+ ret = SD_RES_SYSTEM_ERROR;
+ goto out;
+ }
+
+ ret = pread64(fd, req->data, hdr->data_length, hdr->offset);
+ if (ret < 0)
+ ret = SD_RES_EIO;
+ else {
+ rsp->data_length = ret;
+ rsp->copies = copies;
+ ret = SD_RES_SUCCESS;
+ }
+ break;
+ case SD_OP_CREATE_AND_WRITE_OBJ:
+ case SD_OP_WRITE_OBJ:
+ ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
+ if (ret != hdr->data_length) {
+ ret = SD_RES_EIO;
+ goto out;
+ } else
+ ret = SD_RES_SUCCESS;
+ break;
+ case SD_OP_SYNC_OBJ:
+ ret = fsync(fd);
+ if (ret) {
+ if (errno == EIO)
+ ret = SD_RES_EIO;
+ else
+ ret = SD_RES_UNKNOWN;
+ }
+ break;
+ case SD_OP_STAT_SHEEP:
+ ret = stat_sheep(&nrsp->store_size, &nrsp->store_free);
+ break;
+ }
+out:
+ if (ret != SD_RES_SUCCESS) {
+ dprintf("failed, %d, %d, %x, %s, %u, %u\n", ret, idx, opcode,
+ path, epoch, req_epoch);
+
+ rsp->result = ret;
+ }
+
+ if (fd != -1)
+ close(fd);
+}
+
+int init_store(char *dir)
+{
+ int ret;
+ struct mntent *mnt;
+ struct stat s, ms;
+ FILE *fp;
+
+ ret = stat(dir, &s);
+ if (ret) {
+ if (errno == ENOENT) {
+ ret = mkdir(dir, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP |
+ S_IWGRP | S_IXGRP);
+ if (ret) {
+ eprintf("can't create the object dir %s, %m\n",
+ dir);
+ return 1;
+ } else {
+ ret = stat(dir, &s);
+ if (ret)
+ return 1;
+
+ eprintf("created the object dir %s\n", dir);
+ }
+ } else {
+ eprintf("can't handle the object dir %s, %m\n", dir);
+ return 1;
+ }
+ } else if (!S_ISDIR(s.st_mode)) {
+ eprintf("%s is not a directory\n", dir);
+ return 1;
+ }
+
+ obj_dir = dir;
+
+ fp = setmntent(MOUNTED, "r");
+ if (!fp)
+ return 1;
+
+ while ((mnt = getmntent(fp))) {
+ ret = stat(mnt->mnt_dir, &ms);
+ if (ret)
+ continue;
+
+ if (ms.st_dev == s.st_dev) {
+ mnt_dir = strdup(mnt->mnt_dir);
+ break;
+ }
+ }
+
+ endmntent(fp);
+
+ zero_block = zalloc(SD_DATA_OBJ_SIZE * NR_WORKER_THREAD);
+ if (!zero_block)
+ return 1;
+
+ return ret;
+}
--
1.5.6.5
More information about the sheepdog
mailing list