This patch adds the feature of atomicity while performing vdi data object<br>update/write operation.<br><br>With the help of the journalling API, implemented the task of updating<br>vdi object atomically in store_queue_request_local() for the<br>
operations SD_OP_WRITE_OBJ & SD_OP_CREATE_AND_WRITE_OBJ.<br><br>Signed-off-by: Narendra <narendramind at <a href="http://gmail.com">gmail.com</a>><br><br> sheep/sheep.c | 2 +<br> sheep/sheep_priv.h | 63 ++++++++<br>
sheep/store.c | 433 ++++++++++++++++++++++++++++++++++++++++++++++++++--<br> 3 files changed, 486 insertions(+), 12 deletions(-)<br><br>diff --git a/sheep/sheep.c b/sheep/sheep.c<br>index dc9a320..d6c776d 100644<br>
--- a/sheep/sheep.c<br>+++ b/sheep/sheep.c<br>@@ -112,6 +112,8 @@ int main(int argc, char **argv)<br> if (is_daemon && daemon(0, 0))<br> exit(1);<br> <br>+ jrnl_recover();<br>+<br> ret = init_event(EPOLL_SIZE);<br>
if (ret)<br> exit(1);<br>diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h<br>index 62924f2..abc4681 100644<br>--- a/sheep/sheep_priv.h<br>+++ b/sheep/sheep_priv.h<br>@@ -202,6 +202,69 @@ int remove_object(struct sheepdog_node_list_entry *e,<br>
int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx,<br> uint32_t epoch, int worker_idx);<br> <br>+/* Journal */<br>+typedef uint32_t end_mark_t;<br>+typedef uint32_t jrnl_type_t;<br>+<br>+#define JRNL_TYPE_VDI 0<br>
+#define JRNL_MAX_TYPES 1<br>+<br>+#define SET_END_MARK 1UL<br>+#define UNSET_END_MARK 0UL<br>+#define IS_END_MARK_SET(var) (var == 1UL)<br>+<br>+/* Journal header for data object */<br>+typedef struct jrnl_vdi_head {<br>
+ jrnl_type_t jh_type;<br>+ uint64_t jh_offset;<br>+ uint64_t jh_size;<br>+} jrnl_vdi_head_t;<br>+<br>+typedef struct jrnl_file_desc {<br>+ uint32_t jf_epoch; /* epoch */<br>+ uint64_t jf_oid; /* Object id */<br>
+ int jf_fd; /* Open file fd */<br>+ int jf_target_fd;<br>+} jrnl_file_desc_t;<br>+<br>+typedef struct jrnl_descriptor {<br>+ void *jd_head;<br>+ void *jd_data;<br>
+ int jd_end_mark;<br>+ jrnl_file_desc_t jd_jfd;<br>+#define jdf_epoch jd_jfd.jf_epoch<br>+#define jdf_oid jd_jfd.jf_oid<br>+#define jdf_fd jd_jfd.jf_fd<br>+#define jdf_target_fd jd_jfd.jf_target_fd<br>
+} jrnl_desc_t;<br>+<br>+typedef struct jrnl_handler {<br>+ int (*has_end_mark)(jrnl_desc_t *jd);<br>+ int (*write_header)(jrnl_desc_t *jd);<br>+ int (*write_data)(jrnl_desc_t *jd);<br>+ int (*write_end_mark)(jrnl_desc_t *jd);<br>
+ int (*apply_to_target_object)(jrnl_file_desc_t *jfd);<br>+ int (*commit_data)(jrnl_desc_t *jd);<br>+} jrnl_handler_t;<br>+<br>+inline jrnl_type_t jrnl_get_type(jrnl_desc_t *jd);<br>+int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type);<br>
+int jrnl_exists(jrnl_file_desc_t *jfd);<br>+int jrnl_update_epoch_store(uint32_t epoch);<br>+int jrnl_open(jrnl_file_desc_t *jfd, int aflags);<br>+int jrnl_create(jrnl_file_desc_t *jfd);<br>+int jrnl_remove(jrnl_file_desc_t *jfd);<br>
+inline int jrnl_close(jrnl_file_desc_t *jfd);<br>+<br>+inline int jrnl_has_end_mark(jrnl_desc_t *jd);<br>+inline int jrnl_write_header(jrnl_desc_t *jd);<br>+inline int jrnl_write_data(jrnl_desc_t *jd);<br>+inline int jrnl_write_end_mark(jrnl_desc_t *jd);<br>
+inline int jrnl_apply_to_targe_object(jrnl_file_desc_t *jfd);<br>+inline int jrnl_commit_data(jrnl_desc_t *jd);<br>+int jrnl_perform(jrnl_desc_t *jd);<br>+int jrnl_recover(void);<br>+<br> static inline int is_myself(struct sheepdog_node_list_entry *e)<br>
{<br> return e->id == sys-><a href="http://this_node.id">this_node.id</a>;<br>diff --git a/sheep/store.c b/sheep/store.c<br>index 8c688c1..bf44815 100644<br>--- a/sheep/store.c<br>+++ b/sheep/store.c<br>@@ -31,10 +31,32 @@<br>
static char *obj_path;<br> static char *epoch_path;<br> static char *mnt_path;<br>+static char *jrnl_path;<br> <br> static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;<br> static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;<br>
<br>+/* Journal internal data structures */<br>+/* Journal Handlers for Data Object */<br>+static int jrnl_vdi_has_end_mark(jrnl_desc_t *jd);<br>+static int jrnl_vdi_write_header(jrnl_desc_t *jd);<br>+static int jrnl_vdi_write_data(jrnl_desc_t *jd);<br>
+static int jrnl_vdi_write_end_mark(jrnl_desc_t *jd);<br>+static int jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd);<br>+static int jrnl_vdi_commit_data(jrnl_desc_t *jd);<br>+<br>+static jrnl_handler_t jrnl_handlers[JRNL_MAX_TYPES] = {<br>
+ {<br>+ .has_end_mark = jrnl_vdi_has_end_mark,<br>+ .write_header = jrnl_vdi_write_header,<br>+ .write_data = jrnl_vdi_write_data,<br>+ .write_end_mark = jrnl_vdi_write_end_mark,<br>+ .apply_to_target_object = jrnl_vdi_apply_to_target_object,<br>
+ .commit_data = jrnl_vdi_commit_data<br>+ }<br>+};<br>+<br>+<br> static int obj_cmp(const void *oid1, const void *oid2)<br> {<br> const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);<br>
@@ -556,6 +578,8 @@ static int store_queue_request_local(struct request *req, uint32_t epoch)<br> uint64_t oid = hdr->oid;<br> uint32_t opcode = hdr->opcode;<br> char path[1024], *buf;<br>+ jrnl_desc_t jd;<br>
+ jrnl_vdi_head_t jh;<br> <br> switch (opcode) {<br> case SD_OP_CREATE_AND_WRITE_OBJ:<br>@@ -671,19 +695,30 @@ static int store_queue_request_local(struct request *req, uint32_t epoch)<br> case SD_OP_WRITE_OBJ:<br>
case SD_OP_CREATE_AND_WRITE_OBJ:<br> if (!is_data_obj(oid)) {<br>- /* FIXME: write data to journal */<br>- }<br>- ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);<br>
- if (ret != hdr->data_length) {<br>- if (errno == ENOSPC)<br>- ret = SD_RES_NO_SPACE;<br>- else<br>- ret = SD_RES_EIO;<br>- goto out;<br>- }<br>
+ jd.jdf_epoch = epoch;<br>+ jd.jdf_oid = oid;<br>+ jd.jdf_target_fd = fd;<br> <br>- if (!is_data_obj(oid)) {<br>- /* FIXME: remove journal data */<br>+ jh.jh_type = JRNL_TYPE_VDI;<br>
+ jh.jh_offset = hdr->offset;<br>+ jh.jh_size = hdr->data_length;<br>+<br>+ jd.jd_head = &jh;<br>+ jd.jd_data = req->data;<br>+ jd.jd_end_mark = SET_END_MARK;<br>
+<br>+ ret = jrnl_perform(&jd);<br>+ if (ret)<br>+ goto out;<br>+ } else {<br>+ ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);<br>+ if (ret != hdr->data_length) {<br>
+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>+ ret = SD_RES_EIO;<br>+ goto out;<br>+ }<br> }<br> <br>
ret = SD_RES_SUCCESS;<br>@@ -1739,6 +1774,27 @@ static int init_mnt_path(const char *base_path)<br> return 0;<br> }<br> <br>+#define JRNL_PATH "/journal/"<br>+<br>+static int init_jrnl_path(const char *base_path)<br>
+{<br>+ int new, ret;<br>+<br>+ /* Create journal directory */<br>+ jrnl_path = zalloc(strlen(base_path) + strlen(JRNL_PATH) + 1);<br>+ sprintf(jrnl_path, "%s" JRNL_PATH, base_path);<br>+<br>+ ret = init_path(jrnl_path, &new);<br>
+ /* Error during directory creation */<br>+ if (ret)<br>+ return ret;<br>+ /* If journal is newly created */<br>+ if (new)<br>+ return 0;<br>+<br>+ return 0;<br>+}<br>+<br> int init_store(const char *d)<br>
{<br> int ret;<br>@@ -1759,6 +1815,10 @@ int init_store(const char *d)<br> if (ret)<br> return ret;<br> <br>+ ret = init_jrnl_path(d);<br>+ if (ret)<br>+ return ret;<br>+<br> return ret;<br>
}<br> <br>@@ -1791,3 +1851,352 @@ int get_global_nr_copies(uint32_t *copies)<br> {<br> return attr(epoch_path, ANAME_COPIES, copies, sizeof(*copies), 0);<br> }<br>+<br>+/* Journal APIs */<br>+int jrnl_exists(jrnl_file_desc_t *jfd)<br>
+{<br>+ int ret;<br>+ char path[1024];<br>+ struct stat s;<br>+<br>+ snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>+<br>+ ret = stat(path, &s);<br>
+ if (ret)<br>+ return 1;<br>+<br>+ return 0;<br>+}<br>+<br>+int jrnl_update_epoch_store(uint32_t epoch)<br>+{<br>+ char new[1024];<br>+ struct stat s;<br>+<br>+ snprintf(new, sizeof(new), "%s%08u/", jrnl_path, epoch);<br>
+ if (stat(new, &s) < 0)<br>+ if (errno == ENOENT)<br>+ mkdir(new, def_dmode);<br>+<br>+ return 0;<br>+}<br>+<br>+int jrnl_open(jrnl_file_desc_t *jfd, int aflags)<br>+{<br>+ char path[1024];<br>
+ int flags = aflags;<br>+ int fd, ret;<br>+<br>+<br>+ jrnl_update_epoch_store(jfd->jf_epoch);<br>+ snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>
+<br>+ fd = open(path, flags, def_fmode);<br>+ if (fd < 0) {<br>+ eprintf("failed to open %s, %s\n", path, strerror(errno));<br>+ if (errno == ENOENT)<br>+ ret = SD_RES_NO_OBJ;<br>
+ else<br>+ ret = SD_RES_UNKNOWN;<br>+ } else {<br>+ jfd->jf_fd = fd;<br>+ ret = SD_RES_SUCCESS;<br>+ }<br>+<br>+ return ret;<br>+}<br>+<br>+int jrnl_close(jrnl_file_desc_t *jfd)<br>
+{<br>+ close(jfd->jf_fd);<br>+ jfd->jf_fd = -1;<br>+<br>+ return 0;<br>+}<br>+<br>+int jrnl_create(jrnl_file_desc_t *jfd)<br>+{<br>+ return jrnl_open(jfd, O_RDWR | O_CREAT);<br>+}<br>+<br>+inline uint32_t jrnl_get_type(jrnl_desc_t *jd)<br>
+{<br>+ return *((uint32_t *) jd->jd_head);<br>+}<br>+<br>+int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type)<br>+{<br>+ ssize_t retsize;<br>+<br>+ retsize = pread64(jfd->jf_fd, jrnl_type, sizeof(*jrnl_type), 0);<br>
+<br>+ if (retsize != sizeof(*jrnl_type))<br>+ return SD_RES_EIO;<br>+ else<br>+ return SD_RES_SUCCESS;<br>+}<br>+<br>+int jrnl_remove(jrnl_file_desc_t *jfd)<br>+{<br>+ char path[1024];<br>+ int ret;<br>
+<br>+ snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>+ ret = unlink(path);<br>+ if (ret) {<br>+ eprintf("failed to remove %s, %s\n", path, strerror(errno));<br>
+ ret = SD_RES_EIO;<br>+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ return ret;<br>+}<br>+<br>+inline int jrnl_has_end_mark(jrnl_desc_t *jd)<br>+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].has_end_mark(jd);<br>
+}<br>+<br>+inline int jrnl_write_header(jrnl_desc_t *jd)<br>+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].write_header(jd);<br>+}<br>+<br>+inline int jrnl_write_data(jrnl_desc_t *jd)<br>+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].write_data(jd);<br>
+}<br>+<br>+inline int jrnl_write_end_mark(jrnl_desc_t *jd)<br>+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].write_end_mark(jd);<br>+}<br>+<br>+inline int jrnl_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>
+ int ret;<br>+ jrnl_type_t jrnl_type;<br>+<br>+ ret = jrnl_get_type_from_file(jfd, &jrnl_type);<br>+<br>+ return jrnl_handlers[jrnl_type].apply_to_target_object(jfd);<br>+}<br>+<br>+inline int jrnl_commit_data(jrnl_desc_t *jd)<br>
+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].commit_data(jd);<br>+}<br>+<br>+int jrnl_perform(jrnl_desc_t *jd)<br>+{<br>+ int ret;<br>+<br>+ ret = jrnl_create(&jd->jd_jfd);<br>+ if (ret)<br>+ goto out;<br>
+<br>+ ret = jrnl_write_header(jd);<br>+ if (ret)<br>+ goto out;<br>+<br>+ ret = jrnl_write_data(jd);<br>+ if (ret)<br>+ goto out;<br>+<br>+ ret = jrnl_write_end_mark(jd);<br>+ if (ret)<br>
+ goto out;<br>+<br>+ ret = jrnl_commit_data(jd);<br>+ if (ret)<br>+ goto out;<br>+<br>+ ret = jrnl_remove(&jd->jd_jfd);<br>+<br>+out:<br>+ return ret;<br>+}<br>+<br>+int jrnl_recover(void)<br>
+{<br>+ DIR *dir;<br>+ struct dirent *d;<br>+ char jrnl_dir[1024], <br>+ jrnl_file_path[1024],<br>+ obj_file_path[1024];<br>+ int epoch;<br>+<br>+ epoch = get_latest_epoch();<br>+ if (epoch < 0) {<br>
+ return 1;<br>+ }<br>+ snprintf(jrnl_dir, sizeof(jrnl_dir), "%s%08u/", jrnl_path, epoch);<br>+<br>+ eprintf("Openning the directory%s.\n", jrnl_dir);<br>+ dir = opendir(jrnl_dir);<br>
+ if (!dir)<br>+ return -1;<br>+<br>+ vprintf(SDOG_NOTICE "start jrnl_recovery.\n");<br>+ while ((d = readdir(dir))) {<br>+ int ret;<br>+ jrnl_file_desc_t jfd;<br>+<br>+ if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))<br>
+ continue;<br>+<br>+ jfd.jf_epoch = epoch;<br>+ sscanf(d->d_name, "%" PRIx64, &jfd.jf_oid);<br>+ snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%016" PRIx64,<br>
+ jrnl_dir, jfd.jf_oid);<br>+ snprintf(obj_file_path, sizeof(obj_file_path), "%s%08u/%016" PRIx64,<br>+ obj_path, epoch, jfd.jf_oid);<br>
+ ret = jrnl_open(&jfd, O_RDONLY);<br>+ if (ret) {<br>+ eprintf("Unable to open the journal file, %s, for reading.\n", jrnl_file_path);<br>+ goto end_while_3;<br>+ }<br>
+ jfd.jf_target_fd = ob_open(epoch, jfd.jf_oid, 0, &ret);<br>+ if (ret) {<br>+ eprintf("Unable to open the object file, %s, to recover.\n", obj_file_path);<br>+ goto end_while_2;<br>
+ }<br>+ ret = jrnl_apply_to_target_object(&jfd);<br>+ if (ret)<br>+ eprintf("Unable to recover the object, %s.\n", obj_file_path);<br>+<br>+ close(jfd.jf_target_fd);<br>
+ jfd.jf_target_fd = -1;<br>+ end_while_2:<br>+ jrnl_close(&jfd);<br>+ end_while_3:<br>+ vprintf(SDOG_INFO "recovered the object in journal, %s\n", jrnl_file_path);<br>+ jrnl_remove(&jfd);<br>
+ }<br>+ closedir(dir);<br>+ vprintf(SDOG_NOTICE "end jrnl_recovery.\n");<br>+<br>+ return 0;<br>+}<br>+<br>+/* VDI data journalling functions */<br>+static int jrnl_vdi_has_end_mark(jrnl_desc_t *jd)<br>
+{<br>+ ssize_t ret;<br>+ end_mark_t end_mark = UNSET_END_MARK;<br>+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+ ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+ sizeof(*head) + head->jh_size);<br>
+<br>+ return (IS_END_MARK_SET(end_mark)? SET_END_MARK: UNSET_END_MARK);<br>+}<br>+<br>+static int jrnl_vdi_write_header(jrnl_desc_t *jd)<br>+{<br>+ ssize_t ret;<br>+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+ ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0);<br>+<br>+ if (ret != sizeof(*head)) {<br>+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>+ ret = SD_RES_EIO;<br>
+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ return ret;<br>+}<br>+<br>+static int jrnl_vdi_write_data(jrnl_desc_t *jd)<br>+{<br>+ ssize_t ret;<br>+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+ ret = pwrite64(jd->jdf_fd, jd->jd_data, head->jh_size, sizeof(*head));<br>+<br>+ if (ret != head->jh_size) {<br>+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>
+ ret = SD_RES_EIO;<br>+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ return ret;<br>+}<br>+<br>+static int jrnl_vdi_write_end_mark(jrnl_desc_t *jd)<br>+{<br>+ ssize_t retsize;<br>+ int ret;<br>
+ end_mark_t end_mark = SET_END_MARK;<br>+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+ retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+ sizeof(*head) + head->jh_size);<br>
+<br>+ if (retsize != sizeof(end_mark)) {<br>+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>+ ret = SD_RES_EIO;<br>+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>
+ jd->jd_end_mark= end_mark;<br>+<br>+ return ret;<br>+}<br>+<br>+static int jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>+ char *buf = NULL;<br>+ int buf_len, res = 0;<br>+ ssize_t retsize;<br>
+ jrnl_vdi_head_t jh;<br>+<br>+ /* FIXME: handle larger size */<br>+ buf_len = (1 << 22);<br>+ buf = zalloc(buf_len);<br>+ if (!buf) {<br>+ eprintf("failed to allocate memory\n");<br>
+ return SD_RES_NO_MEM;<br>+ }<br>+<br>+ /* Flush out journal to disk (vdi object) */<br>+ retsize = pread64(jfd->jf_fd, &jh, sizeof(jh), 0);<br>+ retsize = pread64(jfd->jf_fd, buf, jh.jh_size, sizeof(jh));<br>
+ retsize = pwrite64(jfd->jf_target_fd, buf, jh.jh_size, jh.jh_offset);<br>+ if (retsize != jh.jh_size) {<br>+ if (errno == ENOSPC)<br>+ res = SD_RES_NO_SPACE;<br>+ else<br>+ res = SD_RES_EIO;<br>
+ }<br>+<br>+ /* Clean up */<br>+ free(buf);<br>+<br>+ return res;<br>+}<br>+<br>+static int jrnl_vdi_commit_data(jrnl_desc_t *jd)<br>+{<br>+ int ret = 0;<br>+ ssize_t retsize;<br>+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+ retsize = pwrite64(jd->jdf_target_fd, jd->jd_data, head->jh_size, head->jh_offset);<br>+ if (retsize != head->jh_size) {<br>+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>
+ else<br>+ ret = SD_RES_EIO;<br>+ }<br>+<br>+ return ret;<br>+}<br>+<br>