<br>This patch adds the feature of atomicity while performing the operations such<br>as vdi object and checksum updates.<br><br>With the help of the journalling API, implemented the task of updating<br>vdi object & checksum atomically in store_queue_request_local() for the<br>
operations SD_OP_WRITE_OBJ & SD_OP_CREATE_AND_WRITE_OBJ.<br><br><br>Signed-off-by: Narendra <<a href="mailto:narendramind@gmail.com">narendramind@gmail.com</a>><br><br>diff --git a/sheep/sheep.c b/sheep/sheep.c<br>
index dc9a320..d6c776d 100644<br>--- a/sheep/sheep.c<br>+++ b/sheep/sheep.c<br>@@ -112,6 +112,8 @@ int main(int argc, char **argv)<br>     if (is_daemon && daemon(0, 0))<br>         exit(1);<br> <br>+    jrnl_recover();<br>
+<br>     ret = init_event(EPOLL_SIZE);<br>     if (ret)<br>         exit(1);<br>diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h<br>index c66baf4..80c75b3 100644<br>--- a/sheep/sheep_priv.h<br>+++ b/sheep/sheep_priv.h<br>
@@ -128,6 +128,7 @@ struct cluster_info {<br> <br> extern struct cluster_info *sys;<br> <br>+<br> int create_listen_port(int port, void *data);<br> <br> int is_io_request(unsigned op);<br>@@ -190,6 +191,81 @@ int remove_object(struct sheepdog_node_list_entry *e,<br>
           int nodes, uint32_t node_version,<br>           uint64_t oid, int nr);<br> <br>+/* Journal */<br>+typedef uint32_t end_mark_t;<br>+typedef uint32_t jrnl_type_t;<br>+<br>+#define JRNL_TYPE_VDI        0<br>+#define JRNL_TYPE_CKSUM      1<br>
+#define JRNL_MAX_TYPES       2<br>+<br>+#define SET_END_MARK            1UL<br>+#define UNSET_END_MARK            0UL<br>+#define IS_END_MARK_SET(var)    (var == 1UL)<br>+<br>+<br>+/* Different Journal headers */<br>+typedef struct jrnl_cksum_head {<br>
+    jrnl_type_t  jh_type;<br>+    uint64_t  jh_size;<br>+}  jrnl_cksum_head_t;<br>+<br>+typedef struct jrnl_vdi_head {<br>+    jrnl_type_t  jh_type;<br>+    uint64_t  jh_offset;<br>+    uint64_t  jh_size;<br>+}  jrnl_vdi_head_t;<br>
+<br>+typedef struct jrnl_cksum_data {<br>+    char  aname_cksum[64];<br>+    uint64_t aval_cksum;    <br>+} jrnl_cksum_data_t;<br>+<br>+typedef struct jrnl_file_desc {<br>+    uint32_t  jf_epoch;    /* epoch */<br>+    uint64_t  jf_oid;     /* Object id */<br>
+    int       jf_fd;      /* Open file fd */<br>+    int       jf_target_fd;<br>+} jrnl_file_desc_t;<br>+<br>+typedef struct jrnl_descriptor {<br>+    void                *jd_head;<br>+    void                *jd_data;<br>
+    int                 jd_end_mark;<br>+    jrnl_file_desc_t    jd_jfd;<br>+#define jdf_epoch          jd_jfd.jf_epoch<br>+#define jdf_oid            jd_jfd.jf_oid<br>+#define jdf_fd             jd_jfd.jf_fd<br>+#define jdf_target_fd      jd_jfd.jf_target_fd<br>
+} jrnl_desc_t;<br>+<br>+typedef struct jrnl_handler {<br>+    int  (*has_end_mark)(jrnl_desc_t *jd);<br>+    int  (*write_header)(jrnl_desc_t  *jd);<br>+    int  (*write_data)(jrnl_desc_t  *jd);<br>+    int  (*write_end_mark)(jrnl_desc_t  *jd);<br>
+    int  (*apply_to_target_object)(jrnl_file_desc_t *jfd);<br>+    int  (*commit_data)(jrnl_desc_t  *jd);<br>+} jrnl_handler_t;<br>+<br>+inline jrnl_type_t jrnl_get_type(jrnl_desc_t  *jd);<br>+int  jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type);<br>
+int  jrnl_exists(jrnl_file_desc_t  *jfd);<br>+int  jrnl_update_epoch_store(uint32_t epoch);<br>+int  jrnl_open(jrnl_file_desc_t  *jfd, int  aflags);<br>+int  jrnl_create(jrnl_file_desc_t  *jfd);<br>+int  jrnl_remove(jrnl_file_desc_t *jfd);<br>
+inline int  jrnl_close(jrnl_file_desc_t  *jfd);<br>+<br>+inline int  jrnl_has_end_mark(jrnl_desc_t  *jd);<br>+inline int  jrnl_write_header(jrnl_desc_t  *jd);<br>+inline int  jrnl_write_data(jrnl_desc_t  *jd);<br>+inline int  jrnl_write_end_mark(jrnl_desc_t  *jd);<br>
+inline int  jrnl_apply_to_targe_object(jrnl_file_desc_t *jfd);<br>+inline int  jrnl_commit_data(jrnl_desc_t  *jd);<br>+int  jrnl_perform(jrnl_desc_t  *jd);<br>+int  jrnl_recover(void);<br>+<br> static inline int is_myself(struct sheepdog_node_list_entry *e)<br>
 {<br>     return e->id == sys-><a href="http://this_node.id">this_node.id</a>;<br>diff --git a/sheep/store.c b/sheep/store.c<br>index a4d6155..0eac4d2 100644<br>--- a/sheep/store.c<br>+++ b/sheep/store.c<br>@@ -32,10 +32,45 @@<br>
 static char *obj_path;<br> static char *epoch_path;<br> static char *mnt_path;<br>+static char *jrnl_path;<br> <br> static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;<br> static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;<br>
 <br>+/* Journal internal data structures */<br>+static int  jrnl_vdi_has_end_mark(jrnl_desc_t  *jd);<br>+static int  jrnl_vdi_write_header(jrnl_desc_t  *jd);<br>+static int  jrnl_vdi_write_data(jrnl_desc_t  *jd);<br>+static int  jrnl_vdi_write_end_mark(jrnl_desc_t  *jd);<br>
+static int  jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd);<br>+static int  jrnl_vdi_commit_data(jrnl_desc_t  *jd);<br>+<br>+static int  jrnl_cksum_has_end_mark(jrnl_desc_t  *jd);<br>+static int  jrnl_cksum_write_header(jrnl_desc_t  *jd);<br>
+static int  jrnl_cksum_write_data(jrnl_desc_t  *jd);<br>+static int  jrnl_cksum_write_end_mark(jrnl_desc_t  *jd);<br>+static int  jrnl_cksum_apply_to_target_object(jrnl_file_desc_t *jfd);<br>+static int  jrnl_cksum_commit_data(jrnl_desc_t  *jd);<br>
+<br>+static jrnl_handler_t  jrnl_handlers[JRNL_MAX_TYPES] = {<br>+    {<br>+        .has_end_mark = jrnl_vdi_has_end_mark,<br>+        .write_header = jrnl_vdi_write_header,<br>+        .write_data = jrnl_vdi_write_data,<br>
+        .write_end_mark = jrnl_vdi_write_end_mark,<br>+        .apply_to_target_object = jrnl_vdi_apply_to_target_object,<br>+        .commit_data = jrnl_vdi_commit_data<br>+    },<br>+    {<br>+        .has_end_mark = jrnl_cksum_has_end_mark,<br>
+        .write_header = jrnl_cksum_write_header,<br>+        .write_data = jrnl_cksum_write_data,<br>+        .write_end_mark = jrnl_cksum_write_end_mark,<br>+        .apply_to_target_object = jrnl_cksum_apply_to_target_object,<br>
+        .commit_data = jrnl_cksum_commit_data<br>+    }<br>+};<br>+<br> static int obj_cmp(const void *oid1, const void *oid2)<br> {<br>     const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);<br>
@@ -98,7 +133,8 @@ static int is_obj_in_range(uint64_t oid, uint64_t start, uint64_t end)<br>         return (start < hval || hval <= end);<br> }<br> <br>-static int verify_object(int fd, char *buf, size_t len, int set_chksum)<br>
+/* When set_cksum is set, epoch & oid are required to perform journalling. */<br>+static int verify_object(int fd, char *buf, size_t len, int set_chksum, uint32_t  epoch, uint64_t oid)<br> {<br>     int ret;<br>     uint64_t checksum;<br>
@@ -128,11 +164,30 @@ static int verify_object(int fd, char *buf, size_t len, int set_chksum)<br>     }<br> <br>     if (set_chksum) {<br>-        checksum = fnv_64a_buf(buf, len, FNV1A_64_INIT);<br>-        ret = fsetxattr(fd, ANAME_CHECKSUM, &checksum, sizeof(checksum), 0);<br>
-        if (ret < 0) {<br>-            eprintf("failed to set xattr, %m\n");<br>-            goto err;<br>+        if (epoch && oid) {<br>+            jrnl_desc_t  jd;<br>+            jrnl_cksum_head_t  head;<br>
+            jrnl_cksum_data_t  data;<br>+<br>+            head.jh_type = JRNL_TYPE_CKSUM;<br>+            head.jh_size = sizeof(data);<br>+            strcpy(data.aname_cksum, ANAME_CHECKSUM);<br>+            data.aval_cksum = fnv_64a_buf(buf, len, FNV1A_64_INIT);<br>
+            jd.jd_head = &head;<br>+            jd.jd_data = &data;<br>+            jd.jdf_epoch = epoch;<br>+            jd.jdf_oid = oid;<br>+            jd.jdf_target_fd = fd;<br>+            ret = jrnl_perform(&jd);<br>
+            if (ret)<br>+                goto err;<br>+        } else {<br>+            checksum = fnv_64a_buf(buf, len, FNV1A_64_INIT);<br>+            ret = fsetxattr(fd, ANAME_CHECKSUM, &checksum, sizeof(checksum), 0);<br>
+            if (ret < 0) {<br>+                eprintf("failed to set xattr, %m\n");<br>+                goto err;<br>+            }<br>         }<br>     } else {<br>         ret = fgetxattr(fd, ANAME_CHECKSUM, &checksum, sizeof(checksum));<br>
@@ -199,7 +254,7 @@ static int get_obj_list(struct request *req)<br>     }<br>     obj_nr = read(fd, buf, buf_len);<br> <br>-    ret = verify_object(fd, buf, obj_nr, 0);<br>+    ret = verify_object(fd, buf, obj_nr, 0, 0, 0);<br>
     if (ret < 0) {<br>         eprintf("verification failed, %s, %m\n", path);<br>         close(fd);<br>@@ -633,6 +688,8 @@ static int store_queue_request_local(struct request *req, uint32_t epoch)<br>     uint64_t oid = hdr->oid;<br>
     uint32_t opcode = hdr->opcode;<br>     char path[1024], *buf;<br>+    jrnl_desc_t  jd;<br>+    jrnl_vdi_head_t jh;<br> <br>     switch (opcode) {<br>     case SD_OP_CREATE_AND_WRITE_OBJ:<br>@@ -766,24 +823,31 @@ static int store_queue_request_local(struct request *req, uint32_t epoch)<br>
 /*             } */<br>         }<br>         /* fall through */<br>-    case SD_OP_CREATE_AND_WRITE_OBJ:<br>-        ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);<br>-        if (ret != hdr->data_length) {<br>
-            if (errno == ENOSPC)<br>-                ret = SD_RES_NO_SPACE;<br>-            else<br>-                ret = SD_RES_EIO;<br>+    case SD_OP_CREATE_AND_WRITE_OBJ: <br>+<br>+        jd.jdf_epoch = epoch;<br>+        jd.jdf_oid = oid;<br>
+        jd.jdf_target_fd = fd;<br>+<br>+        jh.jh_type = JRNL_TYPE_VDI;<br>+        jh.jh_offset = hdr->offset;<br>+        jh.jh_size = hdr->data_length;<br>+<br>+        jd.jd_head = &jh;<br>+        jd.jd_data = req->data;<br>
+        jd.jd_end_mark = SET_END_MARK;<br>+<br>+        ret = jrnl_perform(&jd);<br>+        if (ret)<br>             goto out;<br>-        }<br> <br>         if (!is_data_obj(oid)) {<br>-            /* FIXME: need to update atomically */<br>
-/*             ret = verify_object(fd, NULL, 0, 1); */<br>-/*             if (ret < 0) { */<br>-/*                 eprintf("failed to set checksum, %"PRIx64"\n", oid); */<br>-/*                 ret = SD_RES_EIO; */<br>
-/*                 goto out; */<br>-/*             } */<br>+             ret = verify_object(fd, NULL, 0, 1, epoch, oid);<br>+             if (ret < 0) {<br>+                 eprintf("failed to set checksum, %"PRIx64"\n", oid);<br>
+                 ret = SD_RES_EIO;<br>+                 goto out;<br>+             }<br>         }<br> <br>         ret = SD_RES_SUCCESS;<br>@@ -1556,7 +1620,7 @@ static void __start_recovery(struct work *work, int idx)<br>
     write(fd, rw->buf, sizeof(uint64_t) * rw->count);<br>     fsync(fd);<br> <br>-    ret = verify_object(fd, rw->buf, sizeof(uint64_t) * rw->count, 1);<br>+    ret = verify_object(fd, rw->buf, sizeof(uint64_t) * rw->count, 1, 0, 0);<br>
     if (ret < 0) {<br>         eprintf("failed to set check sum, %s, %m\n", path);<br>         close(fd);<br>@@ -1786,6 +1850,27 @@ static int init_mnt_path(const char *base_path)<br>     return 0;<br> }<br>
 <br>+#define JRNL_PATH "/journal/"<br>+<br>+static int  init_jrnl_path(const char *base_path)<br>+{<br>+    int new, ret;<br>+<br>+    /* Create journal directory */<br>+    jrnl_path = zalloc(strlen(base_path) + strlen(JRNL_PATH) + 1);<br>
+    sprintf(jrnl_path, "%s" JRNL_PATH, base_path);<br>+<br>+    ret = init_path(jrnl_path, &new);<br>+        /* Error during directory creation */<br>+    if (ret)<br>+        return ret;<br>+    /* If journal is newly created */<br>
+    if (new)<br>+        return 0;<br>+<br>+    return 0;<br>+}<br>+<br> int init_store(const char *d)<br> {<br>     int ret;<br>@@ -1806,6 +1891,10 @@ int init_store(const char *d)<br>     if (ret)<br>         return ret;<br>
 <br>+    ret = init_jrnl_path(d);<br>+    if (ret)<br>+        return ret;<br>+<br>     return ret;<br> }<br> <br>@@ -1838,3 +1927,464 @@ int get_global_nr_copies(uint32_t *copies)<br> {<br>     return attr(epoch_path, ANAME_COPIES, copies, sizeof(*copies), 0);<br>
 }<br>+<br>+/* Journal APIs */<br>+int  jrnl_exists(jrnl_file_desc_t  *jfd)<br>+{<br>+    int ret;<br>+    char path[1024];<br>+    struct stat s;<br>+<br>+    snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>
+<br>+    ret = stat(path, &s);<br>+    if (ret)<br>+        return 1;<br>+<br>+    return 0;<br>+}<br>+<br>+int jrnl_update_epoch_store(uint32_t epoch)<br>+{<br>+    char new[1024];<br>+    struct stat s;<br>+<br>+    snprintf(new, sizeof(new), "%s%08u/", jrnl_path, epoch);<br>
+    if (stat(new, &s) < 0)<br>+        if (errno == ENOENT)<br>+            mkdir(new, def_dmode);<br>+<br>+    return 0;<br>+}<br>+<br>+int  jrnl_open(jrnl_file_desc_t  *jfd, int  aflags)<br>+{<br>+    char path[1024];<br>
+    int flags = aflags;<br>+    int fd, ret;<br>+<br>+<br>+    jrnl_update_epoch_store(jfd->jf_epoch);<br>+    snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>
+<br>+    fd = open(path, flags, def_fmode);<br>+    if (fd < 0) {<br>+        eprintf("failed to open %s, %s\n", path, strerror(errno));<br>+        if (errno == ENOENT)<br>+            ret = SD_RES_NO_OBJ;<br>
+        else<br>+            ret = SD_RES_UNKNOWN;<br>+    } else {<br>+        jfd->jf_fd = fd;<br>+        ret = SD_RES_SUCCESS;<br>+    }<br>+<br>+    return ret;<br>+}<br>+<br>+int  jrnl_close(jrnl_file_desc_t  *jfd)<br>
+{<br>+    close(jfd->jf_fd);<br>+    jfd->jf_fd = -1;<br>+<br>+    return 0;<br>+}<br>+<br>+int  jrnl_create(jrnl_file_desc_t  *jfd)<br>+{<br>+    return jrnl_open(jfd, O_RDWR | O_CREAT);<br>+}<br>+<br>+inline uint32_t jrnl_get_type(jrnl_desc_t  *jd)<br>
+{<br>+    return *((uint32_t *) jd->jd_head);<br>+}<br>+<br>+int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type)<br>+{<br>+    ssize_t retsize;<br>+<br>+    retsize = pread64(jfd->jf_fd, jrnl_type, sizeof(*jrnl_type), 0);<br>
+<br>+    if (retsize != sizeof(*jrnl_type))<br>+        return SD_RES_EIO;<br>+    else<br>+        return SD_RES_SUCCESS;<br>+}<br>+<br>+<br>+int  jrnl_remove(jrnl_file_desc_t *jfd)<br>+{<br>+    char path[1024];<br>+    int ret;<br>
+<br>+    snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>+    ret = unlink(path);<br>+    if (ret) {<br>+        eprintf("failed to remove %s, %s\n", path, strerror(errno));<br>
+        ret = SD_RES_EIO;<br>+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    return ret;<br>+}<br>+<br>+int  jrnl_has_end_mark(jrnl_desc_t *jd)<br>+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].has_end_mark(jd);<br>
+}<br>+<br>+int  jrnl_write_header(jrnl_desc_t  *jd)<br>+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].write_header(jd);<br>+}<br>+<br>+int  jrnl_write_data(jrnl_desc_t  *jd)<br>+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].write_data(jd);<br>
+}<br>+<br>+int  jrnl_write_end_mark(jrnl_desc_t  *jd)<br>+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].write_end_mark(jd);<br>+}<br>+<br>+int  jrnl_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>+    int ret;<br>
+    jrnl_type_t  jrnl_type;<br>+<br>+    ret = jrnl_get_type_from_file(jfd, &jrnl_type);<br>+<br>+    return jrnl_handlers[jrnl_type].apply_to_target_object(jfd);<br>+}<br>+<br>+int  jrnl_commit_data(jrnl_desc_t  *jd)<br>
+{<br>+    return jrnl_handlers[jrnl_get_type(jd)].commit_data(jd);<br>+}<br>+<br>+int  jrnl_perform(jrnl_desc_t  *jd)<br>+{<br>+    int  ret;<br>+<br>+    ret = jrnl_create(&jd->jd_jfd);<br>+        if (ret)<br>+            goto out;<br>
+<br>+    ret = jrnl_write_header(jd);<br>+    if (ret)<br>+        goto out;<br>+<br>+    ret = jrnl_write_data(jd);<br>+    if (ret)<br>+        goto out;<br>+<br>+    ret = jrnl_write_end_mark(jd);<br>+    if (ret)<br>
+        goto out;<br>+<br>+    ret = jrnl_commit_data(jd);<br>+    if (ret)<br>+        goto out;<br>+<br>+    ret = jrnl_remove(&jd->jd_jfd);<br>+<br>+out:<br>+    return ret;<br>+}<br>+<br>+int  jrnl_recover(void)<br>
+{<br>+    DIR *dir;<br>+    struct dirent *d;<br>+    char jrnl_dir[1024], <br>+         jrnl_file_path[1024],<br>+         obj_file_path[1024];<br>+    int  epoch;<br>+<br>+    epoch = get_latest_epoch();<br>+    if (epoch < 0) {<br>
+        return 1;<br>+    }<br>+    snprintf(jrnl_dir, sizeof(jrnl_dir), "%s%08u/", jrnl_path, epoch);<br>+<br>+    eprintf("Openning the directory%s.\n", jrnl_dir);<br>+    dir = opendir(jrnl_dir);<br>
+    if (!dir)<br>+        return -1;<br>+<br>+    vprintf(SDOG_NOTICE "start jrnl_recovery.\n");<br>+    while ((d = readdir(dir))) {<br>+        int  ret;<br>+        jrnl_file_desc_t  jfd;<br>+<br>+        if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))<br>
+            continue;<br>+<br>+        jfd.jf_epoch = epoch;<br>+        sscanf(d->d_name, "%" PRIx64, &jfd.jf_oid);<br>+        snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%016" PRIx64,<br>
+                                                        jrnl_dir, jfd.jf_oid);<br>+        snprintf(obj_file_path, sizeof(obj_file_path), "%s%08u/%016" PRIx64,<br>+                                                        obj_path, epoch, jfd.jf_oid);<br>
+        ret = jrnl_open(&jfd, O_RDONLY);<br>+        if (ret) {<br>+            eprintf("Unable to open the journal file, %s, for reading.\n", jrnl_file_path);<br>+            goto end_while_3;<br>+        }<br>
+        jfd.jf_target_fd = ob_open(epoch, jfd.jf_oid, 0, &ret);<br>+        if (ret) {<br>+            eprintf("Unable to open the object file, %s, to recover.\n", obj_file_path);<br>+            goto end_while_2;<br>
+        }<br>+        ret = jrnl_apply_to_target_object(&jfd);<br>+        if (ret)<br>+            eprintf("Unable to recover the object, %s.\n", obj_file_path);<br>+<br>+        close(jfd.jf_target_fd);<br>
+        jfd.jf_target_fd = -1;<br>+    end_while_2:<br>+        jrnl_close(&jfd);<br>+    end_while_3:<br>+        vprintf(SDOG_INFO "recovered the object in journal, %s\n", jrnl_file_path);<br>+        jrnl_remove(&jfd);<br>
+    }<br>+    closedir(dir);<br>+    vprintf(SDOG_NOTICE "end jrnl_recovery.\n");<br>+<br>+    return 0;<br>+}<br>+<br>+/* VDI data journalling functions */<br>+static int  jrnl_vdi_has_end_mark(jrnl_desc_t *jd)<br>
+{<br>+    ssize_t  ret;<br>+    end_mark_t  end_mark = UNSET_END_MARK;<br>+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+    ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+                   sizeof(*head) + head->jh_size);<br>
+<br>+    return (IS_END_MARK_SET(end_mark)? SET_END_MARK: UNSET_END_MARK);<br>+}<br>+<br>+int  jrnl_vdi_write_header(jrnl_desc_t  *jd)<br>+{<br>+    ssize_t  ret;<br>+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+    ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0);<br>+<br>+    if (ret != sizeof(*head)) {<br>+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>+            ret = SD_RES_EIO;<br>
+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    return ret;<br>+}<br>+<br>+int  jrnl_vdi_write_data(jrnl_desc_t  *jd)<br>+{<br>+    ssize_t  ret;<br>+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+    ret = pwrite64(jd->jdf_fd, jd->jd_data, head->jh_size, sizeof(*head));<br>+<br>+    if (ret != head->jh_size) {<br>+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>
+            ret = SD_RES_EIO;<br>+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    return ret;<br>+}<br>+<br>+int  jrnl_vdi_write_end_mark(jrnl_desc_t  *jd)<br>+{<br>+    ssize_t  retsize;<br>+    int  ret;<br>+    end_mark_t  end_mark = SET_END_MARK;<br>
+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+    retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+                   sizeof(*head) + head->jh_size);<br>+<br>+    if (retsize != sizeof(end_mark)) {<br>
+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>+            ret = SD_RES_EIO;<br>+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    jd->jd_end_mark= end_mark;<br>+<br>+    return ret;<br>
+}<br>+<br>+int  jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>+    char *buf;<br>+    int buf_len, res = 0;<br>+    ssize_t retsize;<br>+    jrnl_vdi_head_t jh;<br>+<br>+    /* FIXME: handle larger size */<br>
+    buf_len = (1 << 22);<br>+    buf = (char *) malloc(buf_len);<br>+    if (!buf) {<br>+        eprintf("failed to allocate memory\n");<br>+        return SD_RES_NO_MEM;<br>+    }<br>+<br>+    /* Flush out journal to disk (vdi object) */<br>
+    retsize = pread64(jfd->jf_fd, &jh, sizeof(jh), 0);<br>+    retsize = pread64(jfd->jf_fd, buf, jh.jh_size, sizeof(jh));<br>+    retsize = pwrite64(jfd->jf_target_fd, buf, jh.jh_size, jh.jh_offset);<br>+    if (retsize != jh.jh_size) {<br>
+        if (errno == ENOSPC)<br>+            res = SD_RES_NO_SPACE;<br>+        else<br>+            res = SD_RES_EIO;<br>+    }<br>+<br>+    /* Clean up */<br>+    free(buf);<br>+<br>+    return res;<br>+}<br>+<br>+static int  jrnl_vdi_commit_data(jrnl_desc_t  *jd)<br>
+{<br>+    int  ret = 0;<br>+    ssize_t  retsize;<br>+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+    retsize = pwrite64(jd->jdf_target_fd, jd->jd_data, head->jh_size, head->jh_offset);<br>
+    if (retsize != head->jh_size) {<br>+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>+            ret = SD_RES_EIO;<br>+    }<br>+<br>+    return ret;<br>+}<br>+<br>+/* VDI check sum  journalling functions */<br>
+<br>+/* FIXME: Implment this function */<br>+<br>+static int  jrnl_cksum_has_end_mark(jrnl_desc_t *jd)<br>+{<br>+    ssize_t  ret;<br>+    end_mark_t  end_mark = UNSET_END_MARK;<br>+    jrnl_cksum_head_t  *head = (jrnl_cksum_head_t *) jd->jd_head;<br>
+<br>+    ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+                   sizeof(*head) + head->jh_size);<br>+<br>+    return (IS_END_MARK_SET(end_mark)? SET_END_MARK: UNSET_END_MARK);<br>+}<br>+<br>
+int  jrnl_cksum_write_header(jrnl_desc_t  *jd)<br>+{<br>+    ssize_t  ret;<br>+    jrnl_cksum_head_t  *head = (jrnl_cksum_head_t *) jd->jd_head;<br>+<br>+    ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0);<br>+<br>
+    if (ret != sizeof(*head)) {<br>+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>+            ret = SD_RES_EIO;<br>+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    return ret;<br>
+}<br>+<br>+int  jrnl_cksum_write_data(jrnl_desc_t  *jd)<br>+{<br>+    ssize_t  ret;<br>+    jrnl_cksum_head_t  *head = (jrnl_cksum_head_t *) jd->jd_head;<br>+    jrnl_cksum_data_t  *data = (jrnl_cksum_data_t *) jd->jd_data;<br>
+<br>+    ret = pwrite64(jd->jdf_fd, data, head->jh_size, sizeof(*head));<br>+    if (ret != head->jh_size) {<br>+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>+            ret = SD_RES_EIO;<br>
+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    return ret;<br>+}<br>+<br>+int  jrnl_cksum_write_end_mark(jrnl_desc_t  *jd)<br>+{<br>+    ssize_t  retsize;<br>+    int  ret;<br>+    end_mark_t  end_mark = SET_END_MARK;<br>
+    jrnl_cksum_head_t  *head = (jrnl_cksum_head_t *) jd->jd_head;<br>+<br>+    retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+                   sizeof(*head) + head->jh_size);<br>+<br>+    if (retsize != sizeof(end_mark)) {<br>
+        if (errno == ENOSPC)<br>+            ret = SD_RES_NO_SPACE;<br>+        else<br>+            ret = SD_RES_EIO;<br>+    } else<br>+        ret = SD_RES_SUCCESS;<br>+<br>+    jd->jd_end_mark = end_mark;<br>+<br>
+    return ret;<br>+}<br>+<br>+int  jrnl_cksum_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>+    int  ret;<br>+    ssize_t  retsize;<br>+    jrnl_cksum_head_t  head;<br>+    jrnl_cksum_data_t  data;<br>+<br>+    /* Flush out journal to disk (vdi object) */<br>
+    retsize = pread64(jfd->jf_fd, &head, sizeof(head), 0);<br>+    retsize = pread64(jfd->jf_fd, &data, head.jh_size, sizeof(head));<br>+<br>+    ret = fsetxattr(jfd->jf_target_fd, data.aname_cksum, &data.aval_cksum,<br>
+                                                sizeof(data.aval_cksum), 0);<br>+    if (ret < 0) {<br>+        eprintf("failed to set xattr, %m\n");<br>+        return SD_RES_EIO;<br>+    }<br>+<br>+    return SD_RES_SUCCESS;<br>
+}<br>+<br>+static int  jrnl_cksum_commit_data(jrnl_desc_t  *jd)<br>+{<br>+    int  ret;<br>+    jrnl_cksum_data_t  *data = (jrnl_cksum_data_t *) jd->jd_data;<br>+<br>+    ret = fsetxattr(jd->jdf_target_fd, data->aname_cksum, &data->aval_cksum,<br>
+                                                sizeof(data->aval_cksum), 0);<br>+    if (ret < 0) {<br>+        eprintf("failed to set xattr, %m\n");<br>+        return SD_RES_EIO;<br>+    }<br>+<br>+    return SD_RES_SUCCESS;<br>
+}<br>+<br>