[sheepdog] [PATCH 6/8] object cache: reclaim cached objects when cache reaches the max size

levin li levin108 at gmail.com
Mon Jul 9 08:29:20 CEST 2012


From: levin li <xingke.lwp at taobao.com>

This patch do reclaiming work when the total size of cached objects
reaches the max size specified by user, I did it in the following way:

1. check the object tree for the object entry to determine whether the
   cache entry is exist and whether it's reclaiming, if it's reclaiming
   we make sheep ingore the cache.
2. In object_cache_rw() we search the cache entry, after passed the sanity
   check, we increment its refcnt to tell the reclaiming worker that this
   entry is being referenced, we should not reclaim it now.
3. In add_to_object_cache(), when the cached size reaches the max size,
   we start a reclaiming thread, only one such thread can be running at
   one time.
4. In reclaim_work(), we reclaim cached objects until the cache size reduced
   to 80% of the max size.
5. In reclaim_object(), we start to reclaim an object, before this, we check
   that if the cache is flushing, we don't reclaim it, and if the refcnt of
   the object is not zero, we also don't reclaim it.
   If the cached object is dirty, we flush it by push_cache_object(), and
   then try to remove the object.

Signed-off-by: levin li <xingke.lwp at taobao.com>
---
 include/sheepdog_proto.h |    2 +
 sheep/gateway.c          |   20 ++-
 sheep/object_cache.c     |  373 +++++++++++++++++++++++++++++++++++++++++-----
 sheep/sdnet.c            |    2 +-
 sheep/sheep.c            |    3 +-
 sheep/sheep_priv.h       |    3 +-
 sheep/store.c            |   12 ++-
 7 files changed, 368 insertions(+), 47 deletions(-)

diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index e20254e..d7f66ee 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -66,6 +66,8 @@
 #define SD_RES_NO_SUPPORT       0x21 /* Operation is not supported by backend store */
 #define SD_RES_CLUSTER_RECOVERING 0x22 /* Cluster is recovering. */
 #define SD_RES_OBJ_RECOVERING     0x23 /* Object is recovering */
+#define SD_RES_NO_CACHE      0x24 /* No cache object found */
+#define SD_RES_CACHE_RECLAIMING 0x25 /* Object cache is being reclaiming */
 
 /*
  * Object ID rules
diff --git a/sheep/gateway.c b/sheep/gateway.c
index 7324b85..502da0c 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -31,8 +31,14 @@ int gateway_read_obj(struct request *req)
 	uint64_t oid = req->rq.obj.oid;
 	int nr_copies, j;
 
-	if (sys->enable_write_cache && !req->local && !bypass_object_cache(req))
-		return object_cache_handle_request(req);
+retry:
+	if (sys->enable_write_cache && !req->local &&
+	    !bypass_object_cache(req)) {
+		ret = object_cache_handle_request(req);
+		if (ret == SD_RES_NO_CACHE || ret == SD_RES_CACHE_RECLAIMING)
+			goto retry;
+		return ret;
+	}
 
 	nr_copies = get_nr_copies(req->vnodes);
 	oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
@@ -229,8 +235,14 @@ static int do_gateway_write_obj(struct request *req, bool create)
 
 	dprintf("%"PRIx64"\n", oid);
 
-	if (sys->enable_write_cache && !req->local && !bypass_object_cache(req))
-		return object_cache_handle_request(req);
+retry:
+	if (sys->enable_write_cache && !req->local &&
+	    !bypass_object_cache(req)) {
+		ret = object_cache_handle_request(req);
+		if (ret == SD_RES_NO_CACHE || ret == SD_RES_CACHE_RECLAIMING)
+			goto retry;
+		return ret;
+	}
 
 	write_info_init(&wi);
 	memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index 17ff190..cc39a90 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -40,6 +40,21 @@
 #define CACHE_VDI_BIT         (UINT32_C(1) << CACHE_VDI_SHIFT)
 #define CACHE_BLOCK_SIZE      ((UINT64_C(1) << 10) * 64) /* 64 KB */
 
+#define ENTRY_RECLAIM_BIT     1
+#define ENTRY_DIRTY_BIT       (1 << 1)
+
+#define SD_RES_CACHE_FLUSHING    1
+#define SD_RES_CACHE_REFERENCING 2
+
+#define list_for_each_entry_revert_safe_rcu(pos, n, head, member) \
+	for (pos = cds_list_entry(rcu_dereference((head)->prev), \
+				  typeof(*pos), member), \
+	     n = cds_list_entry(rcu_dereference(pos->member.prev),\
+				typeof(*pos), member); \
+				&pos->member != (head); \
+	     pos = n, n = cds_list_entry(rcu_dereference(pos->member.prev),\
+					 typeof(*pos), member))
+
 struct global_cache {
 	uint64_t cache_size;
 	int reclaiming;
@@ -57,6 +72,7 @@ struct object_cache_entry {
 
 struct object_cache {
 	uint32_t vid;
+	int flushing;
 	struct hlist_node hash;
 
 	struct list_head dirty_lists[2];
@@ -93,6 +109,56 @@ static pthread_mutex_t hashtable_lock[HASH_SIZE] = {
 
 static struct hlist_head cache_hashtable[HASH_SIZE];
 
+static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap,
+			     int create);
+
+static inline int cache_is_reclaiming(void)
+{
+	return uatomic_read(&sys_cache.reclaiming);
+}
+
+static inline int cache_is_flushing(struct object_cache *cache)
+{
+	return cache->flushing;
+}
+
+static inline int entry_is_reclaiming(struct object_cache_entry *entry)
+{
+	int flags = uatomic_read(&entry->flags);
+	if (flags & ENTRY_RECLAIM_BIT)
+		return 1;
+	return 0;
+}
+
+static inline int entry_is_dirty(struct object_cache_entry *entry)
+{
+	int flags = uatomic_read(&entry->flags);
+	if (flags & ENTRY_DIRTY_BIT)
+		return 1;
+	return 0;
+}
+
+static inline void entry_start_reclaiming(struct object_cache_entry *entry)
+{
+	int flags = uatomic_read(&entry->flags);
+	flags |= ENTRY_RECLAIM_BIT;
+	uatomic_set(&entry->flags, flags);
+}
+
+static inline void entry_set_dirty(struct object_cache_entry *entry)
+{
+	int flags = uatomic_read(&entry->flags);
+	flags |= ENTRY_DIRTY_BIT;
+	uatomic_set(&entry->flags, flags);
+}
+
+static inline void entry_clr_dirty(struct object_cache_entry *entry)
+{
+	int flags = uatomic_read(&entry->flags);
+	flags &= ~ENTRY_DIRTY_BIT;
+	uatomic_set(&entry->flags, flags);
+}
+
 static inline int hash(uint64_t vid)
 {
 	return hash_64(vid, HASH_BITS);
@@ -256,6 +322,7 @@ not_found:
 	if (create) {
 		cache = xzalloc(sizeof(*cache));
 		cache->vid = vid;
+		cache->object_tree = RB_ROOT;
 		create_dir_for(vid);
 
 		cache->dirty_trees[0] = RB_ROOT;
@@ -283,6 +350,14 @@ del_from_dirty_tree_and_list(struct dirty_cache_entry *entry,
 	list_del(&entry->list);
 }
 
+static inline void
+del_from_object_tree_and_list(struct object_cache_entry *entry,
+			      struct rb_root *object_tree)
+{
+	rb_erase(&entry->node, object_tree);
+	cds_list_del_rcu(&entry->lru_list);
+}
+
 static void switch_dirty_tree_and_list(struct object_cache *oc,
 				       struct rb_root **inactive_dirty_tree,
 				       struct list_head **inactive_dirty_list)
@@ -313,7 +388,7 @@ add_to_dirty_tree_and_list(struct object_cache *oc,
 	} else {
 		free(dirty_entry);
 
-		if (!merge) {
+		if (!merge && !cache_is_reclaiming()) {
 			struct object_cache_entry *entry;
 
 			entry = dirty_entry->sys_entry;
@@ -356,6 +431,152 @@ alloc_cache_entry(struct object_cache_entry *oc_entry, uint32_t idx,
 	return entry;
 }
 
+static int remove_cache_object(struct object_cache *oc, uint32_t idx)
+{
+	struct strbuf buf;
+	int ret = SD_RES_SUCCESS;
+
+	strbuf_init(&buf, PATH_MAX);
+	strbuf_addstr(&buf, cache_dir);
+	strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx);
+
+	dprintf("removing cache object %s\n", buf.buf);
+	if (unlink(buf.buf) < 0) {
+		ret = SD_RES_EIO;
+		eprintf("failed to remove cached object %m\n");
+		goto out;
+	}
+out:
+	strbuf_release(&buf);
+
+	return ret;
+}
+
+static int check_cache_status(struct object_cache *oc,
+			      struct object_cache_entry *entry)
+{
+	int refcnt = uatomic_read(&entry->refcnt);
+
+	if (cache_is_flushing(oc)) {
+		dprintf("cache %" PRIx32 " is flushing, don't reclaim it.\n",
+			oc->vid);
+		return SD_RES_CACHE_FLUSHING;
+	}
+
+	/* If entry is being accessed, we don't reclaim it */
+	if (refcnt > 0) {
+		dprintf("cache object %" PRIx32 "(%08" PRIx32 ") "
+			"can't be reclaimed, refcnt: %d\n",
+			oc->vid, entry->idx, refcnt);
+		return SD_RES_CACHE_REFERENCING;
+	}
+
+	return SD_RES_SUCCESS;
+}
+
+static int reclaim_object(struct object_cache *oc,
+			  struct object_cache_entry *entry)
+{
+	struct dirty_cache_entry *dirty_entry;
+	uint32_t idx = entry->idx;
+	int ret = SD_RES_SUCCESS;
+
+	pthread_mutex_lock(&oc->lock);
+	dprintf("reclaiming /%06"PRIx32"/%08"PRIx32", cache_size: %ld\n",
+		oc->vid, idx, uatomic_read(&sys_cache.cache_size));
+
+	ret = check_cache_status(oc, entry);
+	if (ret != SD_RES_SUCCESS)
+		goto out;
+
+	/* Now we're sure the inactive_dirty_tree is empty, because
+	 * cache of the VDI isn't flushing. */
+	dirty_entry = dirty_tree_search(oc->active_dirty_tree, idx);
+	if (!dirty_entry && entry_is_dirty(entry))
+		eprintf("bug\n");
+
+	if (dirty_entry) {
+		del_from_dirty_tree_and_list(dirty_entry,
+					     oc->active_dirty_tree);
+		entry_clr_dirty(entry);
+		pthread_mutex_unlock(&oc->lock);
+
+		ret = push_cache_object(oc->vid, dirty_entry->idx,
+					dirty_entry->bmap, dirty_entry->create);
+
+		pthread_mutex_lock(&oc->lock);
+		free(dirty_entry);
+		if (ret != SD_RES_SUCCESS) {
+			/* Rollback to the dirty state. */
+			entry_set_dirty(entry);
+			goto out;
+		}
+
+		/* Now we get lock again, check cache status again */
+		ret = check_cache_status(oc, entry);
+		if (ret != SD_RES_SUCCESS)
+			goto out;
+	}
+
+	/* If entry is still dirty, we don't reclaim it */
+	if (entry_is_dirty(entry)) {
+		dprintf("%08" PRIx32 " can not be reclaimed, is dirty.\n",
+			entry->idx);
+		ret = SD_RES_CACHE_REFERENCING;
+		goto out;
+	}
+
+	/* Mark the entry as being reclaimed */
+	entry_start_reclaiming(entry);
+
+	ret = remove_cache_object(oc, idx);
+	if (ret == SD_RES_SUCCESS)
+		del_from_object_tree_and_list(entry, &oc->object_tree);
+out:
+	pthread_mutex_unlock(&oc->lock);
+	return ret;
+}
+
+static void reclaim_work(struct work *work)
+{
+	struct object_cache_entry *entry, *n;
+	int ret;
+
+	if (node_in_recovery())
+		return;
+
+	list_for_each_entry_revert_safe_rcu(entry, n,
+		       &sys_cache.cache_lru_list, lru_list) {
+		struct object_cache *oc = entry->oc;
+
+		/* Reclaim cache to 80% of max size */
+		if (uatomic_read(&sys_cache.cache_size) <=
+		    sys->cache_size * 8 / 10)
+			break;
+
+		ret = reclaim_object(oc, entry);
+		if (ret == SD_RES_SUCCESS) {
+			unsigned data_length;
+
+			if (idx_has_vdi_bit(entry->idx))
+				data_length = SD_INODE_SIZE / 1024;
+			else
+				data_length = SD_DATA_OBJ_SIZE / 1024;
+
+			uatomic_sub(&sys_cache.cache_size, data_length);
+			free(entry);
+		} else if (ret == SD_RES_CACHE_FLUSHING)
+			/* If cache is flushing, stop reclaiming. */
+			break;
+	}
+}
+
+static void reclaim_done(struct work *work)
+{
+	uatomic_set(&sys_cache.reclaiming, 0);
+	free(work);
+}
+
 static struct object_cache_entry *
 add_to_object_cache(struct object_cache *oc, uint32_t idx)
 {
@@ -372,6 +593,9 @@ add_to_object_cache(struct object_cache *oc, uint32_t idx)
 	entry->idx = idx;
 	CDS_INIT_LIST_HEAD(&entry->lru_list);
 
+	dprintf("cache object for vdi %" PRIx32 ", idx %08" PRIx32 "added\n",
+		oc->vid, idx);
+
 	pthread_mutex_lock(&oc->lock);
 	old = object_cache_insert(&oc->object_tree, entry);
 	if (!old) {
@@ -383,51 +607,83 @@ add_to_object_cache(struct object_cache *oc, uint32_t idx)
 	}
 	pthread_mutex_unlock(&oc->lock);
 
+	if (sys->cache_size &&
+	    uatomic_read(&sys_cache.cache_size) > sys->cache_size &&
+	    !cache_is_reclaiming()) {
+		struct work *work = zalloc(sizeof(struct work));
+		uatomic_set(&sys_cache.reclaiming, 1);
+		work->fn = reclaim_work;
+		work->done = reclaim_done;
+		queue_work(sys->reclaim_wqueue, work);
+	}
+
 	return entry;
 }
 
+static inline int cache_sanity_check(struct object_cache *oc, uint32_t idx,
+				     struct object_cache_entry **entry)
+{
+	struct object_cache_entry *ent;
+
+	ent = object_tree_search(&oc->object_tree, idx);
+
+	if (!ent)
+		return SD_RES_NO_CACHE;
+
+	if (entry_is_reclaiming(ent))
+		return SD_RES_CACHE_RECLAIMING;
+
+	if (entry)
+		*entry = ent;
+
+	return SD_RES_SUCCESS;
+}
+
 static int object_cache_lookup(struct object_cache *oc, uint32_t idx,
 			       int create)
 {
 	struct strbuf buf;
-	int fd, ret = 0, flags = def_open_flags;
+	int fd, ret = SD_RES_SUCCESS, flags = def_open_flags;
 	struct object_cache_entry *entry = NULL;
 	struct dirty_cache_entry *dirty_entry;
+	unsigned data_length;
+
+	if (!create) {
+		pthread_mutex_lock(&oc->lock);
+		ret = cache_sanity_check(oc, idx, NULL);
+		pthread_mutex_unlock(&oc->lock);
+		return ret;
+	}
 
 	strbuf_init(&buf, PATH_MAX);
 	strbuf_addstr(&buf, cache_dir);
 	strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx);
 
-	if (create)
-		flags |= O_CREAT | O_TRUNC;
+	flags |= O_CREAT | O_TRUNC;
 
 	fd = open(buf.buf, flags, def_fmode);
 	if (fd < 0) {
-		ret = -1;
+		ret = SD_RES_NO_CACHE;
 		goto out;
 	}
 
-	if (create) {
-		unsigned data_length;
-
-		if (idx_has_vdi_bit(idx))
-			data_length = SD_INODE_SIZE;
-		else
-			data_length = SD_DATA_OBJ_SIZE;
+	if (idx_has_vdi_bit(idx))
+		data_length = SD_INODE_SIZE;
+	else
+		data_length = SD_DATA_OBJ_SIZE;
 
-		ret = prealloc(fd, data_length);
-		if (ret != SD_RES_SUCCESS)
-			ret = -1;
-		else {
-			uint64_t bmap = UINT64_MAX;
+	ret = prealloc(fd, data_length);
+	if (ret != SD_RES_SUCCESS)
+		ret = -1;
+	else {
+		uint64_t bmap = UINT64_MAX;
 
-			entry = add_to_object_cache(oc, idx);
+		entry = add_to_object_cache(oc, idx);
 
-			dirty_entry = alloc_cache_entry(entry, idx, bmap, 1);
-			pthread_mutex_lock(&oc->lock);
-			add_to_dirty_tree_and_list(oc, dirty_entry, 0);
-			pthread_mutex_unlock(&oc->lock);
-		}
+		dirty_entry = alloc_cache_entry(entry, idx, bmap, 1);
+		pthread_mutex_lock(&oc->lock);
+		add_to_dirty_tree_and_list(oc, dirty_entry, 0);
+		pthread_mutex_unlock(&oc->lock);
 	}
 	close(fd);
 out:
@@ -531,14 +787,21 @@ static int object_cache_rw(struct object_cache *oc, uint32_t idx,
 {
 	struct sd_req *hdr = &req->rq;
 	uint64_t bmap = 0;
-	struct object_cache_entry *entry;
+	struct object_cache_entry *entry = NULL;
 	int ret;
 
 	dprintf("%08"PRIx32", len %"PRIu32", off %"PRIu64"\n", idx,
 		hdr->data_length, hdr->obj.offset);
 
 	pthread_mutex_lock(&oc->lock);
-	entry = object_tree_search(&oc->object_tree, idx);
+	ret = cache_sanity_check(oc, idx, &entry);
+	if (ret != SD_RES_SUCCESS) {
+		ret = SD_RES_NO_CACHE;
+		pthread_mutex_unlock(&oc->lock);
+		goto out;
+	}
+
+	uatomic_inc(&entry->refcnt);
 	pthread_mutex_unlock(&oc->lock);
 
 	if (hdr->flags & SD_FLAG_CMD_WRITE) {
@@ -547,25 +810,29 @@ static int object_cache_rw(struct object_cache *oc, uint32_t idx,
 		ret = write_cache_object(oc->vid, idx, req->data,
 					 hdr->data_length, hdr->obj.offset);
 		if (ret != SD_RES_SUCCESS)
-			goto out;
+			goto err;
 		bmap = calc_object_bmap(hdr->data_length, hdr->obj.offset);
 		dirty_entry = alloc_cache_entry(entry, idx, bmap, 0);
 		pthread_mutex_lock(&oc->lock);
 		add_to_dirty_tree_and_list(oc, dirty_entry, 0);
+		entry_set_dirty(entry);
 		pthread_mutex_unlock(&oc->lock);
 	} else {
 		ret = read_cache_object(oc->vid, idx, req->data,
 					hdr->data_length, hdr->obj.offset);
 		if (ret != SD_RES_SUCCESS)
-			goto out;
+			goto err;
 		req->rp.data_length = hdr->data_length;
 
-		if (entry) {
+		if (entry && !cache_is_reclaiming()) {
 			cds_list_del_rcu(&entry->lru_list);
 			cds_list_add_rcu(&entry->lru_list,
 					 &sys_cache.cache_lru_list);
 		}
 	}
+
+err:
+	uatomic_dec(&entry->refcnt);
 out:
 	return ret;
 }
@@ -750,6 +1017,7 @@ static int object_cache_push(struct object_cache *oc)
 		if (ret != SD_RES_SUCCESS)
 			goto push_failed;
 		del_from_dirty_tree_and_list(entry, inactive_dirty_tree);
+		entry_clr_dirty(entry->sys_entry);
 		free(entry);
 	}
 	return ret;
@@ -759,7 +1027,7 @@ push_failed:
 	return ret;
 }
 
-int object_is_cached(uint64_t oid)
+int object_is_cached(uint64_t oid, int inc_ref)
 {
 	uint32_t vid = oid_to_vid(oid);
 	uint32_t idx = object_cache_oid_to_idx(oid);
@@ -769,10 +1037,10 @@ int object_is_cached(uint64_t oid)
 	if (!cache)
 		return 0;
 
-	if (object_cache_lookup(cache, idx, 0) < 0)
-		return 0;
+	if (object_cache_lookup(cache, idx, 0) == SD_RES_SUCCESS)
+		return 1;
 	else
-		return 1; /* found it */
+		return 0;
 }
 
 void object_cache_delete(uint32_t vid)
@@ -804,6 +1072,20 @@ void object_cache_delete(uint32_t vid)
 
 }
 
+static void object_cache_flush_begin(struct object_cache *oc)
+{
+	pthread_mutex_lock(&oc->lock);
+	oc->flushing = 1;
+	pthread_mutex_unlock(&oc->lock);
+}
+
+static void object_cache_flush_end(struct object_cache *oc)
+{
+	pthread_mutex_lock(&oc->lock);
+	oc->flushing = 0;
+	pthread_mutex_unlock(&oc->lock);
+}
+
 static int object_cache_flush_and_delete(struct object_cache *oc)
 {
 	DIR *dir;
@@ -814,6 +1096,8 @@ static int object_cache_flush_and_delete(struct object_cache *oc)
 	struct strbuf p;
 	int ret = 0;
 
+	object_cache_flush_begin(oc);
+
 	strbuf_init(&p, PATH_MAX);
 	strbuf_addstr(&p, cache_dir);
 	strbuf_addf(&p, "/%06"PRIx32, vid);
@@ -842,6 +1126,8 @@ static int object_cache_flush_and_delete(struct object_cache *oc)
 	}
 
 	object_cache_delete(vid);
+
+	object_cache_flush_end(oc);
 out:
 	strbuf_release(&p);
 	return ret;
@@ -865,10 +1151,10 @@ int bypass_object_cache(struct request *req)
 			/* For read requet, we can read cache if any */
 			uint32_t idx = object_cache_oid_to_idx(oid);
 
-			if (object_cache_lookup(cache, idx, 0) < 0)
-				return 1;
-			else
+			if (object_cache_lookup(cache, idx, 0) == 0)
 				return 0;
+			else
+				return 1;
 		}
 	}
 
@@ -894,11 +1180,14 @@ int object_cache_handle_request(struct request *req)
 	if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ)
 		create = 1;
 
-	if (object_cache_lookup(cache, idx, create) < 0) {
+	ret = object_cache_lookup(cache, idx, create);
+	if (ret == SD_RES_NO_CACHE) {
 		ret = object_cache_pull(cache, idx);
 		if (ret != SD_RES_SUCCESS)
 			return ret;
-	}
+	} else if (ret == SD_RES_CACHE_RECLAIMING)
+		return ret;
+
 	return object_cache_rw(cache, idx, req);
 }
 
@@ -971,12 +1260,17 @@ int object_cache_flush_vdi(struct request *req)
 {
 	uint32_t vid = oid_to_vid(req->rq.obj.oid);
 	struct object_cache *cache;
+	int ret;
 
 	cache = find_object_cache(vid, 0);
 	if (!cache)
 		return SD_RES_SUCCESS;
 
-	return object_cache_push(cache);
+	object_cache_flush_begin(cache);
+	ret = object_cache_push(cache);
+	object_cache_flush_end(cache);
+
+	return ret;
 }
 
 int object_cache_flush_and_del(struct request *req)
@@ -985,8 +1279,10 @@ int object_cache_flush_and_del(struct request *req)
 	struct object_cache *cache;
 
 	cache = find_object_cache(vid, 0);
+
 	if (cache && object_cache_flush_and_delete(cache) < 0)
 		return SD_RES_EIO;
+
 	return SD_RES_SUCCESS;
 }
 
@@ -1140,6 +1436,7 @@ int object_cache_init(const char *p)
 
 	CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list);
 	uatomic_set(&sys_cache.cache_size, 0);
+	uatomic_set(&sys_cache.reclaiming, 0);
 
 	ret = load_existing_cache();
 err:
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index c13cdb0..d30d647 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -271,7 +271,7 @@ static void queue_gateway_request(struct request *req)
 	 */
 	if (sys->enable_write_cache &&
 	    req->rq.flags & SD_FLAG_CMD_CACHE &&
-	    object_is_cached(req->rq.obj.oid))
+	    object_is_cached(req->rq.obj.oid, 0))
 		goto queue_work;
 
 	if (req->local_oid)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 56a432e..c552952 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -266,8 +266,9 @@ int main(int argc, char **argv)
 	sys->recovery_wqueue = init_work_queue("recovery", true);
 	sys->deletion_wqueue = init_work_queue("deletion", true);
 	sys->block_wqueue = init_work_queue("block", true);
+	sys->reclaim_wqueue = init_work_queue("reclaim", true);
 	if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
-	    !sys->deletion_wqueue || !sys->block_wqueue)
+	    !sys->deletion_wqueue || !sys->block_wqueue || !sys->reclaim_wqueue)
 		exit(1);
 
 	ret = init_signal();
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index de06417..ff5140c 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -128,6 +128,7 @@ struct cluster_info {
 	struct work_queue *deletion_wqueue;
 	struct work_queue *recovery_wqueue;
 	struct work_queue *block_wqueue;
+	struct work_queue *reclaim_wqueue;
 };
 
 struct siocb {
@@ -382,7 +383,7 @@ int peer_remove_obj(struct request *req);
 /* object_cache */
 
 int bypass_object_cache(struct request *req);
-int object_is_cached(uint64_t oid);
+int object_is_cached(uint64_t oid, int ref_cnt);
 
 int object_cache_handle_request(struct request *req);
 int object_cache_write(uint64_t oid, char *data, unsigned int datalen,
diff --git a/sheep/store.c b/sheep/store.c
index 31ae4ef..c68b50f 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -491,9 +491,13 @@ int write_object(uint64_t oid, char *data, unsigned int datalen,
 	struct sd_req hdr;
 	int ret;
 
-	if (sys->enable_write_cache && object_is_cached(oid)) {
+retry:
+	if (sys->enable_write_cache && object_is_cached(oid, 1)) {
 		ret = object_cache_write(oid, data, datalen, offset,
 					 flags, create);
+		if (ret == SD_RES_NO_CACHE)
+			goto retry;
+
 		if (ret != 0) {
 			eprintf("write cache failed %"PRIx64" %"PRIx32"\n",
 				oid, ret);
@@ -526,8 +530,12 @@ int read_object(uint64_t oid, char *data, unsigned int datalen,
 	struct sd_req hdr;
 	int ret;
 
-	if (sys->enable_write_cache && object_is_cached(oid)) {
+retry:
+	if (sys->enable_write_cache && object_is_cached(oid, 1)) {
 		ret = object_cache_read(oid, data, datalen, offset);
+		if (ret == SD_RES_NO_CACHE)
+			goto retry;
+
 		if (ret != SD_RES_SUCCESS) {
 			eprintf("try forward read %"PRIx64" %"PRIx32"\n",
 				oid, ret);
-- 
1.7.1




More information about the sheepdog mailing list