<br>This patch adds the feature of atomicity while performing the operations such<br>as vdi object and checksum updates.<br><br>With the help of the journalling API, implemented the task of updating<br>vdi object & checksum atomically in store_queue_request_local() for the<br>
operations SD_OP_WRITE_OBJ & SD_OP_CREATE_AND_WRITE_OBJ.<br><br><br>Signed-off-by: Narendra <<a href="mailto:narendramind@gmail.com">narendramind@gmail.com</a>><br><br>diff --git a/sheep/sheep.c b/sheep/sheep.c<br>
index dc9a320..d6c776d 100644<br>--- a/sheep/sheep.c<br>+++ b/sheep/sheep.c<br>@@ -112,6 +112,8 @@ int main(int argc, char **argv)<br> if (is_daemon && daemon(0, 0))<br> exit(1);<br> <br>+ jrnl_recover();<br>
+<br> ret = init_event(EPOLL_SIZE);<br> if (ret)<br> exit(1);<br>diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h<br>index c66baf4..80c75b3 100644<br>--- a/sheep/sheep_priv.h<br>+++ b/sheep/sheep_priv.h<br>
@@ -128,6 +128,7 @@ struct cluster_info {<br> <br> extern struct cluster_info *sys;<br> <br>+<br> int create_listen_port(int port, void *data);<br> <br> int is_io_request(unsigned op);<br>@@ -190,6 +191,81 @@ int remove_object(struct sheepdog_node_list_entry *e,<br>
int nodes, uint32_t node_version,<br> uint64_t oid, int nr);<br> <br>+/* Journal */<br>+typedef uint32_t end_mark_t;<br>+typedef uint32_t jrnl_type_t;<br>+<br>+#define JRNL_TYPE_VDI 0<br>+#define JRNL_TYPE_CKSUM 1<br>
+#define JRNL_MAX_TYPES 2<br>+<br>+#define SET_END_MARK 1UL<br>+#define UNSET_END_MARK 0UL<br>+#define IS_END_MARK_SET(var) (var == 1UL)<br>+<br>+<br>+/* Different Journal headers */<br>+typedef struct jrnl_cksum_head {<br>
+ jrnl_type_t jh_type;<br>+ uint64_t jh_size;<br>+} jrnl_cksum_head_t;<br>+<br>+typedef struct jrnl_vdi_head {<br>+ jrnl_type_t jh_type;<br>+ uint64_t jh_offset;<br>+ uint64_t jh_size;<br>+} jrnl_vdi_head_t;<br>
+<br>+typedef struct jrnl_cksum_data {<br>+ char aname_cksum[64];<br>+ uint64_t aval_cksum; <br>+} jrnl_cksum_data_t;<br>+<br>+typedef struct jrnl_file_desc {<br>+ uint32_t jf_epoch; /* epoch */<br>+ uint64_t jf_oid; /* Object id */<br>
+ int jf_fd; /* Open file fd */<br>+ int jf_target_fd;<br>+} jrnl_file_desc_t;<br>+<br>+typedef struct jrnl_descriptor {<br>+ void *jd_head;<br>+ void *jd_data;<br>
+ int jd_end_mark;<br>+ jrnl_file_desc_t jd_jfd;<br>+#define jdf_epoch jd_jfd.jf_epoch<br>+#define jdf_oid jd_jfd.jf_oid<br>+#define jdf_fd jd_jfd.jf_fd<br>+#define jdf_target_fd jd_jfd.jf_target_fd<br>
+} jrnl_desc_t;<br>+<br>+typedef struct jrnl_handler {<br>+ int (*has_end_mark)(jrnl_desc_t *jd);<br>+ int (*write_header)(jrnl_desc_t *jd);<br>+ int (*write_data)(jrnl_desc_t *jd);<br>+ int (*write_end_mark)(jrnl_desc_t *jd);<br>
+ int (*apply_to_target_object)(jrnl_file_desc_t *jfd);<br>+ int (*commit_data)(jrnl_desc_t *jd);<br>+} jrnl_handler_t;<br>+<br>+inline jrnl_type_t jrnl_get_type(jrnl_desc_t *jd);<br>+int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type);<br>
+int jrnl_exists(jrnl_file_desc_t *jfd);<br>+int jrnl_update_epoch_store(uint32_t epoch);<br>+int jrnl_open(jrnl_file_desc_t *jfd, int aflags);<br>+int jrnl_create(jrnl_file_desc_t *jfd);<br>+int jrnl_remove(jrnl_file_desc_t *jfd);<br>
+inline int jrnl_close(jrnl_file_desc_t *jfd);<br>+<br>+inline int jrnl_has_end_mark(jrnl_desc_t *jd);<br>+inline int jrnl_write_header(jrnl_desc_t *jd);<br>+inline int jrnl_write_data(jrnl_desc_t *jd);<br>+inline int jrnl_write_end_mark(jrnl_desc_t *jd);<br>
+inline int jrnl_apply_to_targe_object(jrnl_file_desc_t *jfd);<br>+inline int jrnl_commit_data(jrnl_desc_t *jd);<br>+int jrnl_perform(jrnl_desc_t *jd);<br>+int jrnl_recover(void);<br>+<br> static inline int is_myself(struct sheepdog_node_list_entry *e)<br>
{<br> return e->id == sys-><a href="http://this_node.id">this_node.id</a>;<br>diff --git a/sheep/store.c b/sheep/store.c<br>index a4d6155..0eac4d2 100644<br>--- a/sheep/store.c<br>+++ b/sheep/store.c<br>@@ -32,10 +32,45 @@<br>
static char *obj_path;<br> static char *epoch_path;<br> static char *mnt_path;<br>+static char *jrnl_path;<br> <br> static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP;<br> static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;<br>
<br>+/* Journal internal data structures */<br>+static int jrnl_vdi_has_end_mark(jrnl_desc_t *jd);<br>+static int jrnl_vdi_write_header(jrnl_desc_t *jd);<br>+static int jrnl_vdi_write_data(jrnl_desc_t *jd);<br>+static int jrnl_vdi_write_end_mark(jrnl_desc_t *jd);<br>
+static int jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd);<br>+static int jrnl_vdi_commit_data(jrnl_desc_t *jd);<br>+<br>+static int jrnl_cksum_has_end_mark(jrnl_desc_t *jd);<br>+static int jrnl_cksum_write_header(jrnl_desc_t *jd);<br>
+static int jrnl_cksum_write_data(jrnl_desc_t *jd);<br>+static int jrnl_cksum_write_end_mark(jrnl_desc_t *jd);<br>+static int jrnl_cksum_apply_to_target_object(jrnl_file_desc_t *jfd);<br>+static int jrnl_cksum_commit_data(jrnl_desc_t *jd);<br>
+<br>+static jrnl_handler_t jrnl_handlers[JRNL_MAX_TYPES] = {<br>+ {<br>+ .has_end_mark = jrnl_vdi_has_end_mark,<br>+ .write_header = jrnl_vdi_write_header,<br>+ .write_data = jrnl_vdi_write_data,<br>
+ .write_end_mark = jrnl_vdi_write_end_mark,<br>+ .apply_to_target_object = jrnl_vdi_apply_to_target_object,<br>+ .commit_data = jrnl_vdi_commit_data<br>+ },<br>+ {<br>+ .has_end_mark = jrnl_cksum_has_end_mark,<br>
+ .write_header = jrnl_cksum_write_header,<br>+ .write_data = jrnl_cksum_write_data,<br>+ .write_end_mark = jrnl_cksum_write_end_mark,<br>+ .apply_to_target_object = jrnl_cksum_apply_to_target_object,<br>
+ .commit_data = jrnl_cksum_commit_data<br>+ }<br>+};<br>+<br> static int obj_cmp(const void *oid1, const void *oid2)<br> {<br> const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);<br>
@@ -98,7 +133,8 @@ static int is_obj_in_range(uint64_t oid, uint64_t start, uint64_t end)<br> return (start < hval || hval <= end);<br> }<br> <br>-static int verify_object(int fd, char *buf, size_t len, int set_chksum)<br>
+/* When set_cksum is set, epoch & oid are required to perform journalling. */<br>+static int verify_object(int fd, char *buf, size_t len, int set_chksum, uint32_t epoch, uint64_t oid)<br> {<br> int ret;<br> uint64_t checksum;<br>
@@ -128,11 +164,30 @@ static int verify_object(int fd, char *buf, size_t len, int set_chksum)<br> }<br> <br> if (set_chksum) {<br>- checksum = fnv_64a_buf(buf, len, FNV1A_64_INIT);<br>- ret = fsetxattr(fd, ANAME_CHECKSUM, &checksum, sizeof(checksum), 0);<br>
- if (ret < 0) {<br>- eprintf("failed to set xattr, %m\n");<br>- goto err;<br>+ if (epoch && oid) {<br>+ jrnl_desc_t jd;<br>+ jrnl_cksum_head_t head;<br>
+ jrnl_cksum_data_t data;<br>+<br>+ head.jh_type = JRNL_TYPE_CKSUM;<br>+ head.jh_size = sizeof(data);<br>+ strcpy(data.aname_cksum, ANAME_CHECKSUM);<br>+ data.aval_cksum = fnv_64a_buf(buf, len, FNV1A_64_INIT);<br>
+ jd.jd_head = &head;<br>+ jd.jd_data = &data;<br>+ jd.jdf_epoch = epoch;<br>+ jd.jdf_oid = oid;<br>+ jd.jdf_target_fd = fd;<br>+ ret = jrnl_perform(&jd);<br>
+ if (ret)<br>+ goto err;<br>+ } else {<br>+ checksum = fnv_64a_buf(buf, len, FNV1A_64_INIT);<br>+ ret = fsetxattr(fd, ANAME_CHECKSUM, &checksum, sizeof(checksum), 0);<br>
+ if (ret < 0) {<br>+ eprintf("failed to set xattr, %m\n");<br>+ goto err;<br>+ }<br> }<br> } else {<br> ret = fgetxattr(fd, ANAME_CHECKSUM, &checksum, sizeof(checksum));<br>
@@ -199,7 +254,7 @@ static int get_obj_list(struct request *req)<br> }<br> obj_nr = read(fd, buf, buf_len);<br> <br>- ret = verify_object(fd, buf, obj_nr, 0);<br>+ ret = verify_object(fd, buf, obj_nr, 0, 0, 0);<br>
if (ret < 0) {<br> eprintf("verification failed, %s, %m\n", path);<br> close(fd);<br>@@ -633,6 +688,8 @@ static int store_queue_request_local(struct request *req, uint32_t epoch)<br> uint64_t oid = hdr->oid;<br>
uint32_t opcode = hdr->opcode;<br> char path[1024], *buf;<br>+ jrnl_desc_t jd;<br>+ jrnl_vdi_head_t jh;<br> <br> switch (opcode) {<br> case SD_OP_CREATE_AND_WRITE_OBJ:<br>@@ -766,24 +823,31 @@ static int store_queue_request_local(struct request *req, uint32_t epoch)<br>
/* } */<br> }<br> /* fall through */<br>- case SD_OP_CREATE_AND_WRITE_OBJ:<br>- ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);<br>- if (ret != hdr->data_length) {<br>
- if (errno == ENOSPC)<br>- ret = SD_RES_NO_SPACE;<br>- else<br>- ret = SD_RES_EIO;<br>+ case SD_OP_CREATE_AND_WRITE_OBJ: <br>+<br>+ jd.jdf_epoch = epoch;<br>+ jd.jdf_oid = oid;<br>
+ jd.jdf_target_fd = fd;<br>+<br>+ jh.jh_type = JRNL_TYPE_VDI;<br>+ jh.jh_offset = hdr->offset;<br>+ jh.jh_size = hdr->data_length;<br>+<br>+ jd.jd_head = &jh;<br>+ jd.jd_data = req->data;<br>
+ jd.jd_end_mark = SET_END_MARK;<br>+<br>+ ret = jrnl_perform(&jd);<br>+ if (ret)<br> goto out;<br>- }<br> <br> if (!is_data_obj(oid)) {<br>- /* FIXME: need to update atomically */<br>
-/* ret = verify_object(fd, NULL, 0, 1); */<br>-/* if (ret < 0) { */<br>-/* eprintf("failed to set checksum, %"PRIx64"\n", oid); */<br>-/* ret = SD_RES_EIO; */<br>
-/* goto out; */<br>-/* } */<br>+ ret = verify_object(fd, NULL, 0, 1, epoch, oid);<br>+ if (ret < 0) {<br>+ eprintf("failed to set checksum, %"PRIx64"\n", oid);<br>
+ ret = SD_RES_EIO;<br>+ goto out;<br>+ }<br> }<br> <br> ret = SD_RES_SUCCESS;<br>@@ -1556,7 +1620,7 @@ static void __start_recovery(struct work *work, int idx)<br>
write(fd, rw->buf, sizeof(uint64_t) * rw->count);<br> fsync(fd);<br> <br>- ret = verify_object(fd, rw->buf, sizeof(uint64_t) * rw->count, 1);<br>+ ret = verify_object(fd, rw->buf, sizeof(uint64_t) * rw->count, 1, 0, 0);<br>
if (ret < 0) {<br> eprintf("failed to set check sum, %s, %m\n", path);<br> close(fd);<br>@@ -1786,6 +1850,27 @@ static int init_mnt_path(const char *base_path)<br> return 0;<br> }<br>
<br>+#define JRNL_PATH "/journal/"<br>+<br>+static int init_jrnl_path(const char *base_path)<br>+{<br>+ int new, ret;<br>+<br>+ /* Create journal directory */<br>+ jrnl_path = zalloc(strlen(base_path) + strlen(JRNL_PATH) + 1);<br>
+ sprintf(jrnl_path, "%s" JRNL_PATH, base_path);<br>+<br>+ ret = init_path(jrnl_path, &new);<br>+ /* Error during directory creation */<br>+ if (ret)<br>+ return ret;<br>+ /* If journal is newly created */<br>
+ if (new)<br>+ return 0;<br>+<br>+ return 0;<br>+}<br>+<br> int init_store(const char *d)<br> {<br> int ret;<br>@@ -1806,6 +1891,10 @@ int init_store(const char *d)<br> if (ret)<br> return ret;<br>
<br>+ ret = init_jrnl_path(d);<br>+ if (ret)<br>+ return ret;<br>+<br> return ret;<br> }<br> <br>@@ -1838,3 +1927,464 @@ int get_global_nr_copies(uint32_t *copies)<br> {<br> return attr(epoch_path, ANAME_COPIES, copies, sizeof(*copies), 0);<br>
}<br>+<br>+/* Journal APIs */<br>+int jrnl_exists(jrnl_file_desc_t *jfd)<br>+{<br>+ int ret;<br>+ char path[1024];<br>+ struct stat s;<br>+<br>+ snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>
+<br>+ ret = stat(path, &s);<br>+ if (ret)<br>+ return 1;<br>+<br>+ return 0;<br>+}<br>+<br>+int jrnl_update_epoch_store(uint32_t epoch)<br>+{<br>+ char new[1024];<br>+ struct stat s;<br>+<br>+ snprintf(new, sizeof(new), "%s%08u/", jrnl_path, epoch);<br>
+ if (stat(new, &s) < 0)<br>+ if (errno == ENOENT)<br>+ mkdir(new, def_dmode);<br>+<br>+ return 0;<br>+}<br>+<br>+int jrnl_open(jrnl_file_desc_t *jfd, int aflags)<br>+{<br>+ char path[1024];<br>
+ int flags = aflags;<br>+ int fd, ret;<br>+<br>+<br>+ jrnl_update_epoch_store(jfd->jf_epoch);<br>+ snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>
+<br>+ fd = open(path, flags, def_fmode);<br>+ if (fd < 0) {<br>+ eprintf("failed to open %s, %s\n", path, strerror(errno));<br>+ if (errno == ENOENT)<br>+ ret = SD_RES_NO_OBJ;<br>
+ else<br>+ ret = SD_RES_UNKNOWN;<br>+ } else {<br>+ jfd->jf_fd = fd;<br>+ ret = SD_RES_SUCCESS;<br>+ }<br>+<br>+ return ret;<br>+}<br>+<br>+int jrnl_close(jrnl_file_desc_t *jfd)<br>
+{<br>+ close(jfd->jf_fd);<br>+ jfd->jf_fd = -1;<br>+<br>+ return 0;<br>+}<br>+<br>+int jrnl_create(jrnl_file_desc_t *jfd)<br>+{<br>+ return jrnl_open(jfd, O_RDWR | O_CREAT);<br>+}<br>+<br>+inline uint32_t jrnl_get_type(jrnl_desc_t *jd)<br>
+{<br>+ return *((uint32_t *) jd->jd_head);<br>+}<br>+<br>+int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type)<br>+{<br>+ ssize_t retsize;<br>+<br>+ retsize = pread64(jfd->jf_fd, jrnl_type, sizeof(*jrnl_type), 0);<br>
+<br>+ if (retsize != sizeof(*jrnl_type))<br>+ return SD_RES_EIO;<br>+ else<br>+ return SD_RES_SUCCESS;<br>+}<br>+<br>+<br>+int jrnl_remove(jrnl_file_desc_t *jfd)<br>+{<br>+ char path[1024];<br>+ int ret;<br>
+<br>+ snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path, jfd->jf_epoch, jfd->jf_oid);<br>+ ret = unlink(path);<br>+ if (ret) {<br>+ eprintf("failed to remove %s, %s\n", path, strerror(errno));<br>
+ ret = SD_RES_EIO;<br>+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ return ret;<br>+}<br>+<br>+int jrnl_has_end_mark(jrnl_desc_t *jd)<br>+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].has_end_mark(jd);<br>
+}<br>+<br>+int jrnl_write_header(jrnl_desc_t *jd)<br>+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].write_header(jd);<br>+}<br>+<br>+int jrnl_write_data(jrnl_desc_t *jd)<br>+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].write_data(jd);<br>
+}<br>+<br>+int jrnl_write_end_mark(jrnl_desc_t *jd)<br>+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].write_end_mark(jd);<br>+}<br>+<br>+int jrnl_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>+ int ret;<br>
+ jrnl_type_t jrnl_type;<br>+<br>+ ret = jrnl_get_type_from_file(jfd, &jrnl_type);<br>+<br>+ return jrnl_handlers[jrnl_type].apply_to_target_object(jfd);<br>+}<br>+<br>+int jrnl_commit_data(jrnl_desc_t *jd)<br>
+{<br>+ return jrnl_handlers[jrnl_get_type(jd)].commit_data(jd);<br>+}<br>+<br>+int jrnl_perform(jrnl_desc_t *jd)<br>+{<br>+ int ret;<br>+<br>+ ret = jrnl_create(&jd->jd_jfd);<br>+ if (ret)<br>+ goto out;<br>
+<br>+ ret = jrnl_write_header(jd);<br>+ if (ret)<br>+ goto out;<br>+<br>+ ret = jrnl_write_data(jd);<br>+ if (ret)<br>+ goto out;<br>+<br>+ ret = jrnl_write_end_mark(jd);<br>+ if (ret)<br>
+ goto out;<br>+<br>+ ret = jrnl_commit_data(jd);<br>+ if (ret)<br>+ goto out;<br>+<br>+ ret = jrnl_remove(&jd->jd_jfd);<br>+<br>+out:<br>+ return ret;<br>+}<br>+<br>+int jrnl_recover(void)<br>
+{<br>+ DIR *dir;<br>+ struct dirent *d;<br>+ char jrnl_dir[1024], <br>+ jrnl_file_path[1024],<br>+ obj_file_path[1024];<br>+ int epoch;<br>+<br>+ epoch = get_latest_epoch();<br>+ if (epoch < 0) {<br>
+ return 1;<br>+ }<br>+ snprintf(jrnl_dir, sizeof(jrnl_dir), "%s%08u/", jrnl_path, epoch);<br>+<br>+ eprintf("Openning the directory%s.\n", jrnl_dir);<br>+ dir = opendir(jrnl_dir);<br>
+ if (!dir)<br>+ return -1;<br>+<br>+ vprintf(SDOG_NOTICE "start jrnl_recovery.\n");<br>+ while ((d = readdir(dir))) {<br>+ int ret;<br>+ jrnl_file_desc_t jfd;<br>+<br>+ if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))<br>
+ continue;<br>+<br>+ jfd.jf_epoch = epoch;<br>+ sscanf(d->d_name, "%" PRIx64, &jfd.jf_oid);<br>+ snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%016" PRIx64,<br>
+ jrnl_dir, jfd.jf_oid);<br>+ snprintf(obj_file_path, sizeof(obj_file_path), "%s%08u/%016" PRIx64,<br>+ obj_path, epoch, jfd.jf_oid);<br>
+ ret = jrnl_open(&jfd, O_RDONLY);<br>+ if (ret) {<br>+ eprintf("Unable to open the journal file, %s, for reading.\n", jrnl_file_path);<br>+ goto end_while_3;<br>+ }<br>
+ jfd.jf_target_fd = ob_open(epoch, jfd.jf_oid, 0, &ret);<br>+ if (ret) {<br>+ eprintf("Unable to open the object file, %s, to recover.\n", obj_file_path);<br>+ goto end_while_2;<br>
+ }<br>+ ret = jrnl_apply_to_target_object(&jfd);<br>+ if (ret)<br>+ eprintf("Unable to recover the object, %s.\n", obj_file_path);<br>+<br>+ close(jfd.jf_target_fd);<br>
+ jfd.jf_target_fd = -1;<br>+ end_while_2:<br>+ jrnl_close(&jfd);<br>+ end_while_3:<br>+ vprintf(SDOG_INFO "recovered the object in journal, %s\n", jrnl_file_path);<br>+ jrnl_remove(&jfd);<br>
+ }<br>+ closedir(dir);<br>+ vprintf(SDOG_NOTICE "end jrnl_recovery.\n");<br>+<br>+ return 0;<br>+}<br>+<br>+/* VDI data journalling functions */<br>+static int jrnl_vdi_has_end_mark(jrnl_desc_t *jd)<br>
+{<br>+ ssize_t ret;<br>+ end_mark_t end_mark = UNSET_END_MARK;<br>+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+ ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+ sizeof(*head) + head->jh_size);<br>
+<br>+ return (IS_END_MARK_SET(end_mark)? SET_END_MARK: UNSET_END_MARK);<br>+}<br>+<br>+int jrnl_vdi_write_header(jrnl_desc_t *jd)<br>+{<br>+ ssize_t ret;<br>+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+ ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0);<br>+<br>+ if (ret != sizeof(*head)) {<br>+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>+ ret = SD_RES_EIO;<br>
+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ return ret;<br>+}<br>+<br>+int jrnl_vdi_write_data(jrnl_desc_t *jd)<br>+{<br>+ ssize_t ret;<br>+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>
+<br>+ ret = pwrite64(jd->jdf_fd, jd->jd_data, head->jh_size, sizeof(*head));<br>+<br>+ if (ret != head->jh_size) {<br>+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>
+ ret = SD_RES_EIO;<br>+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ return ret;<br>+}<br>+<br>+int jrnl_vdi_write_end_mark(jrnl_desc_t *jd)<br>+{<br>+ ssize_t retsize;<br>+ int ret;<br>+ end_mark_t end_mark = SET_END_MARK;<br>
+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+ retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+ sizeof(*head) + head->jh_size);<br>+<br>+ if (retsize != sizeof(end_mark)) {<br>
+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>+ ret = SD_RES_EIO;<br>+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ jd->jd_end_mark= end_mark;<br>+<br>+ return ret;<br>
+}<br>+<br>+int jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>+ char *buf;<br>+ int buf_len, res = 0;<br>+ ssize_t retsize;<br>+ jrnl_vdi_head_t jh;<br>+<br>+ /* FIXME: handle larger size */<br>
+ buf_len = (1 << 22);<br>+ buf = (char *) malloc(buf_len);<br>+ if (!buf) {<br>+ eprintf("failed to allocate memory\n");<br>+ return SD_RES_NO_MEM;<br>+ }<br>+<br>+ /* Flush out journal to disk (vdi object) */<br>
+ retsize = pread64(jfd->jf_fd, &jh, sizeof(jh), 0);<br>+ retsize = pread64(jfd->jf_fd, buf, jh.jh_size, sizeof(jh));<br>+ retsize = pwrite64(jfd->jf_target_fd, buf, jh.jh_size, jh.jh_offset);<br>+ if (retsize != jh.jh_size) {<br>
+ if (errno == ENOSPC)<br>+ res = SD_RES_NO_SPACE;<br>+ else<br>+ res = SD_RES_EIO;<br>+ }<br>+<br>+ /* Clean up */<br>+ free(buf);<br>+<br>+ return res;<br>+}<br>+<br>+static int jrnl_vdi_commit_data(jrnl_desc_t *jd)<br>
+{<br>+ int ret = 0;<br>+ ssize_t retsize;<br>+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;<br>+<br>+ retsize = pwrite64(jd->jdf_target_fd, jd->jd_data, head->jh_size, head->jh_offset);<br>
+ if (retsize != head->jh_size) {<br>+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>+ ret = SD_RES_EIO;<br>+ }<br>+<br>+ return ret;<br>+}<br>+<br>+/* VDI check sum journalling functions */<br>
+<br>+/* FIXME: Implment this function */<br>+<br>+static int jrnl_cksum_has_end_mark(jrnl_desc_t *jd)<br>+{<br>+ ssize_t ret;<br>+ end_mark_t end_mark = UNSET_END_MARK;<br>+ jrnl_cksum_head_t *head = (jrnl_cksum_head_t *) jd->jd_head;<br>
+<br>+ ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+ sizeof(*head) + head->jh_size);<br>+<br>+ return (IS_END_MARK_SET(end_mark)? SET_END_MARK: UNSET_END_MARK);<br>+}<br>+<br>
+int jrnl_cksum_write_header(jrnl_desc_t *jd)<br>+{<br>+ ssize_t ret;<br>+ jrnl_cksum_head_t *head = (jrnl_cksum_head_t *) jd->jd_head;<br>+<br>+ ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0);<br>+<br>
+ if (ret != sizeof(*head)) {<br>+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>+ ret = SD_RES_EIO;<br>+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ return ret;<br>
+}<br>+<br>+int jrnl_cksum_write_data(jrnl_desc_t *jd)<br>+{<br>+ ssize_t ret;<br>+ jrnl_cksum_head_t *head = (jrnl_cksum_head_t *) jd->jd_head;<br>+ jrnl_cksum_data_t *data = (jrnl_cksum_data_t *) jd->jd_data;<br>
+<br>+ ret = pwrite64(jd->jdf_fd, data, head->jh_size, sizeof(*head));<br>+ if (ret != head->jh_size) {<br>+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>+ ret = SD_RES_EIO;<br>
+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ return ret;<br>+}<br>+<br>+int jrnl_cksum_write_end_mark(jrnl_desc_t *jd)<br>+{<br>+ ssize_t retsize;<br>+ int ret;<br>+ end_mark_t end_mark = SET_END_MARK;<br>
+ jrnl_cksum_head_t *head = (jrnl_cksum_head_t *) jd->jd_head;<br>+<br>+ retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark),<br>+ sizeof(*head) + head->jh_size);<br>+<br>+ if (retsize != sizeof(end_mark)) {<br>
+ if (errno == ENOSPC)<br>+ ret = SD_RES_NO_SPACE;<br>+ else<br>+ ret = SD_RES_EIO;<br>+ } else<br>+ ret = SD_RES_SUCCESS;<br>+<br>+ jd->jd_end_mark = end_mark;<br>+<br>
+ return ret;<br>+}<br>+<br>+int jrnl_cksum_apply_to_target_object(jrnl_file_desc_t *jfd)<br>+{<br>+ int ret;<br>+ ssize_t retsize;<br>+ jrnl_cksum_head_t head;<br>+ jrnl_cksum_data_t data;<br>+<br>+ /* Flush out journal to disk (vdi object) */<br>
+ retsize = pread64(jfd->jf_fd, &head, sizeof(head), 0);<br>+ retsize = pread64(jfd->jf_fd, &data, head.jh_size, sizeof(head));<br>+<br>+ ret = fsetxattr(jfd->jf_target_fd, data.aname_cksum, &data.aval_cksum,<br>
+ sizeof(data.aval_cksum), 0);<br>+ if (ret < 0) {<br>+ eprintf("failed to set xattr, %m\n");<br>+ return SD_RES_EIO;<br>+ }<br>+<br>+ return SD_RES_SUCCESS;<br>
+}<br>+<br>+static int jrnl_cksum_commit_data(jrnl_desc_t *jd)<br>+{<br>+ int ret;<br>+ jrnl_cksum_data_t *data = (jrnl_cksum_data_t *) jd->jd_data;<br>+<br>+ ret = fsetxattr(jd->jdf_target_fd, data->aname_cksum, &data->aval_cksum,<br>
+ sizeof(data->aval_cksum), 0);<br>+ if (ret < 0) {<br>+ eprintf("failed to set xattr, %m\n");<br>+ return SD_RES_EIO;<br>+ }<br>+<br>+ return SD_RES_SUCCESS;<br>
+}<br>+<br>