[sheepdog] [PATCH 5/5] sheep: rename journal_file.c as journal.c

Liu Yuan namei.unix at gmail.com
Tue Apr 2 11:23:02 CEST 2013


From: Liu Yuan <tailai.ly at taobao.com>

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/Makefile.am    |    2 +-
 sheep/journal.c      |  571 +++++++++++++++++++++++++++++++-------------------
 sheep/journal_file.c |  427 -------------------------------------
 sheep/ops.c          |    8 +-
 sheep/sheep_priv.h   |    7 -
 sheep/store.c        |   30 ---
 6 files changed, 355 insertions(+), 690 deletions(-)
 delete mode 100644 sheep/journal_file.c

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 7f87616..2592528 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -27,7 +27,7 @@ sbin_PROGRAMS		= sheep
 sheep_SOURCES		= sheep.c group.c request.c gateway.c store.c vdi.c work.c \
 			  journal.c ops.c recovery.c cluster/local.c \
 			  object_cache.c object_list_cache.c sockfd_cache.c \
-			  plain_store.c config.c migrate.c journal_file.c md.c
+			  plain_store.c config.c migrate.c md.c
 
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
diff --git a/sheep/journal.c b/sheep/journal.c
index fafb8f9..694cc9f 100644
--- a/sheep/journal.c
+++ b/sheep/journal.c
@@ -1,5 +1,7 @@
 /*
- * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
+ * Copyright (C) 2012 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License version
@@ -8,285 +10,418 @@
  * You should have received a copy of the GNU General Public License
  * along with this program. If not, see <http://www.gnu.org/licenses/>.
  */
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <dirent.h>
 #include <errno.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <sys/mman.h>
 
 #include "sheep_priv.h"
 
-#define JRNL_END_MARK           0x87654321UL
+struct journal_file {
+	int fd;
+	off_t pos;
+	int commit_fd;
+	uatomic_bool in_commit;
+};
 
-/* Journal header for data object */
-struct jrnl_head {
+struct journal_descriptor {
+	uint32_t magic;
+	uint16_t flag;
+	uint16_t reserved;
+	union {
+		uint32_t epoch;
+		uint64_t oid;
+	};
 	uint64_t offset;
 	uint64_t size;
-	char target_path[256];
-};
+	uint8_t create;
+	uint8_t pad[475];
+} __packed;
 
-struct jrnl_descriptor {
-	struct jrnl_head head;
-	const void *data;
-	int fd;      /* Open file fd */
-	int target_fd;
-	char path[256];
-};
+/* JOURNAL_DESC + JOURNAL_MARKER must be 512 algined for DIO */
+#define JOURNAL_DESC_MAGIC 0xfee1900d
+#define JOURNAL_DESC_SIZE 508
+#define JOURNAL_MARKER_SIZE 4 /* Use marker to detect partial write */
+#define JOURNAL_META_SIZE (JOURNAL_DESC_SIZE + JOURNAL_MARKER_SIZE)
 
-/* Journal APIs */
-static int jrnl_open(struct jrnl_descriptor *jd, const char *path)
+#define JOURNAL_END_MARKER 0xdeadbeef
+
+#define JF_STORE 0
+#define JF_EPOCH 1
+#define JF_CONFI 2
+
+static const char *jfile_name[2] = { "journal_file0", "journal_file1", };
+static int jfile_fds[2];
+static size_t jfile_size;
+
+static struct journal_file jfile;
+static pthread_spinlock_t jfile_lock;
+
+static int create_journal_file(const char *root, const char *name)
 {
-	pstrcpy(jd->path, sizeof(jd->path), path);
-	jd->fd = open(path, O_RDONLY);
+	int fd, flags = O_DSYNC | O_RDWR | O_TRUNC | O_CREAT | O_DIRECT;
+	char path[PATH_MAX];
 
-	if (jd->fd < 0) {
-		sd_eprintf("failed to open %s: %m", jd->path);
-		if (errno == ENOENT)
-			return SD_RES_NO_OBJ;
-		else
-			return SD_RES_UNKNOWN;
+	snprintf(path, sizeof(path), "%s/%s", root, name);
+	fd = open(path, flags, 0644);
+	if (fd < 0) {
+		sd_eprintf("open %s %m", name);
+		return -1;
+	}
+	if (prealloc(fd, jfile_size) < 0) {
+		sd_eprintf("prealloc %s %m", name);
+		return -1;
 	}
 
-	return SD_RES_SUCCESS;
+	return fd;
 }
 
-static int jrnl_close(struct jrnl_descriptor *jd)
+/* We should have two valid FDs, otherwise something goes wrong */
+static int get_old_new_jfile(const char *p, int *old, int *new)
 {
-	close(jd->fd);
-	jd->fd = -1;
+	int fd1, fd2;
+	int flags = O_RDONLY;
+	char path[PATH_MAX];
+	struct stat st1, st2;
+
+	snprintf(path, sizeof(path), "%s/%s", p, jfile_name[0]);
+	fd1 = open(path, flags);
+	if (fd1 < 0) {
+		if (errno == ENOENT)
+			return 0;
+
+		sd_eprintf("open1 %m");
+		return -1;
+	}
+	snprintf(path, sizeof(path), "%s/%s", p, jfile_name[1]);
+	fd2 = open(path, flags);
+	if (fd2 < 0) {
+		sd_eprintf("open2 %m");
+		close(fd1);
+		return -1;
+	}
+
+	if (fstat(fd1, &st1) < 0 || fstat(fd2, &st2) < 0) {
+		sd_eprintf("stat %m");
+		goto out;
+	}
+
+	if (st1.st_mtime < st2.st_mtime) {
+		*old = fd1;
+		*new = fd2;
+	} else {
+		*old = fd2;
+		*new = fd1;
+	}
 
 	return 0;
+out:
+	close(fd1);
+	close(fd2);
+	return -1;
 }
 
-static int jrnl_create(struct jrnl_descriptor *jd, const char *jrnl_dir)
+static bool journal_entry_full_write(struct journal_descriptor *jd)
 {
-	snprintf(jd->path, sizeof(jd->path), "%sXXXXXX", jrnl_dir);
-	jd->fd = mkostemp(jd->path, O_DSYNC);
+	char *end = (char *)jd +
+		roundup(jd->size, SECTOR_SIZE) + JOURNAL_META_SIZE;
+	uint32_t marker = *(((uint32_t *)end) - 1);
 
-	if (jd->fd < 0) {
-		sd_eprintf("failed to create %s: %m", jd->path);
-		return SD_RES_UNKNOWN;
-	}
-
-	return SD_RES_SUCCESS;
+	if (marker != JOURNAL_END_MARKER)
+		return false;
+	return true;
 }
 
-static int jrnl_remove(struct jrnl_descriptor *jd)
+static void journal_get_path(struct journal_descriptor *jd, char *path)
 {
-	int ret;
+	switch (jd->flag) {
+	case JF_STORE:
+		snprintf(path, PATH_MAX, "%s/%016"PRIx64,
+			 get_object_path(jd->oid), jd->oid);
+		sd_iprintf("%s, size %"PRIu64", off %"PRIu64", %d",
+			   path, jd->size, jd->offset, jd->create);
+		break;
+	case JF_EPOCH:
+		snprintf(path, PATH_MAX, "%s/%08"PRIu32, epoch_path, jd->epoch);
+		sd_iprintf("%s, %"PRIu32" size %"PRIu64,
+			   path, jd->epoch, jd->size);
+		break;
+	case JF_CONFI:
+		snprintf(path, PATH_MAX, "%s", config_path);
+		sd_iprintf("%s, size %"PRIu64, path, jd->size);
+		break;
+	}
+}
 
-	ret = unlink(jd->path);
-	if (ret) {
-		sd_eprintf("failed to remove %s: %m", jd->path);
-		ret = SD_RES_EIO;
-	} else
-		ret = SD_RES_SUCCESS;
+static int replay_journal_entry(struct journal_descriptor *jd)
+{
+	char path[PATH_MAX];
+	ssize_t size;
+	int fd, flags = O_WRONLY, ret = 0;
+	void *buf;
+	char *p = (char *)jd;
+
+	if (jd->create)
+		flags |= O_CREAT;
+
+	journal_get_path(jd, path);
+	fd = open(path, flags, def_fmode);
+	if (fd < 0) {
+		sd_eprintf("open %m");
+		return -1;
+	}
 
+	if (jd->create && jd->flag == JF_STORE) {
+		ret = prealloc(fd, get_objsize(jd->oid));
+		if (ret < 0)
+			goto out;
+	}
+	buf = xmalloc(jd->size);
+	p += JOURNAL_DESC_SIZE;
+	memcpy(buf, p, jd->size);
+	size = xpwrite(fd, buf, jd->size, jd->offset);
+	if (size != jd->size) {
+		sd_eprintf("write %zd, size %zu, errno %m", size, jd->size);
+		ret = -1;
+		goto out;
+	}
+out:
+	close(fd);
 	return ret;
 }
 
-static int jrnl_write_header(struct jrnl_descriptor *jd)
+static int do_recover(int fd)
 {
-	ssize_t ret;
-	struct jrnl_head *head = (struct jrnl_head *) &jd->head;
+	struct journal_descriptor *jd;
+	void *map;
+	char *p, *end;
+	struct stat st;
 
-	ret = xpwrite(jd->fd, head, sizeof(*head), 0);
+	if (fstat(fd, &st) < 0) {
+		sd_eprintf("fstat %m");
+		return -1;
+	}
 
-	if (ret != sizeof(*head)) {
-		if (errno == ENOSPC)
-			ret = SD_RES_NO_SPACE;
-		else
-			ret = SD_RES_EIO;
-	} else
-		ret = SD_RES_SUCCESS;
+	map = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
+	close(fd);
+	if (map == MAP_FAILED) {
+		sd_eprintf("%m");
+		return -1;
+	}
 
-	return ret;
+	end = (char *)map + st.st_size;
+	for (p = map; p < end;) {
+		jd = (struct journal_descriptor *)p;
+		if (jd->magic != JOURNAL_DESC_MAGIC) {
+			/* Empty area */
+			p += SECTOR_SIZE;
+			continue;
+		}
+		/* We skip partial write because it is not acked back to VM */
+		if (!journal_entry_full_write(jd))
+			goto skip;
+
+		if (replay_journal_entry(jd) < 0)
+			return -1;
+skip:
+		p += JOURNAL_META_SIZE + roundup(jd->size, SECTOR_SIZE);
+	}
+	munmap(map, st.st_size);
+	/* Do a final sync() to assure data is reached to the disk */
+	sync();
+	return 0;
 }
 
-static int jrnl_write_data(struct jrnl_descriptor *jd)
+/*
+ * We recover the journal file in order of wall time in the corner case that
+ * sheep crashes while in the middle of journal committing. For most of cases,
+ * we actually only recover one jfile, the other would be empty. This process
+ * is fast with buffered IO that only take several secends at most.
+ */
+static int check_recover_journal_file(const char *p)
 {
-	ssize_t ret;
-	struct jrnl_head *head = (struct jrnl_head *) &jd->head;
+	int old = 0, new = 0;
 
-	ret = xpwrite(jd->fd, jd->data, head->size, sizeof(*head));
+	if (get_old_new_jfile(p, &old, &new) < 0)
+		return -1;
 
-	if (ret != head->size) {
-		if (errno == ENOSPC)
-			ret = SD_RES_NO_SPACE;
-		else
-			ret = SD_RES_EIO;
-	} else
-		ret = SD_RES_SUCCESS;
+	/* No journal file found */
+	if (old == 0)
+		return 0;
 
-	return ret;
+	if (do_recover(old) < 0)
+		return -1;
+	if (do_recover(new) < 0)
+		return -1;
+
+	return 0;
 }
 
-static int jrnl_write_end_mark(struct jrnl_descriptor *jd)
+int journal_file_init(const char *path, size_t size, bool skip)
 {
-	ssize_t retsize;
-	int ret;
-	uint32_t end_mark = JRNL_END_MARK;
-	struct jrnl_head *head = (struct jrnl_head *) &jd->head;
-
-	retsize = xpwrite(jd->fd, &end_mark, sizeof(end_mark),
-			   sizeof(*head) + head->size);
-
-	if (retsize != sizeof(end_mark)) {
-		if (errno == ENOSPC)
-			ret = SD_RES_NO_SPACE;
-		else
-			ret = SD_RES_EIO;
-	} else
-		ret = SD_RES_SUCCESS;
+	int fd;
 
-	return ret;
-}
+	if (!skip && check_recover_journal_file(path) < 0)
+		return -1;
 
-static int jrnl_apply_to_target_object(struct jrnl_descriptor *jd)
-{
-	char *buf = NULL;
-	int res = 0;
-	ssize_t retsize;
-
-	/* Flush out journal to disk (VDI object) */
-	retsize = xpread(jd->fd, &jd->head, sizeof(jd->head), 0);
-	buf = xzalloc(jd->head.size);
-	retsize = xpread(jd->fd, buf, jd->head.size, sizeof(jd->head));
-	retsize = xpwrite(jd->target_fd, buf, jd->head.size, jd->head.offset);
-	if (retsize != jd->head.size) {
-		if (errno == ENOSPC)
-			res = SD_RES_NO_SPACE;
-		else
-			res = SD_RES_EIO;
-	}
+	jfile_size = (size * 1024 * 1024) / 2;
 
-	/* Clean up */
-	free(buf);
+	fd = create_journal_file(path, jfile_name[0]);
+	if (fd < 0)
+		return -1;
+	jfile.fd = jfile_fds[0] = fd;
 
-	return res;
-}
+	fd = create_journal_file(path, jfile_name[1]);
+	jfile_fds[1] = fd;
 
-/* We cannot use this function for concurrent write operations */
-struct jrnl_descriptor *jrnl_begin(const void *buf, size_t count, off_t offset,
-		 const char *path, const char *jrnl_dir)
-{
-	int ret;
-	struct jrnl_descriptor *jd = xzalloc(sizeof(*jd));
-
-	jd->head.offset = offset;
-	jd->head.size = count;
-	pstrcpy(jd->head.target_path, sizeof(jd->head.target_path), path);
-
-	jd->data = buf;
-
-	ret = jrnl_create(jd, jrnl_dir);
-	if (ret)
-		goto err;
-
-	ret = jrnl_write_header(jd);
-	if (ret)
-		goto err;
-
-	ret = jrnl_write_data(jd);
-	if (ret)
-		goto err;
-
-	ret = jrnl_write_end_mark(jd);
-	if (ret)
-		goto err;
-	return jd;
-err:
-	free(jd);
-	return NULL;
+	pthread_spin_init(&jfile_lock, PTHREAD_PROCESS_PRIVATE);
+	return 0;
 }
 
-int jrnl_end(struct jrnl_descriptor *jd)
+static inline bool jfile_enough_space(size_t size)
 {
-	int ret = 0;
-	if (!jd)
-		return ret;
-
-	ret = jrnl_close(jd);
-	if (ret)
-		goto err;
-
-	ret = jrnl_remove(jd);
-err:
-	free(jd);
-	return ret;
+	if (jfile.pos + size > jfile_size)
+		return false;
+	return true;
 }
 
-int jrnl_recover(const char *jrnl_dir)
+/*
+ * We rely on the kernel's page cache to cache data objects to 1) boost read
+ * perfmance 2) simplify read path so that data commiting is simply a
+ * sync() operation and We do it in a dedicated thread to avoid blocking
+ * the writer by switch back and forth between two journal files.
+ */
+static void *commit_data(void *ignored)
 {
-	DIR *dir;
-	struct dirent *d;
-	char jrnl_file_path[PATH_MAX];
+	int err;
 
-	sd_eprintf("opening the directory %s", jrnl_dir);
-	dir = opendir(jrnl_dir);
-	if (!dir)
-		return -1;
+	/* Tell runtime to release resources after termination */
+	err = pthread_detach(pthread_self());
+	if (err)
+		panic("%s", strerror(err));
 
-	sd_printf(SDOG_NOTICE, "starting journal recovery");
-	while ((d = readdir(dir))) {
-		struct jrnl_descriptor jd;
-		uint32_t end_mark = 0;
-		int ret;
+	sync();
+	if (ftruncate(jfile.commit_fd, 0) < 0)
+		panic("truncate %m");
+	if (prealloc(jfile.commit_fd, jfile_size) < 0)
+		panic("prealloc");
 
-		if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
-			continue;
+	uatomic_set_false(&jfile.in_commit);
 
-		snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%s",
-			 jrnl_dir, d->d_name);
-		ret = jrnl_open(&jd, jrnl_file_path);
-		if (ret) {
-			sd_eprintf("unable to open the journal file %s for"
-				   " reading", jrnl_file_path);
-			goto end_while_3;
-		}
+	pthread_exit(NULL);
+}
 
-		ret = xpread(jd.fd, &jd.head, sizeof(jd.head), 0);
-		if (ret != sizeof(jd.head)) {
-			sd_eprintf("can't read journal head");
-			goto end_while_2;
-		}
+/* FIXME: Try not sleep inside lock */
+static void switch_journal_file(void)
+{
+	int old = jfile.fd, err;
+	pthread_t thread;
+
+retry:
+	if (!uatomic_set_true(&jfile.in_commit)) {
+		sd_eprintf("journal file in committing, "
+			   "you might need enlarge jfile size");
+		usleep(100000); /* Wait until committing is finished */
+		goto retry;
+	}
 
-		ret = xpread(jd.fd, &end_mark, sizeof(end_mark),
-				sizeof(jd.head) + jd.head.size);
-		if (ret != sizeof(end_mark)) {
-			sd_eprintf("can't read journal end mark for object %s",
-				   jd.head.target_path);
-			goto end_while_2;
-		}
+	if (old == jfile_fds[0])
+		jfile.fd = jfile_fds[1];
+	else
+		jfile.fd = jfile_fds[0];
+	jfile.commit_fd = old;
+	jfile.pos = 0;
 
-		if (end_mark != JRNL_END_MARK)
-			goto end_while_2;
+	err = pthread_create(&thread, NULL, commit_data, NULL);
+	if (err)
+		panic("%s", strerror(err));
+}
 
-		jd.target_fd = open(jd.head.target_path, O_DSYNC | O_RDWR);
-		if (jd.target_fd < 0) {
-			sd_eprintf("unable to open the object file %s for"
-				   " recovery", jd.head.target_path);
-			goto end_while_2;
-		}
-		ret = jrnl_apply_to_target_object(&jd);
-		if (ret)
-			sd_eprintf("unable to recover the object %s",
-				   jd.head.target_path);
-
-		close(jd.target_fd);
-		jd.target_fd = -1;
-end_while_2:
-		jrnl_close(&jd);
-end_while_3:
-		sd_printf(SDOG_INFO, "recovered the object %s from the journal",
-			  jrnl_file_path);
-		jrnl_remove(&jd);
+static int journal_file_write(struct journal_descriptor *jd, const char *buf)
+{
+	uint32_t marker = JOURNAL_END_MARKER;
+	int ret = SD_RES_SUCCESS;
+	uint64_t size = jd->size;
+	ssize_t written, rusize = roundup(size, SECTOR_SIZE),
+		wsize = JOURNAL_META_SIZE + rusize;
+	off_t woff;
+	char *wbuffer, *p;
+
+	pthread_spin_lock(&jfile_lock);
+	if (!jfile_enough_space(wsize))
+		switch_journal_file();
+	woff = jfile.pos;
+	jfile.pos += wsize;
+	pthread_spin_unlock(&jfile_lock);
+
+	p = wbuffer = xvalloc(wsize);
+	memcpy(p, jd, JOURNAL_DESC_SIZE);
+	p += JOURNAL_DESC_SIZE;
+	memcpy(p, buf, size);
+	p += size;
+	if (size < rusize) {
+		memset(p, 0, rusize - size);
+		p += rusize - size;
 	}
-	closedir(dir);
-	sd_printf(SDOG_NOTICE, "journal recovery complete");
+	memcpy(p, &marker, JOURNAL_MARKER_SIZE);
+	/*
+	 * Concurrent writes with the same FD is okay because we don't have any
+	 * critical sections that need lock inside kernel write path, since we
+	 * a) bypass page cache, b) don't modify i_size of this inode.
+	 *
+	 * Feel free to correct me If I am wrong.
+	 */
+	written = xpwrite(jfile.fd, wbuffer, wsize, woff);
+	if (written != wsize) {
+		sd_eprintf("failed, written %zd, len %zu", written, wsize);
+		/* FIXME: teach journal file handle EIO gracefully */
+		ret = SD_RES_EIO;
+		goto out;
+	}
+out:
+	free(wbuffer);
+	return ret;
+}
 
-	return 0;
+int journal_write_store(uint64_t oid, const char *buf, size_t size,
+			off_t offset,bool create)
+{
+	struct journal_descriptor jd = {
+		.magic = JOURNAL_DESC_MAGIC,
+		.flag = JF_STORE,
+		.offset = offset,
+		.size = size,
+		.oid = oid,
+		.create = create,
+	};
+	return journal_file_write(&jd, buf);
+}
+
+int journal_write_epoch(const char *buf, size_t size, uint32_t epoch)
+{
+	struct journal_descriptor jd = {
+		.magic = JOURNAL_DESC_MAGIC,
+		.flag = JF_EPOCH,
+		.offset = 0,
+		.size = size,
+		.epoch = epoch,
+		.create = true,
+	};
+	return journal_file_write(&jd, buf);
+}
+
+int journal_write_confi(const char *buf, size_t size)
+{
+	struct journal_descriptor jd = {
+		.magic = JOURNAL_DESC_MAGIC,
+		.flag = JF_CONFI,
+		.offset = 0,
+		.size = size,
+		.create = true,
+	};
+	return journal_file_write(&jd, buf);
 }
diff --git a/sheep/journal_file.c b/sheep/journal_file.c
deleted file mode 100644
index 694cc9f..0000000
--- a/sheep/journal_file.c
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Copyright (C) 2012 Taobao Inc.
- *
- * Liu Yuan <namei.unix at gmail.com>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License version
- * 2 as published by the Free Software Foundation.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-#include <errno.h>
-#include <fcntl.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <unistd.h>
-#include <pthread.h>
-#include <stdint.h>
-#include <sys/mman.h>
-
-#include "sheep_priv.h"
-
-struct journal_file {
-	int fd;
-	off_t pos;
-	int commit_fd;
-	uatomic_bool in_commit;
-};
-
-struct journal_descriptor {
-	uint32_t magic;
-	uint16_t flag;
-	uint16_t reserved;
-	union {
-		uint32_t epoch;
-		uint64_t oid;
-	};
-	uint64_t offset;
-	uint64_t size;
-	uint8_t create;
-	uint8_t pad[475];
-} __packed;
-
-/* JOURNAL_DESC + JOURNAL_MARKER must be 512 algined for DIO */
-#define JOURNAL_DESC_MAGIC 0xfee1900d
-#define JOURNAL_DESC_SIZE 508
-#define JOURNAL_MARKER_SIZE 4 /* Use marker to detect partial write */
-#define JOURNAL_META_SIZE (JOURNAL_DESC_SIZE + JOURNAL_MARKER_SIZE)
-
-#define JOURNAL_END_MARKER 0xdeadbeef
-
-#define JF_STORE 0
-#define JF_EPOCH 1
-#define JF_CONFI 2
-
-static const char *jfile_name[2] = { "journal_file0", "journal_file1", };
-static int jfile_fds[2];
-static size_t jfile_size;
-
-static struct journal_file jfile;
-static pthread_spinlock_t jfile_lock;
-
-static int create_journal_file(const char *root, const char *name)
-{
-	int fd, flags = O_DSYNC | O_RDWR | O_TRUNC | O_CREAT | O_DIRECT;
-	char path[PATH_MAX];
-
-	snprintf(path, sizeof(path), "%s/%s", root, name);
-	fd = open(path, flags, 0644);
-	if (fd < 0) {
-		sd_eprintf("open %s %m", name);
-		return -1;
-	}
-	if (prealloc(fd, jfile_size) < 0) {
-		sd_eprintf("prealloc %s %m", name);
-		return -1;
-	}
-
-	return fd;
-}
-
-/* We should have two valid FDs, otherwise something goes wrong */
-static int get_old_new_jfile(const char *p, int *old, int *new)
-{
-	int fd1, fd2;
-	int flags = O_RDONLY;
-	char path[PATH_MAX];
-	struct stat st1, st2;
-
-	snprintf(path, sizeof(path), "%s/%s", p, jfile_name[0]);
-	fd1 = open(path, flags);
-	if (fd1 < 0) {
-		if (errno == ENOENT)
-			return 0;
-
-		sd_eprintf("open1 %m");
-		return -1;
-	}
-	snprintf(path, sizeof(path), "%s/%s", p, jfile_name[1]);
-	fd2 = open(path, flags);
-	if (fd2 < 0) {
-		sd_eprintf("open2 %m");
-		close(fd1);
-		return -1;
-	}
-
-	if (fstat(fd1, &st1) < 0 || fstat(fd2, &st2) < 0) {
-		sd_eprintf("stat %m");
-		goto out;
-	}
-
-	if (st1.st_mtime < st2.st_mtime) {
-		*old = fd1;
-		*new = fd2;
-	} else {
-		*old = fd2;
-		*new = fd1;
-	}
-
-	return 0;
-out:
-	close(fd1);
-	close(fd2);
-	return -1;
-}
-
-static bool journal_entry_full_write(struct journal_descriptor *jd)
-{
-	char *end = (char *)jd +
-		roundup(jd->size, SECTOR_SIZE) + JOURNAL_META_SIZE;
-	uint32_t marker = *(((uint32_t *)end) - 1);
-
-	if (marker != JOURNAL_END_MARKER)
-		return false;
-	return true;
-}
-
-static void journal_get_path(struct journal_descriptor *jd, char *path)
-{
-	switch (jd->flag) {
-	case JF_STORE:
-		snprintf(path, PATH_MAX, "%s/%016"PRIx64,
-			 get_object_path(jd->oid), jd->oid);
-		sd_iprintf("%s, size %"PRIu64", off %"PRIu64", %d",
-			   path, jd->size, jd->offset, jd->create);
-		break;
-	case JF_EPOCH:
-		snprintf(path, PATH_MAX, "%s/%08"PRIu32, epoch_path, jd->epoch);
-		sd_iprintf("%s, %"PRIu32" size %"PRIu64,
-			   path, jd->epoch, jd->size);
-		break;
-	case JF_CONFI:
-		snprintf(path, PATH_MAX, "%s", config_path);
-		sd_iprintf("%s, size %"PRIu64, path, jd->size);
-		break;
-	}
-}
-
-static int replay_journal_entry(struct journal_descriptor *jd)
-{
-	char path[PATH_MAX];
-	ssize_t size;
-	int fd, flags = O_WRONLY, ret = 0;
-	void *buf;
-	char *p = (char *)jd;
-
-	if (jd->create)
-		flags |= O_CREAT;
-
-	journal_get_path(jd, path);
-	fd = open(path, flags, def_fmode);
-	if (fd < 0) {
-		sd_eprintf("open %m");
-		return -1;
-	}
-
-	if (jd->create && jd->flag == JF_STORE) {
-		ret = prealloc(fd, get_objsize(jd->oid));
-		if (ret < 0)
-			goto out;
-	}
-	buf = xmalloc(jd->size);
-	p += JOURNAL_DESC_SIZE;
-	memcpy(buf, p, jd->size);
-	size = xpwrite(fd, buf, jd->size, jd->offset);
-	if (size != jd->size) {
-		sd_eprintf("write %zd, size %zu, errno %m", size, jd->size);
-		ret = -1;
-		goto out;
-	}
-out:
-	close(fd);
-	return ret;
-}
-
-static int do_recover(int fd)
-{
-	struct journal_descriptor *jd;
-	void *map;
-	char *p, *end;
-	struct stat st;
-
-	if (fstat(fd, &st) < 0) {
-		sd_eprintf("fstat %m");
-		return -1;
-	}
-
-	map = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
-	close(fd);
-	if (map == MAP_FAILED) {
-		sd_eprintf("%m");
-		return -1;
-	}
-
-	end = (char *)map + st.st_size;
-	for (p = map; p < end;) {
-		jd = (struct journal_descriptor *)p;
-		if (jd->magic != JOURNAL_DESC_MAGIC) {
-			/* Empty area */
-			p += SECTOR_SIZE;
-			continue;
-		}
-		/* We skip partial write because it is not acked back to VM */
-		if (!journal_entry_full_write(jd))
-			goto skip;
-
-		if (replay_journal_entry(jd) < 0)
-			return -1;
-skip:
-		p += JOURNAL_META_SIZE + roundup(jd->size, SECTOR_SIZE);
-	}
-	munmap(map, st.st_size);
-	/* Do a final sync() to assure data is reached to the disk */
-	sync();
-	return 0;
-}
-
-/*
- * We recover the journal file in order of wall time in the corner case that
- * sheep crashes while in the middle of journal committing. For most of cases,
- * we actually only recover one jfile, the other would be empty. This process
- * is fast with buffered IO that only take several secends at most.
- */
-static int check_recover_journal_file(const char *p)
-{
-	int old = 0, new = 0;
-
-	if (get_old_new_jfile(p, &old, &new) < 0)
-		return -1;
-
-	/* No journal file found */
-	if (old == 0)
-		return 0;
-
-	if (do_recover(old) < 0)
-		return -1;
-	if (do_recover(new) < 0)
-		return -1;
-
-	return 0;
-}
-
-int journal_file_init(const char *path, size_t size, bool skip)
-{
-	int fd;
-
-	if (!skip && check_recover_journal_file(path) < 0)
-		return -1;
-
-	jfile_size = (size * 1024 * 1024) / 2;
-
-	fd = create_journal_file(path, jfile_name[0]);
-	if (fd < 0)
-		return -1;
-	jfile.fd = jfile_fds[0] = fd;
-
-	fd = create_journal_file(path, jfile_name[1]);
-	jfile_fds[1] = fd;
-
-	pthread_spin_init(&jfile_lock, PTHREAD_PROCESS_PRIVATE);
-	return 0;
-}
-
-static inline bool jfile_enough_space(size_t size)
-{
-	if (jfile.pos + size > jfile_size)
-		return false;
-	return true;
-}
-
-/*
- * We rely on the kernel's page cache to cache data objects to 1) boost read
- * perfmance 2) simplify read path so that data commiting is simply a
- * sync() operation and We do it in a dedicated thread to avoid blocking
- * the writer by switch back and forth between two journal files.
- */
-static void *commit_data(void *ignored)
-{
-	int err;
-
-	/* Tell runtime to release resources after termination */
-	err = pthread_detach(pthread_self());
-	if (err)
-		panic("%s", strerror(err));
-
-	sync();
-	if (ftruncate(jfile.commit_fd, 0) < 0)
-		panic("truncate %m");
-	if (prealloc(jfile.commit_fd, jfile_size) < 0)
-		panic("prealloc");
-
-	uatomic_set_false(&jfile.in_commit);
-
-	pthread_exit(NULL);
-}
-
-/* FIXME: Try not sleep inside lock */
-static void switch_journal_file(void)
-{
-	int old = jfile.fd, err;
-	pthread_t thread;
-
-retry:
-	if (!uatomic_set_true(&jfile.in_commit)) {
-		sd_eprintf("journal file in committing, "
-			   "you might need enlarge jfile size");
-		usleep(100000); /* Wait until committing is finished */
-		goto retry;
-	}
-
-	if (old == jfile_fds[0])
-		jfile.fd = jfile_fds[1];
-	else
-		jfile.fd = jfile_fds[0];
-	jfile.commit_fd = old;
-	jfile.pos = 0;
-
-	err = pthread_create(&thread, NULL, commit_data, NULL);
-	if (err)
-		panic("%s", strerror(err));
-}
-
-static int journal_file_write(struct journal_descriptor *jd, const char *buf)
-{
-	uint32_t marker = JOURNAL_END_MARKER;
-	int ret = SD_RES_SUCCESS;
-	uint64_t size = jd->size;
-	ssize_t written, rusize = roundup(size, SECTOR_SIZE),
-		wsize = JOURNAL_META_SIZE + rusize;
-	off_t woff;
-	char *wbuffer, *p;
-
-	pthread_spin_lock(&jfile_lock);
-	if (!jfile_enough_space(wsize))
-		switch_journal_file();
-	woff = jfile.pos;
-	jfile.pos += wsize;
-	pthread_spin_unlock(&jfile_lock);
-
-	p = wbuffer = xvalloc(wsize);
-	memcpy(p, jd, JOURNAL_DESC_SIZE);
-	p += JOURNAL_DESC_SIZE;
-	memcpy(p, buf, size);
-	p += size;
-	if (size < rusize) {
-		memset(p, 0, rusize - size);
-		p += rusize - size;
-	}
-	memcpy(p, &marker, JOURNAL_MARKER_SIZE);
-	/*
-	 * Concurrent writes with the same FD is okay because we don't have any
-	 * critical sections that need lock inside kernel write path, since we
-	 * a) bypass page cache, b) don't modify i_size of this inode.
-	 *
-	 * Feel free to correct me If I am wrong.
-	 */
-	written = xpwrite(jfile.fd, wbuffer, wsize, woff);
-	if (written != wsize) {
-		sd_eprintf("failed, written %zd, len %zu", written, wsize);
-		/* FIXME: teach journal file handle EIO gracefully */
-		ret = SD_RES_EIO;
-		goto out;
-	}
-out:
-	free(wbuffer);
-	return ret;
-}
-
-int journal_write_store(uint64_t oid, const char *buf, size_t size,
-			off_t offset,bool create)
-{
-	struct journal_descriptor jd = {
-		.magic = JOURNAL_DESC_MAGIC,
-		.flag = JF_STORE,
-		.offset = offset,
-		.size = size,
-		.oid = oid,
-		.create = create,
-	};
-	return journal_file_write(&jd, buf);
-}
-
-int journal_write_epoch(const char *buf, size_t size, uint32_t epoch)
-{
-	struct journal_descriptor jd = {
-		.magic = JOURNAL_DESC_MAGIC,
-		.flag = JF_EPOCH,
-		.offset = 0,
-		.size = size,
-		.epoch = epoch,
-		.create = true,
-	};
-	return journal_file_write(&jd, buf);
-}
-
-int journal_write_confi(const char *buf, size_t size)
-{
-	struct journal_descriptor jd = {
-		.magic = JOURNAL_DESC_MAGIC,
-		.flag = JF_CONFI,
-		.offset = 0,
-		.size = size,
-		.create = true,
-	};
-	return journal_file_write(&jd, buf);
-}
diff --git a/sheep/ops.c b/sheep/ops.c
index 84cb79b..aa2b38d 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -223,13 +223,7 @@ static int remove_epoch(uint32_t epoch)
 		return SD_RES_EIO;
 	}
 
-	snprintf(path, sizeof(path), "%s%08u/", jrnl_path, epoch);
-	ret = rmdir_r(path);
-	if (ret && ret != -ENOENT) {
-		sd_eprintf("failed to remove %s: %s", path, strerror(-ret));
-		return SD_RES_EIO;
-	}
-	return 0;
+	return SD_RES_EIO;
 }
 
 static int cluster_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 24b4014..0d1b064 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -205,7 +205,6 @@ static inline struct store_driver *find_store_driver(const char *name)
 extern struct cluster_info *sys;
 extern struct store_driver *sd_store;
 extern char *obj_path;
-extern char *jrnl_path;
 extern char *epoch_path;
 extern mode_t def_fmode;
 extern mode_t def_dmode;
@@ -336,12 +335,6 @@ int do_process_main(const struct sd_op_template *op, const struct sd_req *req,
 int sheep_do_op_work(const struct sd_op_template *op, struct request *req);
 int gateway_to_peer_opcode(int opcode);
 
-/* Journal */
-struct jrnl_descriptor *jrnl_begin(const void *buf, size_t count, off_t offset,
-				   const char *path, const char *jrnl_dir);
-int jrnl_end(struct jrnl_descriptor *jd);
-int jrnl_recover(const char *jrnl_dir);
-
 static inline bool is_myself(const uint8_t *addr, uint16_t port)
 {
 	return (memcmp(addr, sys->this_node.nid.addr,
diff --git a/sheep/store.c b/sheep/store.c
index 4a28cdc..5b1e457 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -29,7 +29,6 @@
 #include "farm/farm.h"
 
 char *obj_path;
-char *jrnl_path;
 char *epoch_path;
 
 mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;
@@ -283,31 +282,6 @@ static int init_epoch_path(const char *base_path)
 	return init_path(epoch_path, NULL);
 }
 
-#define JRNL_PATH "/journal/"
-
-static int init_jrnl_path(const char *base_path)
-{
-	int ret, len = strlen(base_path) + strlen(JRNL_PATH) + 1;
-	bool new;
-
-	/* Create journal directory */
-	jrnl_path = xzalloc(len);
-	snprintf(jrnl_path, len, "%s" JRNL_PATH, base_path);
-
-	ret = init_path(jrnl_path, &new);
-	/* Error during directory creation */
-	if (ret)
-		return ret;
-
-	/* If journal is newly created */
-	if (new)
-		return 0;
-
-	jrnl_recover(jrnl_path);
-
-	return 0;
-}
-
 /*
  * If the node is gateway, this function only finds the store driver.
  * Otherwise, this function initializes the backend store
@@ -407,10 +381,6 @@ int init_global_pathnames(const char *d, char *argp)
 	if (ret)
 		return ret;
 
-	ret = init_jrnl_path(d);
-	if (ret)
-		return ret;
-
 	init_config_path(d);
 
 	return 0;
-- 
1.7.9.5




More information about the sheepdog mailing list