[Sheepdog] [PATCH 3/3] add vdi attributes support

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Tue Feb 15 18:28:42 CET 2011


It is useful to store metadata associated with virtual disks.  This
patch adds support for vdi attributes.

Usage:
  $ collie vdi getattr KEY                      # get value
  $ collie vdi setattr KEY [-x] VALUE           # set value
  $ collie vdi setattr KEY [-x] < VALUEFILE     # set value from stdin
  $ collie vdi setattr KEY -d                   # delete attribute

-x is an exclusive option; if you set the option and the key already
exists, the operation fails.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/collie.c               |  257 ++++++++++++++++++++++++++++++++++++++++-
 include/sheep.h               |    5 +-
 include/sheepdog_proto.h      |   20 +++-
 script/bash_completion_collie |   22 ++++-
 sheep/group.c                 |   14 ++-
 sheep/sdnet.c                 |    2 +
 sheep/sheep_priv.h            |    3 +
 sheep/store.c                 |    7 +-
 sheep/vdi.c                   |   58 +++++++++
 9 files changed, 377 insertions(+), 11 deletions(-)

diff --git a/collie/collie.c b/collie/collie.c
index 5ed6bcd..bb0be69 100644
--- a/collie/collie.c
+++ b/collie/collie.c
@@ -56,7 +56,7 @@ static void usage(int status)
 Command syntax:\n\
   cluster (info|format|shutdown)\n\
   node (info|list)\n\
-  vdi (list|tree|graph|delete|object)\n\
+  vdi (list|tree|graph|delete|object|setattr|getattr)\n\
 \n\
 Common parameters:\n\
   -a, --address           specify the daemon address (default: localhost)\n\
@@ -167,6 +167,8 @@ struct vdi_cmd_data {
 	unsigned int index;
 	int snapshot_id;
 	char snapshot_tag[SD_MAX_VDI_TAG_LEN];
+	int exclusive;
+	int delete;
 } vdi_cmd_data = { ~0, };
 
 static int cluster_format(int argc, char **argv)
@@ -817,12 +819,255 @@ static int vdi_object(int argc, char **argv)
 	return 0;
 }
 
+static int find_vdi_attr_oid(char *vdiname, char *tag, uint32_t snapid,
+			     char *key, uint32_t *vid, uint64_t *oid,
+			     unsigned int *nr_copies, int creat, int excl)
+{
+	struct sd_vdi_req hdr;
+	struct sd_vdi_rsp *rsp = (struct sd_vdi_rsp *)&hdr;
+	int fd, ret;
+	unsigned int wlen, rlen;
+	char buf[SD_ATTR_HEADER_SIZE];
+
+	memset(buf, 0, sizeof(buf));
+	strncpy(buf, vdiname, SD_MAX_VDI_LEN);
+	strncpy(buf + SD_MAX_VDI_LEN, vdi_cmd_data.snapshot_tag,
+		SD_MAX_VDI_TAG_LEN);
+	memcpy(buf + SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN,
+	       &vdi_cmd_data.snapshot_id, sizeof(uint32_t));
+	strncpy(buf + SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN + sizeof(uint32_t),
+		key, SD_MAX_VDI_ATTR_KEY_LEN);
+
+	fd = connect_to(sdhost, sdport);
+	if (fd < 0) {
+		fprintf(stderr, "failed to connect\n\n");
+		return SD_RES_EIO;
+	}
+
+	memset(&hdr, 0, sizeof(hdr));
+	hdr.opcode = SD_OP_GET_VDI_ATTR;
+	wlen = SD_ATTR_HEADER_SIZE;
+	rlen = 0;
+	hdr.proto_ver = SD_PROTO_VER;
+	hdr.data_length = wlen;
+	hdr.snapid = vdi_cmd_data.snapshot_id;
+	hdr.flags = SD_FLAG_CMD_WRITE;
+	if (creat)
+		hdr.flags |= SD_FLAG_CMD_CREAT;
+	if (excl)
+		hdr.flags |= SD_FLAG_CMD_EXCL;
+
+	ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
+	if (ret) {
+		ret = SD_RES_EIO;
+		goto out;
+	}
+
+	if (rsp->result != SD_RES_SUCCESS) {
+		ret = rsp->result;
+		goto out;
+	}
+
+	*vid = rsp->vdi_id;
+	*oid = vid_to_attr_oid(rsp->vdi_id, rsp->attr_id);
+	*nr_copies = rsp->copies;
+
+	ret = SD_RES_SUCCESS;
+out:
+	close(fd);
+	return ret;
+}
+
+static int vdi_setattr(int argc, char **argv)
+{
+	struct sd_obj_req hdr;
+	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
+	char name[128];
+	int i = 0, n, fd, ret;
+	uint64_t oid, attr_oid = 0;
+	uint32_t vid = 0, nr_copies = 0;
+	char *vdiname = argv[optind++], *key, *value;
+	unsigned int wlen = 0, rlen = 0;
+	uint64_t offset;
+
+	key = argv[optind++];
+	if (!key) {
+		fprintf(stderr, "please specify the name of key\n");
+		return 1;
+	}
+
+	value = argv[optind++];
+	if (!value) {
+		value = malloc(SD_MAX_VDI_ATTR_VALUE_LEN);
+		if (!value) {
+			fprintf(stderr, "failed to allocate memory\n");
+			return 1;
+		}
+
+		offset = 0;
+reread:
+		ret = read(STDIN_FILENO, value + offset,
+			   SD_MAX_VDI_ATTR_VALUE_LEN - offset);
+		if (ret < 0) {
+			fprintf(stderr, "failed to read from stdin, %m\n");
+			return 1;
+		}
+		if (ret > 0) {
+			offset += ret;
+			goto reread;
+		}
+	}
+
+	ret = find_vdi_attr_oid(vdiname, vdi_cmd_data.snapshot_tag,
+				vdi_cmd_data.snapshot_id, key, &vid, &attr_oid,
+				&nr_copies, !vdi_cmd_data.delete,
+				vdi_cmd_data.exclusive);
+	if (ret) {
+		if (ret == SD_RES_VDI_EXIST) {
+			fprintf(stderr, "the attribute already exists, %s\n", key);
+		} else if (ret == SD_RES_NO_OBJ) {
+			fprintf(stderr, "no such attribute, %s\n", key);
+		} else
+			fprintf(stderr, "failed to find attr oid, %s\n",
+				sd_strerror(ret));
+		return 1;
+	}
+
+	oid = attr_oid;
+	for (i = 0; i < nr_copies; i++) {
+		rlen = 0;
+		if (vdi_cmd_data.delete)
+			wlen = 1;
+		else
+			wlen = strlen(value);
+
+		n = obj_to_sheep(node_list_entries, nr_nodes, oid, i);
+
+		addr_to_str(name, sizeof(name), node_list_entries[n].addr, 0);
+
+		fd = connect_to(name, node_list_entries[n].port);
+		if (fd < 0) {
+			printf("%s(%d): %s, %m\n", __func__, __LINE__,
+			       name);
+			break;
+		}
+
+		memset(&hdr, 0, sizeof(hdr));
+		hdr.epoch = node_list_version;
+		hdr.opcode = SD_OP_WRITE_OBJ;
+		hdr.oid = oid;
+
+		hdr.data_length = wlen;
+		if (vdi_cmd_data.delete) {
+			hdr.flags =  SD_FLAG_CMD_DIRECT | SD_FLAG_CMD_WRITE;
+			hdr.offset = offsetof(struct sheepdog_inode, name);
+			value = (char *)"";
+		} else {
+			hdr.flags =  SD_FLAG_CMD_DIRECT | SD_FLAG_CMD_WRITE |
+				SD_FLAG_CMD_TRUNCATE;
+			hdr.offset = SD_ATTR_HEADER_SIZE;
+		}
+
+		ret = exec_req(fd, (struct sd_req *)&hdr, value, &wlen, &rlen);
+		close(fd);
+
+		if (ret) {
+			fprintf(stderr, "failed to set attribute\n");
+			return 1;
+		}
+		if (rsp->result != SD_RES_SUCCESS) {
+			fprintf(stderr, "failed to set attribute, %s\n",
+				sd_strerror(rsp->result));
+			return 1;
+		}
+	}
+
+	return 0;
+}
+
+static int vdi_getattr(int argc, char **argv)
+{
+	struct sd_obj_req hdr;
+	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
+	char name[128];
+	int i = 0, n, fd, ret;
+	uint64_t oid, attr_oid = 0;
+	uint32_t vid = 0, nr_copies = 0;
+	char *vdiname = argv[optind++], *key, *value;
+	unsigned int wlen = 0, rlen = 0;
+
+	key = argv[optind++];
+	if (!key) {
+		fprintf(stderr, "please specify the name of key\n");
+		return 1;
+	}
+
+	ret = find_vdi_attr_oid(vdiname, vdi_cmd_data.snapshot_tag,
+				vdi_cmd_data.snapshot_id, key, &vid, &attr_oid,
+				&nr_copies, 0, 0);
+	if (ret == SD_RES_NO_OBJ) {
+		fprintf(stderr, "no such attribute, %s\n", key);
+		return 1;
+	} else if (ret) {
+		fprintf(stderr, "failed to find attr oid, %s\n",
+			sd_strerror(ret));
+		return 1;
+	}
+
+	oid = attr_oid;
+	value = malloc(SD_MAX_VDI_ATTR_VALUE_LEN);
+	if (!value) {
+		fprintf(stderr, "failed to allocate memory\n");
+		return 1;
+	}
+	for (i = 0; i < nr_copies; i++) {
+		rlen = SD_MAX_VDI_ATTR_VALUE_LEN;
+		wlen = 0;
+
+		n = obj_to_sheep(node_list_entries, nr_nodes, oid, i);
+
+		addr_to_str(name, sizeof(name), node_list_entries[n].addr, 0);
+
+		fd = connect_to(name, node_list_entries[n].port);
+		if (fd < 0) {
+			printf("%s(%d): %s, %m\n", __func__, __LINE__,
+			       name);
+			goto out;
+		}
+
+		memset(&hdr, 0, sizeof(hdr));
+		hdr.epoch = node_list_version;
+		hdr.opcode = SD_OP_READ_OBJ;
+		hdr.oid = oid;
+
+		hdr.data_length = rlen;
+		hdr.flags =  SD_FLAG_CMD_DIRECT;
+		hdr.offset = SD_ATTR_HEADER_SIZE;
+
+		ret = exec_req(fd, (struct sd_req *)&hdr, value, &wlen, &rlen);
+		close(fd);
+
+		if (!ret) {
+			if (rsp->result == SD_RES_SUCCESS) {
+				printf("%s", value);
+				free(value);
+				return 0;
+			}
+		}
+	}
+out:
+	free(value);
+	return 1;
+}
+
 static struct subcommand vdi_cmd[] = {
 	{"delete", SUBCMD_FLAG_NEED_NOEDLIST|SUBCMD_FLAG_NEED_THIRD_ARG, vdi_delete},
 	{"list", SUBCMD_FLAG_NEED_NOEDLIST, vdi_list},
 	{"tree", SUBCMD_FLAG_NEED_NOEDLIST, vdi_tree},
 	{"graph", SUBCMD_FLAG_NEED_NOEDLIST, vdi_graph},
 	{"object", SUBCMD_FLAG_NEED_NOEDLIST|SUBCMD_FLAG_NEED_THIRD_ARG, vdi_object},
+	{"setattr", SUBCMD_FLAG_NEED_NOEDLIST|SUBCMD_FLAG_NEED_THIRD_ARG, vdi_setattr},
+	{"getattr", SUBCMD_FLAG_NEED_NOEDLIST|SUBCMD_FLAG_NEED_THIRD_ARG, vdi_getattr},
 	{NULL,},
 };
 
@@ -831,6 +1076,8 @@ static struct option vdi_long_options[] =
 	COMMON_LONG_OPTIONS
 	{"index", required_argument, NULL, 'i'},
 	{"snapshot", required_argument, NULL, 's'},
+	{"exclusive", no_argument, NULL, 'x'},
+	{"delete", no_argument, NULL, 'd'},
 	{NULL, 0, NULL, 0},
 };
 
@@ -846,6 +1093,12 @@ static int vdi_parser(int ch, char *opt)
 			strncpy(vdi_cmd_data.snapshot_tag, opt,
 				sizeof(vdi_cmd_data.snapshot_tag));
 		break;
+	case 'x':
+		vdi_cmd_data.exclusive = 1;
+		break;
+	case 'd':
+		vdi_cmd_data.delete = 1;
+		break;
 	}
 
 	return 0;
@@ -954,7 +1207,7 @@ static struct {
 } commands[] = {
 	{"vdi", vdi_cmd,
 	 vdi_long_options,
-	 COMMON_SHORT_OPTIONS "i:s:",
+	 COMMON_SHORT_OPTIONS "i:s:xd",
 	 vdi_parser,},
 	{"node", node_cmd,},
 	{"cluster", cluster_cmd,
diff --git a/include/sheep.h b/include/sheep.h
index 551a9ce..5b2257f 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -16,7 +16,7 @@
 #include "list.h"
 #include "net.h"
 
-#define SD_SHEEP_PROTO_VER 0x01
+#define SD_SHEEP_PROTO_VER 0x02
 
 #define SD_MAX_NODES 1024
 #define SD_MAX_VMS   4096 /* FIXME: should be removed */
@@ -30,9 +30,12 @@
 #define SD_OP_STAT_SHEEP     0x86
 #define SD_OP_STAT_CLUSTER   0x87
 #define SD_OP_KILL_NODE      0x88
+#define SD_OP_GET_VDI_ATTR   0x89
 
 #define SD_FLAG_CMD_DIRECT   0x10
 #define SD_FLAG_CMD_RECOVERY 0x20
+#define SD_FLAG_CMD_CREAT    0x40
+#define SD_FLAG_CMD_EXCL     0x80
 
 #define SD_RES_OLD_NODE_VER  0x41 /* Remote node has an old epoch */
 #define SD_RES_NEW_NODE_VER  0x42 /* Remote node has a new epoch */
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 5c37618..d43988f 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -31,6 +31,7 @@
 
 #define SD_FLAG_CMD_WRITE    0x01
 #define SD_FLAG_CMD_COW      0x02
+#define SD_FLAG_CMD_TRUNCATE 0x04
 
 #define SD_RES_SUCCESS       0x00 /* Success */
 #define SD_RES_UNKNOWN       0x01 /* Unknown error */
@@ -71,16 +72,21 @@
 #define VDI_SPACE_SHIFT   32
 #define VDI_BIT (UINT64_C(1) << 63)
 #define VMSTATE_BIT (UINT64_C(1) << 62)
+#define VDI_ATTR_BIT (UINT64_C(1) << 61)
 #define MAX_DATA_OBJS (1ULL << 20)
-#define MAX_CHILDREN 1024
-#define SD_MAX_VDI_LEN 256
-#define SD_MAX_VDI_TAG_LEN 256
+#define MAX_CHILDREN 1024U
+#define SD_MAX_VDI_LEN 256U
+#define SD_MAX_VDI_TAG_LEN 256U
+#define SD_MAX_VDI_ATTR_KEY_LEN 256U
+#define SD_MAX_VDI_ATTR_VALUE_LEN (UINT64_C(1) << 22)
 #define SD_NR_VDIS   (1U << 24)
 #define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
 
 #define SD_INODE_SIZE (sizeof(struct sheepdog_inode))
 #define SD_INODE_HEADER_SIZE (sizeof(struct sheepdog_inode) - \
 			      sizeof(uint32_t) * MAX_DATA_OBJS)
+#define SD_ATTR_HEADER_SIZE (SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN + \
+			     sizeof(uint32_t) + SD_MAX_VDI_ATTR_KEY_LEN)
 #define CURRENT_VDI_ID 0
 
 struct sd_req {
@@ -154,8 +160,9 @@ struct sd_vdi_rsp {
 	uint32_t        result;
 	uint32_t        rsvd;
 	uint32_t        vdi_id;
+	uint32_t        attr_id;
 	uint32_t        copies;
-	uint32_t        pad[4];
+	uint32_t        pad[3];
 };
 
 struct sheepdog_inode {
@@ -226,4 +233,9 @@ static inline uint32_t oid_to_vid(uint64_t oid)
 	return (~VDI_BIT & oid) >> VDI_SPACE_SHIFT;
 }
 
+static inline uint64_t vid_to_attr_oid(uint32_t vid, uint32_t attrid)
+{
+	return ((uint64_t)vid << VDI_SPACE_SHIFT) | VDI_ATTR_BIT | attrid;
+}
+
 #endif
diff --git a/script/bash_completion_collie b/script/bash_completion_collie
index d934ed0..8a8c0cc 100644
--- a/script/bash_completion_collie
+++ b/script/bash_completion_collie
@@ -55,6 +55,21 @@ _collie_vdi_object()
     esac
 }
 
+_collie_vdi_setattr()
+{
+    local cur
+    cur="${COMP_WORDS[COMP_CWORD]}"
+
+    case "$cur" in
+        -*)
+            COMPREPLY=(${COMPREPLY[@]} \
+                $( compgen \
+                -W "-d --delete -x --exclusive" \
+                -- ${cur} ))
+            ;;
+    esac
+}
+
 _collie_cluster()
 {
     local opts
@@ -103,7 +118,7 @@ _collie_node()
 _collie_vdi()
 {
     local opts
-    opts="list tree graph delete object"
+    opts="list tree graph delete object setattr getattr"
 
     case "$1" in
         list)
@@ -118,6 +133,11 @@ _collie_vdi()
         object)
             _collie_vdi_object
             ;;
+        setattr)
+            _collie_vdi_setattr
+            ;;
+        getattr)
+            ;;
         "")
             COMPREPLY=($( compgen \
                 -W "${opts}" \
diff --git a/sheep/group.c b/sheep/group.c
index 1731a97..7051698 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -632,7 +632,7 @@ static void vdi_op(struct vdi_op_message *msg)
 	struct sd_vdi_rsp *rsp = &msg->rsp;
 	void *data = msg->data;
 	int ret = SD_RES_SUCCESS;
-	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
+	uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
 
 	switch (hdr->opcode) {
 	case SD_OP_NEW_VDI:
@@ -655,6 +655,16 @@ static void vdi_op(struct vdi_op_message *msg)
 		if (ret != SD_RES_SUCCESS)
 			break;
 		break;
+	case SD_OP_GET_VDI_ATTR:
+		ret = lookup_vdi(hdr->epoch, data,
+				 min(SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, hdr->data_length),
+				 &vid, hdr->snapid, &nr_copies);
+		if (ret != SD_RES_SUCCESS)
+			break;
+		ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
+				   &attrid, hdr->flags & SD_FLAG_CMD_CREAT,
+				   hdr->flags & SD_FLAG_CMD_EXCL);
+		break;
 	case SD_OP_RELEASE_VDI:
 		break;
 	case SD_OP_MAKE_FS:
@@ -669,6 +679,7 @@ static void vdi_op(struct vdi_op_message *msg)
 	}
 
 	rsp->vdi_id = vid;
+	rsp->attr_id = attrid;
 	rsp->copies = nr_copies;
 	rsp->result = ret;
 }
@@ -700,6 +711,7 @@ static void vdi_op_done(struct vdi_op_message *msg)
 	case SD_OP_LOCK_VDI:
 	case SD_OP_RELEASE_VDI:
 	case SD_OP_GET_VDI_INFO:
+	case SD_OP_GET_VDI_ATTR:
 		break;
 	case SD_OP_MAKE_FS:
 		sys->nr_sobjs = ((struct sd_so_req *)hdr)->copies;
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 3b1a554..c7be51c 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -106,6 +106,7 @@ static void __done(struct work *work, int idx)
 	case SD_OP_GET_VDI_INFO:
 	case SD_OP_MAKE_FS:
 	case SD_OP_SHUTDOWN:
+	case SD_OP_GET_VDI_ATTR:
 		/* request is forwarded to cpg group */
 		return;
 	}
@@ -232,6 +233,7 @@ static void queue_request(struct request *req)
 	case SD_OP_MAKE_FS:
 	case SD_OP_SHUTDOWN:
 	case SD_OP_STAT_CLUSTER:
+	case SD_OP_GET_VDI_ATTR:
 		req->work.fn = cluster_queue_request;
 		break;
 	case SD_OP_READ_VDIS:
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 00e0c34..7f24dc9 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -154,6 +154,9 @@ int lookup_vdi(uint32_t epoch, char *data, int data_len, uint32_t *vid,
 
 int read_vdis(char *data, int len, unsigned int *rsp_len);
 
+int get_vdi_attr(uint32_t epoch, char *data, int data_len, uint32_t vid,
+		 uint32_t *attrid, int creat, int excl);
+
 int setup_ordered_sd_node_list(struct request *req);
 int get_ordered_sd_node_list(struct sheepdog_node_list_entry *entries);
 int is_access_to_busy_objects(uint64_t oid);
diff --git a/sheep/store.c b/sheep/store.c
index 7ff134d..bcac0b4 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -606,10 +606,10 @@ int read_object_local(uint64_t oid, char *data, unsigned int datalen,
 	ret = store_queue_request_local(&req, epoch);
 
 	if (ret != 0)
-		return -SD_RES_EIO;
+		return -ret;
 
 	if (rsp->data_length != datalen)
-		return -rsp->result;
+		return -SD_RES_EIO;
 
 	return rsp->data_length;
 }
@@ -741,6 +741,9 @@ static int store_queue_request_local(struct request *req, uint32_t epoch)
 		break;
 	case SD_OP_WRITE_OBJ:
 	case SD_OP_CREATE_AND_WRITE_OBJ:
+		if (hdr->flags & SD_FLAG_CMD_TRUNCATE)
+			ftruncate(fd, hdr->offset + hdr->data_length);
+
 		if (!is_data_obj(oid)) {
 			jd.jdf_epoch = epoch;
 			jd.jdf_oid = oid;
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 7cc0114..114576f 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -535,3 +535,61 @@ int start_deletion(uint32_t vid, uint32_t epoch)
 
 	return SD_RES_SUCCESS;
 }
+
+int get_vdi_attr(uint32_t epoch, char *data, int data_len, uint32_t vid,
+		 uint32_t *attrid, int creat, int excl)
+{
+	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+	char attr_buf[SD_ATTR_HEADER_SIZE], inode_buf[SD_INODE_HEADER_SIZE];
+	uint64_t oid;
+	uint32_t end;
+	int ret, nr_nodes, copies;
+
+	if (data_len != SD_ATTR_HEADER_SIZE)
+		return SD_RES_INVALID_PARMS;
+
+	nr_nodes = get_ordered_sd_node_list(entries);
+
+	ret = read_object(entries, nr_nodes, epoch, vid_to_vdi_oid(vid),
+			  inode_buf, sizeof(inode_buf), 0, sys->nr_sobjs);
+	if (ret != SD_INODE_HEADER_SIZE) {
+		eprintf("failed to read vdi object, %"PRIx32"\n", vid);
+		return -ret;
+	}
+
+	copies = ((struct sheepdog_inode *)inode_buf)->nr_copies;
+
+	*attrid = fnv_64a_buf(data, data_len, FNV1A_64_INIT);
+	*attrid &= (UINT64_C(1) << VDI_SPACE_SHIFT) - 1;
+
+	end = *attrid - 1;
+	while (*attrid != end) {
+		oid = vid_to_attr_oid(vid, *attrid);
+		ret = read_object(entries, nr_nodes, epoch, oid, attr_buf,
+				  sizeof(attr_buf), 0, copies);
+
+		if (ret == -SD_RES_NO_OBJ && creat) {
+			ret = write_object(entries, nr_nodes, epoch, oid, data,
+					   data_len, 0, copies, 1);
+			if (ret)
+				return SD_RES_EIO;
+
+			return SD_RES_SUCCESS;
+		}
+
+		if (ret < 0)
+			return -ret;
+
+		if (memcmp(attr_buf, data, sizeof(attr_buf)) == 0) {
+			if (excl)
+				return SD_RES_VDI_EXIST;
+			else
+				return SD_RES_SUCCESS;
+		}
+
+		(*attrid)++;
+	}
+
+	dprintf("there is no space for new vdis\n");
+	return SD_RES_FULL_VDI;
+}
-- 
1.5.6.5




More information about the sheepdog mailing list