[sheepdog] [RFC PATCH 3/3] sheep: introduce journal file to boost IO performance

Liu Yuan namei.unix at gmail.com
Sat Nov 3 16:09:47 CET 2012


From: Liu Yuan <tailai.ly at taobao.com>

The basic the idea is very simple: use a dedicated device to log all the IO
operations in a sequential manner and then we are safe to change the backend IO
operations from O_DSYNC & O_DIRECT into O_RDWR (buffered IO), which will
benefit us both read & write performance a lot.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 include/util.h       |    5 +
 sheep/Makefile.am    |    2 +-
 sheep/journal_file.c |  386 ++++++++++++++++++++++++++++++++++++++++++++++++++
 sheep/plain_store.c  |   26 +++-
 sheep/sheep.c        |   15 +-
 sheep/sheep_priv.h   |    8 +-
 sheep/store.c        |    9 +-
 7 files changed, 438 insertions(+), 13 deletions(-)
 create mode 100644 sheep/journal_file.c

diff --git a/include/util.h b/include/util.h
index 5fb19c2..a0d8186 100644
--- a/include/util.h
+++ b/include/util.h
@@ -38,6 +38,7 @@
 #endif
 
 #define notrace __attribute__((no_instrument_function))
+#define __packed __attribute((packed))
 
 #define uninitialized_var(x) (x = x)
 
@@ -68,6 +69,10 @@ static inline void *zalloc(size_t size)
 	return calloc(1, size);
 }
 
+#define __round_mask(x, y) ((__typeof__(x))((y)-1))
+#define round_up(x, y) ((((x)-1) | __round_mask(x, y))+1)
+#define round_down(x, y) ((x) & ~__round_mask(x, y))
+
 typedef void (*try_to_free_t)(size_t);
 extern try_to_free_t set_try_to_free_routine(try_to_free_t);
 
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index e7b4f53..0ae19de 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -27,7 +27,7 @@ sbin_PROGRAMS		= sheep
 sheep_SOURCES		= sheep.c group.c request.c gateway.c store.c vdi.c work.c \
 			  journal.c ops.c recovery.c cluster/local.c \
 			  object_cache.c object_list_cache.c sockfd_cache.c \
-			  plain_store.c config.c migrate.c
+			  plain_store.c config.c migrate.c journal_file.c
 
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
diff --git a/sheep/journal_file.c b/sheep/journal_file.c
new file mode 100644
index 0000000..110a034
--- /dev/null
+++ b/sheep/journal_file.c
@@ -0,0 +1,386 @@
+/*
+ * Copyright (C) 2012 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <sys/mman.h>
+
+#include "sheep_priv.h"
+
+struct journal_file {
+	int fd;
+	off_t pos;
+	int commit_fd;
+	uatomic_bool in_commit;
+};
+
+struct journal_descriptor {
+	uint32_t magic;
+	uint64_t oid;
+	uint64_t offset;
+	uint64_t size;
+	uint8_t create;
+	uint8_t pad[479];
+} __packed;
+
+/* JOURNAL_DESC + JOURNAL_MARKER must be 512 algined for DIO */
+#define JOURNAL_DESC_MAGIC 0xfee1900d
+#define JOURNAL_DESC_SIZE 508
+#define JOURNAL_MARKER_SIZE 4 /* Use marker to detect partial write */
+#define JOURNAL_META_SIZE SECTOR_SIZE
+
+#define JOURNAL_FILE_SIZE (1024*1024*256) /* 256M */
+#define JOURNAL_END_MARKER 0xdeadbeef
+
+static const char *jfile_name[2] = { "journal_file0", "journal_file1", };
+static int jfile_fds[2];
+
+static struct journal_file jfile;
+static pthread_spinlock_t jfile_lock;
+
+static int zero_out_jfile(int fd)
+{
+	char *buf;
+	ssize_t wlen;
+
+	buf = valloc(JOURNAL_FILE_SIZE);
+	memset(buf, 0, JOURNAL_FILE_SIZE);
+	wlen = xpwrite(fd, buf, JOURNAL_FILE_SIZE, 0);
+	if (wlen != JOURNAL_FILE_SIZE) {
+		eprintf("WARN: failed, %m\n");
+		return -1;
+	}
+
+	free(buf);
+	return 0;
+}
+
+static int create_journal_file(const char *root, const char *name)
+{
+	int fd, flags = O_DSYNC | O_RDWR | O_TRUNC | O_CREAT | O_DIRECT;
+	char path[PATH_MAX];
+
+	sprintf(path, "%s/%s", root, name);
+	fd = open(path, flags, 0644);
+	if (fd < 0) {
+		eprintf("open %s %m\n", name);
+		return -1;
+	}
+	if (prealloc(fd, JOURNAL_FILE_SIZE) < 0) {
+		eprintf("prealloc %s %m\n", name);
+		return -1;
+	}
+
+	/* Turn unwritten extents of FS into written ones for faster write */
+	if (zero_out_jfile(fd) < 0) {
+		close(fd);
+		return -1;
+	}
+	return fd;
+}
+
+/* We should have two valid FDs, otherwise something goes wrong */
+static int get_old_new_jfile(const char *p, int *old, int *new)
+{
+	int fd1, fd2;
+	int flags = O_RDONLY;
+	char path[PATH_MAX];
+	struct stat st1, st2;
+
+	sprintf(path, "%s/%s", p, jfile_name[0]);
+	fd1 = open(path, flags);
+	if (fd1 < 0) {
+		if (errno != EEXIST)
+			return 0;
+
+		eprintf("open1 %m\n");
+		return -1;
+	}
+	sprintf(path, "%s/%s", p, jfile_name[1]);
+	fd2 = open(path, flags);
+	if (fd2 < 0) {
+		eprintf("open2 %m\n");
+		close(fd1);
+		return -1;
+	}
+
+	if (fstat(fd1, &st1) < 0 || fstat(fd2, &st2) < 0) {
+		eprintf("stat %m\n");
+		goto out;
+	}
+
+	if (st1.st_mtime < st2.st_mtime) {
+		*old = fd1;
+		*new = fd2;
+	} else {
+		*old = fd2;
+		*new = fd1;
+	}
+
+	return 0;
+out:
+	close(fd1);
+	close(fd2);
+	return -1;
+}
+
+static bool journal_entry_full_write(struct journal_descriptor *jd)
+{
+	char *end = (char *)jd +
+		round_up(jd->size, SECTOR_SIZE) + JOURNAL_META_SIZE;
+	uint32_t marker = *(((uint32_t *)end) - 1);
+
+	if (marker != JOURNAL_END_MARKER)
+		return false;
+	return true;
+}
+
+static int replay_journal_entry(struct journal_descriptor *jd)
+{
+	char path[PATH_MAX];
+	ssize_t size;
+	int fd, flags = O_WRONLY, ret = 0;
+	void *buf;
+	char *p = (char *)jd;
+
+	dprintf("%"PRIx64", size %"PRIu64", off %"PRIu64", %d\n",
+		jd->oid, jd->size, jd->offset, jd->create);
+
+	if (jd->create)
+		flags |= O_CREAT;
+	sprintf(path, "%s%016" PRIx64, obj_path, jd->oid);
+	fd = open(path, flags, def_fmode);
+	if (fd < 0) {
+		eprintf("open %m\n");
+		return -1;
+	}
+
+	buf = malloc(jd->size);
+	p += JOURNAL_DESC_SIZE;
+	memcpy(buf, p, jd->size);
+	size = xpwrite(fd, buf, jd->size, jd->offset);
+	if (size != jd->size) {
+		eprintf("write %zd, size %zu, errno %m\n", size, jd->size);
+		ret = -1;
+		goto out;
+	}
+out:
+	close(fd);
+	return ret;
+}
+
+static bool journal_descriptor_valid(struct journal_descriptor *jd)
+{
+	if (jd->magic != JOURNAL_DESC_MAGIC)
+		return false;
+	if (jd->oid == 0)
+		return false;
+	if (jd->size > SD_INODE_SIZE)
+		return false;
+	if (jd->create > 1)
+		return false;
+	return true;
+}
+
+static int do_recover(int fd)
+{
+	struct journal_descriptor *jd;
+	void *map;
+	char *p, *end;
+
+	map = mmap(NULL, JOURNAL_FILE_SIZE, PROT_READ, MAP_PRIVATE, fd, 0);
+	close(fd);
+	if (map == MAP_FAILED) {
+		eprintf("%m\n");
+		return -1;
+	}
+
+	end = (char *)map + JOURNAL_FILE_SIZE;
+	for (p = map; p < end;) {
+		jd = (struct journal_descriptor *)p;
+		if (!journal_descriptor_valid(jd)) {
+			/* Empty area */
+			p += SECTOR_SIZE;
+			continue;
+		}
+		/* We skip partial write because it is not acked back to VM */
+		if (!journal_entry_full_write(jd))
+			goto skip;
+
+		if (replay_journal_entry(jd) < 0)
+			return -1;
+skip:
+		p += JOURNAL_META_SIZE + round_up(jd->size, SECTOR_SIZE);
+	}
+	munmap(map, JOURNAL_FILE_SIZE);
+	/* Do a final sync() to assure data is reached to the disk */
+	sync();
+	close(fd);
+	return 0;
+}
+
+/*
+ * We recover the journal file in order of wall time in the corner case that
+ * sheep crashes while in the middle of journal committing. For most of cases,
+ * we actually only recover one jfile, the other would be empty. This process
+ * is fast with buffered IO that only take several secends at most.
+ */
+static int check_recover_journal_file(const char *p)
+{
+	int old = 0, new = 0;
+
+	if (get_old_new_jfile(p, &old, &new) < 0)
+		return -1;
+
+	/* No journal file found */
+	if (old == 0)
+		return 0;
+
+	if (do_recover(old) < 0)
+		return -1;
+	if (do_recover(new) < 0)
+		return -1;
+
+	return 0;
+}
+
+int journal_file_init(const char *path)
+{
+	int fd;
+
+	if (check_recover_journal_file(path) < 0)
+		return -1;
+
+	fd = create_journal_file(path, jfile_name[0]);
+	if (fd < 0)
+		return -1;
+	jfile.fd = jfile_fds[0] = fd;
+
+	fd = create_journal_file(path, jfile_name[1]);
+	jfile_fds[1] = fd;
+
+	pthread_spin_init(&jfile_lock, PTHREAD_PROCESS_PRIVATE);
+	return 0;
+}
+
+static bool jfile_enough_space(size_t size)
+{
+	if (jfile.pos + size > JOURNAL_FILE_SIZE)
+		return false;
+	return true;
+}
+
+/*
+ * We rely on the kernel's page cache to cache data objects to 1) boost read
+ * perfmance 2) simplify read path so that journal file commiting is simply a
+ * sync() operation and We do it in a dedicated thread to avoid blocking
+ * the writer by switch back and forth between two journal files.
+ */
+static void *journal_file_commit(void *ignored)
+{
+	int err;
+
+	/* Tell runtime to release resources after termination */
+	err = pthread_detach(pthread_self());
+	if (err)
+		panic("%s\n", strerror(err));
+
+	sync();
+	if (zero_out_jfile(jfile.commit_fd) < 0)
+		panic("failed to zero journal file\n");
+	uatomic_set_false(&jfile.in_commit);
+
+	pthread_exit(NULL);
+}
+
+/* FIXME: Try not sleep inside lock */
+static void switch_journal_file(void)
+{
+	int old = jfile.fd, err;
+	pthread_t thread;
+
+retry:
+	if (!uatomic_set_true(&jfile.in_commit)) {
+		eprintf("journal file in committing, "
+			"you might need enlarge jfile size\n");
+		usleep(100000); /* Wait until committing is finished */
+		goto retry;
+	}
+
+	if (old == jfile_fds[0])
+		jfile.fd = jfile_fds[1];
+	else
+		jfile.fd = jfile_fds[0];
+	jfile.commit_fd = old;
+	jfile.pos = 0;
+
+	err = pthread_create(&thread, NULL, journal_file_commit, NULL);
+	if (err)
+		panic("%s\n", strerror(err));
+}
+
+int journal_file_write(uint64_t oid, const char *buf, size_t size,
+		       off_t offset, bool create)
+{
+	struct journal_descriptor jd;
+	uint32_t marker = JOURNAL_END_MARKER;
+	int ret = SD_RES_SUCCESS;
+	ssize_t written, rusize = round_up(size, SECTOR_SIZE),
+		wsize = JOURNAL_META_SIZE + rusize;
+	off_t woff;
+	char *wbuffer, *p;
+
+	jd.magic = JOURNAL_DESC_MAGIC;
+	jd.offset = offset;
+	jd.size = size;
+	jd.oid = oid;
+	jd.create = create;
+
+	pthread_spin_lock(&jfile_lock);
+	if (!jfile_enough_space(wsize))
+		switch_journal_file();
+	woff = jfile.pos;
+	jfile.pos += wsize;
+	pthread_spin_unlock(&jfile_lock);
+
+	p = wbuffer = valloc(wsize);
+	if (!wbuffer)
+		panic("%m\n");
+	memcpy(p, &jd, JOURNAL_DESC_SIZE);
+	p += JOURNAL_DESC_SIZE;
+	memcpy(p, buf, rusize);
+	p += rusize;
+	memcpy(p, &marker, JOURNAL_MARKER_SIZE);
+
+	dprintf("oid %lx, pos %zu, wsize %zu\n", oid, jfile.pos, wsize);
+	/*
+	 * Concurrent writes with the same FD is okay because we don't have any
+	 * critical sections that need lock inside kernel write path, since we
+	 * a) bypass page cache, b) don't modify i_size of this inode.
+	 *
+	 * Feel free to correct me If I am wrong.
+	 */
+	written = xpwrite(jfile.fd, wbuffer, wsize, woff);
+	if (written != wsize) {
+		eprintf("failed, written %zd, len %zu\n", written, wsize);
+		ret = err_to_sderr(oid, errno);
+		goto out;
+	}
+out:
+	free(wbuffer);
+	return ret;
+}
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index 908f761..834ecb0 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -24,10 +24,11 @@ static int get_open_flags(uint64_t oid, bool create, int fl)
 {
 	int flags = O_DSYNC | O_RDWR;
 
-	if (fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled())
+	if ((fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled()) ||
+	    uatomic_is_true(&sys->use_journal))
 		flags &= ~O_DSYNC;
 
-	if (is_data_obj(oid))
+	if (is_data_obj(oid) && !uatomic_is_true(&sys->use_journal))
 		flags |= O_DIRECT;
 
 	if (create)
@@ -108,7 +109,7 @@ bool default_exist(uint64_t oid)
 	return true;
 }
 
-static int err_to_sderr(uint64_t oid, int err)
+int err_to_sderr(uint64_t oid, int err)
 {
 	struct stat s;
 
@@ -143,6 +144,16 @@ int default_write(uint64_t oid, const struct siocb *iocb)
 	}
 
 	get_obj_path(oid, path);
+
+	if (uatomic_is_true(&sys->use_journal) &&
+	    journal_file_write(oid, iocb->buf, iocb->length, iocb->offset, 0)
+	    != SD_RES_SUCCESS) {
+		eprintf("turn off journaling\n");
+		uatomic_set_false(&sys->use_journal);
+		flags |= O_DSYNC | O_DIRECT;
+		sync();
+	}
+
 	fd = open(path, flags, def_fmode);
 	if (fd < 0)
 		return err_to_sderr(oid, errno);
@@ -305,6 +316,15 @@ int default_create_and_write(uint64_t oid, const struct siocb *iocb)
 	get_obj_path(oid, path);
 	get_tmp_obj_path(oid, tmp_path);
 
+	if (uatomic_is_true(&sys->use_journal) &&
+	    journal_file_write(oid, iocb->buf, iocb->length, iocb->offset, 1)
+	    != SD_RES_SUCCESS) {
+		eprintf("turn off journaling\n");
+		uatomic_set_false(&sys->use_journal);
+		flags |= O_DSYNC | O_DIRECT;
+		sync();
+	}
+
 	fd = open(tmp_path, flags, def_fmode);
 	if (fd < 0) {
 		if (errno == EEXIST) {
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 3ec2c4d..376cbad 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -46,7 +46,7 @@ static struct option const long_options[] = {
 	{"foreground", no_argument, NULL, 'f'},
 	{"gateway", no_argument, NULL, 'g'},
 	{"help", no_argument, NULL, 'h'},
-	{"journal", no_argument, NULL, 'j'},
+	{"journal", required_argument, NULL, 'j'},
 	{"loglevel", required_argument, NULL, 'l'},
 	{"myaddr", required_argument, NULL, 'y'},
 	{"stdout", no_argument, NULL, 'o'},
@@ -59,7 +59,7 @@ static struct option const long_options[] = {
 	{NULL, 0, NULL, 0},
 };
 
-static const char *short_options = "b:c:dDfghjl:op:P:s:uw:y:z:";
+static const char *short_options = "b:c:dDfghj:l:op:P:s:uw:y:z:";
 
 static void usage(int status)
 {
@@ -77,7 +77,7 @@ Options:\n\
   -f, --foreground        make the program run in the foreground\n\
   -g, --gateway           make the progam run as a gateway mode\n\
   -h, --help              display this help and exit\n\
-  -j, --journal           use jouranl to update vdi objects\n\
+  -j, --journal           use jouranl file to log all the write operations\n\
   -l, --loglevel          specify the level of logging detail\n\
   -o, --stdout            log to stdout instead of shared logger\n\
   -p, --port              specify the TCP port on which to listen\n\
@@ -332,6 +332,7 @@ int main(int argc, char **argv)
 	unsigned char buf[sizeof(struct in6_addr)];
 	int ipv4 = 0;
 	int ipv6 = 0;
+	char journal_path[PATH_MAX];
 
 	signal(SIGPIPE, SIG_IGN);
 
@@ -411,7 +412,8 @@ int main(int argc, char **argv)
 		case 'c':
 			sys->cdrv = find_cdrv(optarg);
 			if (!sys->cdrv) {
-				fprintf(stderr, "Invalid cluster driver '%s'\n", optarg);
+				fprintf(stderr, "Invalid cluster driver '%s'\n",
+					optarg);
 				fprintf(stderr, "Supported drivers:");
 				FOR_EACH_CLUSTER_DRIVER(cdrv) {
 					fprintf(stderr, " %s", cdrv->name);
@@ -426,7 +428,8 @@ int main(int argc, char **argv)
 			init_cache_type(optarg);
 			break;
 		case 'j':
-			sys->use_journal = true;
+			uatomic_set_true(&sys->use_journal);
+			sprintf(journal_path, "%s", optarg);
 			break;
 		case 'b':
 			/* validate provided address using inet_pton */
@@ -471,7 +474,7 @@ int main(int argc, char **argv)
 	if (ret)
 		exit(1);
 
-	ret = init_store(dir);
+	ret = init_store(dir, journal_path);
 	if (ret)
 		exit(1);
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 0480e26..f849400 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -122,7 +122,7 @@ struct cluster_info {
 	uint32_t object_cache_size;
 	bool object_cache_directio;
 
-	bool use_journal;
+	uatomic_bool use_journal;
 	bool upgrade; /* upgrade data layout before starting service
 		       * if necessary*/
 };
@@ -183,6 +183,7 @@ int default_remove_object(uint64_t oid);
 int default_purge_obj(void);
 int for_each_object_in_wd(int (*func)(uint64_t oid, void *arg), bool cleanup,
 			  void *arg);
+int err_to_sderr(uint64_t oid, int err);
 
 extern struct list_head store_drivers;
 #define add_store_driver(driver)                                 \
@@ -218,7 +219,7 @@ static inline uint32_t sys_epoch(void)
 
 int create_listen_port(char *bindaddr, int port, void *data);
 
-int init_store(const char *dir);
+int init_store(const char *dir, const char *journal);
 int init_base_path(const char *dir);
 
 int fill_vdi_copy_list(void *data);
@@ -417,4 +418,7 @@ static inline bool is_disk_cache_enabled(void)
 	return !!(sys->enabled_cache_type & CACHE_TYPE_DISK);
 }
 
+/* journal_file.c */
+int journal_file_init(const char *p);
+int journal_file_write(uint64_t oid, const char *buf, size_t size, off_t, bool);
 #endif
diff --git a/sheep/store.c b/sheep/store.c
index 653fff5..e212965 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -402,7 +402,7 @@ out:
 	return ret;
 }
 
-int init_store(const char *d)
+int init_store(const char *d, const char *j)
 {
 	int ret;
 
@@ -430,6 +430,13 @@ int init_store(const char *d)
 	if (ret)
 		return ret;
 
+	/* We should init journal file before backend init */
+	if (uatomic_is_true(&sys->use_journal)) {
+		ret = journal_file_init(j);
+		if (ret)
+			return ret;
+	}
+
 	if (!sys->gateway_only) {
 		ret = init_store_driver();
 		if (ret)
-- 
1.7.9.5




More information about the sheepdog mailing list