[Sheepdog] [PATCH 3/6] collie: add disk I/O manager

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Tue Dec 1 19:35:20 CET 2009


This is originally a part of sheep.
Currently, only btrfs is supported as a local file system, but
we think of removing this restriction.
Data recovery will be also supported soon.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/store.c |  366 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 366 insertions(+), 0 deletions(-)
 create mode 100644 collie/store.c

diff --git a/collie/store.c b/collie/store.c
new file mode 100644
index 0000000..6b66a92
--- /dev/null
+++ b/collie/store.c
@@ -0,0 +1,366 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <mntent.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/xattr.h>
+#include <sys/statvfs.h>
+
+#include "collie.h"
+#include "meta.h"
+
+static char *obj_dir;
+static char *mnt_dir;
+static char *zero_block;
+
+static int stat_sheep(uint64_t *store_size, uint64_t *store_free)
+{
+	struct statvfs vs;
+	int ret;
+	DIR *dir;
+	struct dirent *d;
+	uint64_t used = 0;
+	struct stat s;
+	char path[1024];
+
+	ret = statvfs(mnt_dir, &vs);
+	if (ret)
+		return SD_RES_EIO;
+
+	dir = opendir(obj_dir);
+	if (!dir)
+		return SD_RES_EIO;
+
+	while ((d = readdir(dir))) {
+		if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
+			continue;
+
+		snprintf(path, sizeof(path), "%s/%s", obj_dir, d->d_name);
+
+		ret = stat(path, &s);
+		if (ret)
+			continue;
+
+		used += s.st_size;
+	}
+
+	*store_size = vs.f_frsize * vs.f_bfree;
+	*store_free = vs.f_frsize * vs.f_bfree - used;
+
+	return SD_RES_SUCCESS;
+}
+
+static int read_from_one(struct cluster_info *cluster, uint64_t oid,
+			 unsigned *rlen, void *buf, uint64_t offset)
+{
+	int i, n, nr, fd, ret;
+	unsigned wlen;
+	char name[128];
+	struct sheepdog_node_list_entry *e;
+	struct sd_obj_req hdr;
+	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
+
+	e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
+again:
+	nr = build_node_list(&cluster->node_list, e);
+
+	for (i = 0; i < nr; i++) {
+		n = obj_to_sheep(e, nr, oid, i);
+
+		snprintf(name, sizeof(name), "%d.%d.%d.%d",
+			 e[n].addr[12], e[n].addr[13],
+			 e[n].addr[14], e[n].addr[15]);
+
+		fd = connect_to(name, e[n].port);
+		if (fd < 0)
+			continue;
+
+		memset(&hdr, 0, sizeof(hdr));
+		hdr.opcode = SD_OP_READ_OBJ;
+		hdr.oid = oid;
+		hdr.epoch = cluster->epoch;
+
+		hdr.flags = 0;
+		hdr.data_length = *rlen;
+		hdr.offset = offset;
+
+		ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, rlen);
+
+		close(fd);
+
+		if (ret)
+			continue;
+
+		switch (rsp->result) {
+		case SD_RES_SUCCESS:
+			return 0;
+		case SD_RES_OLD_NODE_VER:
+		case SD_RES_NEW_NODE_VER:
+			/* waits for the node list timer */
+			sleep(2);
+			goto again;
+			break;
+		default:
+			;
+		}
+	}
+
+	return -1;
+}
+
+static int read_from_other_sheeps(struct cluster_info *cluster,
+				  uint64_t oid, char *buf, int copies)
+{
+	int ret;
+	unsigned int rlen;
+
+	rlen = SD_DATA_OBJ_SIZE;
+
+	ret = read_from_one(cluster, oid, &rlen, buf, 0);
+
+	return ret;
+}
+
+void store_queue_request(struct work *work, int idx)
+{
+	struct request *req = container_of(work, struct request, work);
+	struct cluster_info *cluster = req->ci->cluster;
+	char path[1024];
+	int fd = -1, ret = SD_RES_SUCCESS;
+	int flags = O_RDWR;
+	char *buf = zero_block + idx * SD_DATA_OBJ_SIZE;
+	char aname[] = "user.sheepdog.copies";
+	struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
+	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+	uint64_t oid = hdr->oid;
+	uint32_t opcode = hdr->opcode;
+	uint32_t epoch = cluster->epoch;
+	uint32_t req_epoch = hdr->epoch;
+	struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
+	int copies;
+
+	/* use le_to_cpu */
+
+	snprintf(path, sizeof(path), "%s/%" PRIx64, obj_dir, oid);
+
+	dprintf("%d, %x, %s, %u, %u\n", idx, opcode, path, epoch, req_epoch);
+
+	if (list_empty(&cluster->node_list)) {
+		/* we haven't got SD_OP_GET_NODE_LIST response yet. */
+		ret = SD_RES_SYSTEM_ERROR;
+		goto out;
+	}
+
+	if (opcode != SD_OP_GET_NODE_LIST) {
+		if (before(req_epoch, epoch)) {
+			ret = SD_RES_OLD_NODE_VER;
+			eprintf("old node version %u %u, %x %" PRIx64 "\n",
+				epoch, req_epoch, opcode, oid);
+			goto out;
+		} else if (after(req_epoch, epoch)) {
+			ret = SD_RES_NEW_NODE_VER;
+			eprintf("new node version %u %u %x %" PRIx64 "\n",
+				epoch, req_epoch, opcode, oid);
+			goto out;
+		}
+	}
+
+	switch (opcode) {
+	case SD_OP_CREATE_AND_WRITE_OBJ:
+	case SD_OP_WRITE_OBJ:
+	case SD_OP_READ_OBJ:
+	case SD_OP_SYNC_OBJ:
+		if (opcode == SD_OP_CREATE_AND_WRITE_OBJ)
+			flags |= O_CREAT;
+
+		fd = open(path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+		if (fd < 0) {
+			if (errno == ENOENT)
+				ret = SD_RES_NO_OBJ;
+			else
+				ret = SD_RES_UNKNOWN;
+
+			goto out;
+		}
+
+		if (opcode != SD_OP_CREATE_AND_WRITE_OBJ)
+			break;
+
+		if (!hdr->copies) {
+			eprintf("zero copies is invalid\n");
+			ret = SD_RES_INVALID_PARMS;
+			goto out;
+		}
+
+		ret = ftruncate(fd, 0);
+		if (ret) {
+			ret = SD_RES_EIO;
+			goto out;
+		}
+
+		ret = fsetxattr(fd, aname, &hdr->copies,
+				sizeof(hdr->copies), 0);
+		if (ret) {
+			eprintf("use 'user_xattr' option?\n");
+			ret = SD_RES_SYSTEM_ERROR;
+			goto out;
+		}
+
+		if (!is_data_obj(oid))
+			break;
+
+		if (hdr->flags & SD_FLAG_CMD_COW) {
+			dprintf("%" PRIu64 "\n", hdr->cow_oid);
+
+			ret = read_from_other_sheeps(cluster,
+						     hdr->cow_oid, buf,
+						     hdr->copies);
+			if (ret) {
+				ret = 1;
+				goto out;
+			}
+		} else {
+			dprintf("%" PRIu64 "\n", oid);
+			memset(buf, 0, SD_DATA_OBJ_SIZE);
+		}
+
+		dprintf("%" PRIu64 "\n", oid);
+
+		ret = pwrite64(fd, buf, SD_DATA_OBJ_SIZE, 0);
+		if (ret != SD_DATA_OBJ_SIZE) {
+			ret = SD_RES_EIO;
+			goto out;
+		}
+	default:
+		break;
+	}
+
+	switch (opcode) {
+	case SD_OP_REMOVE_OBJ:
+		ret = unlink(path);
+		if (ret)
+			ret = 1;
+		break;
+	case SD_OP_READ_OBJ:
+		/*
+		 * TODO: should be optional (we can use the flags) for
+		 * performance; qemu doesn't always need the copies.
+		 */
+		copies = 0;
+		ret = fgetxattr(fd, aname, &copies, sizeof(copies));
+		if (ret != sizeof(copies)) {
+			ret = SD_RES_SYSTEM_ERROR;
+			goto out;
+		}
+
+		ret = pread64(fd, req->data, hdr->data_length, hdr->offset);
+		if (ret < 0)
+			ret = SD_RES_EIO;
+		else {
+			rsp->data_length = ret;
+			rsp->copies = copies;
+			ret = SD_RES_SUCCESS;
+		}
+		break;
+	case SD_OP_CREATE_AND_WRITE_OBJ:
+	case SD_OP_WRITE_OBJ:
+		ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
+		if (ret != hdr->data_length) {
+			ret = SD_RES_EIO;
+			goto out;
+		} else
+			ret = SD_RES_SUCCESS;
+		break;
+	case SD_OP_SYNC_OBJ:
+		ret = fsync(fd);
+		if (ret) {
+			if (errno == EIO)
+				ret = SD_RES_EIO;
+			else
+				ret = SD_RES_UNKNOWN;
+		}
+		break;
+	case SD_OP_STAT_SHEEP:
+		ret = stat_sheep(&nrsp->store_size, &nrsp->store_free);
+		break;
+	}
+out:
+	if (ret != SD_RES_SUCCESS) {
+		dprintf("failed, %d, %d, %x, %s, %u, %u\n", ret, idx, opcode,
+			path, epoch, req_epoch);
+
+		rsp->result = ret;
+	}
+
+	if (fd != -1)
+		close(fd);
+}
+
+int init_store(char *dir)
+{
+	int ret;
+	struct mntent *mnt;
+	struct stat s, ms;
+	FILE *fp;
+
+	ret = stat(dir, &s);
+	if (ret) {
+		if (errno == ENOENT) {
+			ret = mkdir(dir, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP |
+				    S_IWGRP | S_IXGRP);
+			if (ret) {
+				eprintf("can't create the object dir %s, %m\n",
+					dir);
+				return 1;
+			} else {
+				ret = stat(dir, &s);
+				if (ret)
+					return 1;
+
+				eprintf("created the object dir %s\n", dir);
+			}
+		} else {
+			eprintf("can't handle the object dir %s, %m\n", dir);
+			return 1;
+		}
+	} else if (!S_ISDIR(s.st_mode)) {
+		eprintf("%s is not a directory\n", dir);
+		return 1;
+	}
+
+	obj_dir = dir;
+
+	fp = setmntent(MOUNTED, "r");
+	if (!fp)
+		return 1;
+
+	while ((mnt = getmntent(fp))) {
+		ret = stat(mnt->mnt_dir, &ms);
+		if (ret)
+			continue;
+
+		if (ms.st_dev == s.st_dev) {
+			mnt_dir = strdup(mnt->mnt_dir);
+			break;
+		}
+	}
+
+	endmntent(fp);
+
+	zero_block = zalloc(SD_DATA_OBJ_SIZE * NR_WORKER_THREAD);
+	if (!zero_block)
+		return 1;
+
+	return ret;
+}
-- 
1.5.6.5




More information about the sheepdog mailing list