This patch adds the feature of atomicity while performing vdi data object<br>update/write operation.<br><br>With the help of the journalling API, implemented the task of updating<br>vdi object atomically in store_queue_request_local() for the<br>
operations SD_OP_WRITE_OBJ & SD_OP_CREATE_AND_WRITE_OBJ.<br><br>Signed-off-by: Narendra <narendramind at <a href="http://gmail.com">gmail.com</a>><br><br> sheep/sheep.c      |    2 +<br> sheep/sheep_priv.h |   63 ++++++++<br>
 sheep/store.c      |  433 ++++++++++++++++++++++++++++++++++++++++++++++++++--<br> 3 files changed, 486 insertions(+), 12 deletions(-)<br><br>diff --git a/sheep/sheep.c b/sheep/sheep.c<br>index dc9a320..d6c776d 100644<br>
--- a/sheep/sheep.c<br>+++ b/sheep/sheep.c<br>@@ -112,6 +112,8 @@ int main(int argc, char **argv)<br>     if (is_daemon && daemon(0, 0))<br>         exit(1);<br> <br>+    jrnl_recover();<br>+<br>     ret = init_event(EPOLL_SIZE);<br>
     if (ret)<br>         exit(1);<br>diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h<br>index 62924f2..abc4681 100644<br>--- a/sheep/sheep_priv.h<br>+++ b/sheep/sheep_priv.h<br>@@ -202,6 +202,69 @@ int remove_object(struct sheepdog_node_list_entry *e,<br>
 int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx,<br>          uint32_t epoch, int worker_idx);<br> <br>+/* Journal */<br>+typedef uint32_t end_mark_t;<br>+typedef uint32_t jrnl_type_t;<br>+<br>+#define JRNL_TYPE_VDI        0<br>
+#define JRNL_MAX_TYPES       1<br>+<br>+#define SET_END_MARK            1UL<br>+#define UNSET_END_MARK            0UL<br>+#define IS_END_MARK_SET(var)    (var == 1UL)<br>+<br>+/* Journal header for data object */<br>+typedef struct jrnl_vdi_head {<br>
+    jrnl_type_t  jh_type;<br>+    uint64_t  jh_offset;<br>+    uint64_t  jh_size;<br>+}  jrnl_vdi_head_t;<br>+<br>+typedef struct jrnl_file_desc {<br>+    uint32_t  jf_epoch;    /* epoch */<br>+    uint64_t  jf_oid;     /* Object id */<br>
+    int       jf_fd;      /* Open file fd */<br>+    int       jf_target_fd;<br>+} jrnl_file_desc_t;<br>+<br>+typedef struct jrnl_descriptor {<br>+    void                *jd_head;<br>+    void                *jd_data;<br>
+    int                 jd_end_mark;<br>+    jrnl_file_desc_t    jd_jfd;<br>+#define jdf_epoch          jd_jfd.jf_epoch<br>+#define jdf_oid            jd_jfd.jf_oid<br>+#define jdf_fd             jd_jfd.jf_fd<br>+#define jdf_target_fd      jd_jfd.jf_target_fd<br>
+} jrnl_desc_t;<br>+<br>+typedef struct jrnl_handler {<br>+    int  (*has_end_mark)(jrnl_desc_t *jd);<br>+    int  (*write_header)(jrnl_desc_t  *jd);<br>+    int  (*write_data)(jrnl_desc_t  *jd);<br>+    int  (*write_end_mark)(jrnl_desc_t  *jd);<br>
+    int  (*apply_to_target_object)(jrnl_file_desc_t *jfd);<br>+    int  (*commit_data)(jrnl_desc_t  *jd);<br>+} jrnl_handler_t;<br>+<br>+inline jrnl_type_t jrnl_get_type(jrnl_desc_t  *jd);<br>+int  jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type);<br>
+int  jrnl_exists(jrnl_file_desc_t  *jfd);<br>+int  jrnl_update_epoch_store(uint32_t epoch);<br>+int  jrnl_open(jrnl_file_desc_t  *jfd, int  aflags);<br>+int  jrnl_create(jrnl_file_desc_t  *jfd);<br>+int  jrnl_remove(jrnl_file_desc_t *jfd);<br>
+inline int  jrnl_close(jrnl_file_desc_t  *jfd);<br>+<br>+inline int  jrnl_has_end_mark(jrnl_desc_t  *jd);<br>+inline int  jrnl_write_header(jrnl_desc_t  *jd);<br>+inline int  jrnl_write_data(jrnl_desc_t  *jd);<br>+inline int  jrnl_write_end_mark(jrnl_desc_t  *jd);<br>
+inline int  jrnl_apply_to_targe_object(jrnl_file_desc_t *jfd);<br>+inline int  jrnl_commit_data(jrnl_desc_t  *jd);<br>+int  jrnl_perform(jrnl_desc_t  *jd);<br>+int  jrnl_recover(void);<br>+<br> static inline int is_myself(struct sheepdog_node_list_entry *e)<br>
 {<br>     return e->id == sys-><a href="http://this_node.id">this_node.id</a>;<br>diff --git a/sheep/store.c b/sheep/store.c<br>index 8c688c1..bf44815 100644<br>--- a/sheep/store.c<br>+++ b/sheep/store.c<br>@@ -31,10 +31,32 @@<br>
 static char *obj_path;<br> static char *epoch_path;<br> static char *mnt_path;<br>+static char *jrnl_path;<br> <br> static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;<br> static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;<br>
 <br>+/* Journal internal data structures */<br>+/* Journal Handlers for Data Object */<br>+static int  jrnl_vdi_has_end_mark(jrnl_desc_t  *jd);<br>+static int  jrnl_vdi_write_header(jrnl_desc_t  *jd);<br>+static int  jrnl_vdi_write_data(jrnl_desc_t  *jd);<br>
+static int  jrnl_vdi_write_end_mark(jrnl_desc_t  *jd);<br>+static int  jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd);<br>+static int  jrnl_vdi_commit_data(jrnl_desc_t  *jd);<br>+<br>+static jrnl_handler_t  jrnl_handlers[JRNL_MAX_TYPES] = {<br>
+    {<br>+        .has_end_mark = jrnl_vdi_has_end_mark,<br>+        .write_header = jrnl_vdi_write_header,<br>+        .write_data = jrnl_vdi_write_data,<br>+        .write_end_mark = jrnl_vdi_write_end_mark,<br>+        .apply_to_target_object = jrnl_vdi_apply_to_target_object,<br>
+        .commit_data = jrnl_vdi_commit_data<br>+    }<br>+};<br>+<br>+<br> static int obj_cmp(const void *oid1, const void *oid2)<br> {<br>     const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);<br>
@@ -556,6 +578,8 @@ static int store_queue_request_local(struct request *req, uint32_t epoch)<br>     uint64_t oid = hdr->oid;<br>     uint32_t opcode = hdr->opcode;<br>     char path[1024], *buf;<br>+    jrnl_desc_t  jd;<br>
+    jrnl_vdi_head_t jh;<br> <br>     switch (opcode) {<br>     case SD_OP_CREATE_AND_WRITE_OBJ:<br>@@ -671,19 +695,30 @@ static int store_queue_request_local(struct request *req, uint32_t epoch)<br>     case SD_OP_WRITE_OBJ:<br>
     case SD_OP_CREATE_AND_WRITE_OBJ:<br>         if (!is_data_obj(oid)) {<br>-            /* FIXME: write data to journal */<br>-        }<br>-        ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);<br>
-        if (ret != hdr->data_length) {<br>-            if (errno == ENOSPC)<br>-                ret = SD_RES_NO_SPACE;<br>-            else<br>-                ret = SD_RES_EIO;<br>-            goto out;<br>-        }<br>
+            jd.jdf_epoch = epoch;<br>+            jd.jdf_oid = oid;<br>+            jd.jdf_target_fd = fd;<br> <br>-        if (!is_data_obj(oid)) {<br>-            /* FIXME: remove journal data */<br>+            jh.jh_type = JRNL_TYPE_VDI;<br>
+            jh.jh_offset = hdr->offset;<br>+            jh.jh_size = hdr->data_length;<br>+<br>+            jd.jd_head = &jh;<br>+            jd.jd_data = req->data;<br>+            jd.jd_end_mark = SET_END_MARK;<br>
+<br>+            ret = jrnl_perform(&jd);<br>+            if (ret)<br>+                goto out;<br>+        } else {<br>+            ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);<br>+            if (ret != hdr->data_length) {<br>
+                if (errno == ENOSPC)<br>+                    ret = SD_RES_NO_SPACE;<br>+                else<br>+                    ret = SD_RES_EIO;<br>+                goto out;<br>+            }<br>         }<br> <br>
         ret = SD_RES_SUCCESS;<br>@@ -1739,6 +1774,27 @@ static int init_mnt_path(const char *base_path)<br>     return 0;<br> }<br> <br>+#define JRNL_PATH "/journal/"<br>+<br>+static int  init_jrnl_path(const char *base_path)<br>
+{<br>+    int new, ret;<br>+<br>+    /* Create journal directory */<br>+    jrnl_path = zalloc(strlen(base_path) + strlen(JRNL_PATH) + 1);<br>+    sprintf(jrnl_path, "%s" JRNL_PATH, base_path);<br>+<br>+    ret = init_path(jrnl_path, &new);<br>
+        /* Error during directory creation */<br>+    if (ret)<br>+        return ret;<br>+    /* If journal is newly created */<br>+    if (new)<br>+        return 0;<br>+<br>+    return 0;<br>+}<br>+<br> int init_store(const char *d)<br>
 {<br>     int ret;<br>@@ -1759,6 +1815,10 @@ int init_store(const char *d)<br>     if (ret)<br>         return ret;<br> <br>+    ret = init_jrnl_path(d);<br>+    if (ret)<br>+        return ret;<br>+<br>     return ret;<br>
 }<br> <br>@@ -1791,3 +1851,352 @@ int get_global_nr_copies(uint32_t *copies)<br> {<br>     return attr(epoch_path, ANAME_COPIES, copies, sizeof(*copies), 0);<br> }<br>+<br>+/* Journal APIs */<br>+int  jrnl_exists(jrnl_file_desc_t  *jfd)<br>
+{<br>+    int ret;<br>+    char path[1024];<br>+    struct stat s;<br>+<br>+    snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>+<br>+    ret = stat(path, &s);<br>
+    if (ret)<br>+        return 1;<br>+<br>+    return 0;<br>+}<br>+<br>+int jrnl_update_epoch_store(uint32_t epoch)<br>+{<br>+    char new[1024];<br>+    struct stat s;<br>+<br>+    snprintf(new, sizeof(new), "%s%08u/", jrnl_path, epoch);<br>
+    if (stat(new, &s) < 0)<br>+        if (errno == ENOENT)<br>+            mkdir(new, def_dmode);<br>+<br>+    return 0;<br>+}<br>+<br>+int  jrnl_open(jrnl_file_desc_t  *jfd, int  aflags)<br>+{<br>+    char path[1024];<br>
+    int flags = aflags;<br>+    int fd, ret;<br>+<br>+<br>+    jrnl_update_epoch_store(jfd->jf_epoch);<br>+    snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>
+<br>+    fd = open(path, flags, def_fmode);<br>+    if (fd < 0) {<br>+        eprintf("failed to open %s, %s\n", path, strerror(errno));<br>+        if (errno == ENOENT)<br>+            ret = SD_RES_NO_OBJ;<br>
+        else<br>+            ret = SD_RES_UNKNOWN;<br>+    } else {<br>+        jfd->jf_fd = fd;<br>+        ret = SD_RES_SUCCESS;<br>+    }<br>+<br>+    return ret;<br>+}<br>+<br>+int  jrnl_close(jrnl_file_desc_t  *jfd)<br>
+{<br>+    close(jfd->jf_fd);<br>+    jfd->jf_fd = -1;<br>+<br>+    return 0;<br>+}<br>+<br>+int  jrnl_create(jrnl_file_desc_t  *jfd)<br>+{<br>+    return jrnl_open(jfd, O_RDWR | O_CREAT);<br>+}<br>+<br>+inline uint32_t jrnl_get_type(jrnl_desc_t  *jd)<br>
+{<br>+    return *((uint32_t *) jd->jd_head);<br>+}<br>+<br>+int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type)<br>+{<br>+    ssize_t retsize;<br>+<br>+    retsize = pread64(jfd->jf_fd, jrnl_type, sizeof(*jrnl_type), 0);<br>
+<br>+    if (retsize != sizeof(*jrnl_type))<br>+        return SD_RES_EIO;<br>+    else<br>+        return SD_RES_SUCCESS;<br>+}<br>+<br>+int  jrnl_remove(jrnl_file_desc_t *jfd)<br>+{<br>+    char path[1024];<br>+    int ret;<br>
+<br>+    snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>+    ret = unlink(path);<br>+    if (ret) {<br>+        eprintf("failed to remove %s, %s\n", path, strerror(errno));<br>
+        ret = SD_RES_EIO;<br>+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    return ret;<br>+}<br>+<br>+inline int  jrnl_has_end_mark(jrnl_desc_t *jd)<br>+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].has_end_mark(jd);<br>
+}<br>+<br>+inline int  jrnl_write_header(jrnl_desc_t  *jd)<br>+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].write_header(jd);<br>+}<br>+<br>+inline int  jrnl_write_data(jrnl_desc_t  *jd)<br>+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].write_data(jd);<br>
+}<br>+<br>+inline int  jrnl_write_end_mark(jrnl_desc_t  *jd)<br>+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].write_end_mark(jd);<br>+}<br>+<br>+inline int  jrnl_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>
+    int ret;<br>+    jrnl_type_t  jrnl_type;<br>+<br>+    ret = jrnl_get_type_from_file(jfd, &jrnl_type);<br>+<br>+    return jrnl_handlers[jrnl_type].apply_to_target_object(jfd);<br>+}<br>+<br>+inline int  jrnl_commit_data(jrnl_desc_t  *jd)<br>
+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].commit_data(jd);<br>+}<br>+<br>+int  jrnl_perform(jrnl_desc_t  *jd)<br>+{<br>+    int  ret;<br>+<br>+    ret = jrnl_create(&jd->jd_jfd);<br>+        if (ret)<br>+            goto out;<br>
+<br>+    ret = jrnl_write_header(jd);<br>+    if (ret)<br>+        goto out;<br>+<br>+    ret = jrnl_write_data(jd);<br>+    if (ret)<br>+        goto out;<br>+<br>+    ret = jrnl_write_end_mark(jd);<br>+    if (ret)<br>
+        goto out;<br>+<br>+    ret = jrnl_commit_data(jd);<br>+    if (ret)<br>+        goto out;<br>+<br>+    ret = jrnl_remove(&jd->jd_jfd);<br>+<br>+out:<br>+    return ret;<br>+}<br>+<br>+int  jrnl_recover(void)<br>
+{<br>+    DIR *dir;<br>+    struct dirent *d;<br>+    char jrnl_dir[1024], <br>+         jrnl_file_path[1024],<br>+         obj_file_path[1024];<br>+    int  epoch;<br>+<br>+    epoch = get_latest_epoch();<br>+    if (epoch < 0) {<br>
+        return 1;<br>+    }<br>+    snprintf(jrnl_dir, sizeof(jrnl_dir), "%s%08u/", jrnl_path, epoch);<br>+<br>+    eprintf("Openning the directory%s.\n", jrnl_dir);<br>+    dir = opendir(jrnl_dir);<br>
+    if (!dir)<br>+        return -1;<br>+<br>+    vprintf(SDOG_NOTICE "start jrnl_recovery.\n");<br>+    while ((d = readdir(dir))) {<br>+        int  ret;<br>+        jrnl_file_desc_t  jfd;<br>+<br>+        if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))<br>
+            continue;<br>+<br>+        jfd.jf_epoch = epoch;<br>+        sscanf(d->d_name, "%" PRIx64, &jfd.jf_oid);<br>+        snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%016" PRIx64,<br>
+                                                        jrnl_dir, jfd.jf_oid);<br>+        snprintf(obj_file_path, sizeof(obj_file_path), "%s%08u/%016" PRIx64,<br>+                                                        obj_path, epoch, jfd.jf_oid);<br>
+        ret = jrnl_open(&jfd, O_RDONLY);<br>+        if (ret) {<br>+            eprintf("Unable to open the journal file, %s, for reading.\n", jrnl_file_path);<br>+            goto end_while_3;<br>+        }<br>
+        jfd.jf_target_fd = ob_open(epoch, jfd.jf_oid, 0, &ret);<br>+        if (ret) {<br>+            eprintf("Unable to open the object file, %s, to recover.\n", obj_file_path);<br>+            goto end_while_2;<br>
+        }<br>+        ret = jrnl_apply_to_target_object(&jfd);<br>+        if (ret)<br>+            eprintf("Unable to recover the object, %s.\n", obj_file_path);<br>+<br>+        close(jfd.jf_target_fd);<br>
+        jfd.jf_target_fd = -1;<br>+    end_while_2:<br>+        jrnl_close(&jfd);<br>+    end_while_3:<br>+        vprintf(SDOG_INFO "recovered the object in journal, %s\n", jrnl_file_path);<br>+        jrnl_remove(&jfd);<br>
+    }<br>+    closedir(dir);<br>+    vprintf(SDOG_NOTICE "end jrnl_recovery.\n");<br>+<br>+    return 0;<br>+}<br>+<br>+/* VDI data journalling functions */<br>+static int  jrnl_vdi_has_end_mark(jrnl_desc_t *jd)<br>
+{<br>+    ssize_t  ret;<br>+    end_mark_t  end_mark = UNSET_END_MARK;<br>+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+    ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+                   sizeof(*head) + head->jh_size);<br>
+<br>+    return (IS_END_MARK_SET(end_mark)? SET_END_MARK: UNSET_END_MARK);<br>+}<br>+<br>+static int  jrnl_vdi_write_header(jrnl_desc_t  *jd)<br>+{<br>+    ssize_t  ret;<br>+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+    ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0);<br>+<br>+    if (ret != sizeof(*head)) {<br>+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>+            ret = SD_RES_EIO;<br>
+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    return ret;<br>+}<br>+<br>+static int  jrnl_vdi_write_data(jrnl_desc_t  *jd)<br>+{<br>+    ssize_t  ret;<br>+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+    ret = pwrite64(jd->jdf_fd, jd->jd_data, head->jh_size, sizeof(*head));<br>+<br>+    if (ret != head->jh_size) {<br>+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>
+            ret = SD_RES_EIO;<br>+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    return ret;<br>+}<br>+<br>+static int  jrnl_vdi_write_end_mark(jrnl_desc_t  *jd)<br>+{<br>+    ssize_t  retsize;<br>+    int  ret;<br>
+    end_mark_t  end_mark = SET_END_MARK;<br>+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+    retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+                   sizeof(*head) + head->jh_size);<br>
+<br>+    if (retsize != sizeof(end_mark)) {<br>+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>+            ret = SD_RES_EIO;<br>+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>
+    jd->jd_end_mark= end_mark;<br>+<br>+    return ret;<br>+}<br>+<br>+static int  jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>+    char *buf = NULL;<br>+    int buf_len, res = 0;<br>+    ssize_t retsize;<br>
+    jrnl_vdi_head_t jh;<br>+<br>+    /* FIXME: handle larger size */<br>+    buf_len = (1 << 22);<br>+    buf = zalloc(buf_len);<br>+    if (!buf) {<br>+        eprintf("failed to allocate memory\n");<br>
+        return SD_RES_NO_MEM;<br>+    }<br>+<br>+    /* Flush out journal to disk (vdi object) */<br>+    retsize = pread64(jfd->jf_fd, &jh, sizeof(jh), 0);<br>+    retsize = pread64(jfd->jf_fd, buf, jh.jh_size, sizeof(jh));<br>
+    retsize = pwrite64(jfd->jf_target_fd, buf, jh.jh_size, jh.jh_offset);<br>+    if (retsize != jh.jh_size) {<br>+        if (errno == ENOSPC)<br>+            res = SD_RES_NO_SPACE;<br>+        else<br>+            res = SD_RES_EIO;<br>
+    }<br>+<br>+    /* Clean up */<br>+    free(buf);<br>+<br>+    return res;<br>+}<br>+<br>+static int  jrnl_vdi_commit_data(jrnl_desc_t  *jd)<br>+{<br>+    int  ret = 0;<br>+    ssize_t  retsize;<br>+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+    retsize = pwrite64(jd->jdf_target_fd, jd->jd_data, head->jh_size, head->jh_offset);<br>+    if (retsize != head->jh_size) {<br>+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>
+        else<br>+            ret = SD_RES_EIO;<br>+    }<br>+<br>+    return ret;<br>+}<br>+<br>