[Sheepdog] [PATCH] Journal support for atomic operations
Narendra Prasad Madanapalli
narendramind at gmail.com
Sun Dec 26 18:08:23 CET 2010
This patch adds the feature of atomicity while performing vdi data object
update/write operation.
With the help of the journalling API, implemented the task of updating
vdi object atomically in store_queue_request_local() for the
operations SD_OP_WRITE_OBJ & SD_OP_CREATE_AND_WRITE_OBJ.
Signed-off-by: Narendra <narendramind at gmail.com>
sheep/sheep.c | 2 +
sheep/sheep_priv.h | 63 ++++++++
sheep/store.c | 433
++++++++++++++++++++++++++++++++++++++++++++++++++--
3 files changed, 486 insertions(+), 12 deletions(-)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index dc9a320..d6c776d 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -112,6 +112,8 @@ int main(int argc, char **argv)
if (is_daemon && daemon(0, 0))
exit(1);
+ jrnl_recover();
+
ret = init_event(EPOLL_SIZE);
if (ret)
exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 62924f2..abc4681 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -202,6 +202,69 @@ int remove_object(struct sheepdog_node_list_entry *e,
int get_sheep_fd(struct sheepdog_node_list_entry *e, int node_idx,
uint32_t epoch, int worker_idx);
+/* Journal */
+typedef uint32_t end_mark_t;
+typedef uint32_t jrnl_type_t;
+
+#define JRNL_TYPE_VDI 0
+#define JRNL_MAX_TYPES 1
+
+#define SET_END_MARK 1UL
+#define UNSET_END_MARK 0UL
+#define IS_END_MARK_SET(var) (var == 1UL)
+
+/* Journal header for data object */
+typedef struct jrnl_vdi_head {
+ jrnl_type_t jh_type;
+ uint64_t jh_offset;
+ uint64_t jh_size;
+} jrnl_vdi_head_t;
+
+typedef struct jrnl_file_desc {
+ uint32_t jf_epoch; /* epoch */
+ uint64_t jf_oid; /* Object id */
+ int jf_fd; /* Open file fd */
+ int jf_target_fd;
+} jrnl_file_desc_t;
+
+typedef struct jrnl_descriptor {
+ void *jd_head;
+ void *jd_data;
+ int jd_end_mark;
+ jrnl_file_desc_t jd_jfd;
+#define jdf_epoch jd_jfd.jf_epoch
+#define jdf_oid jd_jfd.jf_oid
+#define jdf_fd jd_jfd.jf_fd
+#define jdf_target_fd jd_jfd.jf_target_fd
+} jrnl_desc_t;
+
+typedef struct jrnl_handler {
+ int (*has_end_mark)(jrnl_desc_t *jd);
+ int (*write_header)(jrnl_desc_t *jd);
+ int (*write_data)(jrnl_desc_t *jd);
+ int (*write_end_mark)(jrnl_desc_t *jd);
+ int (*apply_to_target_object)(jrnl_file_desc_t *jfd);
+ int (*commit_data)(jrnl_desc_t *jd);
+} jrnl_handler_t;
+
+inline jrnl_type_t jrnl_get_type(jrnl_desc_t *jd);
+int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t
*jrnl_type);
+int jrnl_exists(jrnl_file_desc_t *jfd);
+int jrnl_update_epoch_store(uint32_t epoch);
+int jrnl_open(jrnl_file_desc_t *jfd, int aflags);
+int jrnl_create(jrnl_file_desc_t *jfd);
+int jrnl_remove(jrnl_file_desc_t *jfd);
+inline int jrnl_close(jrnl_file_desc_t *jfd);
+
+inline int jrnl_has_end_mark(jrnl_desc_t *jd);
+inline int jrnl_write_header(jrnl_desc_t *jd);
+inline int jrnl_write_data(jrnl_desc_t *jd);
+inline int jrnl_write_end_mark(jrnl_desc_t *jd);
+inline int jrnl_apply_to_targe_object(jrnl_file_desc_t *jfd);
+inline int jrnl_commit_data(jrnl_desc_t *jd);
+int jrnl_perform(jrnl_desc_t *jd);
+int jrnl_recover(void);
+
static inline int is_myself(struct sheepdog_node_list_entry *e)
{
return e->id == sys->this_node.id;
diff --git a/sheep/store.c b/sheep/store.c
index 8c688c1..bf44815 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -31,10 +31,32 @@
static char *obj_path;
static char *epoch_path;
static char *mnt_path;
+static char *jrnl_path;
static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP |
S_IXGRP;
static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+/* Journal internal data structures */
+/* Journal Handlers for Data Object */
+static int jrnl_vdi_has_end_mark(jrnl_desc_t *jd);
+static int jrnl_vdi_write_header(jrnl_desc_t *jd);
+static int jrnl_vdi_write_data(jrnl_desc_t *jd);
+static int jrnl_vdi_write_end_mark(jrnl_desc_t *jd);
+static int jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd);
+static int jrnl_vdi_commit_data(jrnl_desc_t *jd);
+
+static jrnl_handler_t jrnl_handlers[JRNL_MAX_TYPES] = {
+ {
+ .has_end_mark = jrnl_vdi_has_end_mark,
+ .write_header = jrnl_vdi_write_header,
+ .write_data = jrnl_vdi_write_data,
+ .write_end_mark = jrnl_vdi_write_end_mark,
+ .apply_to_target_object = jrnl_vdi_apply_to_target_object,
+ .commit_data = jrnl_vdi_commit_data
+ }
+};
+
+
static int obj_cmp(const void *oid1, const void *oid2)
{
const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t),
FNV1A_64_INIT);
@@ -556,6 +578,8 @@ static int store_queue_request_local(struct request
*req, uint32_t epoch)
uint64_t oid = hdr->oid;
uint32_t opcode = hdr->opcode;
char path[1024], *buf;
+ jrnl_desc_t jd;
+ jrnl_vdi_head_t jh;
switch (opcode) {
case SD_OP_CREATE_AND_WRITE_OBJ:
@@ -671,19 +695,30 @@ static int store_queue_request_local(struct request
*req, uint32_t epoch)
case SD_OP_WRITE_OBJ:
case SD_OP_CREATE_AND_WRITE_OBJ:
if (!is_data_obj(oid)) {
- /* FIXME: write data to journal */
- }
- ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
- if (ret != hdr->data_length) {
- if (errno == ENOSPC)
- ret = SD_RES_NO_SPACE;
- else
- ret = SD_RES_EIO;
- goto out;
- }
+ jd.jdf_epoch = epoch;
+ jd.jdf_oid = oid;
+ jd.jdf_target_fd = fd;
- if (!is_data_obj(oid)) {
- /* FIXME: remove journal data */
+ jh.jh_type = JRNL_TYPE_VDI;
+ jh.jh_offset = hdr->offset;
+ jh.jh_size = hdr->data_length;
+
+ jd.jd_head = &jh;
+ jd.jd_data = req->data;
+ jd.jd_end_mark = SET_END_MARK;
+
+ ret = jrnl_perform(&jd);
+ if (ret)
+ goto out;
+ } else {
+ ret = pwrite64(fd, req->data, hdr->data_length, hdr->offset);
+ if (ret != hdr->data_length) {
+ if (errno == ENOSPC)
+ ret = SD_RES_NO_SPACE;
+ else
+ ret = SD_RES_EIO;
+ goto out;
+ }
}
ret = SD_RES_SUCCESS;
@@ -1739,6 +1774,27 @@ static int init_mnt_path(const char *base_path)
return 0;
}
+#define JRNL_PATH "/journal/"
+
+static int init_jrnl_path(const char *base_path)
+{
+ int new, ret;
+
+ /* Create journal directory */
+ jrnl_path = zalloc(strlen(base_path) + strlen(JRNL_PATH) + 1);
+ sprintf(jrnl_path, "%s" JRNL_PATH, base_path);
+
+ ret = init_path(jrnl_path, &new);
+ /* Error during directory creation */
+ if (ret)
+ return ret;
+ /* If journal is newly created */
+ if (new)
+ return 0;
+
+ return 0;
+}
+
int init_store(const char *d)
{
int ret;
@@ -1759,6 +1815,10 @@ int init_store(const char *d)
if (ret)
return ret;
+ ret = init_jrnl_path(d);
+ if (ret)
+ return ret;
+
return ret;
}
@@ -1791,3 +1851,352 @@ int get_global_nr_copies(uint32_t *copies)
{
return attr(epoch_path, ANAME_COPIES, copies, sizeof(*copies), 0);
}
+
+/* Journal APIs */
+int jrnl_exists(jrnl_file_desc_t *jfd)
+{
+ int ret;
+ char path[1024];
+ struct stat s;
+
+ snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path,
jfd->jf_epoch, jfd->jf_oid);
+
+ ret = stat(path, &s);
+ if (ret)
+ return 1;
+
+ return 0;
+}
+
+int jrnl_update_epoch_store(uint32_t epoch)
+{
+ char new[1024];
+ struct stat s;
+
+ snprintf(new, sizeof(new), "%s%08u/", jrnl_path, epoch);
+ if (stat(new, &s) < 0)
+ if (errno == ENOENT)
+ mkdir(new, def_dmode);
+
+ return 0;
+}
+
+int jrnl_open(jrnl_file_desc_t *jfd, int aflags)
+{
+ char path[1024];
+ int flags = aflags;
+ int fd, ret;
+
+
+ jrnl_update_epoch_store(jfd->jf_epoch);
+ snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path,
jfd->jf_epoch, jfd->jf_oid);
+
+ fd = open(path, flags, def_fmode);
+ if (fd < 0) {
+ eprintf("failed to open %s, %s\n", path, strerror(errno));
+ if (errno == ENOENT)
+ ret = SD_RES_NO_OBJ;
+ else
+ ret = SD_RES_UNKNOWN;
+ } else {
+ jfd->jf_fd = fd;
+ ret = SD_RES_SUCCESS;
+ }
+
+ return ret;
+}
+
+int jrnl_close(jrnl_file_desc_t *jfd)
+{
+ close(jfd->jf_fd);
+ jfd->jf_fd = -1;
+
+ return 0;
+}
+
+int jrnl_create(jrnl_file_desc_t *jfd)
+{
+ return jrnl_open(jfd, O_RDWR | O_CREAT);
+}
+
+inline uint32_t jrnl_get_type(jrnl_desc_t *jd)
+{
+ return *((uint32_t *) jd->jd_head);
+}
+
+int jrnl_get_type_from_file(jrnl_file_desc_t *jfd, jrnl_type_t *jrnl_type)
+{
+ ssize_t retsize;
+
+ retsize = pread64(jfd->jf_fd, jrnl_type, sizeof(*jrnl_type), 0);
+
+ if (retsize != sizeof(*jrnl_type))
+ return SD_RES_EIO;
+ else
+ return SD_RES_SUCCESS;
+}
+
+int jrnl_remove(jrnl_file_desc_t *jfd)
+{
+ char path[1024];
+ int ret;
+
+ snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path,
jfd->jf_epoch, jfd->jf_oid);
+ ret = unlink(path);
+ if (ret) {
+ eprintf("failed to remove %s, %s\n", path, strerror(errno));
+ ret = SD_RES_EIO;
+ } else
+ ret = SD_RES_SUCCESS;
+
+ return ret;
+}
+
+inline int jrnl_has_end_mark(jrnl_desc_t *jd)
+{
+ return jrnl_handlers[jrnl_get_type(jd)].has_end_mark(jd);
+}
+
+inline int jrnl_write_header(jrnl_desc_t *jd)
+{
+ return jrnl_handlers[jrnl_get_type(jd)].write_header(jd);
+}
+
+inline int jrnl_write_data(jrnl_desc_t *jd)
+{
+ return jrnl_handlers[jrnl_get_type(jd)].write_data(jd);
+}
+
+inline int jrnl_write_end_mark(jrnl_desc_t *jd)
+{
+ return jrnl_handlers[jrnl_get_type(jd)].write_end_mark(jd);
+}
+
+inline int jrnl_apply_to_target_object(jrnl_file_desc_t *jfd)
+{
+ int ret;
+ jrnl_type_t jrnl_type;
+
+ ret = jrnl_get_type_from_file(jfd, &jrnl_type);
+
+ return jrnl_handlers[jrnl_type].apply_to_target_object(jfd);
+}
+
+inline int jrnl_commit_data(jrnl_desc_t *jd)
+{
+ return jrnl_handlers[jrnl_get_type(jd)].commit_data(jd);
+}
+
+int jrnl_perform(jrnl_desc_t *jd)
+{
+ int ret;
+
+ ret = jrnl_create(&jd->jd_jfd);
+ if (ret)
+ goto out;
+
+ ret = jrnl_write_header(jd);
+ if (ret)
+ goto out;
+
+ ret = jrnl_write_data(jd);
+ if (ret)
+ goto out;
+
+ ret = jrnl_write_end_mark(jd);
+ if (ret)
+ goto out;
+
+ ret = jrnl_commit_data(jd);
+ if (ret)
+ goto out;
+
+ ret = jrnl_remove(&jd->jd_jfd);
+
+out:
+ return ret;
+}
+
+int jrnl_recover(void)
+{
+ DIR *dir;
+ struct dirent *d;
+ char jrnl_dir[1024],
+ jrnl_file_path[1024],
+ obj_file_path[1024];
+ int epoch;
+
+ epoch = get_latest_epoch();
+ if (epoch < 0) {
+ return 1;
+ }
+ snprintf(jrnl_dir, sizeof(jrnl_dir), "%s%08u/", jrnl_path, epoch);
+
+ eprintf("Openning the directory%s.\n", jrnl_dir);
+ dir = opendir(jrnl_dir);
+ if (!dir)
+ return -1;
+
+ vprintf(SDOG_NOTICE "start jrnl_recovery.\n");
+ while ((d = readdir(dir))) {
+ int ret;
+ jrnl_file_desc_t jfd;
+
+ if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
+ continue;
+
+ jfd.jf_epoch = epoch;
+ sscanf(d->d_name, "%" PRIx64, &jfd.jf_oid);
+ snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%016" PRIx64,
+ jrnl_dir,
jfd.jf_oid);
+ snprintf(obj_file_path, sizeof(obj_file_path), "%s%08u/%016"
PRIx64,
+ obj_path, epoch,
jfd.jf_oid);
+ ret = jrnl_open(&jfd, O_RDONLY);
+ if (ret) {
+ eprintf("Unable to open the journal file, %s, for reading.\n",
jrnl_file_path);
+ goto end_while_3;
+ }
+ jfd.jf_target_fd = ob_open(epoch, jfd.jf_oid, 0, &ret);
+ if (ret) {
+ eprintf("Unable to open the object file, %s, to recover.\n",
obj_file_path);
+ goto end_while_2;
+ }
+ ret = jrnl_apply_to_target_object(&jfd);
+ if (ret)
+ eprintf("Unable to recover the object, %s.\n", obj_file_path);
+
+ close(jfd.jf_target_fd);
+ jfd.jf_target_fd = -1;
+ end_while_2:
+ jrnl_close(&jfd);
+ end_while_3:
+ vprintf(SDOG_INFO "recovered the object in journal, %s\n",
jrnl_file_path);
+ jrnl_remove(&jfd);
+ }
+ closedir(dir);
+ vprintf(SDOG_NOTICE "end jrnl_recovery.\n");
+
+ return 0;
+}
+
+/* VDI data journalling functions */
+static int jrnl_vdi_has_end_mark(jrnl_desc_t *jd)
+{
+ ssize_t ret;
+ end_mark_t end_mark = UNSET_END_MARK;
+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+ ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark),
+ sizeof(*head) + head->jh_size);
+
+ return (IS_END_MARK_SET(end_mark)? SET_END_MARK: UNSET_END_MARK);
+}
+
+static int jrnl_vdi_write_header(jrnl_desc_t *jd)
+{
+ ssize_t ret;
+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+ ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0);
+
+ if (ret != sizeof(*head)) {
+ if (errno == ENOSPC)
+ ret = SD_RES_NO_SPACE;
+ else
+ ret = SD_RES_EIO;
+ } else
+ ret = SD_RES_SUCCESS;
+
+ return ret;
+}
+
+static int jrnl_vdi_write_data(jrnl_desc_t *jd)
+{
+ ssize_t ret;
+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+ ret = pwrite64(jd->jdf_fd, jd->jd_data, head->jh_size, sizeof(*head));
+
+ if (ret != head->jh_size) {
+ if (errno == ENOSPC)
+ ret = SD_RES_NO_SPACE;
+ else
+ ret = SD_RES_EIO;
+ } else
+ ret = SD_RES_SUCCESS;
+
+ return ret;
+}
+
+static int jrnl_vdi_write_end_mark(jrnl_desc_t *jd)
+{
+ ssize_t retsize;
+ int ret;
+ end_mark_t end_mark = SET_END_MARK;
+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+ retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark),
+ sizeof(*head) + head->jh_size);
+
+ if (retsize != sizeof(end_mark)) {
+ if (errno == ENOSPC)
+ ret = SD_RES_NO_SPACE;
+ else
+ ret = SD_RES_EIO;
+ } else
+ ret = SD_RES_SUCCESS;
+
+ jd->jd_end_mark= end_mark;
+
+ return ret;
+}
+
+static int jrnl_vdi_apply_to_target_object(jrnl_file_desc_t *jfd)
+{
+ char *buf = NULL;
+ int buf_len, res = 0;
+ ssize_t retsize;
+ jrnl_vdi_head_t jh;
+
+ /* FIXME: handle larger size */
+ buf_len = (1 << 22);
+ buf = zalloc(buf_len);
+ if (!buf) {
+ eprintf("failed to allocate memory\n");
+ return SD_RES_NO_MEM;
+ }
+
+ /* Flush out journal to disk (vdi object) */
+ retsize = pread64(jfd->jf_fd, &jh, sizeof(jh), 0);
+ retsize = pread64(jfd->jf_fd, buf, jh.jh_size, sizeof(jh));
+ retsize = pwrite64(jfd->jf_target_fd, buf, jh.jh_size, jh.jh_offset);
+ if (retsize != jh.jh_size) {
+ if (errno == ENOSPC)
+ res = SD_RES_NO_SPACE;
+ else
+ res = SD_RES_EIO;
+ }
+
+ /* Clean up */
+ free(buf);
+
+ return res;
+}
+
+static int jrnl_vdi_commit_data(jrnl_desc_t *jd)
+{
+ int ret = 0;
+ ssize_t retsize;
+ jrnl_vdi_head_t *head = (jrnl_vdi_head_t *) jd->jd_head;
+
+ retsize = pwrite64(jd->jdf_target_fd, jd->jd_data, head->jh_size,
head->jh_offset);
+ if (retsize != head->jh_size) {
+ if (errno == ENOSPC)
+ ret = SD_RES_NO_SPACE;
+ else
+ ret = SD_RES_EIO;
+ }
+
+ return ret;
+}
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.wpkg.org/pipermail/sheepdog/attachments/20101226/13cd5a75/attachment-0003.html>
More information about the sheepdog
mailing list