[sheepdog] [PATCH UPDATE] object cache: reclaim cached objects when cache reaches the max size

levin li levin108 at gmail.com
Fri Jul 27 11:10:31 CEST 2012


From: levin li <xingke.lwp at taobao.com>

This patch do reclaiming work when the total size of cached objects
reaches the max size specified by user, I did it in the following way:

1. check the object tree for the object entry to determine whether the
   cache entry is exist and whether it's reclaiming, if it's reclaiming
   we make sheep ingore the cache.
2. In object_cache_rw() we search the cache entry, after passed the sanity
   check, we increment its refcnt to tell the reclaiming worker that this
   entry is being referenced, we should not reclaim it now.
3. In add_to_object_cache(), when the cached size reaches the max size,
   we start a reclaiming thread, only one such thread can be running at
   one time.
4. In reclaim_work(), we reclaim cached objects until the cache size reduced
   to 80% of the max size.
5. In reclaim_object(), we start to reclaim an object, before this, we check
   that if the cache is flushing, we don't reclaim it, and if the refcnt of
   the object is not zero, we also don't reclaim it.
   If the cached object is dirty, we flush it by push_cache_object(), and
   then try to remove the object.

Signed-off-by: levin li <xingke.lwp at taobao.com>
---
 include/list.h           |   10 +
 include/sheepdog_proto.h |    1 +
 sheep/object_cache.c     |  754 +++++++++++++++++++++++++++++++---------------
 sheep/sheep.c            |    3 +-
 sheep/sheep_priv.h       |    1 +
 sheep/store.c            |    8 +
 6 files changed, 535 insertions(+), 242 deletions(-)

diff --git a/include/list.h b/include/list.h
index 30ee3c4..e1d645d 100644
--- a/include/list.h
+++ b/include/list.h
@@ -259,3 +259,13 @@ static inline void hlist_add_after(struct hlist_node *n,
              pos = n)
 
 #endif
+
+
+#define list_for_each_entry_revert_safe_rcu(pos, n, head, member)          \
+	for (pos = cds_list_entry(rcu_dereference((head)->prev),           \
+				  typeof(*pos), member),                   \
+	     n = cds_list_entry(rcu_dereference(pos->member.prev),         \
+				typeof(*pos), member);                     \
+				&pos->member != (head);                    \
+	     pos = n, n = cds_list_entry(rcu_dereference(pos->member.prev),\
+					 typeof(*pos), member))
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 45a4b81..05597fb 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -68,6 +68,7 @@
 #define SD_RES_CLUSTER_RECOVERING 0x22 /* Cluster is recovering. */
 #define SD_RES_OBJ_RECOVERING     0x23 /* Object is recovering */
 #define SD_RES_KILLED           0x24 /* Node is killed */
+#define SD_RES_NO_CACHE      0x25 /* No cache object found */
 
 /* errors above 0x80 are sheepdog-internal */
 
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index 19480c2..7711bef 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -40,7 +40,13 @@
 #define CACHE_VDI_BIT         (UINT32_C(1) << CACHE_VDI_SHIFT)
 #define CACHE_BLOCK_SIZE      ((UINT64_C(1) << 10) * 64) /* 64 KB */
 
-#define ENTRY_CREATE_BIT      (1)
+#define CACHE_RECLAIM_SHIFT   27
+#define CACHE_RECLAIM_BIT     (UINT32_C(1) << CACHE_RECLAIM_SHIFT)
+
+#define CACHE_CREATE_SHIFT    26
+#define CACHE_CREATE_BIT      (UINT32_C(1) << CACHE_CREATE_SHIFT)
+
+#define CACHE_INDEX_MASK      (CACHE_RECLAIM_BIT | CACHE_CREATE_BIT)
 
 struct global_cache {
 	uint64_t cache_size;
@@ -50,6 +56,7 @@ struct global_cache {
 
 struct object_cache {
 	uint32_t vid;
+	uint8_t in_flush;
 	struct hlist_node hash;
 
 	struct list_head dirty_list;
@@ -64,7 +71,6 @@ struct object_cache_entry {
 	uint64_t bmap; /* each bit represents one dirty
 			* block which should be flushed */
 	int refcnt;
-	int flags;
 	struct rb_node node;
 	struct rb_node dirty_node;
 	struct object_cache *oc;
@@ -85,11 +91,29 @@ static pthread_mutex_t hashtable_lock[HASH_SIZE] = {
 
 static struct hlist_head cache_hashtable[HASH_SIZE];
 
+static inline int cache_in_reclaim(int start)
+{
+	if (start)
+		return uatomic_cmpxchg(&sys_cache.reclaiming, 0, 1);
+	else
+		return uatomic_read(&sys_cache.reclaiming);
+}
+
+static inline int entry_is_dirty(struct object_cache_entry *entry)
+{
+	return !!entry->bmap;
+}
+
 static inline int hash(uint64_t vid)
 {
 	return hash_64(vid, HASH_BITS);
 }
 
+static inline uint32_t idx_mask(uint32_t idx)
+{
+	return idx &= ~CACHE_INDEX_MASK;
+}
+
 static inline uint32_t object_cache_oid_to_idx(uint64_t oid)
 {
 	uint32_t idx = data_oid_to_idx(oid);
@@ -124,14 +148,15 @@ object_cache_insert(struct rb_root *root, struct object_cache_entry *new)
 	struct rb_node **p = &root->rb_node;
 	struct rb_node *parent = NULL;
 	struct object_cache_entry *entry;
+	uint32_t idx = idx_mask(new->idx);
 
 	while (*p) {
 		parent = *p;
 		entry = rb_entry(parent, struct object_cache_entry, node);
 
-		if (new->idx < entry->idx)
+		if (idx < idx_mask(entry->idx))
 			p = &(*p)->rb_left;
-		else if (new->idx > entry->idx)
+		else if (idx > idx_mask(entry->idx))
 			p = &(*p)->rb_right;
 		else {
 			/* already has this entry */
@@ -149,13 +174,14 @@ static struct object_cache_entry *object_tree_search(struct rb_root *root,
 {
 	struct rb_node *n = root->rb_node;
 	struct object_cache_entry *t;
+	idx = idx_mask(idx);
 
 	while (n) {
 		t = rb_entry(n, struct object_cache_entry, node);
 
-		if (idx < t->idx)
+		if (idx < idx_mask(t->idx))
 			n = n->rb_left;
-		else if (idx > t->idx)
+		else if (idx > idx_mask(t->idx))
 			n = n->rb_right;
 		else
 			return t; /* found it */
@@ -164,6 +190,339 @@ static struct object_cache_entry *object_tree_search(struct rb_root *root,
 	return NULL;
 }
 
+static struct object_cache_entry *dirty_tree_search(struct rb_root *root,
+						    uint32_t idx)
+{
+	struct rb_node *n = root->rb_node;
+	struct object_cache_entry *t;
+	idx = idx_mask(idx);
+
+	while (n) {
+		t = rb_entry(n, struct object_cache_entry, dirty_node);
+
+		if (idx < idx_mask(t->idx))
+			n = n->rb_left;
+		else if (idx > idx_mask(t->idx))
+			n = n->rb_right;
+		else
+			return t; /* found it */
+	}
+
+	return NULL;
+}
+
+static inline void
+del_from_dirty_tree_and_list(struct object_cache_entry *entry,
+			     struct rb_root *dirty_tree)
+{
+	rb_erase(&entry->dirty_node, dirty_tree);
+	list_del(&entry->list);
+}
+
+static inline void
+del_from_object_tree_and_list(struct object_cache_entry *entry,
+			      struct rb_root *object_tree)
+{
+	rb_erase(&entry->node, object_tree);
+	cds_list_del_rcu(&entry->lru_list);
+}
+
+static uint64_t cache_vid_to_data_oid(uint32_t vid, uint32_t idx)
+{
+	idx = idx_mask(idx);
+
+	return vid_to_data_oid(vid, idx);
+}
+
+static uint64_t idx_to_oid(uint32_t vid, uint32_t idx)
+{
+	if (idx_has_vdi_bit(idx))
+		return vid_to_vdi_oid(vid);
+	else
+		return cache_vid_to_data_oid(vid, idx);
+}
+
+static int remove_cache_object(struct object_cache *oc, uint32_t idx)
+{
+	struct strbuf buf;
+	int ret = SD_RES_SUCCESS;
+
+	idx = idx_mask(idx);
+
+	strbuf_init(&buf, PATH_MAX);
+	strbuf_addstr(&buf, cache_dir);
+	strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx);
+
+	dprintf("removing cache object %s\n", buf.buf);
+	if (unlink(buf.buf) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("failed to remove cached object %m\n");
+		goto out;
+	}
+out:
+	strbuf_release(&buf);
+
+	return ret;
+}
+
+static int write_cache_object(uint32_t vid, uint32_t idx, void *buf,
+			      size_t count, off_t offset)
+{
+	size_t size;
+	int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
+	struct strbuf p;
+
+	strbuf_init(&p, PATH_MAX);
+	strbuf_addstr(&p, cache_dir);
+	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
+
+	if (sys->use_directio && !idx_has_vdi_bit(idx))
+		flags |= O_DIRECT;
+
+	fd = open(p.buf, flags, def_fmode);
+	if (fd < 0) {
+		eprintf("%m\n");
+		ret = SD_RES_EIO;
+		goto out;
+	}
+
+	if (flock(fd, LOCK_EX) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("%m\n");
+		goto out_close;
+	}
+	size = xpwrite(fd, buf, count, offset);
+	if (flock(fd, LOCK_UN) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("%m\n");
+		goto out_close;
+	}
+
+	if (size != count) {
+		eprintf("size %zu, count:%zu, offset %jd %m\n",
+			size, count, (intmax_t)offset);
+		ret = SD_RES_EIO;
+	}
+out_close:
+	close(fd);
+out:
+	strbuf_release(&p);
+	return ret;
+}
+
+static int read_cache_object(uint32_t vid, uint32_t idx, void *buf,
+			     size_t count, off_t offset)
+{
+	size_t size;
+	int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
+	struct strbuf p;
+
+	strbuf_init(&p, PATH_MAX);
+	strbuf_addstr(&p, cache_dir);
+	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
+
+	if (sys->use_directio && !idx_has_vdi_bit(idx))
+		flags |= O_DIRECT;
+
+	fd = open(p.buf, flags, def_fmode);
+	if (fd < 0) {
+		eprintf("%m\n");
+		ret = SD_RES_EIO;
+		goto out;
+	}
+
+	if (flock(fd, LOCK_SH) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("%m\n");
+		goto out_close;
+	}
+	size = xpread(fd, buf, count, offset);
+	if (flock(fd, LOCK_UN) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("%m\n");
+		goto out_close;
+	}
+
+	if (size != count) {
+		eprintf("size %zu, count:%zu, offset %jd %m\n",
+			size, count, (intmax_t)offset);
+		ret = SD_RES_EIO;
+	}
+
+out_close:
+	close(fd);
+out:
+	strbuf_release(&p);
+	return ret;
+}
+
+static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap,
+			     int create)
+{
+	struct sd_req hdr;
+	void *buf;
+	off_t offset;
+	unsigned data_length;
+	int ret = SD_RES_NO_MEM;
+	uint64_t oid = idx_to_oid(vid, idx);
+	int first_bit, last_bit;
+
+	dprintf("%"PRIx64", create %d\n", oid, create);
+
+	idx = idx_mask(idx);
+
+	if (!bmap) {
+		dprintf("WARN: nothing to flush\n");
+		return SD_RES_SUCCESS;
+	}
+
+	first_bit = ffsll(bmap) - 1;
+	last_bit = fls64(bmap) - 1;
+
+	dprintf("bmap:0x%"PRIx64", first_bit:%d, last_bit:%d\n",
+		bmap, first_bit, last_bit);
+	offset = first_bit * CACHE_BLOCK_SIZE;
+	data_length = (last_bit - first_bit + 1) * CACHE_BLOCK_SIZE;
+
+	/*
+	 * CACHE_BLOCK_SIZE may not be divisible by SD_INODE_SIZE,
+	 * so (offset + data_length) could larger than SD_INODE_SIZE
+	 */
+	if (is_vdi_obj(oid) && (offset + data_length) > SD_INODE_SIZE)
+		data_length = SD_INODE_SIZE - offset;
+
+	buf = valloc(data_length);
+	if (buf == NULL) {
+		eprintf("failed to allocate memory\n");
+		goto out;
+	}
+
+	ret = read_cache_object(vid, idx, buf, data_length, offset);
+	if (ret != SD_RES_SUCCESS)
+		goto out;
+
+	if (create)
+		sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
+	else
+		sd_init_req(&hdr, SD_OP_WRITE_OBJ);
+	hdr.flags = SD_FLAG_CMD_WRITE;
+	hdr.data_length = data_length;
+	hdr.obj.oid = oid;
+	hdr.obj.offset = offset;
+
+	ret = exec_local_req(&hdr, buf);
+	if (ret != SD_RES_SUCCESS)
+		eprintf("failed to push object %x\n", ret);
+
+out:
+	free(buf);
+	return ret;
+}
+
+static int reclaim_object(struct object_cache_entry *entry)
+{
+	struct object_cache *oc = entry->oc;
+	int ret = SD_RES_SUCCESS;
+
+	pthread_rwlock_wrlock(&oc->lock);
+	dprintf("reclaiming /%06"PRIx32"/%08"PRIx32", cache_size: %ld\n",
+		oc->vid, entry->idx, uatomic_read(&sys_cache.cache_size));
+
+	if (uatomic_read(&entry->refcnt) > 0) {
+		ret = -1;
+		goto out;
+	}
+
+	if (entry_is_dirty(entry)) {
+		uint64_t bmap = entry->bmap;
+		int create = entry->idx & CACHE_CREATE_BIT;
+
+		if (oc->in_flush) {
+			ret = -1;
+			goto out;
+		}
+
+		entry->bmap = 0;
+		del_from_dirty_tree_and_list(entry, &oc->dirty_tree);
+		pthread_rwlock_unlock(&oc->lock);
+
+		ret = push_cache_object(oc->vid, entry->idx, bmap, create);
+
+		pthread_rwlock_wrlock(&oc->lock);
+		if (ret != SD_RES_SUCCESS) {
+			/* still dirty */
+			entry->bmap |= bmap;
+			ret = -1;
+			goto out;
+		}
+
+		entry->idx &= ~CACHE_CREATE_BIT;
+		/* dirty again */
+		if (entry_is_dirty(entry)) {
+			dprintf("object cache is dirty again %06" PRIx32 "\n",
+				entry->idx);
+			ret = -1;
+			goto out;
+		}
+
+		if (oc->in_flush) {
+			ret = -1;
+			goto out;
+		}
+
+		if (uatomic_read(&entry->refcnt) > 0) {
+			ret = -1;
+			goto out;
+		}
+	}
+
+	entry->idx |= CACHE_RECLAIM_BIT;
+
+	ret = remove_cache_object(oc, entry->idx);
+	if (ret == SD_RES_SUCCESS)
+		del_from_object_tree_and_list(entry, &oc->object_tree);
+out:
+	pthread_rwlock_unlock(&oc->lock);
+	return ret;
+}
+
+static void reclaim_work(struct work *work)
+{
+	struct object_cache_entry *entry, *n;
+	int ret;
+
+	if (node_in_recovery())
+		return;
+
+	list_for_each_entry_revert_safe_rcu(entry, n,
+		       &sys_cache.cache_lru_list, lru_list) {
+		unsigned data_length;
+		/* Reclaim cache to 80% of max size */
+		if (uatomic_read(&sys_cache.cache_size) <=
+		    sys->cache_size * 8 / 10)
+			break;
+
+		ret = reclaim_object(entry);
+		if (ret != SD_RES_SUCCESS)
+			continue;
+		if (idx_has_vdi_bit(entry->idx))
+			data_length = SD_INODE_SIZE;
+		else
+			data_length = SD_DATA_OBJ_SIZE;
+
+		uatomic_sub(&sys_cache.cache_size, data_length);
+		free(entry);
+	}
+
+	dprintf("cache reclaim complete\n");
+}
+
+static void reclaim_done(struct work *work)
+{
+	uatomic_set(&sys_cache.reclaiming, 0);
+	free(work);
+}
+
 static struct object_cache_entry *
 dirty_tree_insert(struct object_cache *oc, uint32_t idx,
 		  uint64_t bmap, int create)
@@ -171,18 +530,21 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
 	struct rb_node **p = &oc->dirty_tree.rb_node;
 	struct rb_node *parent = NULL;
 	struct object_cache_entry *entry;
+	idx = idx_mask(idx);
 
 	while (*p) {
 		parent = *p;
 		entry = rb_entry(parent, struct object_cache_entry, dirty_node);
 
-		if (idx < entry->idx)
+		if (idx < idx_mask(entry->idx))
 			p = &(*p)->rb_left;
-		else if (idx > entry->idx)
+		else if (idx > idx_mask(entry->idx))
 			p = &(*p)->rb_right;
 		else {
 			/* already has this entry, merge bmap */
 			entry->bmap |= bmap;
+			if (create)
+				entry->idx |= CACHE_CREATE_BIT;
 			return entry;
 		}
 	}
@@ -192,7 +554,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
 		return NULL;
 
 	entry->bmap |= bmap;
-	entry->flags |= ENTRY_CREATE_BIT;
+	if (create)
+		entry->idx |= CACHE_CREATE_BIT;
 	rb_link_node(&entry->dirty_node, parent, p);
 	rb_insert_color(&entry->dirty_node, &oc->dirty_tree);
 	list_add(&entry->list, &oc->dirty_list);
@@ -200,26 +563,6 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
 	return entry;
 }
 
-static struct object_cache_entry *dirty_tree_search(struct rb_root *root,
-						    uint32_t idx)
-{
-	struct rb_node *n = root->rb_node;
-	struct object_cache_entry *t;
-
-	while (n) {
-		t = rb_entry(n, struct object_cache_entry, dirty_node);
-
-		if (idx < t->idx)
-			n = n->rb_left;
-		else if (idx > t->idx)
-			n = n->rb_right;
-		else
-			return t; /* found it */
-	}
-
-	return NULL;
-}
-
 static int create_dir_for(uint32_t vid)
 {
 	int ret = 0;
@@ -257,6 +600,7 @@ not_found:
 	if (create) {
 		cache = xzalloc(sizeof(*cache));
 		cache->vid = vid;
+		cache->object_tree = RB_ROOT;
 		create_dir_for(vid);
 
 		cache->dirty_tree = RB_ROOT;
@@ -271,14 +615,6 @@ out:
 	return cache;
 }
 
-static inline void
-del_from_dirty_tree_and_list(struct object_cache_entry *entry,
-			     struct rb_root *dirty_tree)
-{
-	rb_erase(&entry->dirty_node, dirty_tree);
-	list_del(&entry->list);
-}
-
 /* Caller should hold the oc->lock */
 static inline void
 add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx,
@@ -289,6 +625,9 @@ add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx,
 	if (!entry)
 		panic("Can not find object entry %" PRIx32 "\n", idx);
 
+	if (cache_in_reclaim(0))
+		return;
+
 	/* If cache isn't in reclaiming, move it
 	 * to the head of lru list */
 	cds_list_del_rcu(&entry->lru_list);
@@ -312,15 +651,18 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx)
 	uint32_t data_length;
 
 	if (idx_has_vdi_bit(idx))
-		data_length = SD_INODE_SIZE / 1024;
+		data_length = SD_INODE_SIZE;
 	else
-		data_length = SD_DATA_OBJ_SIZE / 1024;
+		data_length = SD_DATA_OBJ_SIZE;
 
 	entry = xzalloc(sizeof(*entry));
 	entry->oc = oc;
 	entry->idx = idx;
 	CDS_INIT_LIST_HEAD(&entry->lru_list);
 
+	dprintf("cache object for vdi %" PRIx32 ", idx %08" PRIx32 "added\n",
+		oc->vid, idx);
+
 	pthread_rwlock_wrlock(&oc->lock);
 	old = object_cache_insert(&oc->object_tree, entry);
 	if (!old) {
@@ -331,20 +673,49 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx)
 		entry = old;
 	}
 	pthread_rwlock_unlock(&oc->lock);
+
+	if (sys->cache_size &&
+	    uatomic_read(&sys_cache.cache_size) > sys->cache_size &&
+	    !cache_in_reclaim(1)) {
+		struct work *work = xzalloc(sizeof(struct work));
+		work->fn = reclaim_work;
+		work->done = reclaim_done;
+		queue_work(sys->reclaim_wqueue, work);
+	}
+}
+
+static inline struct object_cache_entry *
+find_cache_entry(struct object_cache *oc, uint32_t idx)
+{
+	struct object_cache_entry *entry;
+
+	entry = object_tree_search(&oc->object_tree, idx);
+	if (!entry || entry->idx & CACHE_RECLAIM_BIT)
+		return NULL;
+
+	return entry;
 }
 
 static int object_cache_lookup(struct object_cache *oc, uint32_t idx,
 			       int create)
 {
 	struct strbuf buf;
-	int fd, ret = 0, flags = def_open_flags;
+	int fd, ret = SD_RES_SUCCESS, flags = def_open_flags;
+	unsigned data_length;
+
+	if (!create) {
+		pthread_rwlock_wrlock(&oc->lock);
+		if (!find_cache_entry(oc, idx))
+			ret = SD_RES_NO_CACHE;
+		pthread_rwlock_unlock(&oc->lock);
+		return ret;
+	}
 
 	strbuf_init(&buf, PATH_MAX);
 	strbuf_addstr(&buf, cache_dir);
 	strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx);
 
-	if (create)
-		flags |= O_CREAT | O_TRUNC;
+	flags |= O_CREAT | O_TRUNC;
 
 	fd = open(buf.buf, flags, def_fmode);
 	if (fd < 0) {
@@ -352,26 +723,22 @@ static int object_cache_lookup(struct object_cache *oc, uint32_t idx,
 		goto out;
 	}
 
-	if (create) {
-		unsigned data_length;
-
-		if (idx_has_vdi_bit(idx))
-			data_length = SD_INODE_SIZE;
-		else
-			data_length = SD_DATA_OBJ_SIZE;
+	if (idx_has_vdi_bit(idx))
+		data_length = SD_INODE_SIZE;
+	else
+		data_length = SD_DATA_OBJ_SIZE;
 
-		ret = prealloc(fd, data_length);
-		if (ret != SD_RES_SUCCESS)
-			ret = -1;
-		else {
-			uint64_t bmap = UINT64_MAX;
+	ret = prealloc(fd, data_length);
+	if (ret != SD_RES_SUCCESS)
+		ret = -1;
+	else {
+		uint64_t bmap = UINT64_MAX;
 
-			add_to_object_cache(oc, idx);
+		add_to_object_cache(oc, idx);
 
-			pthread_rwlock_wrlock(&oc->lock);
-			add_to_dirty_tree_and_list(oc, idx, bmap, 1);
-			pthread_rwlock_unlock(&oc->lock);
-		}
+		pthread_rwlock_wrlock(&oc->lock);
+		add_to_dirty_tree_and_list(oc, idx, bmap, 1);
+		pthread_rwlock_unlock(&oc->lock);
 	}
 	close(fd);
 out:
@@ -379,97 +746,6 @@ out:
 	return ret;
 }
 
-static int write_cache_object(uint32_t vid, uint32_t idx, void *buf,
-			      size_t count, off_t offset)
-{
-	size_t size;
-	int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
-	struct strbuf p;
-
-	strbuf_init(&p, PATH_MAX);
-	strbuf_addstr(&p, cache_dir);
-	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
-
-	if (sys->use_directio && !idx_has_vdi_bit(idx))
-		flags |= O_DIRECT;
-
-	fd = open(p.buf, flags, def_fmode);
-	if (fd < 0) {
-		eprintf("%m\n");
-		ret = SD_RES_EIO;
-		goto out;
-	}
-
-	if (flock(fd, LOCK_EX) < 0) {
-		ret = SD_RES_EIO;
-		eprintf("%m\n");
-		goto out_close;
-	}
-	size = xpwrite(fd, buf, count, offset);
-	if (flock(fd, LOCK_UN) < 0) {
-		ret = SD_RES_EIO;
-		eprintf("%m\n");
-		goto out_close;
-	}
-
-	if (size != count) {
-		eprintf("size %zu, count:%zu, offset %jd %m\n",
-			size, count, (intmax_t)offset);
-		ret = SD_RES_EIO;
-	}
-out_close:
-	close(fd);
-out:
-	strbuf_release(&p);
-	return ret;
-}
-
-static int read_cache_object(uint32_t vid, uint32_t idx, void *buf,
-			     size_t count, off_t offset)
-{
-	size_t size;
-	int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
-	struct strbuf p;
-
-	strbuf_init(&p, PATH_MAX);
-	strbuf_addstr(&p, cache_dir);
-	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
-
-	if (sys->use_directio && !idx_has_vdi_bit(idx))
-		flags |= O_DIRECT;
-
-	fd = open(p.buf, flags, def_fmode);
-	if (fd < 0) {
-		eprintf("%m\n");
-		ret = SD_RES_EIO;
-		goto out;
-	}
-
-	if (flock(fd, LOCK_SH) < 0) {
-		ret = SD_RES_EIO;
-		eprintf("%m\n");
-		goto out_close;
-	}
-	size = xpread(fd, buf, count, offset);
-	if (flock(fd, LOCK_UN) < 0) {
-		ret = SD_RES_EIO;
-		eprintf("%m\n");
-		goto out_close;
-	}
-
-	if (size != count) {
-		eprintf("size %zu, count:%zu, offset %jd %m\n",
-			size, count, (intmax_t)offset);
-		ret = SD_RES_EIO;
-	}
-
-out_close:
-	close(fd);
-out:
-	strbuf_release(&p);
-	return ret;
-}
-
 static int create_cache_object(struct object_cache *oc, uint32_t idx,
 			       void *buffer, size_t buf_size)
 {
@@ -531,7 +807,7 @@ static int object_cache_pull(struct object_cache *oc, uint32_t idx)
 		oid = vid_to_vdi_oid(oc->vid);
 		data_length = SD_INODE_SIZE;
 	} else {
-		oid = vid_to_data_oid(oc->vid, idx);
+		oid = cache_vid_to_data_oid(oc->vid, idx);
 		data_length = SD_DATA_OBJ_SIZE;
 	}
 
@@ -558,75 +834,6 @@ out:
 	return ret;
 }
 
-static uint64_t idx_to_oid(uint32_t vid, uint32_t idx)
-{
-	if (idx_has_vdi_bit(idx))
-		return vid_to_vdi_oid(vid);
-	else
-		return vid_to_data_oid(vid, idx);
-}
-
-static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap,
-			     int create)
-{
-	struct sd_req hdr;
-	void *buf;
-	off_t offset;
-	unsigned data_length;
-	int ret = SD_RES_NO_MEM;
-	uint64_t oid = idx_to_oid(vid, idx);
-	int first_bit, last_bit;
-
-	dprintf("%"PRIx64", create %d\n", oid, create);
-
-	if (!bmap) {
-		dprintf("WARN: nothing to flush\n");
-		return SD_RES_SUCCESS;
-	}
-
-	first_bit = ffsll(bmap) - 1;
-	last_bit = fls64(bmap) - 1;
-
-	dprintf("bmap:0x%"PRIx64", first_bit:%d, last_bit:%d\n",
-		bmap, first_bit, last_bit);
-	offset = first_bit * CACHE_BLOCK_SIZE;
-	data_length = (last_bit - first_bit + 1) * CACHE_BLOCK_SIZE;
-
-	/*
-	 * CACHE_BLOCK_SIZE may not be divisible by SD_INODE_SIZE,
-	 * so (offset + data_length) could larger than SD_INODE_SIZE
-	 */
-	if (is_vdi_obj(oid) && (offset + data_length) > SD_INODE_SIZE)
-		data_length = SD_INODE_SIZE - offset;
-
-	buf = valloc(data_length);
-	if (buf == NULL) {
-		eprintf("failed to allocate memory\n");
-		goto out;
-	}
-
-	ret = read_cache_object(vid, idx, buf, data_length, offset);
-	if (ret != SD_RES_SUCCESS)
-		goto out;
-
-	if (create)
-		sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
-	else
-		sd_init_req(&hdr, SD_OP_WRITE_OBJ);
-	hdr.flags = SD_FLAG_CMD_WRITE;
-	hdr.data_length = data_length;
-	hdr.obj.oid = oid;
-	hdr.obj.offset = offset;
-
-	ret = exec_local_req(&hdr, buf);
-	if (ret != SD_RES_SUCCESS)
-		eprintf("failed to push object %x\n", ret);
-
-out:
-	free(buf);
-	return ret;
-}
-
 /* Push back all the dirty objects to sheep cluster storage */
 static int object_cache_push(struct object_cache *oc)
 {
@@ -640,27 +847,33 @@ static int object_cache_push(struct object_cache *oc)
 		return SD_RES_SUCCESS;
 
 	pthread_rwlock_wrlock(&oc->lock);
+	oc->in_flush = 1;
 	list_splice_init(&oc->dirty_list, &inactive_dirty_list);
 	pthread_rwlock_unlock(&oc->lock);
 
 	list_for_each_entry_safe(entry, t, &inactive_dirty_list, list) {
-		pthread_rwlock_rdlock(&oc->lock);
+		pthread_rwlock_wrlock(&oc->lock);
 		bmap = entry->bmap;
-		create = entry->flags & ENTRY_CREATE_BIT;
+		create = entry->idx & CACHE_CREATE_BIT;
+		entry->bmap = 0;
+		del_from_dirty_tree_and_list(entry, &oc->dirty_tree);
 		pthread_rwlock_unlock(&oc->lock);
 
 		ret = push_cache_object(oc->vid, entry->idx, bmap, create);
-		if (ret != SD_RES_SUCCESS)
-			goto push_failed;
 
 		pthread_rwlock_wrlock(&oc->lock);
-		del_from_dirty_tree_and_list(entry, &oc->dirty_tree);
+		if (ret != SD_RES_SUCCESS) {
+			entry->bmap |= bmap;
+			goto push_failed;
+		}
+		entry->idx &= ~CACHE_CREATE_BIT;
 		pthread_rwlock_unlock(&oc->lock);
 	}
+	oc->in_flush = 0;
 	return ret;
 
 push_failed:
-	pthread_rwlock_wrlock(&oc->lock);
+	oc->in_flush = 0;
 	list_splice_init(&inactive_dirty_list, &oc->dirty_list);
 	pthread_rwlock_unlock(&oc->lock);
 
@@ -677,10 +890,10 @@ int object_is_cached(uint64_t oid)
 	if (!cache)
 		return 0;
 
-	if (object_cache_lookup(cache, idx, 0) < 0)
-		return 0;
+	if (object_cache_lookup(cache, idx, 0) == SD_RES_SUCCESS)
+		return 1;
 	else
-		return 1; /* found it */
+		return 0;
 }
 
 void object_cache_delete(uint32_t vid)
@@ -712,6 +925,30 @@ void object_cache_delete(uint32_t vid)
 
 }
 
+static struct object_cache_entry *
+get_cache_entry(struct object_cache *cache, uint32_t idx)
+{
+	struct object_cache_entry *entry;
+
+	pthread_rwlock_rdlock(&cache->lock);
+	entry = find_cache_entry(cache, idx);
+	if (!entry) {
+		/* The cache entry may be reclaimed, so try again. */
+		pthread_rwlock_unlock(&cache->lock);
+		return NULL;
+	}
+
+	uatomic_inc(&entry->refcnt);
+	pthread_rwlock_unlock(&cache->lock);
+
+	return entry;
+}
+
+static void put_cache_entry(struct object_cache_entry *entry)
+{
+	uatomic_dec(&entry->refcnt);
+}
+
 static int object_cache_flush_and_delete(struct object_cache *oc)
 {
 	DIR *dir;
@@ -750,6 +987,7 @@ static int object_cache_flush_and_delete(struct object_cache *oc)
 	}
 
 	object_cache_delete(vid);
+
 out:
 	strbuf_release(&p);
 	return ret;
@@ -773,10 +1011,10 @@ int bypass_object_cache(struct request *req)
 			/* For read requet, we can read cache if any */
 			uint32_t idx = object_cache_oid_to_idx(oid);
 
-			if (object_cache_lookup(cache, idx, 0) < 0)
-				return 1;
-			else
+			if (object_cache_lookup(cache, idx, 0) == 0)
 				return 0;
+			else
+				return 1;
 		}
 	}
 
@@ -807,37 +1045,43 @@ int object_cache_handle_request(struct request *req)
 	if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ)
 		create = 1;
 
-	if (object_cache_lookup(cache, idx, create) < 0) {
+retry:
+	ret = object_cache_lookup(cache, idx, create);
+	if (ret == SD_RES_NO_CACHE) {
 		ret = object_cache_pull(cache, idx);
 		if (ret != SD_RES_SUCCESS)
 			return ret;
 	}
 
-	pthread_rwlock_rdlock(&cache->lock);
-	entry = object_tree_search(&cache->object_tree, idx);
-	pthread_rwlock_unlock(&cache->lock);
+	entry = get_cache_entry(cache, idx);
+	if (!entry) {
+		dprintf("cache entry %"PRIx32"/%"PRIx32" may be reclaimed\n",
+			vid, idx);
+		goto retry;
+	}
 
 	if (hdr->flags & SD_FLAG_CMD_WRITE) {
 		ret = write_cache_object(cache->vid, idx, req->data,
 					 hdr->data_length, hdr->obj.offset);
 		if (ret != SD_RES_SUCCESS)
-			goto out;
+			goto err;
 		update_cache_entry(cache, idx, hdr->data_length,
 				   hdr->obj.offset);
 	} else {
 		ret = read_cache_object(cache->vid, idx, req->data,
 					hdr->data_length, hdr->obj.offset);
 		if (ret != SD_RES_SUCCESS)
-			goto out;
+			goto err;
 		req->rp.data_length = hdr->data_length;
 
-		if (entry) {
+		if (entry && !cache_in_reclaim(0)) {
 			cds_list_del_rcu(&entry->lru_list);
 			cds_list_add_rcu(&entry->lru_list,
 					 &sys_cache.cache_lru_list);
 		}
 	}
-out:
+err:
+	put_cache_entry(entry);
 	return ret;
 }
 
@@ -847,13 +1091,23 @@ int object_cache_write(uint64_t oid, char *data, unsigned int datalen,
 	uint32_t vid = oid_to_vid(oid);
 	uint32_t idx = object_cache_oid_to_idx(oid);
 	struct object_cache *cache;
+	struct object_cache_entry *entry;
 	int ret;
 
 	cache = find_object_cache(vid, 0);
 
+	dprintf("cache object write %" PRIx32 "\n", idx);
+
+	entry = get_cache_entry(cache, idx);
+	if (!entry)
+		panic("cache object %" PRIx32 " doesn't exist\n", idx);
+
 	ret = write_cache_object(vid, idx, data, datalen, offset);
 	if (ret == SD_RES_SUCCESS)
 		update_cache_entry(cache, idx, datalen, offset);
+
+	put_cache_entry(entry);
+
 	return ret;
 }
 
@@ -862,8 +1116,23 @@ int object_cache_read(uint64_t oid, char *data, unsigned int datalen,
 {
 	uint32_t vid = oid_to_vid(oid);
 	uint32_t idx = object_cache_oid_to_idx(oid);
+	struct object_cache *cache;
+	struct object_cache_entry *entry;
+	int ret;
+
+	cache = find_object_cache(vid, 0);
+
+	dprintf("cache object read %" PRIx32 "\n", idx);
+
+	entry = get_cache_entry(cache, idx);
+	if (!entry)
+		panic("cache object %" PRIx32 " doesn't exist\n", idx);
+
+	ret = read_cache_object(vid, idx, data, datalen, offset);
 
-	return read_cache_object(vid, idx, data, datalen, offset);
+	put_cache_entry(entry);
+
+	return ret;
 }
 
 int object_cache_flush_vdi(struct request *req)
@@ -884,8 +1153,10 @@ int object_cache_flush_and_del(struct request *req)
 	struct object_cache *cache;
 
 	cache = find_object_cache(vid, 0);
+
 	if (cache && object_cache_flush_and_delete(cache) < 0)
 		return SD_RES_EIO;
+
 	return SD_RES_SUCCESS;
 }
 
@@ -999,6 +1270,7 @@ int object_cache_init(const char *p)
 
 	CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list);
 	uatomic_set(&sys_cache.cache_size, 0);
+	uatomic_set(&sys_cache.reclaiming, 0);
 
 	ret = load_existing_cache();
 err:
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 9f371ba..d17884d 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -356,8 +356,9 @@ int main(int argc, char **argv)
 	sys->deletion_wqueue = init_work_queue("deletion", true);
 	sys->block_wqueue = init_work_queue("block", true);
 	sys->sockfd_wqueue = init_work_queue("sockfd", true);
+	sys->reclaim_wqueue = init_work_queue("reclaim", true);
 	if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
-	    !sys->deletion_wqueue || !sys->block_wqueue)
+	    !sys->deletion_wqueue || !sys->block_wqueue || !sys->reclaim_wqueue)
 		exit(1);
 
 	ret = trace_init();
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index fe61411..2090d67 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -125,6 +125,7 @@ struct cluster_info {
 	struct work_queue *recovery_wqueue;
 	struct work_queue *block_wqueue;
 	struct work_queue *sockfd_wqueue;
+	struct work_queue *reclaim_wqueue;
 };
 
 struct siocb {
diff --git a/sheep/store.c b/sheep/store.c
index a05822d..4a65c9e 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -492,9 +492,13 @@ int write_object(uint64_t oid, char *data, unsigned int datalen,
 	struct sd_req hdr;
 	int ret;
 
+retry:
 	if (sys->enable_write_cache && object_is_cached(oid)) {
 		ret = object_cache_write(oid, data, datalen, offset,
 					 flags, create);
+		if (ret == SD_RES_NO_CACHE)
+			goto retry;
+
 		if (ret != 0) {
 			eprintf("write cache failed %"PRIx64" %"PRIx32"\n",
 				oid, ret);
@@ -529,8 +533,12 @@ int read_object(uint64_t oid, char *data, unsigned int datalen,
 	struct sd_req hdr;
 	int ret;
 
+retry:
 	if (sys->enable_write_cache && object_is_cached(oid)) {
 		ret = object_cache_read(oid, data, datalen, offset);
+		if (ret == SD_RES_NO_CACHE)
+			goto retry;
+
 		if (ret != SD_RES_SUCCESS) {
 			eprintf("try forward read %"PRIx64" %"PRIx32"\n",
 				oid, ret);
-- 
1.7.1




More information about the sheepdog mailing list