From: Liu Yuan <tailai.ly at taobao.com> The basic the idea is very simple: use a dedicated device to log all the IO operations in a sequential manner and then we are safe to change the backend IO operations from O_DSYNC & O_DIRECT into O_RDWR (buffered IO), which will benefit us both read & write performance a lot. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- include/util.h | 5 + sheep/Makefile.am | 2 +- sheep/journal_file.c | 386 ++++++++++++++++++++++++++++++++++++++++++++++++++ sheep/plain_store.c | 26 +++- sheep/sheep.c | 15 +- sheep/sheep_priv.h | 8 +- sheep/store.c | 9 +- 7 files changed, 438 insertions(+), 13 deletions(-) create mode 100644 sheep/journal_file.c diff --git a/include/util.h b/include/util.h index 5fb19c2..a0d8186 100644 --- a/include/util.h +++ b/include/util.h @@ -38,6 +38,7 @@ #endif #define notrace __attribute__((no_instrument_function)) +#define __packed __attribute((packed)) #define uninitialized_var(x) (x = x) @@ -68,6 +69,10 @@ static inline void *zalloc(size_t size) return calloc(1, size); } +#define __round_mask(x, y) ((__typeof__(x))((y)-1)) +#define round_up(x, y) ((((x)-1) | __round_mask(x, y))+1) +#define round_down(x, y) ((x) & ~__round_mask(x, y)) + typedef void (*try_to_free_t)(size_t); extern try_to_free_t set_try_to_free_routine(try_to_free_t); diff --git a/sheep/Makefile.am b/sheep/Makefile.am index e7b4f53..0ae19de 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -27,7 +27,7 @@ sbin_PROGRAMS = sheep sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c work.c \ journal.c ops.c recovery.c cluster/local.c \ object_cache.c object_list_cache.c sockfd_cache.c \ - plain_store.c config.c migrate.c + plain_store.c config.c migrate.c journal_file.c if BUILD_COROSYNC sheep_SOURCES += cluster/corosync.c diff --git a/sheep/journal_file.c b/sheep/journal_file.c new file mode 100644 index 0000000..110a034 --- /dev/null +++ b/sheep/journal_file.c @@ -0,0 +1,386 @@ +/* + * Copyright (C) 2012 Taobao Inc. + * + * Liu Yuan <namei.unix at gmail.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <errno.h> +#include <fcntl.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <pthread.h> +#include <stdint.h> +#include <sys/mman.h> + +#include "sheep_priv.h" + +struct journal_file { + int fd; + off_t pos; + int commit_fd; + uatomic_bool in_commit; +}; + +struct journal_descriptor { + uint32_t magic; + uint64_t oid; + uint64_t offset; + uint64_t size; + uint8_t create; + uint8_t pad[479]; +} __packed; + +/* JOURNAL_DESC + JOURNAL_MARKER must be 512 algined for DIO */ +#define JOURNAL_DESC_MAGIC 0xfee1900d +#define JOURNAL_DESC_SIZE 508 +#define JOURNAL_MARKER_SIZE 4 /* Use marker to detect partial write */ +#define JOURNAL_META_SIZE SECTOR_SIZE + +#define JOURNAL_FILE_SIZE (1024*1024*256) /* 256M */ +#define JOURNAL_END_MARKER 0xdeadbeef + +static const char *jfile_name[2] = { "journal_file0", "journal_file1", }; +static int jfile_fds[2]; + +static struct journal_file jfile; +static pthread_spinlock_t jfile_lock; + +static int zero_out_jfile(int fd) +{ + char *buf; + ssize_t wlen; + + buf = valloc(JOURNAL_FILE_SIZE); + memset(buf, 0, JOURNAL_FILE_SIZE); + wlen = xpwrite(fd, buf, JOURNAL_FILE_SIZE, 0); + if (wlen != JOURNAL_FILE_SIZE) { + eprintf("WARN: failed, %m\n"); + return -1; + } + + free(buf); + return 0; +} + +static int create_journal_file(const char *root, const char *name) +{ + int fd, flags = O_DSYNC | O_RDWR | O_TRUNC | O_CREAT | O_DIRECT; + char path[PATH_MAX]; + + sprintf(path, "%s/%s", root, name); + fd = open(path, flags, 0644); + if (fd < 0) { + eprintf("open %s %m\n", name); + return -1; + } + if (prealloc(fd, JOURNAL_FILE_SIZE) < 0) { + eprintf("prealloc %s %m\n", name); + return -1; + } + + /* Turn unwritten extents of FS into written ones for faster write */ + if (zero_out_jfile(fd) < 0) { + close(fd); + return -1; + } + return fd; +} + +/* We should have two valid FDs, otherwise something goes wrong */ +static int get_old_new_jfile(const char *p, int *old, int *new) +{ + int fd1, fd2; + int flags = O_RDONLY; + char path[PATH_MAX]; + struct stat st1, st2; + + sprintf(path, "%s/%s", p, jfile_name[0]); + fd1 = open(path, flags); + if (fd1 < 0) { + if (errno != EEXIST) + return 0; + + eprintf("open1 %m\n"); + return -1; + } + sprintf(path, "%s/%s", p, jfile_name[1]); + fd2 = open(path, flags); + if (fd2 < 0) { + eprintf("open2 %m\n"); + close(fd1); + return -1; + } + + if (fstat(fd1, &st1) < 0 || fstat(fd2, &st2) < 0) { + eprintf("stat %m\n"); + goto out; + } + + if (st1.st_mtime < st2.st_mtime) { + *old = fd1; + *new = fd2; + } else { + *old = fd2; + *new = fd1; + } + + return 0; +out: + close(fd1); + close(fd2); + return -1; +} + +static bool journal_entry_full_write(struct journal_descriptor *jd) +{ + char *end = (char *)jd + + round_up(jd->size, SECTOR_SIZE) + JOURNAL_META_SIZE; + uint32_t marker = *(((uint32_t *)end) - 1); + + if (marker != JOURNAL_END_MARKER) + return false; + return true; +} + +static int replay_journal_entry(struct journal_descriptor *jd) +{ + char path[PATH_MAX]; + ssize_t size; + int fd, flags = O_WRONLY, ret = 0; + void *buf; + char *p = (char *)jd; + + dprintf("%"PRIx64", size %"PRIu64", off %"PRIu64", %d\n", + jd->oid, jd->size, jd->offset, jd->create); + + if (jd->create) + flags |= O_CREAT; + sprintf(path, "%s%016" PRIx64, obj_path, jd->oid); + fd = open(path, flags, def_fmode); + if (fd < 0) { + eprintf("open %m\n"); + return -1; + } + + buf = malloc(jd->size); + p += JOURNAL_DESC_SIZE; + memcpy(buf, p, jd->size); + size = xpwrite(fd, buf, jd->size, jd->offset); + if (size != jd->size) { + eprintf("write %zd, size %zu, errno %m\n", size, jd->size); + ret = -1; + goto out; + } +out: + close(fd); + return ret; +} + +static bool journal_descriptor_valid(struct journal_descriptor *jd) +{ + if (jd->magic != JOURNAL_DESC_MAGIC) + return false; + if (jd->oid == 0) + return false; + if (jd->size > SD_INODE_SIZE) + return false; + if (jd->create > 1) + return false; + return true; +} + +static int do_recover(int fd) +{ + struct journal_descriptor *jd; + void *map; + char *p, *end; + + map = mmap(NULL, JOURNAL_FILE_SIZE, PROT_READ, MAP_PRIVATE, fd, 0); + close(fd); + if (map == MAP_FAILED) { + eprintf("%m\n"); + return -1; + } + + end = (char *)map + JOURNAL_FILE_SIZE; + for (p = map; p < end;) { + jd = (struct journal_descriptor *)p; + if (!journal_descriptor_valid(jd)) { + /* Empty area */ + p += SECTOR_SIZE; + continue; + } + /* We skip partial write because it is not acked back to VM */ + if (!journal_entry_full_write(jd)) + goto skip; + + if (replay_journal_entry(jd) < 0) + return -1; +skip: + p += JOURNAL_META_SIZE + round_up(jd->size, SECTOR_SIZE); + } + munmap(map, JOURNAL_FILE_SIZE); + /* Do a final sync() to assure data is reached to the disk */ + sync(); + close(fd); + return 0; +} + +/* + * We recover the journal file in order of wall time in the corner case that + * sheep crashes while in the middle of journal committing. For most of cases, + * we actually only recover one jfile, the other would be empty. This process + * is fast with buffered IO that only take several secends at most. + */ +static int check_recover_journal_file(const char *p) +{ + int old = 0, new = 0; + + if (get_old_new_jfile(p, &old, &new) < 0) + return -1; + + /* No journal file found */ + if (old == 0) + return 0; + + if (do_recover(old) < 0) + return -1; + if (do_recover(new) < 0) + return -1; + + return 0; +} + +int journal_file_init(const char *path) +{ + int fd; + + if (check_recover_journal_file(path) < 0) + return -1; + + fd = create_journal_file(path, jfile_name[0]); + if (fd < 0) + return -1; + jfile.fd = jfile_fds[0] = fd; + + fd = create_journal_file(path, jfile_name[1]); + jfile_fds[1] = fd; + + pthread_spin_init(&jfile_lock, PTHREAD_PROCESS_PRIVATE); + return 0; +} + +static bool jfile_enough_space(size_t size) +{ + if (jfile.pos + size > JOURNAL_FILE_SIZE) + return false; + return true; +} + +/* + * We rely on the kernel's page cache to cache data objects to 1) boost read + * perfmance 2) simplify read path so that journal file commiting is simply a + * sync() operation and We do it in a dedicated thread to avoid blocking + * the writer by switch back and forth between two journal files. + */ +static void *journal_file_commit(void *ignored) +{ + int err; + + /* Tell runtime to release resources after termination */ + err = pthread_detach(pthread_self()); + if (err) + panic("%s\n", strerror(err)); + + sync(); + if (zero_out_jfile(jfile.commit_fd) < 0) + panic("failed to zero journal file\n"); + uatomic_set_false(&jfile.in_commit); + + pthread_exit(NULL); +} + +/* FIXME: Try not sleep inside lock */ +static void switch_journal_file(void) +{ + int old = jfile.fd, err; + pthread_t thread; + +retry: + if (!uatomic_set_true(&jfile.in_commit)) { + eprintf("journal file in committing, " + "you might need enlarge jfile size\n"); + usleep(100000); /* Wait until committing is finished */ + goto retry; + } + + if (old == jfile_fds[0]) + jfile.fd = jfile_fds[1]; + else + jfile.fd = jfile_fds[0]; + jfile.commit_fd = old; + jfile.pos = 0; + + err = pthread_create(&thread, NULL, journal_file_commit, NULL); + if (err) + panic("%s\n", strerror(err)); +} + +int journal_file_write(uint64_t oid, const char *buf, size_t size, + off_t offset, bool create) +{ + struct journal_descriptor jd; + uint32_t marker = JOURNAL_END_MARKER; + int ret = SD_RES_SUCCESS; + ssize_t written, rusize = round_up(size, SECTOR_SIZE), + wsize = JOURNAL_META_SIZE + rusize; + off_t woff; + char *wbuffer, *p; + + jd.magic = JOURNAL_DESC_MAGIC; + jd.offset = offset; + jd.size = size; + jd.oid = oid; + jd.create = create; + + pthread_spin_lock(&jfile_lock); + if (!jfile_enough_space(wsize)) + switch_journal_file(); + woff = jfile.pos; + jfile.pos += wsize; + pthread_spin_unlock(&jfile_lock); + + p = wbuffer = valloc(wsize); + if (!wbuffer) + panic("%m\n"); + memcpy(p, &jd, JOURNAL_DESC_SIZE); + p += JOURNAL_DESC_SIZE; + memcpy(p, buf, rusize); + p += rusize; + memcpy(p, &marker, JOURNAL_MARKER_SIZE); + + dprintf("oid %lx, pos %zu, wsize %zu\n", oid, jfile.pos, wsize); + /* + * Concurrent writes with the same FD is okay because we don't have any + * critical sections that need lock inside kernel write path, since we + * a) bypass page cache, b) don't modify i_size of this inode. + * + * Feel free to correct me If I am wrong. + */ + written = xpwrite(jfile.fd, wbuffer, wsize, woff); + if (written != wsize) { + eprintf("failed, written %zd, len %zu\n", written, wsize); + ret = err_to_sderr(oid, errno); + goto out; + } +out: + free(wbuffer); + return ret; +} diff --git a/sheep/plain_store.c b/sheep/plain_store.c index 908f761..834ecb0 100644 --- a/sheep/plain_store.c +++ b/sheep/plain_store.c @@ -24,10 +24,11 @@ static int get_open_flags(uint64_t oid, bool create, int fl) { int flags = O_DSYNC | O_RDWR; - if (fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled()) + if ((fl & SD_FLAG_CMD_CACHE && is_disk_cache_enabled()) || + uatomic_is_true(&sys->use_journal)) flags &= ~O_DSYNC; - if (is_data_obj(oid)) + if (is_data_obj(oid) && !uatomic_is_true(&sys->use_journal)) flags |= O_DIRECT; if (create) @@ -108,7 +109,7 @@ bool default_exist(uint64_t oid) return true; } -static int err_to_sderr(uint64_t oid, int err) +int err_to_sderr(uint64_t oid, int err) { struct stat s; @@ -143,6 +144,16 @@ int default_write(uint64_t oid, const struct siocb *iocb) } get_obj_path(oid, path); + + if (uatomic_is_true(&sys->use_journal) && + journal_file_write(oid, iocb->buf, iocb->length, iocb->offset, 0) + != SD_RES_SUCCESS) { + eprintf("turn off journaling\n"); + uatomic_set_false(&sys->use_journal); + flags |= O_DSYNC | O_DIRECT; + sync(); + } + fd = open(path, flags, def_fmode); if (fd < 0) return err_to_sderr(oid, errno); @@ -305,6 +316,15 @@ int default_create_and_write(uint64_t oid, const struct siocb *iocb) get_obj_path(oid, path); get_tmp_obj_path(oid, tmp_path); + if (uatomic_is_true(&sys->use_journal) && + journal_file_write(oid, iocb->buf, iocb->length, iocb->offset, 1) + != SD_RES_SUCCESS) { + eprintf("turn off journaling\n"); + uatomic_set_false(&sys->use_journal); + flags |= O_DSYNC | O_DIRECT; + sync(); + } + fd = open(tmp_path, flags, def_fmode); if (fd < 0) { if (errno == EEXIST) { diff --git a/sheep/sheep.c b/sheep/sheep.c index 3ec2c4d..376cbad 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -46,7 +46,7 @@ static struct option const long_options[] = { {"foreground", no_argument, NULL, 'f'}, {"gateway", no_argument, NULL, 'g'}, {"help", no_argument, NULL, 'h'}, - {"journal", no_argument, NULL, 'j'}, + {"journal", required_argument, NULL, 'j'}, {"loglevel", required_argument, NULL, 'l'}, {"myaddr", required_argument, NULL, 'y'}, {"stdout", no_argument, NULL, 'o'}, @@ -59,7 +59,7 @@ static struct option const long_options[] = { {NULL, 0, NULL, 0}, }; -static const char *short_options = "b:c:dDfghjl:op:P:s:uw:y:z:"; +static const char *short_options = "b:c:dDfghj:l:op:P:s:uw:y:z:"; static void usage(int status) { @@ -77,7 +77,7 @@ Options:\n\ -f, --foreground make the program run in the foreground\n\ -g, --gateway make the progam run as a gateway mode\n\ -h, --help display this help and exit\n\ - -j, --journal use jouranl to update vdi objects\n\ + -j, --journal use jouranl file to log all the write operations\n\ -l, --loglevel specify the level of logging detail\n\ -o, --stdout log to stdout instead of shared logger\n\ -p, --port specify the TCP port on which to listen\n\ @@ -332,6 +332,7 @@ int main(int argc, char **argv) unsigned char buf[sizeof(struct in6_addr)]; int ipv4 = 0; int ipv6 = 0; + char journal_path[PATH_MAX]; signal(SIGPIPE, SIG_IGN); @@ -411,7 +412,8 @@ int main(int argc, char **argv) case 'c': sys->cdrv = find_cdrv(optarg); if (!sys->cdrv) { - fprintf(stderr, "Invalid cluster driver '%s'\n", optarg); + fprintf(stderr, "Invalid cluster driver '%s'\n", + optarg); fprintf(stderr, "Supported drivers:"); FOR_EACH_CLUSTER_DRIVER(cdrv) { fprintf(stderr, " %s", cdrv->name); @@ -426,7 +428,8 @@ int main(int argc, char **argv) init_cache_type(optarg); break; case 'j': - sys->use_journal = true; + uatomic_set_true(&sys->use_journal); + sprintf(journal_path, "%s", optarg); break; case 'b': /* validate provided address using inet_pton */ @@ -471,7 +474,7 @@ int main(int argc, char **argv) if (ret) exit(1); - ret = init_store(dir); + ret = init_store(dir, journal_path); if (ret) exit(1); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 0480e26..f849400 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -122,7 +122,7 @@ struct cluster_info { uint32_t object_cache_size; bool object_cache_directio; - bool use_journal; + uatomic_bool use_journal; bool upgrade; /* upgrade data layout before starting service * if necessary*/ }; @@ -183,6 +183,7 @@ int default_remove_object(uint64_t oid); int default_purge_obj(void); int for_each_object_in_wd(int (*func)(uint64_t oid, void *arg), bool cleanup, void *arg); +int err_to_sderr(uint64_t oid, int err); extern struct list_head store_drivers; #define add_store_driver(driver) \ @@ -218,7 +219,7 @@ static inline uint32_t sys_epoch(void) int create_listen_port(char *bindaddr, int port, void *data); -int init_store(const char *dir); +int init_store(const char *dir, const char *journal); int init_base_path(const char *dir); int fill_vdi_copy_list(void *data); @@ -417,4 +418,7 @@ static inline bool is_disk_cache_enabled(void) return !!(sys->enabled_cache_type & CACHE_TYPE_DISK); } +/* journal_file.c */ +int journal_file_init(const char *p); +int journal_file_write(uint64_t oid, const char *buf, size_t size, off_t, bool); #endif diff --git a/sheep/store.c b/sheep/store.c index 653fff5..e212965 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -402,7 +402,7 @@ out: return ret; } -int init_store(const char *d) +int init_store(const char *d, const char *j) { int ret; @@ -430,6 +430,13 @@ int init_store(const char *d) if (ret) return ret; + /* We should init journal file before backend init */ + if (uatomic_is_true(&sys->use_journal)) { + ret = journal_file_init(j); + if (ret) + return ret; + } + if (!sys->gateway_only) { ret = init_store_driver(); if (ret) -- 1.7.9.5 |