[sheepdog] [PATCH 5/7 v4 UPDATE] sheep: rename journal_file.c as journal.c
Liu Yuan
namei.unix at gmail.com
Thu Apr 4 17:23:39 CEST 2013
From: Liu Yuan <tailai.ly at taobao.com>
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/Makefile.am | 2 +-
sheep/journal.c | 573 +++++++++++++++++++++++++++++++-------------------
sheep/journal_file.c | 429 -------------------------------------
sheep/ops.c | 8 +-
sheep/sheep_priv.h | 7 -
sheep/store.c | 30 ---
6 files changed, 357 insertions(+), 692 deletions(-)
delete mode 100644 sheep/journal_file.c
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 7f87616..2592528 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -27,7 +27,7 @@ sbin_PROGRAMS = sheep
sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c work.c \
journal.c ops.c recovery.c cluster/local.c \
object_cache.c object_list_cache.c sockfd_cache.c \
- plain_store.c config.c migrate.c journal_file.c md.c
+ plain_store.c config.c migrate.c md.c
if BUILD_COROSYNC
sheep_SOURCES += cluster/corosync.c
diff --git a/sheep/journal.c b/sheep/journal.c
index fafb8f9..e9d91b6 100644
--- a/sheep/journal.c
+++ b/sheep/journal.c
@@ -1,5 +1,7 @@
/*
- * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
+ * Copyright (C) 2012 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License version
@@ -8,285 +10,420 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <dirent.h>
#include <errno.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <sys/mman.h>
#include "sheep_priv.h"
-#define JRNL_END_MARK 0x87654321UL
+struct journal_file {
+ int fd;
+ off_t pos;
+ int commit_fd;
+ uatomic_bool in_commit;
+};
-/* Journal header for data object */
-struct jrnl_head {
+struct journal_descriptor {
+ uint32_t magic;
+ uint16_t flag;
+ uint16_t reserved;
+ union {
+ uint32_t epoch;
+ uint64_t oid;
+ };
uint64_t offset;
uint64_t size;
- char target_path[256];
-};
+ uint8_t create;
+ uint8_t pad[475];
+} __packed;
-struct jrnl_descriptor {
- struct jrnl_head head;
- const void *data;
- int fd; /* Open file fd */
- int target_fd;
- char path[256];
-};
+/* JOURNAL_DESC + JOURNAL_MARKER must be 512 algined for DIO */
+#define JOURNAL_DESC_MAGIC 0xfee1900d
+#define JOURNAL_DESC_SIZE 508
+#define JOURNAL_MARKER_SIZE 4 /* Use marker to detect partial write */
+#define JOURNAL_META_SIZE (JOURNAL_DESC_SIZE + JOURNAL_MARKER_SIZE)
-/* Journal APIs */
-static int jrnl_open(struct jrnl_descriptor *jd, const char *path)
+#define JOURNAL_END_MARKER 0xdeadbeef
+
+#define JF_STORE 0
+#define JF_EPOCH 1
+#define JF_CONFI 2
+
+static const char *jfile_name[2] = { "journal_file0", "journal_file1", };
+static int jfile_fds[2];
+static size_t jfile_size;
+
+static struct journal_file jfile;
+static pthread_spinlock_t jfile_lock;
+
+static int create_journal_file(const char *root, const char *name)
{
- pstrcpy(jd->path, sizeof(jd->path), path);
- jd->fd = open(path, O_RDONLY);
+ int fd, flags = O_DSYNC | O_RDWR | O_TRUNC | O_CREAT | O_DIRECT;
+ char path[PATH_MAX];
- if (jd->fd < 0) {
- sd_eprintf("failed to open %s: %m", jd->path);
- if (errno == ENOENT)
- return SD_RES_NO_OBJ;
- else
- return SD_RES_UNKNOWN;
+ snprintf(path, sizeof(path), "%s/%s", root, name);
+ fd = open(path, flags, 0644);
+ if (fd < 0) {
+ sd_eprintf("open %s %m", name);
+ return -1;
+ }
+ if (prealloc(fd, jfile_size) < 0) {
+ sd_eprintf("prealloc %s %m", name);
+ return -1;
}
- return SD_RES_SUCCESS;
+ return fd;
}
-static int jrnl_close(struct jrnl_descriptor *jd)
+/* We should have two valid FDs, otherwise something goes wrong */
+static int get_old_new_jfile(const char *p, int *old, int *new)
{
- close(jd->fd);
- jd->fd = -1;
+ int fd1, fd2;
+ int flags = O_RDONLY;
+ char path[PATH_MAX];
+ struct stat st1, st2;
+
+ snprintf(path, sizeof(path), "%s/%s", p, jfile_name[0]);
+ fd1 = open(path, flags);
+ if (fd1 < 0) {
+ if (errno == ENOENT)
+ return 0;
+
+ sd_eprintf("open1 %m");
+ return -1;
+ }
+ snprintf(path, sizeof(path), "%s/%s", p, jfile_name[1]);
+ fd2 = open(path, flags);
+ if (fd2 < 0) {
+ sd_eprintf("open2 %m");
+ close(fd1);
+ return -1;
+ }
+
+ if (fstat(fd1, &st1) < 0 || fstat(fd2, &st2) < 0) {
+ sd_eprintf("stat %m");
+ goto out;
+ }
+
+ if (st1.st_mtime < st2.st_mtime) {
+ *old = fd1;
+ *new = fd2;
+ } else {
+ *old = fd2;
+ *new = fd1;
+ }
return 0;
+out:
+ close(fd1);
+ close(fd2);
+ return -1;
}
-static int jrnl_create(struct jrnl_descriptor *jd, const char *jrnl_dir)
+static bool journal_entry_full_write(struct journal_descriptor *jd)
{
- snprintf(jd->path, sizeof(jd->path), "%sXXXXXX", jrnl_dir);
- jd->fd = mkostemp(jd->path, O_DSYNC);
+ char *end = (char *)jd +
+ roundup(jd->size, SECTOR_SIZE) + JOURNAL_META_SIZE;
+ uint32_t marker = *(((uint32_t *)end) - 1);
- if (jd->fd < 0) {
- sd_eprintf("failed to create %s: %m", jd->path);
- return SD_RES_UNKNOWN;
- }
-
- return SD_RES_SUCCESS;
+ if (marker != JOURNAL_END_MARKER)
+ return false;
+ return true;
}
-static int jrnl_remove(struct jrnl_descriptor *jd)
+static void journal_get_path(struct journal_descriptor *jd, char *path)
{
- int ret;
+ switch (jd->flag) {
+ case JF_STORE:
+ snprintf(path, PATH_MAX, "%s/%016"PRIx64,
+ get_object_path(jd->oid), jd->oid);
+ sd_iprintf("%s, size %"PRIu64", off %"PRIu64", %d",
+ path, jd->size, jd->offset, jd->create);
+ break;
+ case JF_EPOCH:
+ snprintf(path, PATH_MAX, "%s/%08"PRIu32, epoch_path, jd->epoch);
+ sd_iprintf("%s, %"PRIu32" size %"PRIu64,
+ path, jd->epoch, jd->size);
+ break;
+ case JF_CONFI:
+ snprintf(path, PATH_MAX, "%s", config_path);
+ sd_iprintf("%s, size %"PRIu64, path, jd->size);
+ break;
+ }
+}
- ret = unlink(jd->path);
- if (ret) {
- sd_eprintf("failed to remove %s: %m", jd->path);
- ret = SD_RES_EIO;
- } else
- ret = SD_RES_SUCCESS;
+static int replay_journal_entry(struct journal_descriptor *jd)
+{
+ char path[PATH_MAX];
+ ssize_t size;
+ int fd, flags = O_WRONLY, ret = 0;
+ void *buf;
+ char *p = (char *)jd;
+
+ if (jd->create)
+ flags |= O_CREAT;
+
+ journal_get_path(jd, path);
+ fd = open(path, flags, def_fmode);
+ if (fd < 0) {
+ sd_eprintf("open %m");
+ return -1;
+ }
+ if (jd->create && jd->flag == JF_STORE) {
+ ret = prealloc(fd, get_objsize(jd->oid));
+ if (ret < 0)
+ goto out;
+ }
+ buf = xmalloc(jd->size);
+ p += JOURNAL_DESC_SIZE;
+ memcpy(buf, p, jd->size);
+ size = xpwrite(fd, buf, jd->size, jd->offset);
+ if (size != jd->size) {
+ sd_eprintf("write %zd, size %zu, errno %m", size, jd->size);
+ ret = -1;
+ goto out;
+ }
+out:
+ close(fd);
return ret;
}
-static int jrnl_write_header(struct jrnl_descriptor *jd)
+static int do_recover(int fd)
{
- ssize_t ret;
- struct jrnl_head *head = (struct jrnl_head *) &jd->head;
+ struct journal_descriptor *jd;
+ void *map;
+ char *p, *end;
+ struct stat st;
- ret = xpwrite(jd->fd, head, sizeof(*head), 0);
+ if (fstat(fd, &st) < 0) {
+ sd_eprintf("fstat %m");
+ return -1;
+ }
- if (ret != sizeof(*head)) {
- if (errno == ENOSPC)
- ret = SD_RES_NO_SPACE;
- else
- ret = SD_RES_EIO;
- } else
- ret = SD_RES_SUCCESS;
+ map = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
+ close(fd);
+ if (map == MAP_FAILED) {
+ sd_eprintf("%m");
+ return -1;
+ }
- return ret;
+ end = (char *)map + st.st_size;
+ for (p = map; p < end;) {
+ jd = (struct journal_descriptor *)p;
+ if (jd->magic != JOURNAL_DESC_MAGIC) {
+ /* Empty area */
+ p += SECTOR_SIZE;
+ continue;
+ }
+ /* We skip partial write because it is not acked back to VM */
+ if (!journal_entry_full_write(jd))
+ goto skip;
+
+ if (replay_journal_entry(jd) < 0)
+ return -1;
+skip:
+ p += JOURNAL_META_SIZE + roundup(jd->size, SECTOR_SIZE);
+ }
+ munmap(map, st.st_size);
+ /* Do a final sync() to assure data is reached to the disk */
+ sync();
+ return 0;
}
-static int jrnl_write_data(struct jrnl_descriptor *jd)
+/*
+ * We recover the journal file in order of wall time in the corner case that
+ * sheep crashes while in the middle of journal committing. For most of cases,
+ * we actually only recover one jfile, the other would be empty. This process
+ * is fast with buffered IO that only take several secends at most.
+ */
+static int check_recover_journal_file(const char *p)
{
- ssize_t ret;
- struct jrnl_head *head = (struct jrnl_head *) &jd->head;
+ int old = 0, new = 0;
- ret = xpwrite(jd->fd, jd->data, head->size, sizeof(*head));
+ if (get_old_new_jfile(p, &old, &new) < 0)
+ return -1;
- if (ret != head->size) {
- if (errno == ENOSPC)
- ret = SD_RES_NO_SPACE;
- else
- ret = SD_RES_EIO;
- } else
- ret = SD_RES_SUCCESS;
+ /* No journal file found */
+ if (old == 0)
+ return 0;
- return ret;
+ if (do_recover(old) < 0)
+ return -1;
+ if (do_recover(new) < 0)
+ return -1;
+
+ return 0;
}
-static int jrnl_write_end_mark(struct jrnl_descriptor *jd)
+int journal_file_init(const char *path, size_t size, bool skip)
{
- ssize_t retsize;
- int ret;
- uint32_t end_mark = JRNL_END_MARK;
- struct jrnl_head *head = (struct jrnl_head *) &jd->head;
-
- retsize = xpwrite(jd->fd, &end_mark, sizeof(end_mark),
- sizeof(*head) + head->size);
-
- if (retsize != sizeof(end_mark)) {
- if (errno == ENOSPC)
- ret = SD_RES_NO_SPACE;
- else
- ret = SD_RES_EIO;
- } else
- ret = SD_RES_SUCCESS;
+ int fd;
- return ret;
-}
+ if (!skip && check_recover_journal_file(path) < 0)
+ return -1;
-static int jrnl_apply_to_target_object(struct jrnl_descriptor *jd)
-{
- char *buf = NULL;
- int res = 0;
- ssize_t retsize;
-
- /* Flush out journal to disk (VDI object) */
- retsize = xpread(jd->fd, &jd->head, sizeof(jd->head), 0);
- buf = xzalloc(jd->head.size);
- retsize = xpread(jd->fd, buf, jd->head.size, sizeof(jd->head));
- retsize = xpwrite(jd->target_fd, buf, jd->head.size, jd->head.offset);
- if (retsize != jd->head.size) {
- if (errno == ENOSPC)
- res = SD_RES_NO_SPACE;
- else
- res = SD_RES_EIO;
- }
+ jfile_size = (size * 1024 * 1024) / 2;
- /* Clean up */
- free(buf);
+ fd = create_journal_file(path, jfile_name[0]);
+ if (fd < 0)
+ return -1;
+ jfile.fd = jfile_fds[0] = fd;
- return res;
-}
+ fd = create_journal_file(path, jfile_name[1]);
+ jfile_fds[1] = fd;
-/* We cannot use this function for concurrent write operations */
-struct jrnl_descriptor *jrnl_begin(const void *buf, size_t count, off_t offset,
- const char *path, const char *jrnl_dir)
-{
- int ret;
- struct jrnl_descriptor *jd = xzalloc(sizeof(*jd));
-
- jd->head.offset = offset;
- jd->head.size = count;
- pstrcpy(jd->head.target_path, sizeof(jd->head.target_path), path);
-
- jd->data = buf;
-
- ret = jrnl_create(jd, jrnl_dir);
- if (ret)
- goto err;
-
- ret = jrnl_write_header(jd);
- if (ret)
- goto err;
-
- ret = jrnl_write_data(jd);
- if (ret)
- goto err;
-
- ret = jrnl_write_end_mark(jd);
- if (ret)
- goto err;
- return jd;
-err:
- free(jd);
- return NULL;
+ pthread_spin_init(&jfile_lock, PTHREAD_PROCESS_PRIVATE);
+ return 0;
}
-int jrnl_end(struct jrnl_descriptor *jd)
+static inline bool jfile_enough_space(size_t size)
{
- int ret = 0;
- if (!jd)
- return ret;
-
- ret = jrnl_close(jd);
- if (ret)
- goto err;
-
- ret = jrnl_remove(jd);
-err:
- free(jd);
- return ret;
+ if (jfile.pos + size > jfile_size)
+ return false;
+ return true;
}
-int jrnl_recover(const char *jrnl_dir)
+/*
+ * We rely on the kernel's page cache to cache data objects to 1) boost read
+ * perfmance 2) simplify read path so that data commiting is simply a
+ * sync() operation and We do it in a dedicated thread to avoid blocking
+ * the writer by switch back and forth between two journal files.
+ */
+static void *commit_data(void *ignored)
{
- DIR *dir;
- struct dirent *d;
- char jrnl_file_path[PATH_MAX];
+ int err;
- sd_eprintf("opening the directory %s", jrnl_dir);
- dir = opendir(jrnl_dir);
- if (!dir)
- return -1;
+ /* Tell runtime to release resources after termination */
+ err = pthread_detach(pthread_self());
+ if (err)
+ panic("%s", strerror(err));
- sd_printf(SDOG_NOTICE, "starting journal recovery");
- while ((d = readdir(dir))) {
- struct jrnl_descriptor jd;
- uint32_t end_mark = 0;
- int ret;
+ sync();
+ if (ftruncate(jfile.commit_fd, 0) < 0)
+ panic("truncate %m");
+ if (prealloc(jfile.commit_fd, jfile_size) < 0)
+ panic("prealloc");
- if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
- continue;
+ uatomic_set_false(&jfile.in_commit);
- snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%s",
- jrnl_dir, d->d_name);
- ret = jrnl_open(&jd, jrnl_file_path);
- if (ret) {
- sd_eprintf("unable to open the journal file %s for"
- " reading", jrnl_file_path);
- goto end_while_3;
- }
+ pthread_exit(NULL);
+}
- ret = xpread(jd.fd, &jd.head, sizeof(jd.head), 0);
- if (ret != sizeof(jd.head)) {
- sd_eprintf("can't read journal head");
- goto end_while_2;
- }
+/* FIXME: Try not sleep inside lock */
+static void switch_journal_file(void)
+{
+ int old = jfile.fd, err;
+ pthread_t thread;
+
+retry:
+ if (!uatomic_set_true(&jfile.in_commit)) {
+ sd_eprintf("journal file in committing, "
+ "you might need enlarge jfile size");
+ usleep(100000); /* Wait until committing is finished */
+ goto retry;
+ }
- ret = xpread(jd.fd, &end_mark, sizeof(end_mark),
- sizeof(jd.head) + jd.head.size);
- if (ret != sizeof(end_mark)) {
- sd_eprintf("can't read journal end mark for object %s",
- jd.head.target_path);
- goto end_while_2;
- }
+ if (old == jfile_fds[0])
+ jfile.fd = jfile_fds[1];
+ else
+ jfile.fd = jfile_fds[0];
+ jfile.commit_fd = old;
+ jfile.pos = 0;
- if (end_mark != JRNL_END_MARK)
- goto end_while_2;
+ err = pthread_create(&thread, NULL, commit_data, NULL);
+ if (err)
+ panic("%s", strerror(err));
+}
- jd.target_fd = open(jd.head.target_path, O_DSYNC | O_RDWR);
- if (jd.target_fd < 0) {
- sd_eprintf("unable to open the object file %s for"
- " recovery", jd.head.target_path);
- goto end_while_2;
- }
- ret = jrnl_apply_to_target_object(&jd);
- if (ret)
- sd_eprintf("unable to recover the object %s",
- jd.head.target_path);
-
- close(jd.target_fd);
- jd.target_fd = -1;
-end_while_2:
- jrnl_close(&jd);
-end_while_3:
- sd_printf(SDOG_INFO, "recovered the object %s from the journal",
- jrnl_file_path);
- jrnl_remove(&jd);
+static int journal_file_write(struct journal_descriptor *jd, const char *buf)
+{
+ uint32_t marker = JOURNAL_END_MARKER;
+ int ret = SD_RES_SUCCESS;
+ uint64_t size = jd->size;
+ ssize_t written, rusize = roundup(size, SECTOR_SIZE),
+ wsize = JOURNAL_META_SIZE + rusize;
+ off_t woff;
+ char *wbuffer, *p;
+
+ pthread_spin_lock(&jfile_lock);
+ if (!jfile_enough_space(wsize))
+ switch_journal_file();
+ woff = jfile.pos;
+ jfile.pos += wsize;
+ pthread_spin_unlock(&jfile_lock);
+
+ p = wbuffer = xvalloc(wsize);
+ memcpy(p, jd, JOURNAL_DESC_SIZE);
+ p += JOURNAL_DESC_SIZE;
+ memcpy(p, buf, size);
+ p += size;
+ if (size < rusize) {
+ memset(p, 0, rusize - size);
+ p += rusize - size;
}
- closedir(dir);
- sd_printf(SDOG_NOTICE, "journal recovery complete");
+ memcpy(p, &marker, JOURNAL_MARKER_SIZE);
+ /*
+ * Concurrent writes with the same FD is okay because we don't have any
+ * critical sections that need lock inside kernel write path, since we
+ * a) bypass page cache, b) don't modify i_size of this inode.
+ *
+ * Feel free to correct me If I am wrong.
+ */
+ written = xpwrite(jfile.fd, wbuffer, wsize, woff);
+ if (written != wsize) {
+ sd_eprintf("failed, written %zd, len %zu", written, wsize);
+ /* FIXME: teach journal file handle EIO gracefully */
+ ret = SD_RES_EIO;
+ goto out;
+ }
+out:
+ free(wbuffer);
+ return ret;
+}
- return 0;
+int journal_write_store(uint64_t oid, const char *buf, size_t size,
+ off_t offset, bool create)
+{
+ struct journal_descriptor jd = {
+ .magic = JOURNAL_DESC_MAGIC,
+ .flag = JF_STORE,
+ .offset = offset,
+ .size = size,
+ .oid = oid,
+ .create = create,
+ };
+ /* We have to explicitly do assignment to get all GCC compatible */
+ jd.oid = oid;
+ return journal_file_write(&jd, buf);
+}
+
+int journal_write_epoch(const char *buf, size_t size, uint32_t epoch)
+{
+ struct journal_descriptor jd = {
+ .magic = JOURNAL_DESC_MAGIC,
+ .flag = JF_EPOCH,
+ .offset = 0,
+ .size = size,
+ .create = true,
+ };
+ jd.epoch = epoch;
+ return journal_file_write(&jd, buf);
+}
+
+int journal_write_confi(const char *buf, size_t size)
+{
+ struct journal_descriptor jd = {
+ .magic = JOURNAL_DESC_MAGIC,
+ .flag = JF_CONFI,
+ .offset = 0,
+ .size = size,
+ .create = true,
+ };
+ return journal_file_write(&jd, buf);
}
diff --git a/sheep/journal_file.c b/sheep/journal_file.c
deleted file mode 100644
index e9d91b6..0000000
--- a/sheep/journal_file.c
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Copyright (C) 2012 Taobao Inc.
- *
- * Liu Yuan <namei.unix at gmail.com>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License version
- * 2 as published by the Free Software Foundation.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-#include <errno.h>
-#include <fcntl.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <unistd.h>
-#include <pthread.h>
-#include <stdint.h>
-#include <sys/mman.h>
-
-#include "sheep_priv.h"
-
-struct journal_file {
- int fd;
- off_t pos;
- int commit_fd;
- uatomic_bool in_commit;
-};
-
-struct journal_descriptor {
- uint32_t magic;
- uint16_t flag;
- uint16_t reserved;
- union {
- uint32_t epoch;
- uint64_t oid;
- };
- uint64_t offset;
- uint64_t size;
- uint8_t create;
- uint8_t pad[475];
-} __packed;
-
-/* JOURNAL_DESC + JOURNAL_MARKER must be 512 algined for DIO */
-#define JOURNAL_DESC_MAGIC 0xfee1900d
-#define JOURNAL_DESC_SIZE 508
-#define JOURNAL_MARKER_SIZE 4 /* Use marker to detect partial write */
-#define JOURNAL_META_SIZE (JOURNAL_DESC_SIZE + JOURNAL_MARKER_SIZE)
-
-#define JOURNAL_END_MARKER 0xdeadbeef
-
-#define JF_STORE 0
-#define JF_EPOCH 1
-#define JF_CONFI 2
-
-static const char *jfile_name[2] = { "journal_file0", "journal_file1", };
-static int jfile_fds[2];
-static size_t jfile_size;
-
-static struct journal_file jfile;
-static pthread_spinlock_t jfile_lock;
-
-static int create_journal_file(const char *root, const char *name)
-{
- int fd, flags = O_DSYNC | O_RDWR | O_TRUNC | O_CREAT | O_DIRECT;
- char path[PATH_MAX];
-
- snprintf(path, sizeof(path), "%s/%s", root, name);
- fd = open(path, flags, 0644);
- if (fd < 0) {
- sd_eprintf("open %s %m", name);
- return -1;
- }
- if (prealloc(fd, jfile_size) < 0) {
- sd_eprintf("prealloc %s %m", name);
- return -1;
- }
-
- return fd;
-}
-
-/* We should have two valid FDs, otherwise something goes wrong */
-static int get_old_new_jfile(const char *p, int *old, int *new)
-{
- int fd1, fd2;
- int flags = O_RDONLY;
- char path[PATH_MAX];
- struct stat st1, st2;
-
- snprintf(path, sizeof(path), "%s/%s", p, jfile_name[0]);
- fd1 = open(path, flags);
- if (fd1 < 0) {
- if (errno == ENOENT)
- return 0;
-
- sd_eprintf("open1 %m");
- return -1;
- }
- snprintf(path, sizeof(path), "%s/%s", p, jfile_name[1]);
- fd2 = open(path, flags);
- if (fd2 < 0) {
- sd_eprintf("open2 %m");
- close(fd1);
- return -1;
- }
-
- if (fstat(fd1, &st1) < 0 || fstat(fd2, &st2) < 0) {
- sd_eprintf("stat %m");
- goto out;
- }
-
- if (st1.st_mtime < st2.st_mtime) {
- *old = fd1;
- *new = fd2;
- } else {
- *old = fd2;
- *new = fd1;
- }
-
- return 0;
-out:
- close(fd1);
- close(fd2);
- return -1;
-}
-
-static bool journal_entry_full_write(struct journal_descriptor *jd)
-{
- char *end = (char *)jd +
- roundup(jd->size, SECTOR_SIZE) + JOURNAL_META_SIZE;
- uint32_t marker = *(((uint32_t *)end) - 1);
-
- if (marker != JOURNAL_END_MARKER)
- return false;
- return true;
-}
-
-static void journal_get_path(struct journal_descriptor *jd, char *path)
-{
- switch (jd->flag) {
- case JF_STORE:
- snprintf(path, PATH_MAX, "%s/%016"PRIx64,
- get_object_path(jd->oid), jd->oid);
- sd_iprintf("%s, size %"PRIu64", off %"PRIu64", %d",
- path, jd->size, jd->offset, jd->create);
- break;
- case JF_EPOCH:
- snprintf(path, PATH_MAX, "%s/%08"PRIu32, epoch_path, jd->epoch);
- sd_iprintf("%s, %"PRIu32" size %"PRIu64,
- path, jd->epoch, jd->size);
- break;
- case JF_CONFI:
- snprintf(path, PATH_MAX, "%s", config_path);
- sd_iprintf("%s, size %"PRIu64, path, jd->size);
- break;
- }
-}
-
-static int replay_journal_entry(struct journal_descriptor *jd)
-{
- char path[PATH_MAX];
- ssize_t size;
- int fd, flags = O_WRONLY, ret = 0;
- void *buf;
- char *p = (char *)jd;
-
- if (jd->create)
- flags |= O_CREAT;
-
- journal_get_path(jd, path);
- fd = open(path, flags, def_fmode);
- if (fd < 0) {
- sd_eprintf("open %m");
- return -1;
- }
-
- if (jd->create && jd->flag == JF_STORE) {
- ret = prealloc(fd, get_objsize(jd->oid));
- if (ret < 0)
- goto out;
- }
- buf = xmalloc(jd->size);
- p += JOURNAL_DESC_SIZE;
- memcpy(buf, p, jd->size);
- size = xpwrite(fd, buf, jd->size, jd->offset);
- if (size != jd->size) {
- sd_eprintf("write %zd, size %zu, errno %m", size, jd->size);
- ret = -1;
- goto out;
- }
-out:
- close(fd);
- return ret;
-}
-
-static int do_recover(int fd)
-{
- struct journal_descriptor *jd;
- void *map;
- char *p, *end;
- struct stat st;
-
- if (fstat(fd, &st) < 0) {
- sd_eprintf("fstat %m");
- return -1;
- }
-
- map = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
- close(fd);
- if (map == MAP_FAILED) {
- sd_eprintf("%m");
- return -1;
- }
-
- end = (char *)map + st.st_size;
- for (p = map; p < end;) {
- jd = (struct journal_descriptor *)p;
- if (jd->magic != JOURNAL_DESC_MAGIC) {
- /* Empty area */
- p += SECTOR_SIZE;
- continue;
- }
- /* We skip partial write because it is not acked back to VM */
- if (!journal_entry_full_write(jd))
- goto skip;
-
- if (replay_journal_entry(jd) < 0)
- return -1;
-skip:
- p += JOURNAL_META_SIZE + roundup(jd->size, SECTOR_SIZE);
- }
- munmap(map, st.st_size);
- /* Do a final sync() to assure data is reached to the disk */
- sync();
- return 0;
-}
-
-/*
- * We recover the journal file in order of wall time in the corner case that
- * sheep crashes while in the middle of journal committing. For most of cases,
- * we actually only recover one jfile, the other would be empty. This process
- * is fast with buffered IO that only take several secends at most.
- */
-static int check_recover_journal_file(const char *p)
-{
- int old = 0, new = 0;
-
- if (get_old_new_jfile(p, &old, &new) < 0)
- return -1;
-
- /* No journal file found */
- if (old == 0)
- return 0;
-
- if (do_recover(old) < 0)
- return -1;
- if (do_recover(new) < 0)
- return -1;
-
- return 0;
-}
-
-int journal_file_init(const char *path, size_t size, bool skip)
-{
- int fd;
-
- if (!skip && check_recover_journal_file(path) < 0)
- return -1;
-
- jfile_size = (size * 1024 * 1024) / 2;
-
- fd = create_journal_file(path, jfile_name[0]);
- if (fd < 0)
- return -1;
- jfile.fd = jfile_fds[0] = fd;
-
- fd = create_journal_file(path, jfile_name[1]);
- jfile_fds[1] = fd;
-
- pthread_spin_init(&jfile_lock, PTHREAD_PROCESS_PRIVATE);
- return 0;
-}
-
-static inline bool jfile_enough_space(size_t size)
-{
- if (jfile.pos + size > jfile_size)
- return false;
- return true;
-}
-
-/*
- * We rely on the kernel's page cache to cache data objects to 1) boost read
- * perfmance 2) simplify read path so that data commiting is simply a
- * sync() operation and We do it in a dedicated thread to avoid blocking
- * the writer by switch back and forth between two journal files.
- */
-static void *commit_data(void *ignored)
-{
- int err;
-
- /* Tell runtime to release resources after termination */
- err = pthread_detach(pthread_self());
- if (err)
- panic("%s", strerror(err));
-
- sync();
- if (ftruncate(jfile.commit_fd, 0) < 0)
- panic("truncate %m");
- if (prealloc(jfile.commit_fd, jfile_size) < 0)
- panic("prealloc");
-
- uatomic_set_false(&jfile.in_commit);
-
- pthread_exit(NULL);
-}
-
-/* FIXME: Try not sleep inside lock */
-static void switch_journal_file(void)
-{
- int old = jfile.fd, err;
- pthread_t thread;
-
-retry:
- if (!uatomic_set_true(&jfile.in_commit)) {
- sd_eprintf("journal file in committing, "
- "you might need enlarge jfile size");
- usleep(100000); /* Wait until committing is finished */
- goto retry;
- }
-
- if (old == jfile_fds[0])
- jfile.fd = jfile_fds[1];
- else
- jfile.fd = jfile_fds[0];
- jfile.commit_fd = old;
- jfile.pos = 0;
-
- err = pthread_create(&thread, NULL, commit_data, NULL);
- if (err)
- panic("%s", strerror(err));
-}
-
-static int journal_file_write(struct journal_descriptor *jd, const char *buf)
-{
- uint32_t marker = JOURNAL_END_MARKER;
- int ret = SD_RES_SUCCESS;
- uint64_t size = jd->size;
- ssize_t written, rusize = roundup(size, SECTOR_SIZE),
- wsize = JOURNAL_META_SIZE + rusize;
- off_t woff;
- char *wbuffer, *p;
-
- pthread_spin_lock(&jfile_lock);
- if (!jfile_enough_space(wsize))
- switch_journal_file();
- woff = jfile.pos;
- jfile.pos += wsize;
- pthread_spin_unlock(&jfile_lock);
-
- p = wbuffer = xvalloc(wsize);
- memcpy(p, jd, JOURNAL_DESC_SIZE);
- p += JOURNAL_DESC_SIZE;
- memcpy(p, buf, size);
- p += size;
- if (size < rusize) {
- memset(p, 0, rusize - size);
- p += rusize - size;
- }
- memcpy(p, &marker, JOURNAL_MARKER_SIZE);
- /*
- * Concurrent writes with the same FD is okay because we don't have any
- * critical sections that need lock inside kernel write path, since we
- * a) bypass page cache, b) don't modify i_size of this inode.
- *
- * Feel free to correct me If I am wrong.
- */
- written = xpwrite(jfile.fd, wbuffer, wsize, woff);
- if (written != wsize) {
- sd_eprintf("failed, written %zd, len %zu", written, wsize);
- /* FIXME: teach journal file handle EIO gracefully */
- ret = SD_RES_EIO;
- goto out;
- }
-out:
- free(wbuffer);
- return ret;
-}
-
-int journal_write_store(uint64_t oid, const char *buf, size_t size,
- off_t offset, bool create)
-{
- struct journal_descriptor jd = {
- .magic = JOURNAL_DESC_MAGIC,
- .flag = JF_STORE,
- .offset = offset,
- .size = size,
- .oid = oid,
- .create = create,
- };
- /* We have to explicitly do assignment to get all GCC compatible */
- jd.oid = oid;
- return journal_file_write(&jd, buf);
-}
-
-int journal_write_epoch(const char *buf, size_t size, uint32_t epoch)
-{
- struct journal_descriptor jd = {
- .magic = JOURNAL_DESC_MAGIC,
- .flag = JF_EPOCH,
- .offset = 0,
- .size = size,
- .create = true,
- };
- jd.epoch = epoch;
- return journal_file_write(&jd, buf);
-}
-
-int journal_write_confi(const char *buf, size_t size)
-{
- struct journal_descriptor jd = {
- .magic = JOURNAL_DESC_MAGIC,
- .flag = JF_CONFI,
- .offset = 0,
- .size = size,
- .create = true,
- };
- return journal_file_write(&jd, buf);
-}
diff --git a/sheep/ops.c b/sheep/ops.c
index 9c0a830..a0502b2 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -223,13 +223,7 @@ static int remove_epoch(uint32_t epoch)
return SD_RES_EIO;
}
- snprintf(path, sizeof(path), "%s%08u/", jrnl_path, epoch);
- ret = rmdir_r(path);
- if (ret && ret != -ENOENT) {
- sd_eprintf("failed to remove %s: %s", path, strerror(-ret));
- return SD_RES_EIO;
- }
- return 0;
+ return SD_RES_EIO;
}
static int cluster_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 24b4014..0d1b064 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -205,7 +205,6 @@ static inline struct store_driver *find_store_driver(const char *name)
extern struct cluster_info *sys;
extern struct store_driver *sd_store;
extern char *obj_path;
-extern char *jrnl_path;
extern char *epoch_path;
extern mode_t def_fmode;
extern mode_t def_dmode;
@@ -336,12 +335,6 @@ int do_process_main(const struct sd_op_template *op, const struct sd_req *req,
int sheep_do_op_work(const struct sd_op_template *op, struct request *req);
int gateway_to_peer_opcode(int opcode);
-/* Journal */
-struct jrnl_descriptor *jrnl_begin(const void *buf, size_t count, off_t offset,
- const char *path, const char *jrnl_dir);
-int jrnl_end(struct jrnl_descriptor *jd);
-int jrnl_recover(const char *jrnl_dir);
-
static inline bool is_myself(const uint8_t *addr, uint16_t port)
{
return (memcmp(addr, sys->this_node.nid.addr,
diff --git a/sheep/store.c b/sheep/store.c
index 4a28cdc..5b1e457 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -29,7 +29,6 @@
#include "farm/farm.h"
char *obj_path;
-char *jrnl_path;
char *epoch_path;
mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;
@@ -283,31 +282,6 @@ static int init_epoch_path(const char *base_path)
return init_path(epoch_path, NULL);
}
-#define JRNL_PATH "/journal/"
-
-static int init_jrnl_path(const char *base_path)
-{
- int ret, len = strlen(base_path) + strlen(JRNL_PATH) + 1;
- bool new;
-
- /* Create journal directory */
- jrnl_path = xzalloc(len);
- snprintf(jrnl_path, len, "%s" JRNL_PATH, base_path);
-
- ret = init_path(jrnl_path, &new);
- /* Error during directory creation */
- if (ret)
- return ret;
-
- /* If journal is newly created */
- if (new)
- return 0;
-
- jrnl_recover(jrnl_path);
-
- return 0;
-}
-
/*
* If the node is gateway, this function only finds the store driver.
* Otherwise, this function initializes the backend store
@@ -407,10 +381,6 @@ int init_global_pathnames(const char *d, char *argp)
if (ret)
return ret;
- ret = init_jrnl_path(d);
- if (ret)
- return ret;
-
init_config_path(d);
return 0;
--
1.7.9.5
More information about the sheepdog
mailing list