[sheepdog] [PATCH 7/9] sheep: add kvs feature for object storage
MORITA Kazutaka
morita.kazutaka at gmail.com
Thu Oct 31 08:49:05 CET 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
This implements backend kvs functions for object storage.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/Makefile.am | 4 +-
sheep/kvs.c | 557 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
sheep/kvs.h | 44 +++++
3 files changed, 603 insertions(+), 2 deletions(-)
create mode 100644 sheep/kvs.c
create mode 100644 sheep/kvs.h
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 09311ff..a9497fc 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -30,7 +30,7 @@ sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c \
plain_store.c config.c migrate.c md.c
if BUILD_HTTP
-sheep_SOURCES += http.c
+sheep_SOURCES += http.c kvs.c
endif
if BUILD_COROSYNC
sheep_SOURCES += cluster/corosync.c
@@ -51,7 +51,7 @@ sheep_LDADD = ../lib/libsheepdog.a -lpthread -lm\
sheep_DEPENDENCIES = ../lib/libsheepdog.a
-noinst_HEADERS = sheep_priv.h cluster.h http.h trace/trace.h
+noinst_HEADERS = sheep_priv.h cluster.h http.h kvs.h trace/trace.h
EXTRA_DIST =
diff --git a/sheep/kvs.c b/sheep/kvs.c
new file mode 100644
index 0000000..74230cd
--- /dev/null
+++ b/sheep/kvs.c
@@ -0,0 +1,557 @@
+/*
+ * Copyright (C) 2013 MORITA Kazutaka <morita.kazutaka at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/* This file implements backend kvs functions for object storage. */
+
+#include "sheep_priv.h"
+#include "kvs.h"
+
+#define FOR_EACH_VDI(nr, vdis) FOR_EACH_BIT(nr, vdis, SD_NR_VDIS)
+
+static int lookup_bucket(struct http_request *req, const char *bucket,
+ uint32_t *vid)
+{
+ int ret;
+ struct vdi_info info = {};
+ struct vdi_iocb iocb = {
+ .name = bucket,
+ .data_len = strlen(bucket),
+ };
+
+ ret = vdi_lookup(&iocb, &info);
+ switch (ret) {
+ case SD_RES_SUCCESS:
+ *vid = info.vid;
+ break;
+ case SD_RES_NO_VDI:
+ sd_info("no such bucket %s", bucket);
+ http_response_header(req, NOT_FOUND);
+ return -1;
+ default:
+ sd_err("%s: bucket %s", sd_strerror(ret), bucket);
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Bucket operations */
+
+int kvs_create_bucket(struct http_request *req, const char *bucket)
+{
+ struct sd_req hdr;
+ int ret;
+ char buf[SD_MAX_VDI_LEN] = {0};
+
+ pstrcpy(buf, SD_MAX_VDI_LEN, bucket);
+
+ sd_init_req(&hdr, SD_OP_NEW_VDI);
+ hdr.flags = SD_FLAG_CMD_WRITE;
+ hdr.data_length = SD_MAX_VDI_LEN;
+
+ hdr.vdi.vdi_size = SD_MAX_VDI_SIZE;
+ hdr.vdi.copies = sys->cinfo.nr_copies;
+
+ ret = exec_local_req(&hdr, buf);
+ switch (ret) {
+ case SD_RES_SUCCESS:
+ http_response_header(req, CREATED);
+ break;
+ case SD_RES_VDI_EXIST:
+ http_response_header(req, ACCEPTED);
+ break;
+ default:
+ sd_err("%s: bucket %s", sd_strerror(ret), bucket);
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+
+ return 0;
+}
+
+int kvs_read_bucket(struct http_request *req, const char *bucket)
+{
+ /* TODO: read metadata of the bucket */
+ return -1;
+}
+
+int kvs_update_bucket(struct http_request *req, const char *bucket)
+{
+ /* TODO: update metadata of the bucket */
+ return -1;
+}
+
+/* TODO: return HTTP_CONFLICT when the bucket is not empty */
+int kvs_delete_bucket(struct http_request *req, const char *bucket)
+{
+ int ret;
+ struct sd_req hdr;
+ char data[SD_MAX_VDI_LEN] = {0};
+ uint32_t vid;
+
+ ret = lookup_bucket(req, bucket, &vid);
+ if (ret < 0)
+ return ret;
+
+ sd_init_req(&hdr, SD_OP_DELETE_CACHE);
+ hdr.obj.oid = vid_to_vdi_oid(vid);
+
+ ret = exec_local_req(&hdr, NULL);
+ if (ret != SD_RES_SUCCESS) {
+ sd_err("failed to execute request");
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+
+ sd_init_req(&hdr, SD_OP_DEL_VDI);
+ hdr.flags = SD_FLAG_CMD_WRITE;
+ hdr.data_length = sizeof(data);
+ pstrcpy(data, SD_MAX_VDI_LEN, bucket);
+
+ ret = exec_local_req(&hdr, data);
+ if (ret == SD_RES_SUCCESS) {
+ http_response_header(req, NO_CONTENT);
+ return 0;
+ } else {
+ sd_err("%s: bucket %s", sd_strerror(ret), bucket);
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+}
+
+int kvs_list_buckets(struct http_request *req,
+ void (*cb)(struct http_request *req, const char *bucket,
+ void *opaque),
+ void *opaque)
+{
+ char buf[SD_INODE_HEADER_SIZE];
+ struct sd_inode *inode = (struct sd_inode *)buf;
+ unsigned long nr;
+
+ http_response_header(req, OK);
+
+ FOR_EACH_VDI(nr, sys->vdi_inuse) {
+ uint64_t oid;
+ int ret;
+
+ oid = vid_to_vdi_oid(nr);
+
+ ret = read_object(oid, (char *)inode, SD_INODE_HEADER_SIZE, 0);
+ if (ret != SD_RES_SUCCESS) {
+ sd_err("Failed to read inode header");
+ continue;
+ }
+
+ if (inode->name[0] == '\0') /* this VDI has been deleted */
+ continue;
+
+ if (!vdi_is_snapshot(inode))
+ cb(req, inode->name, opaque);
+ }
+
+ return 0;
+}
+
+/* Object operations */
+
+/* 4 KB header of kvs object */
+struct kvs_object_hdr {
+ union {
+ struct {
+ char name[SD_MAX_OBJECT_NAME];
+ uint64_t size;
+ uint64_t ctime;
+ uint64_t mtime;
+
+ /* the index of the multi parted object */
+ uint64_t segment;
+
+ /* a hash value for etag */
+ uint8_t sha1[round_up(SHA1_DIGEST_SIZE, 8)];
+ };
+
+ uint8_t __pad[BLOCK_SIZE];
+ };
+};
+
+struct kvs_object {
+ struct kvs_object_hdr hdr;
+ uint8_t data[SD_DATA_OBJ_SIZE - sizeof(struct kvs_object_hdr)];
+};
+
+static int do_kvs_create_object(struct http_request *req, const char *obj_name,
+ struct kvs_object *obj, uint32_t vid,
+ uint32_t idx)
+{
+ uint64_t oid = vid_to_data_oid(vid, idx);
+ struct kvs_object_hdr hdr;
+ int ret;
+
+ ret = write_object(oid, (char *)obj, sizeof(obj->hdr) + obj->hdr.size,
+ 0, true);
+ if (ret != SD_RES_SUCCESS) {
+ sd_err("failed to create object, %" PRIx64, oid);
+ goto err;
+ }
+
+ /*
+ * XXX: SD_OP_CREATE_AND_WRITE_OBJ returns success even if the object
+ * alreay exists. We need to confirm that the stored object is actually
+ * what we wrote.
+ */
+ ret = read_object(oid, (char *)&hdr, sizeof(hdr), 0);
+ if (ret != SD_RES_SUCCESS) {
+ sd_err("failed to read object, %" PRIx64, oid);
+ goto err;
+ }
+
+ if (hdr.name[0] != '\0' && strcmp(hdr.name, obj->hdr.name) != 0) {
+ sd_debug("index %d is already used", idx);
+ return 0;
+ }
+
+ if (memcmp(&hdr, &obj->hdr, sizeof(hdr)) == 0) {
+ /* update inode object */
+ uint64_t offset = offsetof(struct sd_inode, data_vdi_id)
+ + idx * sizeof(vid);
+ ret = write_object(vid_to_vdi_oid(vid), (char *)&vid,
+ sizeof(vid), offset, false);
+ if (ret != SD_RES_SUCCESS) {
+ sd_err("failed to update inode, %" PRIx64,
+ vid_to_vdi_oid(vid));
+ goto err;
+ }
+ } else {
+ sd_info("object %s already exists", obj_name);
+
+ /* write again without a create option */
+ ret = write_object(oid, (char *)obj,
+ sizeof(obj->hdr) + obj->hdr.size, 0, false);
+ if (ret != SD_RES_SUCCESS) {
+ sd_err("failed to update object, %"PRIx64, oid);
+ goto err;
+ }
+ }
+
+ http_response_header(req, CREATED);
+ return 0;
+err:
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+}
+
+int kvs_create_object(struct http_request *req, const char *bucket,
+ const char *object)
+{
+ struct kvs_object *obj;
+ ssize_t size;
+ int ret;
+ uint64_t hval;
+ uint32_t vid;
+ struct timeval tv;
+
+ ret = lookup_bucket(req, bucket, &vid);
+ if (ret < 0)
+ return ret;
+
+ obj = xzalloc(sizeof(*obj));
+
+ gettimeofday(&tv, NULL);
+ pstrcpy(obj->hdr.name, sizeof(obj->hdr.name), object);
+ obj->hdr.ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+ obj->hdr.mtime = obj->hdr.ctime;
+
+ /* TODO: support multi parted object for large object */
+ size = http_request_read(req, obj->data, sizeof(obj->data));
+ if (size < 0) {
+ sd_err("%s: bucket %s, object %s", sd_strerror(ret),
+ bucket, object);
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+
+ obj->hdr.size = size;
+
+ hval = sd_hash(object, strlen(object));
+ for (int i = 0; i < MAX_DATA_OBJS; i++) {
+ uint32_t idx = (hval + i) % MAX_DATA_OBJS;
+
+ do_kvs_create_object(req, object, obj, vid, idx);
+ if (req->status != UNKNOWN) {
+ free(obj);
+ return 0;
+ }
+ }
+
+ free(obj);
+
+ /* no free space to create a object */
+ http_response_header(req, SERVICE_UNAVAILABLE);
+ return -1;
+}
+
+static int do_kvs_read_object(struct http_request *req, const char *obj_name,
+ struct kvs_object *obj, uint32_t vid,
+ uint32_t idx)
+{
+ uint64_t oid = vid_to_data_oid(vid, idx);
+ int ret;
+
+ ret = read_object(oid, (char *)obj, sizeof(*obj), 0);
+ switch (ret) {
+ case SD_RES_SUCCESS:
+ break;
+ case SD_RES_NO_OBJ:
+ sd_info("object %s doesn't exist", obj_name);
+ http_response_header(req, NOT_FOUND);
+ return -1;
+ default:
+ sd_err("failed to read %s, %s", req->uri, sd_strerror(ret));
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+
+ if (strcmp(obj->hdr.name, obj_name) == 0) {
+ http_response_header(req, OK);
+
+ /* TODO: support multi parted object for large object */
+ http_request_write(req, obj->data, obj->hdr.size);
+ }
+
+ return 0;
+}
+
+int kvs_read_object(struct http_request *req, const char *bucket,
+ const char *object)
+{
+ struct kvs_object *obj;
+ int ret;
+ uint64_t hval;
+ uint32_t vid;
+
+ ret = lookup_bucket(req, bucket, &vid);
+ if (ret < 0)
+ return ret;
+
+ obj = xzalloc(sizeof(*obj));
+
+ hval = sd_hash(object, strlen(object));
+ for (int i = 0; i < MAX_DATA_OBJS; i++) {
+ uint32_t idx = (hval + i) % MAX_DATA_OBJS;
+
+ do_kvs_read_object(req, object, obj, vid, idx);
+ if (req->status != UNKNOWN) {
+ free(obj);
+ return 0;
+ }
+ }
+
+ free(obj);
+
+ http_response_header(req, NOT_FOUND);
+ return -1;
+}
+
+static int do_kvs_update_object(struct http_request *req, const char *obj_name,
+ struct kvs_object *obj, uint32_t vid,
+ uint32_t idx, size_t size)
+{
+ uint64_t oid = vid_to_data_oid(vid, idx);
+ int ret;
+
+ ret = read_object(oid, (char *)&obj->hdr, sizeof(obj->hdr), 0);
+ switch (ret) {
+ case SD_RES_SUCCESS:
+ break;
+ case SD_RES_NO_VDI:
+ sd_info("object %s doesn't exist", obj_name);
+ http_response_header(req, NOT_FOUND);
+ return -1;
+ default:
+ sd_err("failed to read %s, %s", req->uri, sd_strerror(ret));
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+
+ if (strcmp(obj->hdr.name, obj_name) == 0) {
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+ obj->hdr.mtime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+ obj->hdr.size = size;
+
+ ret = write_object(oid, (char *)obj,
+ sizeof(obj->hdr) + obj->hdr.size, 0, false);
+ if (ret == SD_RES_SUCCESS)
+ http_response_header(req, ACCEPTED);
+ else {
+ sd_err("failed to update object, %" PRIx64, oid);
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int kvs_update_object(struct http_request *req, const char *bucket,
+ const char *object)
+{
+ struct kvs_object *obj;
+ int ret;
+ uint64_t hval;
+ uint32_t vid;
+ ssize_t size;
+
+ ret = lookup_bucket(req, bucket, &vid);
+ if (ret < 0)
+ return ret;
+
+ obj = xzalloc(sizeof(*obj));
+
+ /* TODO: support multi parted object for large object */
+ size = http_request_read(req, obj->data, sizeof(obj->data));
+ if (size < 0) {
+ sd_err("%s: bucket %s, object %s", sd_strerror(ret),
+ bucket, object);
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+
+ hval = sd_hash(object, strlen(object));
+ for (int i = 0; i < MAX_DATA_OBJS; i++) {
+ uint32_t idx = (hval + i) % MAX_DATA_OBJS;
+
+ do_kvs_update_object(req, object, obj, vid, idx, size);
+ if (req->status != UNKNOWN) {
+ free(obj);
+ return 0;
+ }
+ }
+
+ free(obj);
+
+ http_response_header(req, NOT_FOUND);
+ return -1;
+}
+
+static int do_kvs_delete_object(struct http_request *req, const char *obj_name,
+ uint32_t vid, uint32_t idx)
+{
+ uint64_t oid = vid_to_data_oid(vid, idx);
+ char name[SD_MAX_OBJECT_NAME];
+ int ret;
+
+ ret = read_object(oid, name, sizeof(name), 0);
+ switch (ret) {
+ case SD_RES_SUCCESS:
+ break;
+ case SD_RES_NO_OBJ:
+ sd_info("object %s doesn't exist", obj_name);
+ http_response_header(req, NOT_FOUND);
+ return -1;
+ default:
+ sd_err("failed to read %s, %s", req->uri, sd_strerror(ret));
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+
+ if (strcmp(name, obj_name) == 0) {
+ memset(name, 0, sizeof(name));
+ ret = write_object(oid, name, sizeof(name), 0, false);
+ if (ret == SD_RES_SUCCESS)
+ http_response_header(req, NO_CONTENT);
+ else {
+ sd_err("failed to update object, %" PRIx64,
+ oid);
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int kvs_delete_object(struct http_request *req, const char *bucket,
+ const char *object)
+{
+ int ret;
+ uint64_t hval;
+ uint32_t vid;
+
+ ret = lookup_bucket(req, bucket, &vid);
+ if (ret < 0)
+ return ret;
+
+ hval = sd_hash(object, strlen(object));
+ for (int i = 0; i < MAX_DATA_OBJS; i++) {
+ uint32_t idx = (hval + i) % MAX_DATA_OBJS;
+
+ do_kvs_delete_object(req, object, vid, idx);
+ if (req->status != UNKNOWN)
+ return 0;
+ }
+
+ http_response_header(req, NOT_FOUND);
+ return -1;
+}
+
+int kvs_list_objects(struct http_request *req, const char *bucket,
+ void (*cb)(struct http_request *req, const char *bucket,
+ const char *object, void *opaque),
+ void *opaque)
+{
+ int ret;
+ uint32_t vid;
+ struct sd_inode *inode;
+
+ ret = lookup_bucket(req, bucket, &vid);
+ if (ret < 0)
+ return ret;
+
+ inode = xzalloc(sizeof(*inode));
+ ret = read_object(vid_to_vdi_oid(vid), (char *)inode->data_vdi_id,
+ sizeof(inode->data_vdi_id),
+ offsetof(typeof(*inode), data_vdi_id));
+ if (ret != SD_RES_SUCCESS) {
+ sd_err("%s: bucket %s", sd_strerror(ret), bucket);
+ http_response_header(req, INTERNAL_SERVER_ERROR);
+ return -1;
+ }
+
+ http_response_header(req, OK);
+
+ for (uint32_t idx = 0; idx < MAX_DATA_OBJS; idx++) {
+ uint64_t oid;
+ char name[SD_MAX_OBJECT_NAME];
+
+ if (inode->data_vdi_id[idx] == 0)
+ continue;
+
+ oid = vid_to_data_oid(vid, idx);
+
+ ret = read_object(oid, name, sizeof(name), 0);
+ switch (ret) {
+ case SD_RES_SUCCESS:
+ if (name[0] != '\0')
+ cb(req, bucket, name, opaque);
+ break;
+ default:
+ sd_err("%s: bucket %s", sd_strerror(ret), bucket);
+ break;
+ }
+ }
+
+ free(inode);
+
+ return 0;
+}
diff --git a/sheep/kvs.h b/sheep/kvs.h
new file mode 100644
index 0000000..5388a58
--- /dev/null
+++ b/sheep/kvs.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2013 MORITA Kazutaka <morita.kazutaka at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __SD_KVS_H__
+#define __SD_KVS_H__
+
+#include "http.h"
+
+#define SD_MAX_BUCKET_NAME 1024
+#define SD_MAX_OBJECT_NAME 1024
+
+/* Bucket operations */
+int kvs_create_bucket(struct http_request *req, const char *bucket);
+int kvs_read_bucket(struct http_request *req, const char *bucket);
+int kvs_update_bucket(struct http_request *req, const char *bucket);
+int kvs_delete_bucket(struct http_request *req, const char *bucket);
+int kvs_list_buckets(struct http_request *req,
+ void (*cb)(struct http_request *req, const char *bucket,
+ void *opaque),
+ void *opaque);
+
+/* Object operations */
+int kvs_create_object(struct http_request *req, const char *bucket,
+ const char *object);
+int kvs_read_object(struct http_request *req, const char *bucket,
+ const char *object);
+int kvs_update_object(struct http_request *req, const char *bucket,
+ const char *object);
+int kvs_delete_object(struct http_request *req, const char *bucket,
+ const char *object);
+int kvs_list_objects(struct http_request *req, const char *bucket,
+ void (*cb)(struct http_request *req, const char *bucket,
+ const char *object, void *opaque),
+ void *opaque);
+
+#endif /* __SD_KVS_H__ */
--
1.8.1.2
More information about the sheepdog
mailing list