[sheepdog] [PATCH v5 UPDATE] sheep: introduce journal file to boost IO performance
Liu Yuan
namei.unix at gmail.com
Wed Nov 14 03:12:16 CET 2012
From: Liu Yuan <tailai.ly at taobao.com>
The basic the idea is very simple: use a dedicated device to log all the IO
operations in a sequential manner and then we are safe to change the backend IO
operations from O_DSYNC & O_DIRECT into O_RDWR (buffered IO), which will
benefit us both read & write performance a lot.
Usage:
$ sheep -j dir=/path/to/dir,size=256, # enable external journaling with the size 256M
$ sheep -j dir=/path/to/dir,size=256,skip #like above, but skip recovery at startup
$ sheep -j size=512 # enable internal journaling with the size 512M
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
include/util.h | 1 +
sheep/Makefile.am | 2 +-
sheep/journal_file.c | 366 ++++++++++++++++++++++++++++++++++++++++++++++++++
sheep/ops.c | 43 ++----
sheep/plain_store.c | 28 +++-
sheep/sheep.c | 59 +++++++-
sheep/sheep_priv.h | 7 +-
sheep/store.c | 6 +-
8 files changed, 461 insertions(+), 51 deletions(-)
create mode 100644 sheep/journal_file.c
diff --git a/include/util.h b/include/util.h
index 5fb19c2..7422dbf 100644
--- a/include/util.h
+++ b/include/util.h
@@ -38,6 +38,7 @@
#endif
#define notrace __attribute__((no_instrument_function))
+#define __packed __attribute((packed))
#define uninitialized_var(x) (x = x)
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index e7b4f53..0ae19de 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -27,7 +27,7 @@ sbin_PROGRAMS = sheep
sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c work.c \
journal.c ops.c recovery.c cluster/local.c \
object_cache.c object_list_cache.c sockfd_cache.c \
- plain_store.c config.c migrate.c
+ plain_store.c config.c migrate.c journal_file.c
if BUILD_COROSYNC
sheep_SOURCES += cluster/corosync.c
diff --git a/sheep/journal_file.c b/sheep/journal_file.c
new file mode 100644
index 0000000..343fad1
--- /dev/null
+++ b/sheep/journal_file.c
@@ -0,0 +1,366 @@
+/*
+ * Copyright (C) 2012 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <sys/mman.h>
+
+#include "sheep_priv.h"
+
+struct journal_file {
+ int fd;
+ off_t pos;
+ int commit_fd;
+ uatomic_bool in_commit;
+};
+
+struct journal_descriptor {
+ uint32_t magic;
+ uint32_t reserved;
+ uint64_t oid;
+ uint64_t offset;
+ uint64_t size;
+ uint8_t create;
+ uint8_t pad[475];
+} __packed;
+
+/* JOURNAL_DESC + JOURNAL_MARKER must be 512 algined for DIO */
+#define JOURNAL_DESC_MAGIC 0xfee1900d
+#define JOURNAL_DESC_SIZE 508
+#define JOURNAL_MARKER_SIZE 4 /* Use marker to detect partial write */
+#define JOURNAL_META_SIZE (JOURNAL_DESC_SIZE + JOURNAL_MARKER_SIZE)
+
+#define JOURNAL_END_MARKER 0xdeadbeef
+
+static const char *jfile_name[2] = { "journal_file0", "journal_file1", };
+static int jfile_fds[2];
+static size_t jfile_size;
+
+static struct journal_file jfile;
+static pthread_spinlock_t jfile_lock;
+
+static int create_journal_file(const char *root, const char *name)
+{
+ int fd, flags = O_DSYNC | O_RDWR | O_TRUNC | O_CREAT | O_DIRECT;
+ char path[PATH_MAX];
+
+ sprintf(path, "%s/%s", root, name);
+ fd = open(path, flags, 0644);
+ if (fd < 0) {
+ eprintf("open %s %m\n", name);
+ return -1;
+ }
+ if (prealloc(fd, jfile_size) < 0) {
+ eprintf("prealloc %s %m\n", name);
+ return -1;
+ }
+
+ return fd;
+}
+
+/* We should have two valid FDs, otherwise something goes wrong */
+static int get_old_new_jfile(const char *p, int *old, int *new)
+{
+ int fd1, fd2;
+ int flags = O_RDONLY;
+ char path[PATH_MAX];
+ struct stat st1, st2;
+
+ sprintf(path, "%s/%s", p, jfile_name[0]);
+ fd1 = open(path, flags);
+ if (fd1 < 0) {
+ if (errno != EEXIST)
+ return 0;
+
+ eprintf("open1 %m\n");
+ return -1;
+ }
+ sprintf(path, "%s/%s", p, jfile_name[1]);
+ fd2 = open(path, flags);
+ if (fd2 < 0) {
+ eprintf("open2 %m\n");
+ close(fd1);
+ return -1;
+ }
+
+ if (fstat(fd1, &st1) < 0 || fstat(fd2, &st2) < 0) {
+ eprintf("stat %m\n");
+ goto out;
+ }
+
+ if (st1.st_mtime < st2.st_mtime) {
+ *old = fd1;
+ *new = fd2;
+ } else {
+ *old = fd2;
+ *new = fd1;
+ }
+
+ return 0;
+out:
+ close(fd1);
+ close(fd2);
+ return -1;
+}
+
+static bool journal_entry_full_write(struct journal_descriptor *jd)
+{
+ char *end = (char *)jd +
+ roundup(jd->size, SECTOR_SIZE) + JOURNAL_META_SIZE;
+ uint32_t marker = *(((uint32_t *)end) - 1);
+
+ if (marker != JOURNAL_END_MARKER)
+ return false;
+ return true;
+}
+
+static int replay_journal_entry(struct journal_descriptor *jd)
+{
+ char path[PATH_MAX];
+ ssize_t size;
+ int fd, flags = O_WRONLY, ret = 0;
+ void *buf;
+ char *p = (char *)jd;
+
+ dprintf("%"PRIx64", size %"PRIu64", off %"PRIu64", %d\n",
+ jd->oid, jd->size, jd->offset, jd->create);
+
+ if (jd->create)
+ flags |= O_CREAT;
+ sprintf(path, "%s%016" PRIx64, obj_path, jd->oid);
+ fd = open(path, flags, def_fmode);
+ if (fd < 0) {
+ eprintf("open %m\n");
+ return -1;
+ }
+
+ buf = xmalloc(jd->size);
+ p += JOURNAL_DESC_SIZE;
+ memcpy(buf, p, jd->size);
+ size = xpwrite(fd, buf, jd->size, jd->offset);
+ if (size != jd->size) {
+ eprintf("write %zd, size %zu, errno %m\n", size, jd->size);
+ ret = -1;
+ goto out;
+ }
+out:
+ close(fd);
+ return ret;
+}
+
+static int do_recover(int fd)
+{
+ struct journal_descriptor *jd;
+ void *map;
+ char *p, *end;
+ struct stat st;
+
+ if (fstat(fd, &st) < 0) {
+ eprintf("fstat %m\n");
+ return -1;
+ }
+
+ map = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
+ close(fd);
+ if (map == MAP_FAILED) {
+ eprintf("%m\n");
+ return -1;
+ }
+
+ end = (char *)map + st.st_size;
+ for (p = map; p < end;) {
+ jd = (struct journal_descriptor *)p;
+ if (jd->magic != JOURNAL_DESC_MAGIC) {
+ /* Empty area */
+ p += SECTOR_SIZE;
+ continue;
+ }
+ /* We skip partial write because it is not acked back to VM */
+ if (!journal_entry_full_write(jd))
+ goto skip;
+
+ if (replay_journal_entry(jd) < 0)
+ return -1;
+skip:
+ p += JOURNAL_META_SIZE + roundup(jd->size, SECTOR_SIZE);
+ }
+ munmap(map, st.st_size);
+ /* Do a final sync() to assure data is reached to the disk */
+ sync();
+ return 0;
+}
+
+/*
+ * We recover the journal file in order of wall time in the corner case that
+ * sheep crashes while in the middle of journal committing. For most of cases,
+ * we actually only recover one jfile, the other would be empty. This process
+ * is fast with buffered IO that only take several secends at most.
+ */
+static int check_recover_journal_file(const char *p)
+{
+ int old = 0, new = 0;
+
+ if (get_old_new_jfile(p, &old, &new) < 0)
+ return -1;
+
+ /* No journal file found */
+ if (old == 0)
+ return 0;
+
+ if (do_recover(old) < 0)
+ return -1;
+ if (do_recover(new) < 0)
+ return -1;
+
+ return 0;
+}
+
+int journal_file_init(const char *path, size_t size, bool skip)
+{
+ int fd;
+
+ if (!skip && check_recover_journal_file(path) < 0)
+ return -1;
+
+ jfile_size = (size * 1024 * 1024) / 2;
+
+ fd = create_journal_file(path, jfile_name[0]);
+ if (fd < 0)
+ return -1;
+ jfile.fd = jfile_fds[0] = fd;
+
+ fd = create_journal_file(path, jfile_name[1]);
+ jfile_fds[1] = fd;
+
+ pthread_spin_init(&jfile_lock, PTHREAD_PROCESS_PRIVATE);
+ return 0;
+}
+
+static inline bool jfile_enough_space(size_t size)
+{
+ if (jfile.pos + size > jfile_size)
+ return false;
+ return true;
+}
+
+/*
+ * We rely on the kernel's page cache to cache data objects to 1) boost read
+ * perfmance 2) simplify read path so that data commiting is simply a
+ * sync() operation and We do it in a dedicated thread to avoid blocking
+ * the writer by switch back and forth between two journal files.
+ */
+static void *commit_data(void *ignored)
+{
+ int err;
+
+ /* Tell runtime to release resources after termination */
+ err = pthread_detach(pthread_self());
+ if (err)
+ panic("%s\n", strerror(err));
+
+ sync();
+ if (ftruncate(jfile.commit_fd, 0) < 0)
+ panic("truncate %m\n");
+ if (prealloc(jfile.commit_fd, jfile_size) < 0)
+ panic("prealloc\n");
+
+ uatomic_set_false(&jfile.in_commit);
+
+ pthread_exit(NULL);
+}
+
+/* FIXME: Try not sleep inside lock */
+static void switch_journal_file(void)
+{
+ int old = jfile.fd, err;
+ pthread_t thread;
+
+retry:
+ if (!uatomic_set_true(&jfile.in_commit)) {
+ eprintf("journal file in committing, "
+ "you might need enlarge jfile size\n");
+ usleep(100000); /* Wait until committing is finished */
+ goto retry;
+ }
+
+ if (old == jfile_fds[0])
+ jfile.fd = jfile_fds[1];
+ else
+ jfile.fd = jfile_fds[0];
+ jfile.commit_fd = old;
+ jfile.pos = 0;
+
+ err = pthread_create(&thread, NULL, commit_data, NULL);
+ if (err)
+ panic("%s\n", strerror(err));
+}
+
+int journal_file_write(uint64_t oid, const char *buf, size_t size,
+ off_t offset, bool create)
+{
+ uint32_t marker = JOURNAL_END_MARKER;
+ int ret = SD_RES_SUCCESS;
+ ssize_t written, rusize = roundup(size, SECTOR_SIZE),
+ wsize = JOURNAL_META_SIZE + rusize;
+ off_t woff;
+ char *wbuffer, *p;
+ struct journal_descriptor jd = {
+ .magic = JOURNAL_DESC_MAGIC,
+ .offset = offset,
+ .size = size,
+ .oid = oid,
+ .create = create,
+ };
+
+ pthread_spin_lock(&jfile_lock);
+ if (!jfile_enough_space(wsize))
+ switch_journal_file();
+ woff = jfile.pos;
+ jfile.pos += wsize;
+ pthread_spin_unlock(&jfile_lock);
+
+ p = wbuffer = valloc(wsize);
+ if (!wbuffer)
+ panic("%m\n");
+ memcpy(p, &jd, JOURNAL_DESC_SIZE);
+ p += JOURNAL_DESC_SIZE;
+ memcpy(p, buf, size);
+ p += size;
+ if (size < rusize) {
+ memset(p, 0, rusize - size);
+ p += rusize - size;
+ }
+ memcpy(p, &marker, JOURNAL_MARKER_SIZE);
+
+ dprintf("oid %lx, pos %zu, wsize %zu\n", oid, jfile.pos, wsize);
+ /*
+ * Concurrent writes with the same FD is okay because we don't have any
+ * critical sections that need lock inside kernel write path, since we
+ * a) bypass page cache, b) don't modify i_size of this inode.
+ *
+ * Feel free to correct me If I am wrong.
+ */
+ written = xpwrite(jfile.fd, wbuffer, wsize, woff);
+ if (written != wsize) {
+ eprintf("failed, written %zd, len %zu\n", written, wsize);
+ ret = err_to_sderr(oid, errno);
+ goto out;
+ }
+out:
+ free(wbuffer);
+ return ret;
+}
diff --git a/sheep/ops.c b/sheep/ops.c
index e196dae..f99dff6 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -801,46 +801,19 @@ static int do_create_and_write_obj(struct siocb *iocb, struct sd_req *hdr,
return sd_store->create_and_write(hdr->obj.oid, iocb);
}
-static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch,
- void *data)
-{
- uint64_t oid = hdr->obj.oid;
- int ret = SD_RES_SUCCESS;
- void *jd = NULL;
-
- iocb->buf = data;
- iocb->length = hdr->data_length;
- iocb->offset = hdr->obj.offset;
-
- if (is_vdi_obj(oid) && sys->use_journal) {
- struct strbuf buf = STRBUF_INIT;
-
- strbuf_addf(&buf, "%s%016" PRIx64, obj_path, oid);
- jd = jrnl_begin(data, hdr->data_length, hdr->obj.offset,
- buf.buf, jrnl_path);
- if (!jd) {
- strbuf_release(&buf);
- return SD_RES_EIO;
- }
- ret = sd_store->write(oid, iocb);
- jrnl_end(jd);
- strbuf_release(&buf);
- } else
- ret = sd_store->write(oid, iocb);
-
- return ret;
-}
-
int peer_write_obj(struct request *req)
{
struct sd_req *hdr = &req->rq;
- uint32_t epoch = hdr->epoch;
- struct siocb iocb;
+ struct siocb iocb = { };
+ uint64_t oid = hdr->obj.oid;
- memset(&iocb, 0, sizeof(iocb));
- iocb.epoch = epoch;
+ iocb.epoch = hdr->epoch;
iocb.flags = hdr->flags;
- return do_write_obj(&iocb, hdr, epoch, req->data);
+ iocb.buf = req->data;
+ iocb.length = hdr->data_length;
+ iocb.offset = hdr->obj.offset;
+
+ return sd_store->write(oid, &iocb);
}
int peer_create_and_write_obj(struct request *req)
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index 908f761..9acae3d 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -24,12 +24,10 @@ static int get_open_flags(uint64_t oid, bool create, int fl)
{
int flags = O_DSYNC | O_RDWR;
- if (fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled())
+ if ((fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled()) ||
+ uatomic_is_true(&sys->use_journal))
flags &= ~O_DSYNC;
- if (is_data_obj(oid))
- flags |= O_DIRECT;
-
if (create)
flags |= O_CREAT | O_EXCL;
@@ -108,7 +106,7 @@ bool default_exist(uint64_t oid)
return true;
}
-static int err_to_sderr(uint64_t oid, int err)
+int err_to_sderr(uint64_t oid, int err)
{
struct stat s;
@@ -143,6 +141,17 @@ int default_write(uint64_t oid, const struct siocb *iocb)
}
get_obj_path(oid, path);
+
+ if (uatomic_is_true(&sys->use_journal) &&
+ journal_file_write(oid, iocb->buf, iocb->length, iocb->offset,
+ false)
+ != SD_RES_SUCCESS) {
+ eprintf("turn off journaling\n");
+ uatomic_set_false(&sys->use_journal);
+ flags |= O_DSYNC;
+ sync();
+ }
+
fd = open(path, flags, def_fmode);
if (fd < 0)
return err_to_sderr(oid, errno);
@@ -305,6 +314,15 @@ int default_create_and_write(uint64_t oid, const struct siocb *iocb)
get_obj_path(oid, path);
get_tmp_obj_path(oid, tmp_path);
+ if (uatomic_is_true(&sys->use_journal) &&
+ journal_file_write(oid, iocb->buf, iocb->length, iocb->offset, true)
+ != SD_RES_SUCCESS) {
+ eprintf("turn off journaling\n");
+ uatomic_set_false(&sys->use_journal);
+ flags |= O_DSYNC;
+ sync();
+ }
+
fd = open(tmp_path, flags, def_fmode);
if (fd < 0) {
if (errno == EEXIST) {
diff --git a/sheep/sheep.c b/sheep/sheep.c
index af8da4f..ab15526 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -46,7 +46,7 @@ static struct option const long_options[] = {
{"foreground", no_argument, NULL, 'f'},
{"gateway", no_argument, NULL, 'g'},
{"help", no_argument, NULL, 'h'},
- {"journal", no_argument, NULL, 'j'},
+ {"journal", required_argument, NULL, 'j'},
{"loglevel", required_argument, NULL, 'l'},
{"myaddr", required_argument, NULL, 'y'},
{"stdout", no_argument, NULL, 'o'},
@@ -59,7 +59,7 @@ static struct option const long_options[] = {
{NULL, 0, NULL, 0},
};
-static const char *short_options = "b:c:dDfghjl:op:P:s:uw:y:z:";
+static const char *short_options = "b:c:dDfghj:l:op:P:s:uw:y:z:";
static void usage(int status)
{
@@ -77,7 +77,7 @@ Options:\n\
-f, --foreground make the program run in the foreground\n\
-g, --gateway make the progam run as a gateway mode\n\
-h, --help display this help and exit\n\
- -j, --journal use jouranl to update vdi objects\n\
+ -j, --journal use jouranl file to log all the write operations\n\
-l, --loglevel specify the level of logging detail\n\
-o, --stdout log to stdout instead of shared logger\n\
-p, --port specify the TCP port on which to listen\n\
@@ -311,6 +311,36 @@ static void init_cache_type(char *arg)
}
}
+static char jpath[PATH_MAX];
+static bool jskip;
+static ssize_t jsize;
+#define MIN_JOURNAL_SIZE (64) /* 64M */
+
+static void init_journal_arg(char *arg)
+{
+ const char *d = "dir=", *sz = "size=", *sp = "skip";
+ int dl = strlen(d), szl = strlen(sz), spl = strlen(sp);
+
+ if (!strncmp(d, arg, dl)) {
+ arg += dl;
+ sprintf(jpath, "%s", arg);
+ } else if (!strncmp(sz, arg, szl)) {
+ arg += szl;
+ jsize = strtoll(arg, NULL, 10);
+ if (jsize < MIN_JOURNAL_SIZE || jsize == LLONG_MAX) {
+ fprintf(stderr, "invalid size %s, "
+ "must be bigger than %u(M)\n", arg,
+ MIN_JOURNAL_SIZE);
+ exit(1);
+ }
+ } else if (!strncmp(sp, arg, spl)) {
+ jskip = true;
+ } else {
+ fprintf(stderr, "invalid paramters %s\n", arg);
+ exit(1);
+ }
+}
+
int main(int argc, char **argv)
{
int ch, longindex;
@@ -426,7 +456,13 @@ int main(int argc, char **argv)
init_cache_type(optarg);
break;
case 'j':
- sys->use_journal = true;
+ uatomic_set_true(&sys->use_journal);
+ parse_arg(optarg, ",", init_journal_arg);
+ if (!jsize) {
+ fprintf(stderr,
+ "you must specify size for journal\n");
+ exit(1);
+ }
break;
case 'b':
/* validate provided address using inet_pton */
@@ -471,6 +507,21 @@ int main(int argc, char **argv)
if (ret)
exit(1);
+ ret = init_obj_path(dir);
+ if (ret)
+ exit(1);
+
+ /* We should init journal file before backend init */
+ if (uatomic_is_true(&sys->use_journal)) {
+ if (!strlen(jpath))
+ /* internal journal */
+ memcpy(jpath, dir, strlen(dir));
+ dprintf("%s, %zu, %d\n", jpath, jsize, jskip);
+ ret = journal_file_init(jpath, jsize, jskip);
+ if (ret)
+ exit(1);
+ }
+
ret = init_store(dir);
if (ret)
exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 9bec91e..6deaf64 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -122,7 +122,7 @@ struct cluster_info {
uint32_t object_cache_size;
bool object_cache_directio;
- bool use_journal;
+ uatomic_bool use_journal;
bool upgrade; /* upgrade data layout before starting service
* if necessary*/
};
@@ -183,6 +183,7 @@ int default_remove_object(uint64_t oid);
int default_purge_obj(void);
int for_each_object_in_wd(int (*func)(uint64_t oid, void *arg), bool cleanup,
void *arg);
+int err_to_sderr(uint64_t oid, int err);
extern struct list_head store_drivers;
#define add_store_driver(driver) \
@@ -221,6 +222,7 @@ int init_unix_domain_socket(const char *dir);
int init_store(const char *dir);
int init_base_path(const char *dir);
+int init_obj_path(const char *base_path);
int fill_vdi_copy_list(void *data);
int get_vdi_copy_number(uint32_t vid);
@@ -418,4 +420,7 @@ static inline bool is_disk_cache_enabled(void)
return !!(sys->enabled_cache_type & CACHE_TYPE_DISK);
}
+/* journal_file.c */
+int journal_file_init(const char *path, size_t size, bool skip);
+int journal_file_write(uint64_t oid, const char *buf, size_t size, off_t, bool);
#endif
diff --git a/sheep/store.c b/sheep/store.c
index 653fff5..7dae5de 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -247,7 +247,7 @@ int init_base_path(const char *d)
#define OBJ_PATH "/obj/"
-static int init_obj_path(const char *base_path)
+int init_obj_path(const char *base_path)
{
int len;
@@ -406,10 +406,6 @@ int init_store(const char *d)
{
int ret;
- ret = init_obj_path(d);
- if (ret)
- return ret;
-
ret = init_epoch_path(d);
if (ret)
return ret;
--
1.7.9.5
More information about the sheepdog
mailing list