[Sheepdog] [PATCH v3 09/12] farm: the farm impelmentation proper
Liu Yuan
namei.unix at gmail.com
Fri Dec 23 15:39:27 CET 2011
From: Liu Yuan <tailai.ly at taobao.com>
todo:
- implement snapshot feature.
- more enchancements.
- add a doc for farm internls for dev.
- test slab is really needed.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/farm.h | 5 +
sheep/farm/farm.c | 425 +++++++++++++++++++++++++++++++++++++++++++++++++++++
sheep/store.c | 5 +-
3 files changed, 434 insertions(+), 1 deletions(-)
create mode 100644 sheep/farm/farm.c
diff --git a/sheep/farm.h b/sheep/farm.h
index 726ee4c..2669d55 100644
--- a/sheep/farm.h
+++ b/sheep/farm.h
@@ -55,6 +55,11 @@ struct snap_log {
};
extern char *epoch_path;
+extern char *obj_path;
+
+/* farm.c */
+extern char farm_dir[PATH_MAX];
+extern char farm_obj_dir[PATH_MAX];
/* sha1_file.c */
extern char *sha1_to_path(const unsigned char *sha1);
diff --git a/sheep/farm/farm.c b/sheep/farm/farm.c
new file mode 100644
index 0000000..1f2ad76
--- /dev/null
+++ b/sheep/farm/farm.c
@@ -0,0 +1,425 @@
+#include <dirent.h>
+
+#include "farm.h"
+#include "sheep_priv.h"
+
+char farm_obj_dir[PATH_MAX];
+char farm_dir[PATH_MAX];
+
+static int def_open_flags = O_DSYNC | O_RDWR;
+extern char *obj_path;
+extern mode_t def_fmode;
+
+static int create_directory(char *p)
+{
+ int i, ret = 0;
+ struct strbuf buf = STRBUF_INIT;
+
+ strbuf_addstr(&buf, p);
+ strbuf_addstr(&buf, "/.farm");
+ if (mkdir(buf.buf, 0755) < 0) {
+ if (errno != EEXIST) {
+ perror(buf.buf);
+ ret = -1;
+ goto err;
+ }
+ }
+
+ memcpy(farm_dir, buf.buf, buf.len);
+
+ strbuf_addstr(&buf, "/objects");
+ if (mkdir(buf.buf, 0755) < 0) {
+ if (errno != EEXIST) {
+ perror(buf.buf);
+ ret = -1;
+ goto err;
+ }
+ }
+ for (i = 0; i < 256; i++) {
+ strbuf_addf(&buf, "/%02x", i);
+ if (mkdir(buf.buf, 0755) < 0) {
+ if (errno != EEXIST) {
+ perror(buf.buf);
+ ret = -1;
+ goto err;
+ }
+ }
+ strbuf_remove(&buf, buf.len - 3, 3);
+ }
+
+ memcpy(farm_obj_dir, buf.buf, buf.len);
+err:
+ strbuf_release(&buf);
+ return ret;
+}
+
+static int farm_write(uint64_t oid, struct siocb *iocb)
+{
+ ssize_t size = xpwrite(iocb->fd, iocb->buf, iocb->length, iocb->offset);
+
+ if (size != iocb->length)
+ return SD_RES_EIO;
+
+ trunk_update_entry(oid);
+ return SD_RES_SUCCESS;
+}
+
+static int write_last_sector(int fd)
+{
+ const int size = SECTOR_SIZE;
+ char *buf;
+ int ret;
+ off_t off = SD_DATA_OBJ_SIZE - size;
+
+ buf = valloc(size);
+ if (!buf) {
+ eprintf("failed to allocate memory\n");
+ return SD_RES_NO_MEM;
+ }
+ memset(buf, 0, size);
+
+ ret = xpwrite(fd, buf, size, off);
+ if (ret != size)
+ ret = SD_RES_EIO;
+ else
+ ret = SD_RES_SUCCESS;
+ free(buf);
+
+ return ret;
+}
+
+static int err_to_sderr(uint64_t oid, int err)
+{
+ int ret;
+ if (err == ENOENT) {
+ struct stat s;
+
+ if (stat(obj_path, &s) < 0) {
+ eprintf("corrupted\n");
+ ret = SD_RES_EIO;
+ } else {
+ dprintf("object %016" PRIx64 " not found locally\n", oid);
+ ret = SD_RES_NO_OBJ;
+ }
+ } else {
+ eprintf("%m\n");
+ ret = SD_RES_UNKNOWN;
+ }
+ return ret;
+}
+
+/*
+ * Preallocate the whole object to get a better filesystem layout.
+ */
+static int prealloc(int fd)
+{
+ int ret = fallocate(fd, 0, 0, SD_DATA_OBJ_SIZE);
+ if (ret < 0) {
+ if (errno != ENOSYS && errno != EOPNOTSUPP)
+ ret = SD_RES_SYSTEM_ERROR;
+ else
+ ret = write_last_sector(fd);
+ } else
+ ret = SD_RES_SUCCESS;
+ return ret;
+}
+
+static int farm_open(uint64_t oid, struct siocb *iocb, int create)
+{
+ struct strbuf buf = STRBUF_INIT;
+ int ret = SD_RES_SUCCESS, fd;
+ int flags = def_open_flags;
+
+ if (iocb->epoch < sys->epoch)
+ goto out;
+
+ if (sys->use_directio && is_data_obj(oid))
+ flags |= O_DIRECT;
+
+ if (create)
+ flags |= O_CREAT | O_TRUNC;
+
+ strbuf_addstr(&buf, obj_path);
+ strbuf_addf(&buf, "%016" PRIx64, oid);
+ fd = open(buf.buf, flags, def_fmode);
+ if (fd < 0) {
+ ret = err_to_sderr(oid, errno);
+ goto out;
+ }
+ iocb->fd = fd;
+ ret = SD_RES_SUCCESS;
+ if (!(iocb->flags & SD_FLAG_CMD_COW) && create) {
+ ret = prealloc(fd);
+ if (ret != SD_RES_SUCCESS)
+ close(fd);
+ }
+out:
+ strbuf_release(&buf);
+ return ret;
+}
+
+static int farm_close(uint64_t oid, struct siocb *iocb)
+{
+ if (iocb->epoch < sys->epoch)
+ return SD_RES_SUCCESS;
+
+ if (close(iocb->fd) < 0)
+ return SD_RES_EIO;
+
+ return SD_RES_SUCCESS;
+}
+
+static int farm_init(char *p)
+{
+ int ret;
+
+ ret = create_directory(p);
+ if (ret)
+ goto err;
+
+ ret = trunk_init();
+ if (ret)
+ goto err;
+
+ ret = snap_init();
+ if (ret)
+ goto err;
+err:
+ return ret;
+}
+
+static int get_trunk_sha1(int epoch, unsigned char *outsha1, int user)
+{
+ int i, nr_logs = -1, ret = -1;
+ struct snap_log *log_buf, *log_free = NULL;
+ void *snap_buf = NULL;
+ struct sha1_file_hdr hdr;
+
+ log_free = log_buf = snap_log_read(&nr_logs, user);
+ dprintf("%d\n", nr_logs);
+ if (nr_logs < 0)
+ goto out;
+
+ for (i = 0; i < nr_logs; i++, log_buf++) {
+ if (log_buf->epoch != epoch)
+ continue;
+ snap_buf = snap_file_read(log_buf->sha1, &hdr);
+ if (!snap_buf)
+ goto out;
+ memcpy(outsha1, snap_buf, SHA1_LEN);
+ ret = 0;
+ break;
+ }
+out:
+ free(log_free);
+ free(snap_buf);
+ return ret;
+}
+
+static int farm_get_objlist(struct siocb *iocb)
+{
+ struct sha1_file_hdr hdr;
+ struct trunk_entry *trunk_buf, *trunk_free = NULL;
+ unsigned char trunk_sha1[SHA1_LEN];
+ uint64_t nr_trunks, i;
+ uint64_t *objlist = (uint64_t *)iocb->buf;
+ int ret = SD_RES_NO_TAG;
+
+ if (get_trunk_sha1(iocb->epoch, trunk_sha1, 0) < 0)
+ goto out;
+
+ trunk_free = trunk_buf = trunk_file_read(trunk_sha1, &hdr);
+ if (!trunk_buf)
+ goto out;
+
+ nr_trunks = hdr.priv;
+ for (i = 0; i < nr_trunks; i++, trunk_buf++)
+ objlist[iocb->length++] = trunk_buf->oid;
+
+ dprintf("%"PRIu32"\n", iocb->length);
+ ret = SD_RES_SUCCESS;
+out:
+ free(trunk_free);
+ return ret;
+}
+
+static void *retrieve_object_from_snap(uint64_t oid, int epoch, int user)
+{
+ struct sha1_file_hdr hdr;
+ struct trunk_entry *trunk_buf, *trunk_free = NULL;
+ unsigned char trunk_sha1[SHA1_LEN];
+ uint64_t nr_trunks, i;
+ void *buffer = NULL;
+
+ if (get_trunk_sha1(epoch, trunk_sha1, user) < 0)
+ goto out;
+
+ trunk_free = trunk_buf = trunk_file_read(trunk_sha1, &hdr);
+ if (!trunk_buf)
+ goto out;
+
+ nr_trunks = hdr.priv;
+ for (i = 0; i < nr_trunks; i++, trunk_buf++) {
+ struct sha1_file_hdr h;
+ if (trunk_buf->oid != oid)
+ continue;
+ buffer = sha1_file_read(trunk_buf->sha1, &h);
+ if (!buffer)
+ goto out;
+ break;
+ }
+out:
+ dprintf("oid %"PRIx64", epoch %d, %s\n", oid, epoch, buffer ? "succeed" : "fail");
+ free(trunk_free);
+ return buffer;
+}
+
+static int farm_read(uint64_t oid, struct siocb *iocb)
+{
+ if (iocb->epoch != sys->epoch) {
+ void *buffer = retrieve_object_from_snap(oid, iocb->epoch, 0);
+ if (!buffer)
+ return SD_RES_NO_OBJ;
+ memcpy(iocb->buf, buffer, iocb->length);
+ free(buffer);
+ } else {
+ ssize_t size = xpread(iocb->fd, iocb->buf, iocb->length, iocb->offset);
+
+ if (size != iocb->length)
+ return SD_RES_EIO;
+ }
+ return SD_RES_SUCCESS;
+}
+
+static int farm_atomic_put(uint64_t oid, struct siocb *iocb)
+{
+ char path[PATH_MAX], tmp_path[PATH_MAX];
+ int flags = def_open_flags | O_CREAT;
+ int ret = SD_RES_SYSTEM_ERROR, fd;
+ uint32_t len = iocb->length;
+
+ snprintf(path, sizeof(path), "%s%016" PRIx64, obj_path, oid);
+ snprintf(tmp_path, sizeof(tmp_path), "%s%016" PRIx64 ".tmp",
+ obj_path, oid);
+ fd = open(tmp_path, flags, def_fmode);
+ if (fd < 0) {
+ eprintf("failed to open %s: %m\n", tmp_path);
+ goto out;
+ }
+
+ ret = xwrite(fd, iocb->buf, len);
+ if (ret != len) {
+ eprintf("failed to write object. %m\n");
+ ret = SD_RES_SYSTEM_ERROR;
+ goto out_close;
+ }
+
+ ret = rename(tmp_path, path);
+ if (ret < 0) {
+ eprintf("failed to rename %s to %s: %m\n", tmp_path, path);
+ ret = SD_RES_SYSTEM_ERROR;
+ goto out_close;
+ }
+ dprintf("%"PRIx64"\n", oid);
+ ret = SD_RES_SUCCESS;
+out_close:
+ close(fd);
+out:
+ return ret;
+}
+
+static int farm_link(uint64_t oid, struct siocb *iocb, int tgt_epoch)
+{
+ int ret = SD_RES_SYSTEM_ERROR;
+ void *buf;
+ struct siocb io = { 0 };
+
+ dprintf("try link %"PRIx64" from snapshot with epoch %d\n", oid, tgt_epoch);
+ buf = retrieve_object_from_snap(oid, tgt_epoch, 0);
+ if (!buf)
+ goto fail;
+
+ io.length = SD_DATA_OBJ_SIZE;
+ io.buf = buf;
+ ret = farm_atomic_put(oid, &io);
+fail:
+ free(buf);
+ return ret;
+}
+
+static int farm_begin_recover(struct siocb *iocb)
+{
+ unsigned char snap_sha1[SHA1_LEN];
+ int epoch = iocb->epoch - 1;
+
+ dprintf("epoch %d\n", epoch);
+ if (snap_file_write(epoch, snap_sha1, 0) < 0)
+ return SD_RES_SYSTEM_ERROR;
+
+ if (snap_log_write(iocb->epoch - 1, snap_sha1, 0) < 0)
+ return SD_RES_SYSTEM_ERROR;
+
+ return SD_RES_SUCCESS;
+}
+
+static int oid_stale(uint64_t oid)
+{
+ int i, vidx;
+ struct sd_vnode *vnodes = sys->vnodes;
+
+ for (i = 0; i < sys->nr_sobjs; i++) {
+ vidx = obj_to_sheep(vnodes, sys->nr_vnodes, oid, i);
+ if (is_myself(vnodes[vidx].addr, vnodes[vidx].port))
+ return 0;
+ }
+ return 1;
+}
+
+static int farm_end_recover(struct siocb *iocb)
+{
+ DIR *dir;
+ struct dirent *d;
+ uint64_t oid;
+ int ret = SD_RES_SYSTEM_ERROR;
+
+ dprintf("%d\n", iocb->epoch);
+ dir = opendir(obj_path);
+ if (!dir)
+ goto out;
+
+ while ((d = readdir(dir))) {
+ if (!strncmp(d->d_name, ".", 1))
+ continue;
+ oid = strtoull(d->d_name, NULL, 16);
+ if (oid == 0)
+ continue;
+ if (oid_stale(oid)) {
+ char p[PATH_MAX];
+ snprintf(p, sizeof(p), "%s%s", obj_path, d->d_name);
+ if (unlink(p) < 0) {
+ eprintf("%s:%m\n", p);
+ goto out_close;
+ }
+ dprintf("remove oid %s\n", d->d_name);
+ }
+ }
+ ret = SD_RES_SUCCESS;
+out_close:
+ closedir(dir);
+out:
+ return ret;
+}
+
+struct store_driver farm = {
+ .driver_name = "farm",
+ .init = farm_init,
+ .open = farm_open,
+ .write = farm_write,
+ .read = farm_read,
+ .close = farm_close,
+ .get_objlist = farm_get_objlist,
+ .link = farm_link,
+ .atomic_put = farm_atomic_put,
+ .begin_recover = farm_begin_recover,
+ .end_recover = farm_end_recover,
+};
diff --git a/sheep/store.c b/sheep/store.c
index de1bf63..c175b44 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -34,6 +34,7 @@ struct sheepdog_config {
uint8_t pad[3];
};
+extern struct store_driver farm;
char *obj_path;
static char *epoch_path;
static char *mnt_path;
@@ -1570,7 +1571,8 @@ static void do_recover_main(struct work *work, int idx)
queue_work(sys->recovery_wqueue, &rw->work);
} else {
if (store.end_recover) {
- struct siocb iocb;
+ struct siocb iocb = { 0 };
+ iocb.epoch = sys->epoch;
store.end_recover(&iocb);
}
}
@@ -2013,6 +2015,7 @@ int init_store(const char *d)
if (ret)
return ret;
+ register_store_driver(&farm);
ret = store.init(obj_path);
if (ret)
return ret;
--
1.7.8.rc3
More information about the sheepdog
mailing list