[Sheepdog] [PATCH] Journal support for atomic operations

Narendra Prasad Madanapalli narendramind at gmail.com
Sun Dec 26 18:08:23 CET 2010


This patch adds the feature of atomicity while performing vdi data object
update/write operation.

With the help of the journalling API, implemented the task of updating
vdi object atomically in store_queue_request_local() for the
operations SD_OP_WRITE_OBJ & SD_OP_CREATE_AND_WRITE_OBJ.

Signed-off-by: Narendra <narendramind at gmail.com>

 sheep/sheep.c      |    2 +
 sheep/sheep_priv.h |   63 ++++++++
 sheep/store.c      |  433
++++++++++++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 486 insertions(+), 12 deletions(-)

diff --git a/sheep/sheep.c b/sheep/sheep.c
index dc9a320..d6c776d 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -112,6 +112,8 @@ int main(int argc, char **argv)
     if (is_daemon && daemon(0, 0))
         exit(1);

+    jrnl_recover();
+
     ret = init_event(EPOLL_SIZE);
     if (ret)
         exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 62924f2..abc4681 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -202,6 +202,69 @@ int remove_object(struct sheepdog_node_list_entry *e,
 int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx,
          uint32_t epoch, int worker_idx);

+/* Journal */
+typedef uint32_t end_mark_t;
+typedef uint32_t jrnl_type_t;
+
+#define JRNL_TYPE_VDI        0
+#define JRNL_MAX_TYPES       1
+
+#define SET_END_MARK            1UL
+#define UNSET_END_MARK            0UL
+#define IS_END_MARK_SET(var)    (var == 1UL)
+
+/* Journal header for data object */
+typedef struct jrnl_vdi_head {
+    jrnl_type_t  jh_type;
+    uint64_t  jh_offset;
+    uint64_t  jh_size;
+}  jrnl_vdi_head_t;
+
+typedef struct jrnl_file_desc {
+    uint32_t  jf_epoch;    /* epoch */
+    uint64_t  jf_oid;     /* Object id */
+    int       jf_fd;      /* Open file fd */
+    int       jf_target_fd;
+} jrnl_file_desc_t;
+
+typedef struct jrnl_descriptor {
+    void                *jd_head;
+    void                *jd_data;
+    int                 jd_end_mark;
+    jrnl_file_desc_t    jd_jfd;
+#define jdf_epoch          jd_jfd.jf_epoch
+#define jdf_oid            jd_jfd.jf_oid
+#define jdf_fd             jd_jfd.jf_fd
+#define jdf_target_fd      jd_jfd.jf_target_fd
+} jrnl_desc_t;
+
+typedef struct jrnl_handler {
+    int  (*has_end_mark)(jrnl_desc_t *jd);
+    int  (*write_header)(jrnl_desc_t  *jd);
+    int  (*write_data)(jrnl_desc_t  *jd);
+    int  (*write_end_mark)(jrnl_desc_t  *jd);
+    int  (*apply_to_target_object)(jrnl_file_desc_t *jfd);
+    int  (*commit_data)(jrnl_desc_t  *jd);
+} jrnl_handler_t;
+
+inline jrnl_type_t jrnl_get_type(jrnl_desc_t  *jd);
+int  jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t
*jrnl_type);
+int  jrnl_exists(jrnl_file_desc_t  *jfd);
+int  jrnl_update_epoch_store(uint32_t epoch);
+int  jrnl_open(jrnl_file_desc_t  *jfd, int  aflags);
+int  jrnl_create(jrnl_file_desc_t  *jfd);
+int  jrnl_remove(jrnl_file_desc_t *jfd);
+inline int  jrnl_close(jrnl_file_desc_t  *jfd);
+
+inline int  jrnl_has_end_mark(jrnl_desc_t  *jd);
+inline int  jrnl_write_header(jrnl_desc_t  *jd);
+inline int  jrnl_write_data(jrnl_desc_t  *jd);
+inline int  jrnl_write_end_mark(jrnl_desc_t  *jd);
+inline int  jrnl_apply_to_targe_object(jrnl_file_desc_t *jfd);
+inline int  jrnl_commit_data(jrnl_desc_t  *jd);
+int  jrnl_perform(jrnl_desc_t  *jd);
+int  jrnl_recover(void);
+
 static inline int is_myself(struct sheepdog_node_list_entry *e)
 {
     return e->id == sys->this_node.id;
diff --git a/sheep/store.c b/sheep/store.c
index 8c688c1..bf44815 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -31,10 +31,32 @@
 static char *obj_path;
 static char *epoch_path;
 static char *mnt_path;
+static char *jrnl_path;

 static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP |
S_IXGRP;
 static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;

+/* Journal internal data structures */
+/* Journal Handlers for Data Object */
+static int  jrnl_vdi_has_end_mark(jrnl_desc_t  *jd);
+static int  jrnl_vdi_write_header(jrnl_desc_t  *jd);
+static int  jrnl_vdi_write_data(jrnl_desc_t  *jd);
+static int  jrnl_vdi_write_end_mark(jrnl_desc_t  *jd);
+static int  jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd);
+static int  jrnl_vdi_commit_data(jrnl_desc_t  *jd);
+
+static jrnl_handler_t  jrnl_handlers[JRNL_MAX_TYPES] = {
+    {
+        .has_end_mark = jrnl_vdi_has_end_mark,
+        .write_header = jrnl_vdi_write_header,
+        .write_data = jrnl_vdi_write_data,
+        .write_end_mark = jrnl_vdi_write_end_mark,
+        .apply_to_target_object = jrnl_vdi_apply_to_target_object,
+        .commit_data = jrnl_vdi_commit_data
+    }
+};
+
+
 static int obj_cmp(const void *oid1, const void *oid2)
 {
     const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t),
FNV1A_64_INIT);
@@ -556,6 +578,8 @@ static int store_queue_request_local(struct request
*req, uint32_t epoch)
     uint64_t oid = hdr->oid;
     uint32_t opcode = hdr->opcode;
     char path[1024], *buf;
+    jrnl_desc_t  jd;
+    jrnl_vdi_head_t jh;

     switch (opcode) {
     case SD_OP_CREATE_AND_WRITE_OBJ:
@@ -671,19 +695,30 @@ static int store_queue_request_local(struct request
*req, uint32_t epoch)
     case SD_OP_WRITE_OBJ:
     case SD_OP_CREATE_AND_WRITE_OBJ:
         if (!is_data_obj(oid)) {
-            /* FIXME: write data to journal */
-        }
-        ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
-        if (ret != hdr->data_length) {
-            if (errno == ENOSPC)
-                ret = SD_RES_NO_SPACE;
-            else
-                ret = SD_RES_EIO;
-            goto out;
-        }
+            jd.jdf_epoch = epoch;
+            jd.jdf_oid = oid;
+            jd.jdf_target_fd = fd;

-        if (!is_data_obj(oid)) {
-            /* FIXME: remove journal data */
+            jh.jh_type = JRNL_TYPE_VDI;
+            jh.jh_offset = hdr->offset;
+            jh.jh_size = hdr->data_length;
+
+            jd.jd_head = &jh;
+            jd.jd_data = req->data;
+            jd.jd_end_mark = SET_END_MARK;
+
+            ret = jrnl_perform(&jd);
+            if (ret)
+                goto out;
+        } else {
+            ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
+            if (ret != hdr->data_length) {
+                if (errno == ENOSPC)
+                    ret = SD_RES_NO_SPACE;
+                else
+                    ret = SD_RES_EIO;
+                goto out;
+            }
         }

         ret = SD_RES_SUCCESS;
@@ -1739,6 +1774,27 @@ static int init_mnt_path(const char *base_path)
     return 0;
 }

+#define JRNL_PATH "/journal/"
+
+static int  init_jrnl_path(const char *base_path)
+{
+    int new, ret;
+
+    /* Create journal directory */
+    jrnl_path = zalloc(strlen(base_path) + strlen(JRNL_PATH) + 1);
+    sprintf(jrnl_path, "%s" JRNL_PATH, base_path);
+
+    ret = init_path(jrnl_path, &new);
+        /* Error during directory creation */
+    if (ret)
+        return ret;
+    /* If journal is newly created */
+    if (new)
+        return 0;
+
+    return 0;
+}
+
 int init_store(const char *d)
 {
     int ret;
@@ -1759,6 +1815,10 @@ int init_store(const char *d)
     if (ret)
         return ret;

+    ret = init_jrnl_path(d);
+    if (ret)
+        return ret;
+
     return ret;
 }

@@ -1791,3 +1851,352 @@ int get_global_nr_copies(uint32_t *copies)
 {
     return attr(epoch_path, ANAME_COPIES, copies, sizeof(*copies), 0);
 }
+
+/* Journal APIs */
+int  jrnl_exists(jrnl_file_desc_t  *jfd)
+{
+    int ret;
+    char path[1024];
+    struct stat s;
+
+    snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path,
jfd->jf_epoch, jfd->jf_oid);
+
+    ret = stat(path, &s);
+    if (ret)
+        return 1;
+
+    return 0;
+}
+
+int jrnl_update_epoch_store(uint32_t epoch)
+{
+    char new[1024];
+    struct stat s;
+
+    snprintf(new, sizeof(new), "%s%08u/", jrnl_path, epoch);
+    if (stat(new, &s) < 0)
+        if (errno == ENOENT)
+            mkdir(new, def_dmode);
+
+    return 0;
+}
+
+int  jrnl_open(jrnl_file_desc_t  *jfd, int  aflags)
+{
+    char path[1024];
+    int flags = aflags;
+    int fd, ret;
+
+
+    jrnl_update_epoch_store(jfd->jf_epoch);
+    snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path,
jfd->jf_epoch, jfd->jf_oid);
+
+    fd = open(path, flags, def_fmode);
+    if (fd < 0) {
+        eprintf("failed to open %s, %s\n", path, strerror(errno));
+        if (errno == ENOENT)
+            ret = SD_RES_NO_OBJ;
+        else
+            ret = SD_RES_UNKNOWN;
+    } else {
+        jfd->jf_fd = fd;
+        ret = SD_RES_SUCCESS;
+    }
+
+    return ret;
+}
+
+int  jrnl_close(jrnl_file_desc_t  *jfd)
+{
+    close(jfd->jf_fd);
+    jfd->jf_fd = -1;
+
+    return 0;
+}
+
+int  jrnl_create(jrnl_file_desc_t  *jfd)
+{
+    return jrnl_open(jfd, O_RDWR | O_CREAT);
+}
+
+inline uint32_t jrnl_get_type(jrnl_desc_t  *jd)
+{
+    return *((uint32_t *) jd->jd_head);
+}
+
+int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type)
+{
+    ssize_t retsize;
+
+    retsize = pread64(jfd->jf_fd, jrnl_type, sizeof(*jrnl_type), 0);
+
+    if (retsize != sizeof(*jrnl_type))
+        return SD_RES_EIO;
+    else
+        return SD_RES_SUCCESS;
+}
+
+int  jrnl_remove(jrnl_file_desc_t *jfd)
+{
+    char path[1024];
+    int ret;
+
+    snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path,
jfd->jf_epoch, jfd->jf_oid);
+    ret = unlink(path);
+    if (ret) {
+        eprintf("failed to remove %s, %s\n", path, strerror(errno));
+        ret = SD_RES_EIO;
+    } else
+        ret = SD_RES_SUCCESS;
+
+    return ret;
+}
+
+inline int  jrnl_has_end_mark(jrnl_desc_t *jd)
+{
+    return jrnl_handlers[jrnl_get_type(jd)].has_end_mark(jd);
+}
+
+inline int  jrnl_write_header(jrnl_desc_t  *jd)
+{
+    return jrnl_handlers[jrnl_get_type(jd)].write_header(jd);
+}
+
+inline int  jrnl_write_data(jrnl_desc_t  *jd)
+{
+    return jrnl_handlers[jrnl_get_type(jd)].write_data(jd);
+}
+
+inline int  jrnl_write_end_mark(jrnl_desc_t  *jd)
+{
+    return jrnl_handlers[jrnl_get_type(jd)].write_end_mark(jd);
+}
+
+inline int  jrnl_apply_to_target_object(jrnl_file_desc_t *jfd)
+{
+    int ret;
+    jrnl_type_t  jrnl_type;
+
+    ret = jrnl_get_type_from_file(jfd, &jrnl_type);
+
+    return jrnl_handlers[jrnl_type].apply_to_target_object(jfd);
+}
+
+inline int  jrnl_commit_data(jrnl_desc_t  *jd)
+{
+    return jrnl_handlers[jrnl_get_type(jd)].commit_data(jd);
+}
+
+int  jrnl_perform(jrnl_desc_t  *jd)
+{
+    int  ret;
+
+    ret = jrnl_create(&jd->jd_jfd);
+        if (ret)
+            goto out;
+
+    ret = jrnl_write_header(jd);
+    if (ret)
+        goto out;
+
+    ret = jrnl_write_data(jd);
+    if (ret)
+        goto out;
+
+    ret = jrnl_write_end_mark(jd);
+    if (ret)
+        goto out;
+
+    ret = jrnl_commit_data(jd);
+    if (ret)
+        goto out;
+
+    ret = jrnl_remove(&jd->jd_jfd);
+
+out:
+    return ret;
+}
+
+int  jrnl_recover(void)
+{
+    DIR *dir;
+    struct dirent *d;
+    char jrnl_dir[1024],
+         jrnl_file_path[1024],
+         obj_file_path[1024];
+    int  epoch;
+
+    epoch = get_latest_epoch();
+    if (epoch < 0) {
+        return 1;
+    }
+    snprintf(jrnl_dir, sizeof(jrnl_dir), "%s%08u/", jrnl_path, epoch);
+
+    eprintf("Openning the directory%s.\n", jrnl_dir);
+    dir = opendir(jrnl_dir);
+    if (!dir)
+        return -1;
+
+    vprintf(SDOG_NOTICE "start jrnl_recovery.\n");
+    while ((d = readdir(dir))) {
+        int  ret;
+        jrnl_file_desc_t  jfd;
+
+        if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
+            continue;
+
+        jfd.jf_epoch = epoch;
+        sscanf(d->d_name, "%" PRIx64, &jfd.jf_oid);
+        snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%016" PRIx64,
+                                                        jrnl_dir,
jfd.jf_oid);
+        snprintf(obj_file_path, sizeof(obj_file_path), "%s%08u/%016"
PRIx64,
+                                                        obj_path, epoch,
jfd.jf_oid);
+        ret = jrnl_open(&jfd, O_RDONLY);
+        if (ret) {
+            eprintf("Unable to open the journal file, %s, for reading.\n",
jrnl_file_path);
+            goto end_while_3;
+        }
+        jfd.jf_target_fd = ob_open(epoch, jfd.jf_oid, 0, &ret);
+        if (ret) {
+            eprintf("Unable to open the object file, %s, to recover.\n",
obj_file_path);
+            goto end_while_2;
+        }
+        ret = jrnl_apply_to_target_object(&jfd);
+        if (ret)
+            eprintf("Unable to recover the object, %s.\n", obj_file_path);
+
+        close(jfd.jf_target_fd);
+        jfd.jf_target_fd = -1;
+    end_while_2:
+        jrnl_close(&jfd);
+    end_while_3:
+        vprintf(SDOG_INFO "recovered the object in journal, %s\n",
jrnl_file_path);
+        jrnl_remove(&jfd);
+    }
+    closedir(dir);
+    vprintf(SDOG_NOTICE "end jrnl_recovery.\n");
+
+    return 0;
+}
+
+/* VDI data journalling functions */
+static int  jrnl_vdi_has_end_mark(jrnl_desc_t *jd)
+{
+    ssize_t  ret;
+    end_mark_t  end_mark = UNSET_END_MARK;
+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+    ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark),
+                   sizeof(*head) + head->jh_size);
+
+    return (IS_END_MARK_SET(end_mark)? SET_END_MARK: UNSET_END_MARK);
+}
+
+static int  jrnl_vdi_write_header(jrnl_desc_t  *jd)
+{
+    ssize_t  ret;
+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+    ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0);
+
+    if (ret != sizeof(*head)) {
+        if (errno == ENOSPC)
+            ret = SD_RES_NO_SPACE;
+        else
+            ret = SD_RES_EIO;
+    } else
+        ret = SD_RES_SUCCESS;
+
+    return ret;
+}
+
+static int  jrnl_vdi_write_data(jrnl_desc_t  *jd)
+{
+    ssize_t  ret;
+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+    ret = pwrite64(jd->jdf_fd, jd->jd_data, head->jh_size, sizeof(*head));
+
+    if (ret != head->jh_size) {
+        if (errno == ENOSPC)
+            ret = SD_RES_NO_SPACE;
+        else
+            ret = SD_RES_EIO;
+    } else
+        ret = SD_RES_SUCCESS;
+
+    return ret;
+}
+
+static int  jrnl_vdi_write_end_mark(jrnl_desc_t  *jd)
+{
+    ssize_t  retsize;
+    int  ret;
+    end_mark_t  end_mark = SET_END_MARK;
+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+    retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark),
+                   sizeof(*head) + head->jh_size);
+
+    if (retsize != sizeof(end_mark)) {
+        if (errno == ENOSPC)
+            ret = SD_RES_NO_SPACE;
+        else
+            ret = SD_RES_EIO;
+    } else
+        ret = SD_RES_SUCCESS;
+
+    jd->jd_end_mark= end_mark;
+
+    return ret;
+}
+
+static int  jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd)
+{
+    char *buf = NULL;
+    int buf_len, res = 0;
+    ssize_t retsize;
+    jrnl_vdi_head_t jh;
+
+    /* FIXME: handle larger size */
+    buf_len = (1 << 22);
+    buf = zalloc(buf_len);
+    if (!buf) {
+        eprintf("failed to allocate memory\n");
+        return SD_RES_NO_MEM;
+    }
+
+    /* Flush out journal to disk (vdi object) */
+    retsize = pread64(jfd->jf_fd, &jh, sizeof(jh), 0);
+    retsize = pread64(jfd->jf_fd, buf, jh.jh_size, sizeof(jh));
+    retsize = pwrite64(jfd->jf_target_fd, buf, jh.jh_size, jh.jh_offset);
+    if (retsize != jh.jh_size) {
+        if (errno == ENOSPC)
+            res = SD_RES_NO_SPACE;
+        else
+            res = SD_RES_EIO;
+    }
+
+    /* Clean up */
+    free(buf);
+
+    return res;
+}
+
+static int  jrnl_vdi_commit_data(jrnl_desc_t  *jd)
+{
+    int  ret = 0;
+    ssize_t  retsize;
+    jrnl_vdi_head_t  *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+    retsize = pwrite64(jd->jdf_target_fd, jd->jd_data, head->jh_size,
head->jh_offset);
+    if (retsize != head->jh_size) {
+        if (errno == ENOSPC)
+            ret = SD_RES_NO_SPACE;
+        else
+            ret = SD_RES_EIO;
+    }
+
+    return ret;
+}
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.wpkg.org/pipermail/sheepdog/attachments/20101226/13cd5a75/attachment-0003.html>


More information about the sheepdog mailing list