This patch adds the feature of atomicity while performing vdi data object update/write operation. With the help of the journalling API, implemented the task of updating vdi object atomically in store_queue_request_local() for the operations SD_OP_WRITE_OBJ & SD_OP_CREATE_AND_WRITE_OBJ. Signed-off-by: Narendra <narendramind at gmail.com> sheep/sheep.c | 2 + sheep/sheep_priv.h | 63 ++++++++ sheep/store.c | 433 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 486 insertions(+), 12 deletions(-) diff --git a/sheep/sheep.c b/sheep/sheep.c index dc9a320..d6c776d 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -112,6 +112,8 @@ int main(int argc, char **argv) if (is_daemon && daemon(0, 0)) exit(1); + jrnl_recover(); + ret = init_event(EPOLL_SIZE); if (ret) exit(1); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 62924f2..abc4681 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -202,6 +202,69 @@ int remove_object(struct sheepdog_node_list_entry *e, int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx, uint32_t epoch, int worker_idx); +/* Journal */ +typedef uint32_t end_mark_t; +typedef uint32_t jrnl_type_t; + +#define JRNL_TYPE_VDI 0 +#define JRNL_MAX_TYPES 1 + +#define SET_END_MARK 1UL +#define UNSET_END_MARK 0UL +#define IS_END_MARK_SET(var) (var == 1UL) + +/* Journal header for data object */ +typedef struct jrnl_vdi_head { + jrnl_type_t jh_type; + uint64_t jh_offset; + uint64_t jh_size; +} jrnl_vdi_head_t; + +typedef struct jrnl_file_desc { + uint32_t jf_epoch; /* epoch */ + uint64_t jf_oid; /* Object id */ + int jf_fd; /* Open file fd */ + int jf_target_fd; +} jrnl_file_desc_t; + +typedef struct jrnl_descriptor { + void *jd_head; + void *jd_data; + int jd_end_mark; + jrnl_file_desc_t jd_jfd; +#define jdf_epoch jd_jfd.jf_epoch +#define jdf_oid jd_jfd.jf_oid +#define jdf_fd jd_jfd.jf_fd +#define jdf_target_fd jd_jfd.jf_target_fd +} jrnl_desc_t; + +typedef struct jrnl_handler { + int (*has_end_mark)(jrnl_desc_t *jd); + int (*write_header)(jrnl_desc_t *jd); + int (*write_data)(jrnl_desc_t *jd); + int (*write_end_mark)(jrnl_desc_t *jd); + int (*apply_to_target_object)(jrnl_file_desc_t *jfd); + int (*commit_data)(jrnl_desc_t *jd); +} jrnl_handler_t; + +inline jrnl_type_t jrnl_get_type(jrnl_desc_t *jd); +int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type); +int jrnl_exists(jrnl_file_desc_t *jfd); +int jrnl_update_epoch_store(uint32_t epoch); +int jrnl_open(jrnl_file_desc_t *jfd, int aflags); +int jrnl_create(jrnl_file_desc_t *jfd); +int jrnl_remove(jrnl_file_desc_t *jfd); +inline int jrnl_close(jrnl_file_desc_t *jfd); + +inline int jrnl_has_end_mark(jrnl_desc_t *jd); +inline int jrnl_write_header(jrnl_desc_t *jd); +inline int jrnl_write_data(jrnl_desc_t *jd); +inline int jrnl_write_end_mark(jrnl_desc_t *jd); +inline int jrnl_apply_to_targe_object(jrnl_file_desc_t *jfd); +inline int jrnl_commit_data(jrnl_desc_t *jd); +int jrnl_perform(jrnl_desc_t *jd); +int jrnl_recover(void); + static inline int is_myself(struct sheepdog_node_list_entry *e) { return e->id == sys->this_node.id; diff --git a/sheep/store.c b/sheep/store.c index 8c688c1..bf44815 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -31,10 +31,32 @@ static char *obj_path; static char *epoch_path; static char *mnt_path; +static char *jrnl_path; static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP; static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; +/* Journal internal data structures */ +/* Journal Handlers for Data Object */ +static int jrnl_vdi_has_end_mark(jrnl_desc_t *jd); +static int jrnl_vdi_write_header(jrnl_desc_t *jd); +static int jrnl_vdi_write_data(jrnl_desc_t *jd); +static int jrnl_vdi_write_end_mark(jrnl_desc_t *jd); +static int jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd); +static int jrnl_vdi_commit_data(jrnl_desc_t *jd); + +static jrnl_handler_t jrnl_handlers[JRNL_MAX_TYPES] = { + { + .has_end_mark = jrnl_vdi_has_end_mark, + .write_header = jrnl_vdi_write_header, + .write_data = jrnl_vdi_write_data, + .write_end_mark = jrnl_vdi_write_end_mark, + .apply_to_target_object = jrnl_vdi_apply_to_target_object, + .commit_data = jrnl_vdi_commit_data + } +}; + + static int obj_cmp(const void *oid1, const void *oid2) { const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT); @@ -556,6 +578,8 @@ static int store_queue_request_local(struct request *req, uint32_t epoch) uint64_t oid = hdr->oid; uint32_t opcode = hdr->opcode; char path[1024], *buf; + jrnl_desc_t jd; + jrnl_vdi_head_t jh; switch (opcode) { case SD_OP_CREATE_AND_WRITE_OBJ: @@ -671,19 +695,30 @@ static int store_queue_request_local(struct request *req, uint32_t epoch) case SD_OP_WRITE_OBJ: case SD_OP_CREATE_AND_WRITE_OBJ: if (!is_data_obj(oid)) { - /* FIXME: write data to journal */ - } - ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset); - if (ret != hdr->data_length) { - if (errno == ENOSPC) - ret = SD_RES_NO_SPACE; - else - ret = SD_RES_EIO; - goto out; - } + jd.jdf_epoch = epoch; + jd.jdf_oid = oid; + jd.jdf_target_fd = fd; - if (!is_data_obj(oid)) { - /* FIXME: remove journal data */ + jh.jh_type = JRNL_TYPE_VDI; + jh.jh_offset = hdr->offset; + jh.jh_size = hdr->data_length; + + jd.jd_head = &jh; + jd.jd_data = req->data; + jd.jd_end_mark = SET_END_MARK; + + ret = jrnl_perform(&jd); + if (ret) + goto out; + } else { + ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset); + if (ret != hdr->data_length) { + if (errno == ENOSPC) + ret = SD_RES_NO_SPACE; + else + ret = SD_RES_EIO; + goto out; + } } ret = SD_RES_SUCCESS; @@ -1739,6 +1774,27 @@ static int init_mnt_path(const char *base_path) return 0; } +#define JRNL_PATH "/journal/" + +static int init_jrnl_path(const char *base_path) +{ + int new, ret; + + /* Create journal directory */ + jrnl_path = zalloc(strlen(base_path) + strlen(JRNL_PATH) + 1); + sprintf(jrnl_path, "%s" JRNL_PATH, base_path); + + ret = init_path(jrnl_path, &new); + /* Error during directory creation */ + if (ret) + return ret; + /* If journal is newly created */ + if (new) + return 0; + + return 0; +} + int init_store(const char *d) { int ret; @@ -1759,6 +1815,10 @@ int init_store(const char *d) if (ret) return ret; + ret = init_jrnl_path(d); + if (ret) + return ret; + return ret; } @@ -1791,3 +1851,352 @@ int get_global_nr_copies(uint32_t *copies) { return attr(epoch_path, ANAME_COPIES, copies, sizeof(*copies), 0); } + +/* Journal APIs */ +int jrnl_exists(jrnl_file_desc_t *jfd) +{ + int ret; + char path[1024]; + struct stat s; + + snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid); + + ret = stat(path, &s); + if (ret) + return 1; + + return 0; +} + +int jrnl_update_epoch_store(uint32_t epoch) +{ + char new[1024]; + struct stat s; + + snprintf(new, sizeof(new), "%s%08u/", jrnl_path, epoch); + if (stat(new, &s) < 0) + if (errno == ENOENT) + mkdir(new, def_dmode); + + return 0; +} + +int jrnl_open(jrnl_file_desc_t *jfd, int aflags) +{ + char path[1024]; + int flags = aflags; + int fd, ret; + + + jrnl_update_epoch_store(jfd->jf_epoch); + snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid); + + fd = open(path, flags, def_fmode); + if (fd < 0) { + eprintf("failed to open %s, %s\n", path, strerror(errno)); + if (errno == ENOENT) + ret = SD_RES_NO_OBJ; + else + ret = SD_RES_UNKNOWN; + } else { + jfd->jf_fd = fd; + ret = SD_RES_SUCCESS; + } + + return ret; +} + +int jrnl_close(jrnl_file_desc_t *jfd) +{ + close(jfd->jf_fd); + jfd->jf_fd = -1; + + return 0; +} + +int jrnl_create(jrnl_file_desc_t *jfd) +{ + return jrnl_open(jfd, O_RDWR | O_CREAT); +} + +inline uint32_t jrnl_get_type(jrnl_desc_t *jd) +{ + return *((uint32_t *) jd->jd_head); +} + +int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type) +{ + ssize_t retsize; + + retsize = pread64(jfd->jf_fd, jrnl_type, sizeof(*jrnl_type), 0); + + if (retsize != sizeof(*jrnl_type)) + return SD_RES_EIO; + else + return SD_RES_SUCCESS; +} + +int jrnl_remove(jrnl_file_desc_t *jfd) +{ + char path[1024]; + int ret; + + snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid); + ret = unlink(path); + if (ret) { + eprintf("failed to remove %s, %s\n", path, strerror(errno)); + ret = SD_RES_EIO; + } else + ret = SD_RES_SUCCESS; + + return ret; +} + +inline int jrnl_has_end_mark(jrnl_desc_t *jd) +{ + return jrnl_handlers[jrnl_get_type(jd)].has_end_mark(jd); +} + +inline int jrnl_write_header(jrnl_desc_t *jd) +{ + return jrnl_handlers[jrnl_get_type(jd)].write_header(jd); +} + +inline int jrnl_write_data(jrnl_desc_t *jd) +{ + return jrnl_handlers[jrnl_get_type(jd)].write_data(jd); +} + +inline int jrnl_write_end_mark(jrnl_desc_t *jd) +{ + return jrnl_handlers[jrnl_get_type(jd)].write_end_mark(jd); +} + +inline int jrnl_apply_to_target_object(jrnl_file_desc_t *jfd) +{ + int ret; + jrnl_type_t jrnl_type; + + ret = jrnl_get_type_from_file(jfd, &jrnl_type); + + return jrnl_handlers[jrnl_type].apply_to_target_object(jfd); +} + +inline int jrnl_commit_data(jrnl_desc_t *jd) +{ + return jrnl_handlers[jrnl_get_type(jd)].commit_data(jd); +} + +int jrnl_perform(jrnl_desc_t *jd) +{ + int ret; + + ret = jrnl_create(&jd->jd_jfd); + if (ret) + goto out; + + ret = jrnl_write_header(jd); + if (ret) + goto out; + + ret = jrnl_write_data(jd); + if (ret) + goto out; + + ret = jrnl_write_end_mark(jd); + if (ret) + goto out; + + ret = jrnl_commit_data(jd); + if (ret) + goto out; + + ret = jrnl_remove(&jd->jd_jfd); + +out: + return ret; +} + +int jrnl_recover(void) +{ + DIR *dir; + struct dirent *d; + char jrnl_dir[1024], + jrnl_file_path[1024], + obj_file_path[1024]; + int epoch; + + epoch = get_latest_epoch(); + if (epoch < 0) { + return 1; + } + snprintf(jrnl_dir, sizeof(jrnl_dir), "%s%08u/", jrnl_path, epoch); + + eprintf("Openning the directory%s.\n", jrnl_dir); + dir = opendir(jrnl_dir); + if (!dir) + return -1; + + vprintf(SDOG_NOTICE "start jrnl_recovery.\n"); + while ((d = readdir(dir))) { + int ret; + jrnl_file_desc_t jfd; + + if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, "..")) + continue; + + jfd.jf_epoch = epoch; + sscanf(d->d_name, "%" PRIx64, &jfd.jf_oid); + snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%016" PRIx64, + jrnl_dir, jfd.jf_oid); + snprintf(obj_file_path, sizeof(obj_file_path), "%s%08u/%016" PRIx64, + obj_path, epoch, jfd.jf_oid); + ret = jrnl_open(&jfd, O_RDONLY); + if (ret) { + eprintf("Unable to open the journal file, %s, for reading.\n", jrnl_file_path); + goto end_while_3; + } + jfd.jf_target_fd = ob_open(epoch, jfd.jf_oid, 0, &ret); + if (ret) { + eprintf("Unable to open the object file, %s, to recover.\n", obj_file_path); + goto end_while_2; + } + ret = jrnl_apply_to_target_object(&jfd); + if (ret) + eprintf("Unable to recover the object, %s.\n", obj_file_path); + + close(jfd.jf_target_fd); + jfd.jf_target_fd = -1; + end_while_2: + jrnl_close(&jfd); + end_while_3: + vprintf(SDOG_INFO "recovered the object in journal, %s\n", jrnl_file_path); + jrnl_remove(&jfd); + } + closedir(dir); + vprintf(SDOG_NOTICE "end jrnl_recovery.\n"); + + return 0; +} + +/* VDI data journalling functions */ +static int jrnl_vdi_has_end_mark(jrnl_desc_t *jd) +{ + ssize_t ret; + end_mark_t end_mark = UNSET_END_MARK; + jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head; + + ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark), + sizeof(*head) + head->jh_size); + + return (IS_END_MARK_SET(end_mark)? SET_END_MARK: UNSET_END_MARK); +} + +static int jrnl_vdi_write_header(jrnl_desc_t *jd) +{ + ssize_t ret; + jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head; + + ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0); + + if (ret != sizeof(*head)) { + if (errno == ENOSPC) + ret = SD_RES_NO_SPACE; + else + ret = SD_RES_EIO; + } else + ret = SD_RES_SUCCESS; + + return ret; +} + +static int jrnl_vdi_write_data(jrnl_desc_t *jd) +{ + ssize_t ret; + jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head; + + ret = pwrite64(jd->jdf_fd, jd->jd_data, head->jh_size, sizeof(*head)); + + if (ret != head->jh_size) { + if (errno == ENOSPC) + ret = SD_RES_NO_SPACE; + else + ret = SD_RES_EIO; + } else + ret = SD_RES_SUCCESS; + + return ret; +} + +static int jrnl_vdi_write_end_mark(jrnl_desc_t *jd) +{ + ssize_t retsize; + int ret; + end_mark_t end_mark = SET_END_MARK; + jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head; + + retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark), + sizeof(*head) + head->jh_size); + + if (retsize != sizeof(end_mark)) { + if (errno == ENOSPC) + ret = SD_RES_NO_SPACE; + else + ret = SD_RES_EIO; + } else + ret = SD_RES_SUCCESS; + + jd->jd_end_mark= end_mark; + + return ret; +} + +static int jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd) +{ + char *buf = NULL; + int buf_len, res = 0; + ssize_t retsize; + jrnl_vdi_head_t jh; + + /* FIXME: handle larger size */ + buf_len = (1 << 22); + buf = zalloc(buf_len); + if (!buf) { + eprintf("failed to allocate memory\n"); + return SD_RES_NO_MEM; + } + + /* Flush out journal to disk (vdi object) */ + retsize = pread64(jfd->jf_fd, &jh, sizeof(jh), 0); + retsize = pread64(jfd->jf_fd, buf, jh.jh_size, sizeof(jh)); + retsize = pwrite64(jfd->jf_target_fd, buf, jh.jh_size, jh.jh_offset); + if (retsize != jh.jh_size) { + if (errno == ENOSPC) + res = SD_RES_NO_SPACE; + else + res = SD_RES_EIO; + } + + /* Clean up */ + free(buf); + + return res; +} + +static int jrnl_vdi_commit_data(jrnl_desc_t *jd) +{ + int ret = 0; + ssize_t retsize; + jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head; + + retsize = pwrite64(jd->jdf_target_fd, jd->jd_data, head->jh_size, head->jh_offset); + if (retsize != head->jh_size) { + if (errno == ENOSPC) + ret = SD_RES_NO_SPACE; + else + ret = SD_RES_EIO; + } + + return ret; +} + -------------- next part -------------- An HTML attachment was scrubbed... URL: <http://lists.wpkg.org/pipermail/sheepdog/attachments/20101226/13cd5a75/attachment.html> |