[sheepdog] [PATCH v3 6/8] object cache: reclaim cached objects when cache reaches the max size

levin li levin108 at gmail.com
Thu Jul 26 09:17:05 CEST 2012


From: levin li <xingke.lwp at taobao.com>

This patch do reclaiming work when the total size of cached objects
reaches the max size specified by user, I did it in the following way:

1. check the object tree for the object entry to determine whether the
   cache entry is exist and whether it's reclaiming, if it's reclaiming
   we make sheep ingore the cache.
2. In object_cache_rw() we search the cache entry, after passed the sanity
   check, we increment its refcnt to tell the reclaiming worker that this
   entry is being referenced, we should not reclaim it now.
3. In add_to_object_cache(), when the cached size reaches the max size,
   we start a reclaiming thread, only one such thread can be running at
   one time.
4. In reclaim_work(), we reclaim cached objects until the cache size reduced
   to 80% of the max size.
5. In reclaim_object(), we start to reclaim an object, before this, we check
   that if the cache is flushing, we don't reclaim it, and if the refcnt of
   the object is not zero, we also don't reclaim it.
   If the cached object is dirty, we flush it by push_cache_object(), and
   then try to remove the object.

Signed-off-by: levin li <xingke.lwp at taobao.com>
---
 include/list.h           |   10 +
 include/sheepdog_proto.h |    1 +
 sheep/object_cache.c     |  766 ++++++++++++++++++++++++++++++++--------------
 sheep/sheep.c            |    3 +-
 sheep/sheep_priv.h       |    1 +
 sheep/store.c            |    8 +
 6 files changed, 558 insertions(+), 231 deletions(-)

diff --git a/include/list.h b/include/list.h
index 30ee3c4..e1d645d 100644
--- a/include/list.h
+++ b/include/list.h
@@ -259,3 +259,13 @@ static inline void hlist_add_after(struct hlist_node *n,
              pos = n)
 
 #endif
+
+
+#define list_for_each_entry_revert_safe_rcu(pos, n, head, member)          \
+	for (pos = cds_list_entry(rcu_dereference((head)->prev),           \
+				  typeof(*pos), member),                   \
+	     n = cds_list_entry(rcu_dereference(pos->member.prev),         \
+				typeof(*pos), member);                     \
+				&pos->member != (head);                    \
+	     pos = n, n = cds_list_entry(rcu_dereference(pos->member.prev),\
+					 typeof(*pos), member))
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 45a4b81..05597fb 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -68,6 +68,7 @@
 #define SD_RES_CLUSTER_RECOVERING 0x22 /* Cluster is recovering. */
 #define SD_RES_OBJ_RECOVERING     0x23 /* Object is recovering */
 #define SD_RES_KILLED           0x24 /* Node is killed */
+#define SD_RES_NO_CACHE      0x25 /* No cache object found */
 
 /* errors above 0x80 are sheepdog-internal */
 
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index 19480c2..d9750a0 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -41,6 +41,11 @@
 #define CACHE_BLOCK_SIZE      ((UINT64_C(1) << 10) * 64) /* 64 KB */
 
 #define ENTRY_CREATE_BIT      (1)
+#define ENTRY_RECLAIM_BIT     (1 << 1)
+
+
+#define SD_RES_CACHE_FLUSHING    1
+#define SD_RES_CACHE_REFERENCING 2
 
 struct global_cache {
 	uint64_t cache_size;
@@ -50,6 +55,7 @@ struct global_cache {
 
 struct object_cache {
 	uint32_t vid;
+	uint8_t in_flush;
 	struct hlist_node hash;
 
 	struct list_head dirty_list;
@@ -85,6 +91,39 @@ static pthread_mutex_t hashtable_lock[HASH_SIZE] = {
 
 static struct hlist_head cache_hashtable[HASH_SIZE];
 
+static inline int cache_in_reclaim(int start)
+{
+	if (start)
+		return uatomic_cmpxchg(&sys_cache.reclaiming, 0, 1);
+	else
+		return uatomic_read(&sys_cache.reclaiming);
+}
+
+static inline int cache_is_flushing(struct object_cache *cache)
+{
+	return cache->in_flush;
+}
+
+static inline int entry_is_reclaiming(struct object_cache_entry *entry)
+{
+	int flags = uatomic_read(&entry->flags);
+	if (flags & ENTRY_RECLAIM_BIT)
+		return 1;
+	return 0;
+}
+
+static inline int entry_is_dirty(struct object_cache_entry *entry)
+{
+	return !!entry->bmap;
+}
+
+static inline void mark_entry_reclaiming(struct object_cache_entry *entry)
+{
+	int flags = uatomic_read(&entry->flags);
+	flags |= ENTRY_RECLAIM_BIT;
+	uatomic_set(&entry->flags, flags);
+}
+
 static inline int hash(uint64_t vid)
 {
 	return hash_64(vid, HASH_BITS);
@@ -164,6 +203,340 @@ static struct object_cache_entry *object_tree_search(struct rb_root *root,
 	return NULL;
 }
 
+static struct object_cache_entry *dirty_tree_search(struct rb_root *root,
+						    uint32_t idx)
+{
+	struct rb_node *n = root->rb_node;
+	struct object_cache_entry *t;
+
+	while (n) {
+		t = rb_entry(n, struct object_cache_entry, dirty_node);
+
+		if (idx < t->idx)
+			n = n->rb_left;
+		else if (idx > t->idx)
+			n = n->rb_right;
+		else
+			return t; /* found it */
+	}
+
+	return NULL;
+}
+
+static inline void
+del_from_dirty_tree_and_list(struct object_cache_entry *entry,
+			     struct rb_root *dirty_tree)
+{
+	rb_erase(&entry->dirty_node, dirty_tree);
+	list_del(&entry->list);
+}
+
+static inline void
+del_from_object_tree_and_list(struct object_cache_entry *entry,
+			      struct rb_root *object_tree)
+{
+	rb_erase(&entry->node, object_tree);
+	cds_list_del_rcu(&entry->lru_list);
+}
+
+static uint64_t idx_to_oid(uint32_t vid, uint32_t idx)
+{
+	if (idx_has_vdi_bit(idx))
+		return vid_to_vdi_oid(vid);
+	else
+		return vid_to_data_oid(vid, idx);
+}
+
+static int remove_cache_object(struct object_cache *oc, uint32_t idx)
+{
+	struct strbuf buf;
+	int ret = SD_RES_SUCCESS;
+
+	strbuf_init(&buf, PATH_MAX);
+	strbuf_addstr(&buf, cache_dir);
+	strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx);
+
+	dprintf("removing cache object %s\n", buf.buf);
+	if (unlink(buf.buf) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("failed to remove cached object %m\n");
+		goto out;
+	}
+out:
+	strbuf_release(&buf);
+
+	return ret;
+}
+
+static int write_cache_object(uint32_t vid, uint32_t idx, void *buf,
+			      size_t count, off_t offset)
+{
+	size_t size;
+	int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
+	struct strbuf p;
+
+	strbuf_init(&p, PATH_MAX);
+	strbuf_addstr(&p, cache_dir);
+	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
+
+	if (sys->use_directio && !idx_has_vdi_bit(idx))
+		flags |= O_DIRECT;
+
+	fd = open(p.buf, flags, def_fmode);
+	if (fd < 0) {
+		eprintf("%m\n");
+		ret = SD_RES_EIO;
+		goto out;
+	}
+
+	if (flock(fd, LOCK_EX) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("%m\n");
+		goto out_close;
+	}
+	size = xpwrite(fd, buf, count, offset);
+	if (flock(fd, LOCK_UN) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("%m\n");
+		goto out_close;
+	}
+
+	if (size != count) {
+		eprintf("size %zu, count:%zu, offset %jd %m\n",
+			size, count, (intmax_t)offset);
+		ret = SD_RES_EIO;
+	}
+out_close:
+	close(fd);
+out:
+	strbuf_release(&p);
+	return ret;
+}
+
+static int read_cache_object(uint32_t vid, uint32_t idx, void *buf,
+			     size_t count, off_t offset)
+{
+	size_t size;
+	int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
+	struct strbuf p;
+
+	strbuf_init(&p, PATH_MAX);
+	strbuf_addstr(&p, cache_dir);
+	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
+
+	if (sys->use_directio && !idx_has_vdi_bit(idx))
+		flags |= O_DIRECT;
+
+	fd = open(p.buf, flags, def_fmode);
+	if (fd < 0) {
+		eprintf("%m\n");
+		ret = SD_RES_EIO;
+		goto out;
+	}
+
+	if (flock(fd, LOCK_SH) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("%m\n");
+		goto out_close;
+	}
+	size = xpread(fd, buf, count, offset);
+	if (flock(fd, LOCK_UN) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("%m\n");
+		goto out_close;
+	}
+
+	if (size != count) {
+		eprintf("size %zu, count:%zu, offset %jd %m\n",
+			size, count, (intmax_t)offset);
+		ret = SD_RES_EIO;
+	}
+
+out_close:
+	close(fd);
+out:
+	strbuf_release(&p);
+	return ret;
+}
+
+static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap,
+			     int create)
+{
+	struct sd_req hdr;
+	void *buf;
+	off_t offset;
+	unsigned data_length;
+	int ret = SD_RES_NO_MEM;
+	uint64_t oid = idx_to_oid(vid, idx);
+	int first_bit, last_bit;
+
+	dprintf("%"PRIx64", create %d\n", oid, create);
+
+	if (!bmap) {
+		dprintf("WARN: nothing to flush\n");
+		return SD_RES_SUCCESS;
+	}
+
+	first_bit = ffsll(bmap) - 1;
+	last_bit = fls64(bmap) - 1;
+
+	dprintf("bmap:0x%"PRIx64", first_bit:%d, last_bit:%d\n",
+		bmap, first_bit, last_bit);
+	offset = first_bit * CACHE_BLOCK_SIZE;
+	data_length = (last_bit - first_bit + 1) * CACHE_BLOCK_SIZE;
+
+	/*
+	 * CACHE_BLOCK_SIZE may not be divisible by SD_INODE_SIZE,
+	 * so (offset + data_length) could larger than SD_INODE_SIZE
+	 */
+	if (is_vdi_obj(oid) && (offset + data_length) > SD_INODE_SIZE)
+		data_length = SD_INODE_SIZE - offset;
+
+	buf = valloc(data_length);
+	if (buf == NULL) {
+		eprintf("failed to allocate memory\n");
+		goto out;
+	}
+
+	ret = read_cache_object(vid, idx, buf, data_length, offset);
+	if (ret != SD_RES_SUCCESS)
+		goto out;
+
+	if (create)
+		sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
+	else
+		sd_init_req(&hdr, SD_OP_WRITE_OBJ);
+	hdr.flags = SD_FLAG_CMD_WRITE;
+	hdr.data_length = data_length;
+	hdr.obj.oid = oid;
+	hdr.obj.offset = offset;
+
+	ret = exec_local_req(&hdr, buf);
+	if (ret != SD_RES_SUCCESS)
+		eprintf("failed to push object %x\n", ret);
+
+out:
+	free(buf);
+	return ret;
+}
+
+static int check_cache_status(struct object_cache *oc,
+			      struct object_cache_entry *entry)
+{
+	if (cache_is_flushing(oc)) {
+		dprintf("cache %" PRIx32 " is flushing, don't reclaim it.\n",
+			oc->vid);
+		return SD_RES_CACHE_FLUSHING;
+	}
+
+	/* If entry is being accessed, we don't reclaim it */
+	if (uatomic_read(&entry->refcnt) > 0) {
+		dprintf("cache object %" PRIx32 "(%08" PRIx32 ") "
+			"can't be reclaimed, refcnt: %d\n",
+			oc->vid, entry->idx, uatomic_read(&entry->refcnt));
+		return SD_RES_CACHE_REFERENCING;
+	}
+
+	return SD_RES_SUCCESS;
+}
+
+static int reclaim_object(struct object_cache_entry *entry)
+{
+	struct object_cache *oc = entry->oc;
+	uint32_t idx = entry->idx;
+	int ret = SD_RES_SUCCESS;
+
+	pthread_rwlock_wrlock(&oc->lock);
+	dprintf("reclaiming /%06"PRIx32"/%08"PRIx32", cache_size: %ld\n",
+		oc->vid, idx, uatomic_read(&sys_cache.cache_size));
+
+	ret = check_cache_status(oc, entry);
+	if (ret != SD_RES_SUCCESS)
+		goto out;
+
+	if (entry_is_dirty(entry)) {
+		uint64_t bmap = entry->bmap;
+		int create = entry->flags & ENTRY_CREATE_BIT;
+
+		entry->bmap = 0;
+		del_from_dirty_tree_and_list(entry, &oc->dirty_tree);
+		pthread_rwlock_unlock(&oc->lock);
+
+		ret = push_cache_object(oc->vid, idx, bmap, create);
+
+		pthread_rwlock_wrlock(&oc->lock);
+		if (ret == SD_RES_SUCCESS) {
+			entry->flags &= ~ENTRY_CREATE_BIT;
+			/* dirty again */
+			if (entry_is_dirty(entry)) {
+				dprintf("object cache is dirty again %06" PRIx32 "\n", idx);
+				ret = SD_RES_CACHE_REFERENCING;
+				goto out;
+			}
+		} else {
+			/* still dirty */
+			entry->bmap |= bmap;
+			ret = SD_RES_CACHE_REFERENCING;
+			goto out;
+		}
+
+		/* Now we get lock again, check cache status again */
+		ret = check_cache_status(oc, entry);
+		if (ret != SD_RES_SUCCESS)
+			goto out;
+	}
+
+	mark_entry_reclaiming(entry);
+
+	ret = remove_cache_object(oc, idx);
+	if (ret == SD_RES_SUCCESS)
+		del_from_object_tree_and_list(entry, &oc->object_tree);
+out:
+	pthread_rwlock_unlock(&oc->lock);
+	return ret;
+}
+
+static void reclaim_work(struct work *work)
+{
+	struct object_cache_entry *entry, *n;
+	int ret;
+
+	if (node_in_recovery())
+		return;
+
+	list_for_each_entry_revert_safe_rcu(entry, n,
+		       &sys_cache.cache_lru_list, lru_list) {
+		/* Reclaim cache to 80% of max size */
+		if (uatomic_read(&sys_cache.cache_size) <=
+		    sys->cache_size * 8 / 10)
+			break;
+
+		ret = reclaim_object(entry);
+		if (ret == SD_RES_CACHE_FLUSHING)
+			/* If cache is flushing, stop reclaiming. */
+			break;
+
+		if (ret == SD_RES_SUCCESS) {
+			unsigned data_length;
+			if (idx_has_vdi_bit(entry->idx))
+				data_length = SD_INODE_SIZE;
+			else
+				data_length = SD_DATA_OBJ_SIZE;
+
+			uatomic_sub(&sys_cache.cache_size, data_length);
+			free(entry);
+		}
+	}
+
+	dprintf("cache reclaim complete\n");
+}
+
+static void reclaim_done(struct work *work)
+{
+	uatomic_set(&sys_cache.reclaiming, 0);
+	free(work);
+}
+
 static struct object_cache_entry *
 dirty_tree_insert(struct object_cache *oc, uint32_t idx,
 		  uint64_t bmap, int create)
@@ -183,6 +556,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
 		else {
 			/* already has this entry, merge bmap */
 			entry->bmap |= bmap;
+			if (create)
+				entry->flags |= ENTRY_CREATE_BIT;
 			return entry;
 		}
 	}
@@ -192,7 +567,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
 		return NULL;
 
 	entry->bmap |= bmap;
-	entry->flags |= ENTRY_CREATE_BIT;
+	if (create)
+		entry->flags |= ENTRY_CREATE_BIT;
 	rb_link_node(&entry->dirty_node, parent, p);
 	rb_insert_color(&entry->dirty_node, &oc->dirty_tree);
 	list_add(&entry->list, &oc->dirty_list);
@@ -200,26 +576,6 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
 	return entry;
 }
 
-static struct object_cache_entry *dirty_tree_search(struct rb_root *root,
-						    uint32_t idx)
-{
-	struct rb_node *n = root->rb_node;
-	struct object_cache_entry *t;
-
-	while (n) {
-		t = rb_entry(n, struct object_cache_entry, dirty_node);
-
-		if (idx < t->idx)
-			n = n->rb_left;
-		else if (idx > t->idx)
-			n = n->rb_right;
-		else
-			return t; /* found it */
-	}
-
-	return NULL;
-}
-
 static int create_dir_for(uint32_t vid)
 {
 	int ret = 0;
@@ -257,6 +613,7 @@ not_found:
 	if (create) {
 		cache = xzalloc(sizeof(*cache));
 		cache->vid = vid;
+		cache->object_tree = RB_ROOT;
 		create_dir_for(vid);
 
 		cache->dirty_tree = RB_ROOT;
@@ -271,14 +628,6 @@ out:
 	return cache;
 }
 
-static inline void
-del_from_dirty_tree_and_list(struct object_cache_entry *entry,
-			     struct rb_root *dirty_tree)
-{
-	rb_erase(&entry->dirty_node, dirty_tree);
-	list_del(&entry->list);
-}
-
 /* Caller should hold the oc->lock */
 static inline void
 add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx,
@@ -289,6 +638,9 @@ add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx,
 	if (!entry)
 		panic("Can not find object entry %" PRIx32 "\n", idx);
 
+	if (cache_in_reclaim(0))
+		return;
+
 	/* If cache isn't in reclaiming, move it
 	 * to the head of lru list */
 	cds_list_del_rcu(&entry->lru_list);
@@ -312,15 +664,18 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx)
 	uint32_t data_length;
 
 	if (idx_has_vdi_bit(idx))
-		data_length = SD_INODE_SIZE / 1024;
+		data_length = SD_INODE_SIZE;
 	else
-		data_length = SD_DATA_OBJ_SIZE / 1024;
+		data_length = SD_DATA_OBJ_SIZE;
 
 	entry = xzalloc(sizeof(*entry));
 	entry->oc = oc;
 	entry->idx = idx;
 	CDS_INIT_LIST_HEAD(&entry->lru_list);
 
+	dprintf("cache object for vdi %" PRIx32 ", idx %08" PRIx32 "added\n",
+		oc->vid, idx);
+
 	pthread_rwlock_wrlock(&oc->lock);
 	old = object_cache_insert(&oc->object_tree, entry);
 	if (!old) {
@@ -331,20 +686,51 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx)
 		entry = old;
 	}
 	pthread_rwlock_unlock(&oc->lock);
+
+	dprintf("sys_cache.cache_size %" PRIx64 ", sys->cache_size %" PRIx64 "\n",
+		uatomic_read(&sys_cache.cache_size), sys->cache_size);
+	if (sys->cache_size &&
+	    uatomic_read(&sys_cache.cache_size) > sys->cache_size &&
+	    !cache_in_reclaim(1)) {
+		struct work *work = xzalloc(sizeof(struct work));
+		work->fn = reclaim_work;
+		work->done = reclaim_done;
+		queue_work(sys->reclaim_wqueue, work);
+	}
+}
+
+static inline struct object_cache_entry *
+find_cache_entry(struct object_cache *oc, uint32_t idx)
+{
+	struct object_cache_entry *entry;
+
+	entry = object_tree_search(&oc->object_tree, idx);
+	if (!entry || entry_is_reclaiming(entry))
+		return NULL;
+
+	return entry;
 }
 
 static int object_cache_lookup(struct object_cache *oc, uint32_t idx,
 			       int create)
 {
 	struct strbuf buf;
-	int fd, ret = 0, flags = def_open_flags;
+	int fd, ret = SD_RES_SUCCESS, flags = def_open_flags;
+	unsigned data_length;
+
+	if (!create) {
+		pthread_rwlock_wrlock(&oc->lock);
+		if (!find_cache_entry(oc, idx))
+			ret = SD_RES_NO_CACHE;
+		pthread_rwlock_unlock(&oc->lock);
+		return ret;
+	}
 
 	strbuf_init(&buf, PATH_MAX);
 	strbuf_addstr(&buf, cache_dir);
 	strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx);
 
-	if (create)
-		flags |= O_CREAT | O_TRUNC;
+	flags |= O_CREAT | O_TRUNC;
 
 	fd = open(buf.buf, flags, def_fmode);
 	if (fd < 0) {
@@ -352,26 +738,22 @@ static int object_cache_lookup(struct object_cache *oc, uint32_t idx,
 		goto out;
 	}
 
-	if (create) {
-		unsigned data_length;
-
-		if (idx_has_vdi_bit(idx))
-			data_length = SD_INODE_SIZE;
-		else
-			data_length = SD_DATA_OBJ_SIZE;
+	if (idx_has_vdi_bit(idx))
+		data_length = SD_INODE_SIZE;
+	else
+		data_length = SD_DATA_OBJ_SIZE;
 
-		ret = prealloc(fd, data_length);
-		if (ret != SD_RES_SUCCESS)
-			ret = -1;
-		else {
-			uint64_t bmap = UINT64_MAX;
+	ret = prealloc(fd, data_length);
+	if (ret != SD_RES_SUCCESS)
+		ret = -1;
+	else {
+		uint64_t bmap = UINT64_MAX;
 
-			add_to_object_cache(oc, idx);
+		add_to_object_cache(oc, idx);
 
-			pthread_rwlock_wrlock(&oc->lock);
-			add_to_dirty_tree_and_list(oc, idx, bmap, 1);
-			pthread_rwlock_unlock(&oc->lock);
-		}
+		pthread_rwlock_wrlock(&oc->lock);
+		add_to_dirty_tree_and_list(oc, idx, bmap, 1);
+		pthread_rwlock_unlock(&oc->lock);
 	}
 	close(fd);
 out:
@@ -379,97 +761,6 @@ out:
 	return ret;
 }
 
-static int write_cache_object(uint32_t vid, uint32_t idx, void *buf,
-			      size_t count, off_t offset)
-{
-	size_t size;
-	int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
-	struct strbuf p;
-
-	strbuf_init(&p, PATH_MAX);
-	strbuf_addstr(&p, cache_dir);
-	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
-
-	if (sys->use_directio && !idx_has_vdi_bit(idx))
-		flags |= O_DIRECT;
-
-	fd = open(p.buf, flags, def_fmode);
-	if (fd < 0) {
-		eprintf("%m\n");
-		ret = SD_RES_EIO;
-		goto out;
-	}
-
-	if (flock(fd, LOCK_EX) < 0) {
-		ret = SD_RES_EIO;
-		eprintf("%m\n");
-		goto out_close;
-	}
-	size = xpwrite(fd, buf, count, offset);
-	if (flock(fd, LOCK_UN) < 0) {
-		ret = SD_RES_EIO;
-		eprintf("%m\n");
-		goto out_close;
-	}
-
-	if (size != count) {
-		eprintf("size %zu, count:%zu, offset %jd %m\n",
-			size, count, (intmax_t)offset);
-		ret = SD_RES_EIO;
-	}
-out_close:
-	close(fd);
-out:
-	strbuf_release(&p);
-	return ret;
-}
-
-static int read_cache_object(uint32_t vid, uint32_t idx, void *buf,
-			     size_t count, off_t offset)
-{
-	size_t size;
-	int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
-	struct strbuf p;
-
-	strbuf_init(&p, PATH_MAX);
-	strbuf_addstr(&p, cache_dir);
-	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
-
-	if (sys->use_directio && !idx_has_vdi_bit(idx))
-		flags |= O_DIRECT;
-
-	fd = open(p.buf, flags, def_fmode);
-	if (fd < 0) {
-		eprintf("%m\n");
-		ret = SD_RES_EIO;
-		goto out;
-	}
-
-	if (flock(fd, LOCK_SH) < 0) {
-		ret = SD_RES_EIO;
-		eprintf("%m\n");
-		goto out_close;
-	}
-	size = xpread(fd, buf, count, offset);
-	if (flock(fd, LOCK_UN) < 0) {
-		ret = SD_RES_EIO;
-		eprintf("%m\n");
-		goto out_close;
-	}
-
-	if (size != count) {
-		eprintf("size %zu, count:%zu, offset %jd %m\n",
-			size, count, (intmax_t)offset);
-		ret = SD_RES_EIO;
-	}
-
-out_close:
-	close(fd);
-out:
-	strbuf_release(&p);
-	return ret;
-}
-
 static int create_cache_object(struct object_cache *oc, uint32_t idx,
 			       void *buffer, size_t buf_size)
 {
@@ -558,75 +849,6 @@ out:
 	return ret;
 }
 
-static uint64_t idx_to_oid(uint32_t vid, uint32_t idx)
-{
-	if (idx_has_vdi_bit(idx))
-		return vid_to_vdi_oid(vid);
-	else
-		return vid_to_data_oid(vid, idx);
-}
-
-static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap,
-			     int create)
-{
-	struct sd_req hdr;
-	void *buf;
-	off_t offset;
-	unsigned data_length;
-	int ret = SD_RES_NO_MEM;
-	uint64_t oid = idx_to_oid(vid, idx);
-	int first_bit, last_bit;
-
-	dprintf("%"PRIx64", create %d\n", oid, create);
-
-	if (!bmap) {
-		dprintf("WARN: nothing to flush\n");
-		return SD_RES_SUCCESS;
-	}
-
-	first_bit = ffsll(bmap) - 1;
-	last_bit = fls64(bmap) - 1;
-
-	dprintf("bmap:0x%"PRIx64", first_bit:%d, last_bit:%d\n",
-		bmap, first_bit, last_bit);
-	offset = first_bit * CACHE_BLOCK_SIZE;
-	data_length = (last_bit - first_bit + 1) * CACHE_BLOCK_SIZE;
-
-	/*
-	 * CACHE_BLOCK_SIZE may not be divisible by SD_INODE_SIZE,
-	 * so (offset + data_length) could larger than SD_INODE_SIZE
-	 */
-	if (is_vdi_obj(oid) && (offset + data_length) > SD_INODE_SIZE)
-		data_length = SD_INODE_SIZE - offset;
-
-	buf = valloc(data_length);
-	if (buf == NULL) {
-		eprintf("failed to allocate memory\n");
-		goto out;
-	}
-
-	ret = read_cache_object(vid, idx, buf, data_length, offset);
-	if (ret != SD_RES_SUCCESS)
-		goto out;
-
-	if (create)
-		sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
-	else
-		sd_init_req(&hdr, SD_OP_WRITE_OBJ);
-	hdr.flags = SD_FLAG_CMD_WRITE;
-	hdr.data_length = data_length;
-	hdr.obj.oid = oid;
-	hdr.obj.offset = offset;
-
-	ret = exec_local_req(&hdr, buf);
-	if (ret != SD_RES_SUCCESS)
-		eprintf("failed to push object %x\n", ret);
-
-out:
-	free(buf);
-	return ret;
-}
-
 /* Push back all the dirty objects to sheep cluster storage */
 static int object_cache_push(struct object_cache *oc)
 {
@@ -647,20 +869,23 @@ static int object_cache_push(struct object_cache *oc)
 		pthread_rwlock_rdlock(&oc->lock);
 		bmap = entry->bmap;
 		create = entry->flags & ENTRY_CREATE_BIT;
+		entry->bmap = 0;
 		pthread_rwlock_unlock(&oc->lock);
 
 		ret = push_cache_object(oc->vid, entry->idx, bmap, create);
-		if (ret != SD_RES_SUCCESS)
-			goto push_failed;
 
 		pthread_rwlock_wrlock(&oc->lock);
+		if (ret != SD_RES_SUCCESS) {
+			entry->bmap |= bmap;
+			goto push_failed;
+		}
+		entry->flags &= ~ENTRY_CREATE_BIT;
 		del_from_dirty_tree_and_list(entry, &oc->dirty_tree);
 		pthread_rwlock_unlock(&oc->lock);
 	}
 	return ret;
 
 push_failed:
-	pthread_rwlock_wrlock(&oc->lock);
 	list_splice_init(&inactive_dirty_list, &oc->dirty_list);
 	pthread_rwlock_unlock(&oc->lock);
 
@@ -677,10 +902,10 @@ int object_is_cached(uint64_t oid)
 	if (!cache)
 		return 0;
 
-	if (object_cache_lookup(cache, idx, 0) < 0)
-		return 0;
+	if (object_cache_lookup(cache, idx, 0) == SD_RES_SUCCESS)
+		return 1;
 	else
-		return 1; /* found it */
+		return 0;
 }
 
 void object_cache_delete(uint32_t vid)
@@ -712,6 +937,44 @@ void object_cache_delete(uint32_t vid)
 
 }
 
+static void object_cache_flush_begin(struct object_cache *oc)
+{
+	pthread_rwlock_wrlock(&oc->lock);
+	oc->in_flush = 1;
+	pthread_rwlock_unlock(&oc->lock);
+}
+
+static void object_cache_flush_end(struct object_cache *oc)
+{
+	pthread_rwlock_wrlock(&oc->lock);
+	oc->in_flush = 0;
+	pthread_rwlock_unlock(&oc->lock);
+}
+
+static struct object_cache_entry *
+get_cache_entry(struct object_cache *cache, uint32_t idx)
+{
+	struct object_cache_entry *entry;
+
+	pthread_rwlock_rdlock(&cache->lock);
+	entry = find_cache_entry(cache, idx);
+	if (!entry) {
+		/* The cache entry may be reclaimed, so try again. */
+		pthread_rwlock_unlock(&cache->lock);
+		return NULL;
+	}
+
+	uatomic_inc(&entry->refcnt);
+	pthread_rwlock_unlock(&cache->lock);
+
+	return entry;
+}
+
+static void put_cache_entry(struct object_cache_entry *entry)
+{
+	uatomic_dec(&entry->refcnt);
+}
+
 static int object_cache_flush_and_delete(struct object_cache *oc)
 {
 	DIR *dir;
@@ -722,6 +985,8 @@ static int object_cache_flush_and_delete(struct object_cache *oc)
 	struct strbuf p;
 	int ret = 0;
 
+	object_cache_flush_begin(oc);
+
 	strbuf_init(&p, PATH_MAX);
 	strbuf_addstr(&p, cache_dir);
 	strbuf_addf(&p, "/%06"PRIx32, vid);
@@ -750,6 +1015,8 @@ static int object_cache_flush_and_delete(struct object_cache *oc)
 	}
 
 	object_cache_delete(vid);
+
+	object_cache_flush_end(oc);
 out:
 	strbuf_release(&p);
 	return ret;
@@ -773,10 +1040,10 @@ int bypass_object_cache(struct request *req)
 			/* For read requet, we can read cache if any */
 			uint32_t idx = object_cache_oid_to_idx(oid);
 
-			if (object_cache_lookup(cache, idx, 0) < 0)
-				return 1;
-			else
+			if (object_cache_lookup(cache, idx, 0) == 0)
 				return 0;
+			else
+				return 1;
 		}
 	}
 
@@ -807,37 +1074,43 @@ int object_cache_handle_request(struct request *req)
 	if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ)
 		create = 1;
 
-	if (object_cache_lookup(cache, idx, create) < 0) {
+retry:
+	ret = object_cache_lookup(cache, idx, create);
+	if (ret == SD_RES_NO_CACHE) {
 		ret = object_cache_pull(cache, idx);
 		if (ret != SD_RES_SUCCESS)
 			return ret;
 	}
 
-	pthread_rwlock_rdlock(&cache->lock);
-	entry = object_tree_search(&cache->object_tree, idx);
-	pthread_rwlock_unlock(&cache->lock);
+	entry = get_cache_entry(cache, idx);
+	if (!entry) {
+		dprintf("cache entry %"PRIx32"/%"PRIx32" may be reclaimed\n",
+			vid, idx);
+		goto retry;
+	}
 
 	if (hdr->flags & SD_FLAG_CMD_WRITE) {
 		ret = write_cache_object(cache->vid, idx, req->data,
 					 hdr->data_length, hdr->obj.offset);
 		if (ret != SD_RES_SUCCESS)
-			goto out;
+			goto err;
 		update_cache_entry(cache, idx, hdr->data_length,
 				   hdr->obj.offset);
 	} else {
 		ret = read_cache_object(cache->vid, idx, req->data,
 					hdr->data_length, hdr->obj.offset);
 		if (ret != SD_RES_SUCCESS)
-			goto out;
+			goto err;
 		req->rp.data_length = hdr->data_length;
 
-		if (entry) {
+		if (entry && !cache_in_reclaim(0)) {
 			cds_list_del_rcu(&entry->lru_list);
 			cds_list_add_rcu(&entry->lru_list,
 					 &sys_cache.cache_lru_list);
 		}
 	}
-out:
+err:
+	put_cache_entry(entry);
 	return ret;
 }
 
@@ -847,13 +1120,23 @@ int object_cache_write(uint64_t oid, char *data, unsigned int datalen,
 	uint32_t vid = oid_to_vid(oid);
 	uint32_t idx = object_cache_oid_to_idx(oid);
 	struct object_cache *cache;
+	struct object_cache_entry *entry;
 	int ret;
 
 	cache = find_object_cache(vid, 0);
 
+	dprintf("cache object write %" PRIx32 "\n", idx);
+
+	entry = get_cache_entry(cache, idx);
+	if (!entry)
+		panic("cache object %" PRIx32 " doesn't exist\n", idx);
+
 	ret = write_cache_object(vid, idx, data, datalen, offset);
 	if (ret == SD_RES_SUCCESS)
 		update_cache_entry(cache, idx, datalen, offset);
+
+	put_cache_entry(entry);
+
 	return ret;
 }
 
@@ -862,20 +1145,40 @@ int object_cache_read(uint64_t oid, char *data, unsigned int datalen,
 {
 	uint32_t vid = oid_to_vid(oid);
 	uint32_t idx = object_cache_oid_to_idx(oid);
+	struct object_cache *cache;
+	struct object_cache_entry *entry;
+	int ret;
+
+	cache = find_object_cache(vid, 0);
+
+	dprintf("cache object read %" PRIx32 "\n", idx);
+
+	entry = get_cache_entry(cache, idx);
+	if (!entry)
+		panic("cache object %" PRIx32 " doesn't exist\n", idx);
+
+	ret = read_cache_object(vid, idx, data, datalen, offset);
+
+	put_cache_entry(entry);
 
-	return read_cache_object(vid, idx, data, datalen, offset);
+	return ret;
 }
 
 int object_cache_flush_vdi(struct request *req)
 {
 	uint32_t vid = oid_to_vid(req->rq.obj.oid);
 	struct object_cache *cache;
+	int ret;
 
 	cache = find_object_cache(vid, 0);
 	if (!cache)
 		return SD_RES_SUCCESS;
 
-	return object_cache_push(cache);
+	object_cache_flush_begin(cache);
+	ret = object_cache_push(cache);
+	object_cache_flush_end(cache);
+
+	return ret;
 }
 
 int object_cache_flush_and_del(struct request *req)
@@ -884,8 +1187,10 @@ int object_cache_flush_and_del(struct request *req)
 	struct object_cache *cache;
 
 	cache = find_object_cache(vid, 0);
+
 	if (cache && object_cache_flush_and_delete(cache) < 0)
 		return SD_RES_EIO;
+
 	return SD_RES_SUCCESS;
 }
 
@@ -999,6 +1304,7 @@ int object_cache_init(const char *p)
 
 	CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list);
 	uatomic_set(&sys_cache.cache_size, 0);
+	uatomic_set(&sys_cache.reclaiming, 0);
 
 	ret = load_existing_cache();
 err:
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 9f371ba..d17884d 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -356,8 +356,9 @@ int main(int argc, char **argv)
 	sys->deletion_wqueue = init_work_queue("deletion", true);
 	sys->block_wqueue = init_work_queue("block", true);
 	sys->sockfd_wqueue = init_work_queue("sockfd", true);
+	sys->reclaim_wqueue = init_work_queue("reclaim", true);
 	if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
-	    !sys->deletion_wqueue || !sys->block_wqueue)
+	    !sys->deletion_wqueue || !sys->block_wqueue || !sys->reclaim_wqueue)
 		exit(1);
 
 	ret = trace_init();
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index fe61411..2090d67 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -125,6 +125,7 @@ struct cluster_info {
 	struct work_queue *recovery_wqueue;
 	struct work_queue *block_wqueue;
 	struct work_queue *sockfd_wqueue;
+	struct work_queue *reclaim_wqueue;
 };
 
 struct siocb {
diff --git a/sheep/store.c b/sheep/store.c
index a05822d..4a65c9e 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -492,9 +492,13 @@ int write_object(uint64_t oid, char *data, unsigned int datalen,
 	struct sd_req hdr;
 	int ret;
 
+retry:
 	if (sys->enable_write_cache && object_is_cached(oid)) {
 		ret = object_cache_write(oid, data, datalen, offset,
 					 flags, create);
+		if (ret == SD_RES_NO_CACHE)
+			goto retry;
+
 		if (ret != 0) {
 			eprintf("write cache failed %"PRIx64" %"PRIx32"\n",
 				oid, ret);
@@ -529,8 +533,12 @@ int read_object(uint64_t oid, char *data, unsigned int datalen,
 	struct sd_req hdr;
 	int ret;
 
+retry:
 	if (sys->enable_write_cache && object_is_cached(oid)) {
 		ret = object_cache_read(oid, data, datalen, offset);
+		if (ret == SD_RES_NO_CACHE)
+			goto retry;
+
 		if (ret != SD_RES_SUCCESS) {
 			eprintf("try forward read %"PRIx64" %"PRIx32"\n",
 				oid, ret);
-- 
1.7.1




More information about the sheepdog mailing list