[sheepdog] [RFC PATCH 3/3] sheep: introduce journal file to boost IO performance
Liu Yuan
namei.unix at gmail.com
Sat Nov 3 16:09:47 CET 2012
From: Liu Yuan <tailai.ly at taobao.com>
The basic the idea is very simple: use a dedicated device to log all the IO
operations in a sequential manner and then we are safe to change the backend IO
operations from O_DSYNC & O_DIRECT into O_RDWR (buffered IO), which will
benefit us both read & write performance a lot.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
include/util.h | 5 +
sheep/Makefile.am | 2 +-
sheep/journal_file.c | 386 ++++++++++++++++++++++++++++++++++++++++++++++++++
sheep/plain_store.c | 26 +++-
sheep/sheep.c | 15 +-
sheep/sheep_priv.h | 8 +-
sheep/store.c | 9 +-
7 files changed, 438 insertions(+), 13 deletions(-)
create mode 100644 sheep/journal_file.c
diff --git a/include/util.h b/include/util.h
index 5fb19c2..a0d8186 100644
--- a/include/util.h
+++ b/include/util.h
@@ -38,6 +38,7 @@
#endif
#define notrace __attribute__((no_instrument_function))
+#define __packed __attribute((packed))
#define uninitialized_var(x) (x = x)
@@ -68,6 +69,10 @@ static inline void *zalloc(size_t size)
return calloc(1, size);
}
+#define __round_mask(x, y) ((__typeof__(x))((y)-1))
+#define round_up(x, y) ((((x)-1) | __round_mask(x, y))+1)
+#define round_down(x, y) ((x) & ~__round_mask(x, y))
+
typedef void (*try_to_free_t)(size_t);
extern try_to_free_t set_try_to_free_routine(try_to_free_t);
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index e7b4f53..0ae19de 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -27,7 +27,7 @@ sbin_PROGRAMS = sheep
sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c work.c \
journal.c ops.c recovery.c cluster/local.c \
object_cache.c object_list_cache.c sockfd_cache.c \
- plain_store.c config.c migrate.c
+ plain_store.c config.c migrate.c journal_file.c
if BUILD_COROSYNC
sheep_SOURCES += cluster/corosync.c
diff --git a/sheep/journal_file.c b/sheep/journal_file.c
new file mode 100644
index 0000000..110a034
--- /dev/null
+++ b/sheep/journal_file.c
@@ -0,0 +1,386 @@
+/*
+ * Copyright (C) 2012 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <sys/mman.h>
+
+#include "sheep_priv.h"
+
+struct journal_file {
+ int fd;
+ off_t pos;
+ int commit_fd;
+ uatomic_bool in_commit;
+};
+
+struct journal_descriptor {
+ uint32_t magic;
+ uint64_t oid;
+ uint64_t offset;
+ uint64_t size;
+ uint8_t create;
+ uint8_t pad[479];
+} __packed;
+
+/* JOURNAL_DESC + JOURNAL_MARKER must be 512 algined for DIO */
+#define JOURNAL_DESC_MAGIC 0xfee1900d
+#define JOURNAL_DESC_SIZE 508
+#define JOURNAL_MARKER_SIZE 4 /* Use marker to detect partial write */
+#define JOURNAL_META_SIZE SECTOR_SIZE
+
+#define JOURNAL_FILE_SIZE (1024*1024*256) /* 256M */
+#define JOURNAL_END_MARKER 0xdeadbeef
+
+static const char *jfile_name[2] = { "journal_file0", "journal_file1", };
+static int jfile_fds[2];
+
+static struct journal_file jfile;
+static pthread_spinlock_t jfile_lock;
+
+static int zero_out_jfile(int fd)
+{
+ char *buf;
+ ssize_t wlen;
+
+ buf = valloc(JOURNAL_FILE_SIZE);
+ memset(buf, 0, JOURNAL_FILE_SIZE);
+ wlen = xpwrite(fd, buf, JOURNAL_FILE_SIZE, 0);
+ if (wlen != JOURNAL_FILE_SIZE) {
+ eprintf("WARN: failed, %m\n");
+ return -1;
+ }
+
+ free(buf);
+ return 0;
+}
+
+static int create_journal_file(const char *root, const char *name)
+{
+ int fd, flags = O_DSYNC | O_RDWR | O_TRUNC | O_CREAT | O_DIRECT;
+ char path[PATH_MAX];
+
+ sprintf(path, "%s/%s", root, name);
+ fd = open(path, flags, 0644);
+ if (fd < 0) {
+ eprintf("open %s %m\n", name);
+ return -1;
+ }
+ if (prealloc(fd, JOURNAL_FILE_SIZE) < 0) {
+ eprintf("prealloc %s %m\n", name);
+ return -1;
+ }
+
+ /* Turn unwritten extents of FS into written ones for faster write */
+ if (zero_out_jfile(fd) < 0) {
+ close(fd);
+ return -1;
+ }
+ return fd;
+}
+
+/* We should have two valid FDs, otherwise something goes wrong */
+static int get_old_new_jfile(const char *p, int *old, int *new)
+{
+ int fd1, fd2;
+ int flags = O_RDONLY;
+ char path[PATH_MAX];
+ struct stat st1, st2;
+
+ sprintf(path, "%s/%s", p, jfile_name[0]);
+ fd1 = open(path, flags);
+ if (fd1 < 0) {
+ if (errno != EEXIST)
+ return 0;
+
+ eprintf("open1 %m\n");
+ return -1;
+ }
+ sprintf(path, "%s/%s", p, jfile_name[1]);
+ fd2 = open(path, flags);
+ if (fd2 < 0) {
+ eprintf("open2 %m\n");
+ close(fd1);
+ return -1;
+ }
+
+ if (fstat(fd1, &st1) < 0 || fstat(fd2, &st2) < 0) {
+ eprintf("stat %m\n");
+ goto out;
+ }
+
+ if (st1.st_mtime < st2.st_mtime) {
+ *old = fd1;
+ *new = fd2;
+ } else {
+ *old = fd2;
+ *new = fd1;
+ }
+
+ return 0;
+out:
+ close(fd1);
+ close(fd2);
+ return -1;
+}
+
+static bool journal_entry_full_write(struct journal_descriptor *jd)
+{
+ char *end = (char *)jd +
+ round_up(jd->size, SECTOR_SIZE) + JOURNAL_META_SIZE;
+ uint32_t marker = *(((uint32_t *)end) - 1);
+
+ if (marker != JOURNAL_END_MARKER)
+ return false;
+ return true;
+}
+
+static int replay_journal_entry(struct journal_descriptor *jd)
+{
+ char path[PATH_MAX];
+ ssize_t size;
+ int fd, flags = O_WRONLY, ret = 0;
+ void *buf;
+ char *p = (char *)jd;
+
+ dprintf("%"PRIx64", size %"PRIu64", off %"PRIu64", %d\n",
+ jd->oid, jd->size, jd->offset, jd->create);
+
+ if (jd->create)
+ flags |= O_CREAT;
+ sprintf(path, "%s%016" PRIx64, obj_path, jd->oid);
+ fd = open(path, flags, def_fmode);
+ if (fd < 0) {
+ eprintf("open %m\n");
+ return -1;
+ }
+
+ buf = malloc(jd->size);
+ p += JOURNAL_DESC_SIZE;
+ memcpy(buf, p, jd->size);
+ size = xpwrite(fd, buf, jd->size, jd->offset);
+ if (size != jd->size) {
+ eprintf("write %zd, size %zu, errno %m\n", size, jd->size);
+ ret = -1;
+ goto out;
+ }
+out:
+ close(fd);
+ return ret;
+}
+
+static bool journal_descriptor_valid(struct journal_descriptor *jd)
+{
+ if (jd->magic != JOURNAL_DESC_MAGIC)
+ return false;
+ if (jd->oid == 0)
+ return false;
+ if (jd->size > SD_INODE_SIZE)
+ return false;
+ if (jd->create > 1)
+ return false;
+ return true;
+}
+
+static int do_recover(int fd)
+{
+ struct journal_descriptor *jd;
+ void *map;
+ char *p, *end;
+
+ map = mmap(NULL, JOURNAL_FILE_SIZE, PROT_READ, MAP_PRIVATE, fd, 0);
+ close(fd);
+ if (map == MAP_FAILED) {
+ eprintf("%m\n");
+ return -1;
+ }
+
+ end = (char *)map + JOURNAL_FILE_SIZE;
+ for (p = map; p < end;) {
+ jd = (struct journal_descriptor *)p;
+ if (!journal_descriptor_valid(jd)) {
+ /* Empty area */
+ p += SECTOR_SIZE;
+ continue;
+ }
+ /* We skip partial write because it is not acked back to VM */
+ if (!journal_entry_full_write(jd))
+ goto skip;
+
+ if (replay_journal_entry(jd) < 0)
+ return -1;
+skip:
+ p += JOURNAL_META_SIZE + round_up(jd->size, SECTOR_SIZE);
+ }
+ munmap(map, JOURNAL_FILE_SIZE);
+ /* Do a final sync() to assure data is reached to the disk */
+ sync();
+ close(fd);
+ return 0;
+}
+
+/*
+ * We recover the journal file in order of wall time in the corner case that
+ * sheep crashes while in the middle of journal committing. For most of cases,
+ * we actually only recover one jfile, the other would be empty. This process
+ * is fast with buffered IO that only take several secends at most.
+ */
+static int check_recover_journal_file(const char *p)
+{
+ int old = 0, new = 0;
+
+ if (get_old_new_jfile(p, &old, &new) < 0)
+ return -1;
+
+ /* No journal file found */
+ if (old == 0)
+ return 0;
+
+ if (do_recover(old) < 0)
+ return -1;
+ if (do_recover(new) < 0)
+ return -1;
+
+ return 0;
+}
+
+int journal_file_init(const char *path)
+{
+ int fd;
+
+ if (check_recover_journal_file(path) < 0)
+ return -1;
+
+ fd = create_journal_file(path, jfile_name[0]);
+ if (fd < 0)
+ return -1;
+ jfile.fd = jfile_fds[0] = fd;
+
+ fd = create_journal_file(path, jfile_name[1]);
+ jfile_fds[1] = fd;
+
+ pthread_spin_init(&jfile_lock, PTHREAD_PROCESS_PRIVATE);
+ return 0;
+}
+
+static bool jfile_enough_space(size_t size)
+{
+ if (jfile.pos + size > JOURNAL_FILE_SIZE)
+ return false;
+ return true;
+}
+
+/*
+ * We rely on the kernel's page cache to cache data objects to 1) boost read
+ * perfmance 2) simplify read path so that journal file commiting is simply a
+ * sync() operation and We do it in a dedicated thread to avoid blocking
+ * the writer by switch back and forth between two journal files.
+ */
+static void *journal_file_commit(void *ignored)
+{
+ int err;
+
+ /* Tell runtime to release resources after termination */
+ err = pthread_detach(pthread_self());
+ if (err)
+ panic("%s\n", strerror(err));
+
+ sync();
+ if (zero_out_jfile(jfile.commit_fd) < 0)
+ panic("failed to zero journal file\n");
+ uatomic_set_false(&jfile.in_commit);
+
+ pthread_exit(NULL);
+}
+
+/* FIXME: Try not sleep inside lock */
+static void switch_journal_file(void)
+{
+ int old = jfile.fd, err;
+ pthread_t thread;
+
+retry:
+ if (!uatomic_set_true(&jfile.in_commit)) {
+ eprintf("journal file in committing, "
+ "you might need enlarge jfile size\n");
+ usleep(100000); /* Wait until committing is finished */
+ goto retry;
+ }
+
+ if (old == jfile_fds[0])
+ jfile.fd = jfile_fds[1];
+ else
+ jfile.fd = jfile_fds[0];
+ jfile.commit_fd = old;
+ jfile.pos = 0;
+
+ err = pthread_create(&thread, NULL, journal_file_commit, NULL);
+ if (err)
+ panic("%s\n", strerror(err));
+}
+
+int journal_file_write(uint64_t oid, const char *buf, size_t size,
+ off_t offset, bool create)
+{
+ struct journal_descriptor jd;
+ uint32_t marker = JOURNAL_END_MARKER;
+ int ret = SD_RES_SUCCESS;
+ ssize_t written, rusize = round_up(size, SECTOR_SIZE),
+ wsize = JOURNAL_META_SIZE + rusize;
+ off_t woff;
+ char *wbuffer, *p;
+
+ jd.magic = JOURNAL_DESC_MAGIC;
+ jd.offset = offset;
+ jd.size = size;
+ jd.oid = oid;
+ jd.create = create;
+
+ pthread_spin_lock(&jfile_lock);
+ if (!jfile_enough_space(wsize))
+ switch_journal_file();
+ woff = jfile.pos;
+ jfile.pos += wsize;
+ pthread_spin_unlock(&jfile_lock);
+
+ p = wbuffer = valloc(wsize);
+ if (!wbuffer)
+ panic("%m\n");
+ memcpy(p, &jd, JOURNAL_DESC_SIZE);
+ p += JOURNAL_DESC_SIZE;
+ memcpy(p, buf, rusize);
+ p += rusize;
+ memcpy(p, &marker, JOURNAL_MARKER_SIZE);
+
+ dprintf("oid %lx, pos %zu, wsize %zu\n", oid, jfile.pos, wsize);
+ /*
+ * Concurrent writes with the same FD is okay because we don't have any
+ * critical sections that need lock inside kernel write path, since we
+ * a) bypass page cache, b) don't modify i_size of this inode.
+ *
+ * Feel free to correct me If I am wrong.
+ */
+ written = xpwrite(jfile.fd, wbuffer, wsize, woff);
+ if (written != wsize) {
+ eprintf("failed, written %zd, len %zu\n", written, wsize);
+ ret = err_to_sderr(oid, errno);
+ goto out;
+ }
+out:
+ free(wbuffer);
+ return ret;
+}
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index 908f761..834ecb0 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -24,10 +24,11 @@ static int get_open_flags(uint64_t oid, bool create, int fl)
{
int flags = O_DSYNC | O_RDWR;
- if (fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled())
+ if ((fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled()) ||
+ uatomic_is_true(&sys->use_journal))
flags &= ~O_DSYNC;
- if (is_data_obj(oid))
+ if (is_data_obj(oid) && !uatomic_is_true(&sys->use_journal))
flags |= O_DIRECT;
if (create)
@@ -108,7 +109,7 @@ bool default_exist(uint64_t oid)
return true;
}
-static int err_to_sderr(uint64_t oid, int err)
+int err_to_sderr(uint64_t oid, int err)
{
struct stat s;
@@ -143,6 +144,16 @@ int default_write(uint64_t oid, const struct siocb *iocb)
}
get_obj_path(oid, path);
+
+ if (uatomic_is_true(&sys->use_journal) &&
+ journal_file_write(oid, iocb->buf, iocb->length, iocb->offset, 0)
+ != SD_RES_SUCCESS) {
+ eprintf("turn off journaling\n");
+ uatomic_set_false(&sys->use_journal);
+ flags |= O_DSYNC | O_DIRECT;
+ sync();
+ }
+
fd = open(path, flags, def_fmode);
if (fd < 0)
return err_to_sderr(oid, errno);
@@ -305,6 +316,15 @@ int default_create_and_write(uint64_t oid, const struct siocb *iocb)
get_obj_path(oid, path);
get_tmp_obj_path(oid, tmp_path);
+ if (uatomic_is_true(&sys->use_journal) &&
+ journal_file_write(oid, iocb->buf, iocb->length, iocb->offset, 1)
+ != SD_RES_SUCCESS) {
+ eprintf("turn off journaling\n");
+ uatomic_set_false(&sys->use_journal);
+ flags |= O_DSYNC | O_DIRECT;
+ sync();
+ }
+
fd = open(tmp_path, flags, def_fmode);
if (fd < 0) {
if (errno == EEXIST) {
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 3ec2c4d..376cbad 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -46,7 +46,7 @@ static struct option const long_options[] = {
{"foreground", no_argument, NULL, 'f'},
{"gateway", no_argument, NULL, 'g'},
{"help", no_argument, NULL, 'h'},
- {"journal", no_argument, NULL, 'j'},
+ {"journal", required_argument, NULL, 'j'},
{"loglevel", required_argument, NULL, 'l'},
{"myaddr", required_argument, NULL, 'y'},
{"stdout", no_argument, NULL, 'o'},
@@ -59,7 +59,7 @@ static struct option const long_options[] = {
{NULL, 0, NULL, 0},
};
-static const char *short_options = "b:c:dDfghjl:op:P:s:uw:y:z:";
+static const char *short_options = "b:c:dDfghj:l:op:P:s:uw:y:z:";
static void usage(int status)
{
@@ -77,7 +77,7 @@ Options:\n\
-f, --foreground make the program run in the foreground\n\
-g, --gateway make the progam run as a gateway mode\n\
-h, --help display this help and exit\n\
- -j, --journal use jouranl to update vdi objects\n\
+ -j, --journal use jouranl file to log all the write operations\n\
-l, --loglevel specify the level of logging detail\n\
-o, --stdout log to stdout instead of shared logger\n\
-p, --port specify the TCP port on which to listen\n\
@@ -332,6 +332,7 @@ int main(int argc, char **argv)
unsigned char buf[sizeof(struct in6_addr)];
int ipv4 = 0;
int ipv6 = 0;
+ char journal_path[PATH_MAX];
signal(SIGPIPE, SIG_IGN);
@@ -411,7 +412,8 @@ int main(int argc, char **argv)
case 'c':
sys->cdrv = find_cdrv(optarg);
if (!sys->cdrv) {
- fprintf(stderr, "Invalid cluster driver '%s'\n", optarg);
+ fprintf(stderr, "Invalid cluster driver '%s'\n",
+ optarg);
fprintf(stderr, "Supported drivers:");
FOR_EACH_CLUSTER_DRIVER(cdrv) {
fprintf(stderr, " %s", cdrv->name);
@@ -426,7 +428,8 @@ int main(int argc, char **argv)
init_cache_type(optarg);
break;
case 'j':
- sys->use_journal = true;
+ uatomic_set_true(&sys->use_journal);
+ sprintf(journal_path, "%s", optarg);
break;
case 'b':
/* validate provided address using inet_pton */
@@ -471,7 +474,7 @@ int main(int argc, char **argv)
if (ret)
exit(1);
- ret = init_store(dir);
+ ret = init_store(dir, journal_path);
if (ret)
exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 0480e26..f849400 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -122,7 +122,7 @@ struct cluster_info {
uint32_t object_cache_size;
bool object_cache_directio;
- bool use_journal;
+ uatomic_bool use_journal;
bool upgrade; /* upgrade data layout before starting service
* if necessary*/
};
@@ -183,6 +183,7 @@ int default_remove_object(uint64_t oid);
int default_purge_obj(void);
int for_each_object_in_wd(int (*func)(uint64_t oid, void *arg), bool cleanup,
void *arg);
+int err_to_sderr(uint64_t oid, int err);
extern struct list_head store_drivers;
#define add_store_driver(driver) \
@@ -218,7 +219,7 @@ static inline uint32_t sys_epoch(void)
int create_listen_port(char *bindaddr, int port, void *data);
-int init_store(const char *dir);
+int init_store(const char *dir, const char *journal);
int init_base_path(const char *dir);
int fill_vdi_copy_list(void *data);
@@ -417,4 +418,7 @@ static inline bool is_disk_cache_enabled(void)
return !!(sys->enabled_cache_type & CACHE_TYPE_DISK);
}
+/* journal_file.c */
+int journal_file_init(const char *p);
+int journal_file_write(uint64_t oid, const char *buf, size_t size, off_t, bool);
#endif
diff --git a/sheep/store.c b/sheep/store.c
index 653fff5..e212965 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -402,7 +402,7 @@ out:
return ret;
}
-int init_store(const char *d)
+int init_store(const char *d, const char *j)
{
int ret;
@@ -430,6 +430,13 @@ int init_store(const char *d)
if (ret)
return ret;
+ /* We should init journal file before backend init */
+ if (uatomic_is_true(&sys->use_journal)) {
+ ret = journal_file_init(j);
+ if (ret)
+ return ret;
+ }
+
if (!sys->gateway_only) {
ret = init_store_driver();
if (ret)
--
1.7.9.5
More information about the sheepdog
mailing list