[sheepdog] [PATCH v2 1/2] sheep: introduce journal file to boost IO performance

Liu Yuan namei.unix at gmail.com
Thu Nov 8 14:26:10 CET 2012


From: Liu Yuan <tailai.ly at taobao.com>

The basic the idea is very simple: use a dedicated device to log all the IO
operations in a sequential manner and then we are safe to change the backend IO
operations from O_DSYNC & O_DIRECT into O_RDWR (buffered IO), which will
benefit us both read & write performance a lot.

Usage:
 $ sheep -j dir=/path/to/dir,size=256, # enable external journaling with the size 256M
 $ sheep -j dir=/path/to/dir,size=256,skip #like above, but skip recovery at startup
 $ sheep -j # enable internal journaling with the default size 512M

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 include/util.h       |   1 +
 sheep/Makefile.am    |   2 +-
 sheep/journal_file.c | 401 +++++++++++++++++++++++++++++++++++++++++++++++++++
 sheep/ops.c          |  43 +-----
 sheep/plain_store.c  |  27 +++-
 sheep/sheep.c        |  50 ++++++-
 sheep/sheep_priv.h   |   7 +-
 sheep/store.c        |   6 +-
 8 files changed, 487 insertions(+), 50 deletions(-)
 create mode 100644 sheep/journal_file.c

diff --git a/include/util.h b/include/util.h
index 5fb19c2..7422dbf 100644
--- a/include/util.h
+++ b/include/util.h
@@ -38,6 +38,7 @@
 #endif
 
 #define notrace __attribute__((no_instrument_function))
+#define __packed __attribute((packed))
 
 #define uninitialized_var(x) (x = x)
 
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index e7b4f53..0ae19de 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -27,7 +27,7 @@ sbin_PROGRAMS		= sheep
 sheep_SOURCES		= sheep.c group.c request.c gateway.c store.c vdi.c work.c \
 			  journal.c ops.c recovery.c cluster/local.c \
 			  object_cache.c object_list_cache.c sockfd_cache.c \
-			  plain_store.c config.c migrate.c
+			  plain_store.c config.c migrate.c journal_file.c
 
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
diff --git a/sheep/journal_file.c b/sheep/journal_file.c
new file mode 100644
index 0000000..563b216
--- /dev/null
+++ b/sheep/journal_file.c
@@ -0,0 +1,401 @@
+/*
+ * Copyright (C) 2012 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <sys/mman.h>
+
+#include "sheep_priv.h"
+
+struct journal_file {
+	int fd;
+	off_t pos;
+	int commit_fd;
+	uatomic_bool in_commit;
+};
+
+struct journal_descriptor {
+	uint32_t magic;
+	uint32_t reserved;
+	uint64_t oid;
+	uint64_t offset;
+	uint64_t size;
+	uint8_t create;
+	uint8_t pad[475];
+} __packed;
+
+/* JOURNAL_DESC + JOURNAL_MARKER must be 512 algined for DIO */
+#define JOURNAL_DESC_MAGIC 0xfee1900d
+#define JOURNAL_DESC_SIZE 508
+#define JOURNAL_MARKER_SIZE 4 /* Use marker to detect partial write */
+#define JOURNAL_META_SIZE SECTOR_SIZE
+
+#define JOURNAL_DEFAULT_SIZE (1024*1024*256) /* 256M */
+#define JOURNAL_END_MARKER 0xdeadbeef
+
+static const char *jfile_name[2] = { "journal_file0", "journal_file1", };
+static int jfile_fds[2];
+static size_t jfile_size = JOURNAL_DEFAULT_SIZE;
+
+static struct journal_file jfile;
+static pthread_spinlock_t jfile_lock;
+
+static int zero_out_jfile(int fd)
+{
+	char *buf;
+	ssize_t wlen;
+
+	buf = valloc(jfile_size);
+	if (!buf)
+		panic("%m\n");
+	memset(buf, 0, jfile_size);
+	wlen = xpwrite(fd, buf, jfile_size, 0);
+	if (wlen != jfile_size) {
+		eprintf("WARN: failed, %m\n");
+		return -1;
+	}
+
+	free(buf);
+	return 0;
+}
+
+static int create_journal_file(const char *root, const char *name)
+{
+	int fd, flags = O_DSYNC | O_RDWR | O_TRUNC | O_CREAT | O_DIRECT;
+	char path[PATH_MAX];
+
+	sprintf(path, "%s/%s", root, name);
+	fd = open(path, flags, 0644);
+	if (fd < 0) {
+		eprintf("open %s %m\n", name);
+		return -1;
+	}
+	if (prealloc(fd, jfile_size) < 0) {
+		eprintf("prealloc %s %m\n", name);
+		return -1;
+	}
+
+	/* Turn unwritten extents of FS into written ones for faster write */
+	if (zero_out_jfile(fd) < 0) {
+		close(fd);
+		return -1;
+	}
+	return fd;
+}
+
+/* We should have two valid FDs, otherwise something goes wrong */
+static int get_old_new_jfile(const char *p, int *old, int *new)
+{
+	int fd1, fd2;
+	int flags = O_RDONLY;
+	char path[PATH_MAX];
+	struct stat st1, st2;
+
+	sprintf(path, "%s/%s", p, jfile_name[0]);
+	fd1 = open(path, flags);
+	if (fd1 < 0) {
+		if (errno != EEXIST)
+			return 0;
+
+		eprintf("open1 %m\n");
+		return -1;
+	}
+	sprintf(path, "%s/%s", p, jfile_name[1]);
+	fd2 = open(path, flags);
+	if (fd2 < 0) {
+		eprintf("open2 %m\n");
+		close(fd1);
+		return -1;
+	}
+
+	if (fstat(fd1, &st1) < 0 || fstat(fd2, &st2) < 0) {
+		eprintf("stat %m\n");
+		goto out;
+	}
+
+	if (st1.st_mtime < st2.st_mtime) {
+		*old = fd1;
+		*new = fd2;
+	} else {
+		*old = fd2;
+		*new = fd1;
+	}
+
+	return 0;
+out:
+	close(fd1);
+	close(fd2);
+	return -1;
+}
+
+static bool journal_entry_full_write(struct journal_descriptor *jd)
+{
+	char *end = (char *)jd +
+		roundup(jd->size, SECTOR_SIZE) + JOURNAL_META_SIZE;
+	uint32_t marker = *(((uint32_t *)end) - 1);
+
+	if (marker != JOURNAL_END_MARKER)
+		return false;
+	return true;
+}
+
+static int replay_journal_entry(struct journal_descriptor *jd)
+{
+	char path[PATH_MAX];
+	ssize_t size;
+	int fd, flags = O_WRONLY, ret = 0;
+	void *buf;
+	char *p = (char *)jd;
+
+	dprintf("%"PRIx64", size %"PRIu64", off %"PRIu64", %d\n",
+		jd->oid, jd->size, jd->offset, jd->create);
+
+	if (jd->create)
+		flags |= O_CREAT;
+	sprintf(path, "%s%016" PRIx64, obj_path, jd->oid);
+	fd = open(path, flags, def_fmode);
+	if (fd < 0) {
+		eprintf("open %m\n");
+		return -1;
+	}
+
+	buf = xmalloc(jd->size);
+	p += JOURNAL_DESC_SIZE;
+	memcpy(buf, p, jd->size);
+	size = xpwrite(fd, buf, jd->size, jd->offset);
+	if (size != jd->size) {
+		eprintf("write %zd, size %zu, errno %m\n", size, jd->size);
+		ret = -1;
+		goto out;
+	}
+out:
+	close(fd);
+	return ret;
+}
+
+static bool journal_descriptor_valid(struct journal_descriptor *jd)
+{
+	if (jd->magic != JOURNAL_DESC_MAGIC)
+		return false;
+	if (jd->oid == 0)
+		return false;
+	if (jd->size > SD_INODE_SIZE)
+		return false;
+	if (jd->offset > SD_INODE_SIZE)
+		return false;
+	if (jd->create > 1)
+		return false;
+	return true;
+}
+
+static int do_recover(int fd)
+{
+	struct journal_descriptor *jd;
+	void *map;
+	char *p, *end;
+	struct stat st;
+
+	if (fstat(fd, &st) < 0) {
+		eprintf("fstat %m\n");
+		return -1;
+	}
+
+	map = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
+	close(fd);
+	if (map == MAP_FAILED) {
+		eprintf("%m\n");
+		return -1;
+	}
+
+	end = (char *)map + st.st_size;
+	for (p = map; p < end;) {
+		jd = (struct journal_descriptor *)p;
+		if (!journal_descriptor_valid(jd)) {
+			/* Empty area */
+			p += SECTOR_SIZE;
+			continue;
+		}
+		/* We skip partial write because it is not acked back to VM */
+		if (!journal_entry_full_write(jd))
+			goto skip;
+
+		if (replay_journal_entry(jd) < 0)
+			return -1;
+skip:
+		p += JOURNAL_META_SIZE + roundup(jd->size, SECTOR_SIZE);
+	}
+	munmap(map, st.st_size);
+	/* Do a final sync() to assure data is reached to the disk */
+	sync();
+	close(fd);
+	return 0;
+}
+
+/*
+ * We recover the journal file in order of wall time in the corner case that
+ * sheep crashes while in the middle of journal committing. For most of cases,
+ * we actually only recover one jfile, the other would be empty. This process
+ * is fast with buffered IO that only take several secends at most.
+ */
+static int check_recover_journal_file(const char *p)
+{
+	int old = 0, new = 0;
+
+	if (get_old_new_jfile(p, &old, &new) < 0)
+		return -1;
+
+	/* No journal file found */
+	if (old == 0)
+		return 0;
+
+	if (do_recover(old) < 0)
+		return -1;
+	if (do_recover(new) < 0)
+		return -1;
+
+	return 0;
+}
+
+int journal_file_init(const char *path, size_t size, bool skip)
+{
+	int fd;
+
+	if (!skip && check_recover_journal_file(path) < 0)
+		return -1;
+
+	if (size)
+		jfile_size = (size * 1024 * 1024) / 2;
+
+	fd = create_journal_file(path, jfile_name[0]);
+	if (fd < 0)
+		return -1;
+	jfile.fd = jfile_fds[0] = fd;
+
+	fd = create_journal_file(path, jfile_name[1]);
+	jfile_fds[1] = fd;
+
+	pthread_spin_init(&jfile_lock, PTHREAD_PROCESS_PRIVATE);
+	return 0;
+}
+
+static bool jfile_enough_space(size_t size)
+{
+	if (jfile.pos + size > jfile_size)
+		return false;
+	return true;
+}
+
+/*
+ * We rely on the kernel's page cache to cache data objects to 1) boost read
+ * perfmance 2) simplify read path so that data commiting is simply a
+ * sync() operation and We do it in a dedicated thread to avoid blocking
+ * the writer by switch back and forth between two journal files.
+ */
+static void *commit_data(void *ignored)
+{
+	int err;
+
+	/* Tell runtime to release resources after termination */
+	err = pthread_detach(pthread_self());
+	if (err)
+		panic("%s\n", strerror(err));
+
+	sync();
+	if (zero_out_jfile(jfile.commit_fd) < 0)
+		panic("failed to zero journal file\n");
+	uatomic_set_false(&jfile.in_commit);
+
+	pthread_exit(NULL);
+}
+
+/* FIXME: Try not sleep inside lock */
+static void switch_journal_file(void)
+{
+	int old = jfile.fd, err;
+	pthread_t thread;
+
+retry:
+	if (!uatomic_set_true(&jfile.in_commit)) {
+		eprintf("journal file in committing, "
+			"you might need enlarge jfile size\n");
+		usleep(100000); /* Wait until committing is finished */
+		goto retry;
+	}
+
+	if (old == jfile_fds[0])
+		jfile.fd = jfile_fds[1];
+	else
+		jfile.fd = jfile_fds[0];
+	jfile.commit_fd = old;
+	jfile.pos = 0;
+
+	err = pthread_create(&thread, NULL, commit_data, NULL);
+	if (err)
+		panic("%s\n", strerror(err));
+}
+
+int journal_file_write(uint64_t oid, const char *buf, size_t size,
+		       off_t offset, bool create)
+{
+	struct journal_descriptor jd;
+	uint32_t marker = JOURNAL_END_MARKER;
+	int ret = SD_RES_SUCCESS;
+	ssize_t written, rusize = roundup(size, SECTOR_SIZE),
+		wsize = JOURNAL_META_SIZE + rusize;
+	off_t woff;
+	char *wbuffer, *p;
+
+	jd.magic = JOURNAL_DESC_MAGIC;
+	jd.offset = offset;
+	jd.size = size;
+	jd.oid = oid;
+	jd.create = create;
+
+	pthread_spin_lock(&jfile_lock);
+	if (!jfile_enough_space(wsize))
+		switch_journal_file();
+	woff = jfile.pos;
+	jfile.pos += wsize;
+	pthread_spin_unlock(&jfile_lock);
+
+	p = wbuffer = valloc(wsize);
+	if (!wbuffer)
+		panic("%m\n");
+	memcpy(p, &jd, JOURNAL_DESC_SIZE);
+	p += JOURNAL_DESC_SIZE;
+	memcpy(p, buf, rusize);
+	p += rusize;
+	memcpy(p, &marker, JOURNAL_MARKER_SIZE);
+
+	dprintf("oid %lx, pos %zu, wsize %zu\n", oid, jfile.pos, wsize);
+	/*
+	 * Concurrent writes with the same FD is okay because we don't have any
+	 * critical sections that need lock inside kernel write path, since we
+	 * a) bypass page cache, b) don't modify i_size of this inode.
+	 *
+	 * Feel free to correct me If I am wrong.
+	 */
+	written = xpwrite(jfile.fd, wbuffer, wsize, woff);
+	if (written != wsize) {
+		eprintf("failed, written %zd, len %zu\n", written, wsize);
+		ret = err_to_sderr(oid, errno);
+		goto out;
+	}
+out:
+	free(wbuffer);
+	return ret;
+}
diff --git a/sheep/ops.c b/sheep/ops.c
index e196dae..f99dff6 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -801,46 +801,19 @@ static int do_create_and_write_obj(struct siocb *iocb, struct sd_req *hdr,
 	return sd_store->create_and_write(hdr->obj.oid, iocb);
 }
 
-static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch,
-			void *data)
-{
-	uint64_t oid = hdr->obj.oid;
-	int ret = SD_RES_SUCCESS;
-	void *jd = NULL;
-
-	iocb->buf = data;
-	iocb->length = hdr->data_length;
-	iocb->offset = hdr->obj.offset;
-
-	if (is_vdi_obj(oid) && sys->use_journal) {
-		struct strbuf buf = STRBUF_INIT;
-
-		strbuf_addf(&buf, "%s%016" PRIx64, obj_path, oid);
-		jd = jrnl_begin(data, hdr->data_length, hdr->obj.offset,
-				buf.buf, jrnl_path);
-		if (!jd) {
-			strbuf_release(&buf);
-			return SD_RES_EIO;
-		}
-		ret = sd_store->write(oid, iocb);
-		jrnl_end(jd);
-		strbuf_release(&buf);
-	} else
-		ret = sd_store->write(oid, iocb);
-
-	return ret;
-}
-
 int peer_write_obj(struct request *req)
 {
 	struct sd_req *hdr = &req->rq;
-	uint32_t epoch = hdr->epoch;
-	struct siocb iocb;
+	struct siocb iocb = { };
+	uint64_t oid = hdr->obj.oid;
 
-	memset(&iocb, 0, sizeof(iocb));
-	iocb.epoch = epoch;
+	iocb.epoch = hdr->epoch;
 	iocb.flags = hdr->flags;
-	return do_write_obj(&iocb, hdr, epoch, req->data);
+	iocb.buf = req->data;
+	iocb.length = hdr->data_length;
+	iocb.offset = hdr->obj.offset;
+
+	return sd_store->write(oid, &iocb);
 }
 
 int peer_create_and_write_obj(struct request *req)
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index 908f761..6c319db 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -24,12 +24,10 @@ static int get_open_flags(uint64_t oid, bool create, int fl)
 {
 	int flags = O_DSYNC | O_RDWR;
 
-	if (fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled())
+	if ((fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled()) ||
+	    uatomic_is_true(&sys->use_journal))
 		flags &= ~O_DSYNC;
 
-	if (is_data_obj(oid))
-		flags |= O_DIRECT;
-
 	if (create)
 		flags |= O_CREAT | O_EXCL;
 
@@ -108,7 +106,7 @@ bool default_exist(uint64_t oid)
 	return true;
 }
 
-static int err_to_sderr(uint64_t oid, int err)
+int err_to_sderr(uint64_t oid, int err)
 {
 	struct stat s;
 
@@ -143,6 +141,16 @@ int default_write(uint64_t oid, const struct siocb *iocb)
 	}
 
 	get_obj_path(oid, path);
+
+	if (uatomic_is_true(&sys->use_journal) &&
+	    journal_file_write(oid, iocb->buf, iocb->length, iocb->offset, 0)
+	    != SD_RES_SUCCESS) {
+		eprintf("turn off journaling\n");
+		uatomic_set_false(&sys->use_journal);
+		flags |= O_DSYNC | O_DIRECT;
+		sync();
+	}
+
 	fd = open(path, flags, def_fmode);
 	if (fd < 0)
 		return err_to_sderr(oid, errno);
@@ -305,6 +313,15 @@ int default_create_and_write(uint64_t oid, const struct siocb *iocb)
 	get_obj_path(oid, path);
 	get_tmp_obj_path(oid, tmp_path);
 
+	if (uatomic_is_true(&sys->use_journal) &&
+	    journal_file_write(oid, iocb->buf, iocb->length, iocb->offset, 1)
+	    != SD_RES_SUCCESS) {
+		eprintf("turn off journaling\n");
+		uatomic_set_false(&sys->use_journal);
+		flags |= O_DSYNC | O_DIRECT;
+		sync();
+	}
+
 	fd = open(tmp_path, flags, def_fmode);
 	if (fd < 0) {
 		if (errno == EEXIST) {
diff --git a/sheep/sheep.c b/sheep/sheep.c
index af8da4f..91325b4 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -59,7 +59,7 @@ static struct option const long_options[] = {
 	{NULL, 0, NULL, 0},
 };
 
-static const char *short_options = "b:c:dDfghjl:op:P:s:uw:y:z:";
+static const char *short_options = "b:c:dDfghj:l:op:P:s:uw:y:z:";
 
 static void usage(int status)
 {
@@ -77,7 +77,7 @@ Options:\n\
   -f, --foreground        make the program run in the foreground\n\
   -g, --gateway           make the progam run as a gateway mode\n\
   -h, --help              display this help and exit\n\
-  -j, --journal           use jouranl to update vdi objects\n\
+  -j, --journal           use jouranl file to log all the write operations\n\
   -l, --loglevel          specify the level of logging detail\n\
   -o, --stdout            log to stdout instead of shared logger\n\
   -p, --port              specify the TCP port on which to listen\n\
@@ -311,6 +311,34 @@ static void init_cache_type(char *arg)
 	}
 }
 
+static char jpath[PATH_MAX];
+static bool jskip;
+static ssize_t jsize;
+#define MIN_JOURNAL_SIZE (64) /* 64M */
+
+static void init_journal_arg(char *arg)
+{
+	const char *d = "dir=", *sz = "size=", *sp = "skip";
+	int dl = strlen(d), szl = strlen(sz), spl = strlen(sp);
+
+	if (!strncmp(d, arg, dl)) {
+		arg += dl;
+		sprintf(jpath, "%s", arg);
+	} else if (!strncmp(sz, arg, szl)) {
+		arg += szl;
+		jsize = strtoll(arg, NULL, 10);
+		if (jsize < MIN_JOURNAL_SIZE || jsize == LLONG_MAX) {
+			fprintf(stderr, "invalid size %s", arg);
+			exit(1);
+		}
+	} else if (!strncmp(sp, arg, spl)) {
+		jskip = true;
+	} else {
+		fprintf(stderr, "invalid paramters %s\n", arg);
+		exit(1);
+	}
+}
+
 int main(int argc, char **argv)
 {
 	int ch, longindex;
@@ -426,7 +454,8 @@ int main(int argc, char **argv)
 			init_cache_type(optarg);
 			break;
 		case 'j':
-			sys->use_journal = true;
+			uatomic_set_true(&sys->use_journal);
+			parse_arg(optarg, ",", init_journal_arg);
 			break;
 		case 'b':
 			/* validate provided address using inet_pton */
@@ -471,6 +500,21 @@ int main(int argc, char **argv)
 	if (ret)
 		exit(1);
 
+	ret = init_obj_path(dir);
+	if (ret)
+		exit(1);
+
+	/* We should init journal file before backend init */
+	if (uatomic_is_true(&sys->use_journal)) {
+		if (!strlen(jpath))
+			/* internal journal */
+			memcpy(jpath, dir, strlen(dir));
+		dprintf("%s, %zu, %d\n", jpath, jsize, jskip);
+		ret = journal_file_init(jpath, jsize, jskip);
+		if (ret)
+			exit(1);
+	}
+
 	ret = init_store(dir);
 	if (ret)
 		exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 9bec91e..6deaf64 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -122,7 +122,7 @@ struct cluster_info {
 	uint32_t object_cache_size;
 	bool object_cache_directio;
 
-	bool use_journal;
+	uatomic_bool use_journal;
 	bool upgrade; /* upgrade data layout before starting service
 		       * if necessary*/
 };
@@ -183,6 +183,7 @@ int default_remove_object(uint64_t oid);
 int default_purge_obj(void);
 int for_each_object_in_wd(int (*func)(uint64_t oid, void *arg), bool cleanup,
 			  void *arg);
+int err_to_sderr(uint64_t oid, int err);
 
 extern struct list_head store_drivers;
 #define add_store_driver(driver)                                 \
@@ -221,6 +222,7 @@ int init_unix_domain_socket(const char *dir);
 
 int init_store(const char *dir);
 int init_base_path(const char *dir);
+int init_obj_path(const char *base_path);
 
 int fill_vdi_copy_list(void *data);
 int get_vdi_copy_number(uint32_t vid);
@@ -418,4 +420,7 @@ static inline bool is_disk_cache_enabled(void)
 	return !!(sys->enabled_cache_type & CACHE_TYPE_DISK);
 }
 
+/* journal_file.c */
+int journal_file_init(const char *path, size_t size, bool skip);
+int journal_file_write(uint64_t oid, const char *buf, size_t size, off_t, bool);
 #endif
diff --git a/sheep/store.c b/sheep/store.c
index 653fff5..7dae5de 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -247,7 +247,7 @@ int init_base_path(const char *d)
 
 #define OBJ_PATH "/obj/"
 
-static int init_obj_path(const char *base_path)
+int init_obj_path(const char *base_path)
 {
 	int len;
 
@@ -406,10 +406,6 @@ int init_store(const char *d)
 {
 	int ret;
 
-	ret = init_obj_path(d);
-	if (ret)
-		return ret;
-
 	ret = init_epoch_path(d);
 	if (ret)
 		return ret;
-- 
1.7.12.84.gefa6462




More information about the sheepdog mailing list