[sheepdog] [PATCH v2] sheep/http: fix error in bucket_delete

Bingpeng Zhu nkuzbp at foxmail.com
Wed Aug 6 14:26:46 CEST 2014


In current implementation, When we create a bucket, we decide
the bnode's location in account VDI using sd_hash(bucket_name)
as key. We handle hash conflict by linear probing hash table.
Here is the bug:
When we delete a bucket, we can't discard its bnode. Because
bnode_lookup() need it to find if some bucket exists or not
by checking adjacent bnodes. Therefore, we just zero its
bnode.name when client want to delete a bucket. When we create
a bucket later, we can reuse the zero bnode if they hash to
the same location in account VDI. So, we need to change the
implementation of bnode lookup and bnode create function, too.

Signed-off-by: Bingpeng Zhu <bingpeng.zbp at alibaba-inc.com>
---
 sheep/http/kv.c |   74 ++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 57 insertions(+), 17 deletions(-)

diff --git a/sheep/http/kv.c b/sheep/http/kv.c
index 5c4b386..246d4b5 100644
--- a/sheep/http/kv.c
+++ b/sheep/http/kv.c
@@ -247,18 +247,21 @@ int kv_delete_account(struct http_request *req, const char *account)
  */
 
 static int bnode_do_create(struct kv_bnode *bnode, struct sd_inode *inode,
-			   uint32_t idx)
+			   uint32_t idx, bool create)
 {
 	uint32_t vid = inode->vdi_id;
 	uint64_t oid = vid_to_data_oid(vid, idx);
 	int ret;
 
 	bnode->oid = oid;
-	ret = sd_write_object(oid, (char *)bnode, sizeof(*bnode), 0, true);
+	ret = sd_write_object(oid, (char *)bnode, sizeof(*bnode), 0, create);
 	if (ret != SD_RES_SUCCESS) {
 		sd_err("failed to create object, %" PRIx64, oid);
 		goto out;
 	}
+	if (!create)
+		goto out;
+
 	sd_inode_set_vid(inode, idx, vid);
 	ret = sd_inode_write_vid(inode, idx, vid, vid, 0, false, false);
 	if (ret != SD_RES_SUCCESS) {
@@ -276,6 +279,7 @@ static int bnode_create(struct kv_bnode *bnode, uint32_t account_vid)
 	uint32_t tmp_vid, idx;
 	uint64_t hval, i;
 	int ret;
+	bool create = true;
 
 	ret = sd_read_object(vid_to_vdi_oid(account_vid), (char *)inode,
 			     sizeof(*inode), 0);
@@ -289,16 +293,27 @@ static int bnode_create(struct kv_bnode *bnode, uint32_t account_vid)
 	for (i = 0; i < MAX_DATA_OBJS; i++) {
 		idx = (hval + i) % MAX_DATA_OBJS;
 		tmp_vid = sd_inode_get_vid(inode, idx);
-		if (tmp_vid)
-			continue;
-		else
+		if (tmp_vid) {
+			uint64_t oid = vid_to_data_oid(account_vid, idx);
+			char name[SD_MAX_BUCKET_NAME] = { };
+
+			ret = sd_read_object(oid, name, sizeof(name), 0);
+			if (ret != SD_RES_SUCCESS)
+				goto out;
+			if (name[0] == 0) {
+				create = false;
+				goto create;
+			}
+		} else
 			break;
 	}
 	if (i == MAX_DATA_OBJS) {
 		ret = SD_RES_NO_SPACE;
 		goto out;
 	}
-	ret = bnode_do_create(bnode, inode, idx);
+
+create:
+	ret = bnode_do_create(bnode, inode, idx, create);
 out:
 	free(inode);
 	return ret;
@@ -349,24 +364,43 @@ err:
 
 static int bnode_lookup(struct kv_bnode *bnode, uint32_t vid, const char *name)
 {
+	struct sd_inode *inode = xmalloc(sizeof(struct sd_inode));
+	uint32_t tmp_vid, idx;
 	uint64_t hval, i;
 	int ret;
 
+	ret = sd_read_object(vid_to_vdi_oid(vid), (char *)inode,
+			     sizeof(*inode), 0);
+	if (ret != SD_RES_SUCCESS) {
+		sd_err("failed to read %" PRIx32 " %s", vid,
+		       sd_strerror(ret));
+		goto out;
+	}
+
 	hval = sd_hash(name, strlen(name));
 	for (i = 0; i < MAX_DATA_OBJS; i++) {
-		uint32_t idx = (hval + i) % MAX_DATA_OBJS;
-		uint64_t oid = vid_to_data_oid(vid, idx);
-
-		ret = sd_read_object(oid, (char *)bnode, sizeof(*bnode), 0);
-		if (ret != SD_RES_SUCCESS)
-			goto out;
-		if (strcmp(bnode->name, name) == 0)
+		idx = (hval + i) % MAX_DATA_OBJS;
+		tmp_vid = sd_inode_get_vid(inode, idx);
+		if (tmp_vid) {
+			uint64_t oid = vid_to_data_oid(vid, idx);
+			ret = sd_read_object(oid, (char *)bnode,
+							sizeof(*bnode), 0);
+			if (ret != SD_RES_SUCCESS)
+				goto out;
+			if (strcmp(bnode->name, name) == 0)
+				break;
+		} else {
+			ret = SD_RES_NO_OBJ;
 			break;
+		}
 	}
-
-	if (i == MAX_DATA_OBJS)
+	if (i == MAX_DATA_OBJS) {
 		ret = SD_RES_NO_OBJ;
+		goto out;
+	}
+
 out:
+	free(inode);
 	return ret;
 }
 
@@ -423,6 +457,7 @@ static int bucket_delete(const char *account, uint32_t avid, const char *bucket)
 	struct kv_bnode bnode;
 	char onode_name[SD_MAX_VDI_LEN];
 	char alloc_name[SD_MAX_VDI_LEN];
+	char name[SD_MAX_BUCKET_NAME] = {};
 	int ret;
 
 	snprintf(onode_name, SD_MAX_VDI_LEN, "%s/%s", account, bucket);
@@ -436,9 +471,14 @@ static int bucket_delete(const char *account, uint32_t avid, const char *bucket)
 	if (bnode.object_count > 0)
 		return SD_RES_VDI_NOT_EMPTY;
 
-	ret = sd_discard_object(bnode.oid);
+	/*
+	 * We can't discard bnode because bnode_lookup() need it to find
+	 * if some bucket exists or not by checking adjacent bnodes.
+	 * So we just zero bnode.name to indicate a deleted bucket.
+	 */
+	ret = sd_write_object(bnode.oid, name, sizeof(name), 0, false);
 	if (ret != SD_RES_SUCCESS) {
-		sd_err("failed to discard bnode for %s", bucket);
+		sd_err("failed to zero bnode for %s", bucket);
 		return ret;
 	}
 	sd_delete_vdi(onode_name);
-- 
1.7.1




More information about the sheepdog mailing list