[sheepdog] [PATCH] object cache: optimize push phase of dirty objects

Liu Yuan namei.unix at gmail.com
Sun Jan 27 05:27:53 CET 2013


From: Liu Yuan <tailai.ly at taobao.com>

- Don't grab cache lock tight so we can serve RW requests while pushing.
  It is okay for allow subsequent RW after FLUSH because we only need to
  garantee the dirty objects before FLUSH to be pushed.
- Use threaded AIO to hugely boost push performance, such as fsync(2) from VM.
- Fix a nasty long bug that didn't destory pthread rwlock after VDI deletion.
- Clean up {get,put}_cache_entry helpers and introduce lock helpers.
- Prefix work queues of object cache as 'oc_'

Some test results:
mkfs's time drop roughly by 33%
new OS installation time drop roughly by 36%

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/object_cache.c |  362 ++++++++++++++++++++++++++++++++------------------
 sheep/sheep.c        |    5 +-
 sheep/sheep_priv.h   |    3 +-
 3 files changed, 241 insertions(+), 129 deletions(-)

diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index f0ac231..caed7a3 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -22,6 +22,7 @@
 #include <sys/file.h>
 #include <dirent.h>
 #include <urcu/uatomic.h>
+#include <sys/eventfd.h>
 
 #include "sheep_priv.h"
 #include "util.h"
@@ -60,15 +61,18 @@ struct object_cache_entry {
 	struct rb_node node;
 	struct list_head dirty_list;
 	struct list_head lru_list;
+
+	pthread_rwlock_t lock;
 };
 
 struct object_cache {
 	uint32_t vid;
+	uint32_t push_count;
 	struct hlist_node hash;
-
 	struct rb_root lru_tree;
 	struct list_head lru_head; /* Per VDI LRU list for reclaimer */
 	struct list_head dirty_head;
+	int push_efd;
 
 	pthread_rwlock_t lock;
 };
@@ -115,7 +119,7 @@ static inline bool idx_has_vdi_bit(uint32_t idx)
 	return !!(idx & CACHE_VDI_BIT);
 }
 
-static uint64_t calc_object_bmap(size_t len, off_t offset)
+static inline uint64_t calc_object_bmap(size_t len, off_t offset)
 {
 	int start, end, nr;
 	unsigned long bmap = 0;
@@ -130,6 +134,63 @@ static uint64_t calc_object_bmap(size_t len, off_t offset)
 	return (uint64_t)bmap;
 }
 
+static inline void get_cache_entry(struct object_cache_entry *entry)
+{
+	uatomic_inc(&entry->refcnt);
+}
+
+static inline void put_cache_entry(struct object_cache_entry *entry)
+{
+	uatomic_dec(&entry->refcnt);
+}
+
+static inline bool entry_in_use(struct object_cache_entry *entry)
+{
+	return uatomic_read(&entry->refcnt) > 0;
+}
+
+/*
+ * Mutual exclusive protection strategy:
+ *
+ * reader and writer:          no need to project since it is okay to read
+ *                             unacked stale data.
+ * reader, writer and pusher:    cache lock and entry lock and refcnt.
+ * reader, writer and reclaimer: cache lock and entry refcnt.
+ * pusher and reclaimer:       cache lock and entry refcnt.
+ *
+ * entry->bmap is projected by mostly entry lock, sometimes cache lock.
+ * dirty list is projected by cache lock.
+ */
+static inline void read_lock_cache(struct object_cache *oc)
+{
+	pthread_rwlock_rdlock(&oc->lock);
+}
+
+static inline void write_lock_cache(struct object_cache *oc)
+{
+	pthread_rwlock_wrlock(&oc->lock);
+}
+
+static inline void unlock_cache(struct object_cache *oc)
+{
+	pthread_rwlock_unlock(&oc->lock);
+}
+
+static inline void read_lock_entry(struct object_cache_entry *entry)
+{
+	pthread_rwlock_rdlock(&entry->lock);
+}
+
+static inline void write_lock_entry(struct object_cache_entry *entry)
+{
+	pthread_rwlock_wrlock(&entry->lock);
+}
+
+static inline void unlock_entry(struct object_cache_entry *entry)
+{
+	pthread_rwlock_unlock(&entry->lock);
+}
+
 static struct object_cache_entry *
 lru_tree_insert(struct rb_root *root, struct object_cache_entry *new)
 {
@@ -158,7 +219,7 @@ lru_tree_insert(struct rb_root *root, struct object_cache_entry *new)
 }
 
 static struct object_cache_entry *lru_tree_search(struct rb_root *root,
-						     uint32_t idx)
+						  uint32_t idx)
 {
 	struct rb_node *n = root->rb_node;
 	struct object_cache_entry *t;
@@ -178,13 +239,15 @@ static struct object_cache_entry *lru_tree_search(struct rb_root *root,
 }
 
 static inline void
-free_cache_entry(struct object_cache_entry *entry,
-	      struct rb_root *lru_tree)
+free_cache_entry(struct object_cache_entry *entry)
 {
-	rb_erase(&entry->node, lru_tree);
+	struct object_cache *oc = entry->oc;
+
+	rb_erase(&entry->node, &oc->lru_tree);
 	list_del_init(&entry->lru_list);
 	if (!list_empty(&entry->dirty_list))
 		list_del_init(&entry->dirty_list);
+	pthread_rwlock_destroy(&entry->lock);
 	free(entry);
 }
 
@@ -203,8 +266,7 @@ static int remove_cache_object(struct object_cache *oc, uint32_t idx)
 
 	sprintf(path, "%s/%06"PRIx32"/%08"PRIx32, object_cache_dir,
 		oc->vid, idx);
-	sd_dprintf("removing cache object %"PRIx64"\n",
-		idx_to_oid(oc->vid, idx));
+	sd_dprintf("%"PRIx64"\n", idx_to_oid(oc->vid, idx));
 	if (unlink(path) < 0) {
 		sd_eprintf("failed to remove cached object %m\n");
 		if (errno == ENOENT)
@@ -239,7 +301,7 @@ static int read_cache_object_noupdate(uint32_t vid, uint32_t idx, void *buf,
 
 	if (size != count) {
 		sd_eprintf("size %zu, count:%zu, offset %jd %m\n",
-			size, count, (intmax_t)offset);
+			   size, count, (intmax_t)offset);
 		ret = SD_RES_EIO;
 		goto out_close;
 	}
@@ -272,7 +334,7 @@ static int write_cache_object_noupdate(uint32_t vid, uint32_t idx, void *buf,
 
 	if (size != count) {
 		sd_eprintf("size %zu, count:%zu, offset %jd %m\n",
-			size, count, (intmax_t)offset);
+			   size, count, (intmax_t)offset);
 		ret = SD_RES_EIO;
 		goto out_close;
 	}
@@ -293,57 +355,10 @@ static int read_cache_object(struct object_cache_entry *entry, void *buf,
 	ret = read_cache_object_noupdate(vid, idx, buf, count, offset);
 
 	if (ret == SD_RES_SUCCESS) {
-		pthread_rwlock_wrlock(&oc->lock);
+		write_lock_cache(oc);
 		list_move_tail(&entry->lru_list, &oc->lru_head);
-		pthread_rwlock_unlock(&oc->lock);
-	}
-	return ret;
-}
-
-static int write_cache_object(struct object_cache_entry *entry, void *buf,
-			      size_t count, off_t offset, bool create,
-			      bool writeback)
-{
-	uint32_t vid = entry->oc->vid, idx = entry_idx(entry);
-	uint64_t oid = idx_to_oid(vid, idx);
-	struct object_cache *oc = entry->oc;
-	struct sd_req hdr;
-	int ret;
-
-	ret = write_cache_object_noupdate(vid, idx, buf, count, offset);
-	if (ret != SD_RES_SUCCESS)
-		return ret;
-
-	pthread_rwlock_wrlock(&oc->lock);
-	if (writeback) {
-		entry->bmap |= calc_object_bmap(count, offset);
-		if (list_empty(&entry->dirty_list))
-			list_add_tail(&entry->dirty_list,
-				      &oc->dirty_head);
+		unlock_cache(oc);
 	}
-	list_move_tail(&entry->lru_list, &oc->lru_head);
-	pthread_rwlock_unlock(&oc->lock);
-
-	if (writeback)
-		goto out;
-
-	if (create)
-		sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
-	else
-		sd_init_req(&hdr, SD_OP_WRITE_OBJ);
-	hdr.flags =  SD_FLAG_CMD_WRITE;
-	hdr.data_length = count;
-
-	hdr.obj.oid = oid;
-	hdr.obj.offset = offset;
-
-	ret = exec_local_req(&hdr, buf);
-	if (ret != SD_RES_SUCCESS) {
-		sd_eprintf("failed to write object %" PRIx64 ", %x\n",
-			oid, ret);
-		return ret;
-	}
-out:
 	return ret;
 }
 
@@ -369,7 +384,7 @@ static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap,
 	last_bit = fls64(bmap) - 1;
 
 	sd_dprintf("bmap:0x%"PRIx64", first_bit:%d, last_bit:%d\n",
-		bmap, first_bit, last_bit);
+		   bmap, first_bit, last_bit);
 	offset = first_bit * CACHE_BLOCK_SIZE;
 	data_length = (last_bit - first_bit + 1) * CACHE_BLOCK_SIZE;
 
@@ -381,10 +396,8 @@ static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap,
 		data_length = SD_INODE_SIZE - offset;
 
 	buf = valloc(data_length);
-	if (buf == NULL) {
-		sd_eprintf("failed to allocate memory\n");
-		goto out;
-	}
+	if (buf == NULL)
+		panic("failed to allocate memory\n");
 
 	ret = read_cache_object_noupdate(vid, idx, buf, data_length, offset);
 	if (ret != SD_RES_SUCCESS)
@@ -407,6 +420,62 @@ out:
 	return ret;
 }
 
+struct push_work {
+	struct work work;
+	struct object_cache_entry *entry;
+};
+
+static int write_cache_object(struct object_cache_entry *entry, void *buf,
+			      size_t count, off_t offset, bool create,
+			      bool writeback)
+{
+	uint32_t vid = entry->oc->vid, idx = entry_idx(entry);
+	uint64_t oid = idx_to_oid(vid, idx);
+	struct object_cache *oc = entry->oc;
+	struct sd_req hdr;
+	int ret;
+
+	write_lock_entry(entry);
+
+	ret = write_cache_object_noupdate(vid, idx, buf, count, offset);
+	if (ret != SD_RES_SUCCESS) {
+		unlock_entry(entry);
+		return ret;
+	}
+	write_lock_cache(oc);
+	if (writeback) {
+		entry->bmap |= calc_object_bmap(count, offset);
+		if (list_empty(&entry->dirty_list))
+			list_add_tail(&entry->dirty_list, &oc->dirty_head);
+	}
+	list_move_tail(&entry->lru_list, &oc->lru_head);
+	unlock_cache(oc);
+
+	unlock_entry(entry);
+
+	if (writeback)
+		goto out;
+
+	if (create)
+		sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
+	else
+		sd_init_req(&hdr, SD_OP_WRITE_OBJ);
+	hdr.flags =  SD_FLAG_CMD_WRITE;
+	hdr.data_length = count;
+
+	hdr.obj.oid = oid;
+	hdr.obj.offset = offset;
+
+	ret = exec_local_req(&hdr, buf);
+	if (ret != SD_RES_SUCCESS) {
+		sd_eprintf("failed to write object %" PRIx64 ", %x\n",
+			   oid, ret);
+		return ret;
+	}
+out:
+	return ret;
+}
+
 /*
  * The reclaim algorithm is similar to Linux kernel's page cache:
  *  - only tries to reclaim 'clean' object, which doesn't has any dirty updates,
@@ -427,34 +496,27 @@ static void do_reclaim_object(struct object_cache *oc)
 	uint64_t oid;
 	uint32_t cap;
 
-	pthread_rwlock_wrlock(&oc->lock);
+	write_lock_cache(oc);
 	list_for_each_entry_safe(entry, t, &oc->lru_head, lru_list) {
 		oid = idx_to_oid(oc->vid, entry_idx(entry));
-		if (uatomic_read(&entry->refcnt) > 0) {
-			sd_dprintf("%"PRIx64" is in operation, skip...\n", oid);
+		if (entry_in_use(entry)) {
+			sd_dprintf("%"PRIx64" is in use, skip...\n", oid);
 			continue;
 		}
 		if (entry_is_dirty(entry)) {
 			sd_dprintf("%"PRIx64" is dirty, skip...\n", oid);
 			continue;
 		}
-		if (remove_cache_object(oc, entry_idx(entry))
-		    != SD_RES_SUCCESS)
+		if (remove_cache_object(oc, entry_idx(entry)) != SD_RES_SUCCESS)
 			continue;
-		free_cache_entry(entry, &oc->lru_tree);
+		free_cache_entry(entry);
 		cap = uatomic_sub_return(&gcache.capacity, CACHE_OBJECT_SIZE);
 		sd_dprintf("%"PRIx64" reclaimed. capacity:%"PRId32"\n",
-			oid, cap);
+			   oid, cap);
 		if (cap <= HIGH_WATERMARK)
 			break;
 	}
-	pthread_rwlock_unlock(&oc->lock);
-	/*
-	 * Reclaimer grabs a write lock, which will blocks all the IO thread of
-	 * this VDI. We call pthread_yield() to expect that other threads can
-	 * grab the lock more often.
-	 */
-	pthread_yield();
+	unlock_cache(oc);
 }
 
 struct reclaim_work {
@@ -486,7 +548,7 @@ static void do_reclaim(struct work *work)
 			if (cap <= HIGH_WATERMARK) {
 				pthread_rwlock_unlock(&hashtable_lock[idx]);
 				sd_dprintf("complete, capacity %"PRIu32"\n",
-					cap);
+					   cap);
 				return;
 			}
 		}
@@ -505,18 +567,16 @@ static void reclaim_done(struct work *work)
 static int create_dir_for(uint32_t vid)
 {
 	int ret = 0;
-	struct strbuf buf = STRBUF_INIT;
+	char p[PATH_MAX];
 
-	strbuf_addstr(&buf, object_cache_dir);
-	strbuf_addf(&buf, "/%06"PRIx32, vid);
-	if (mkdir(buf.buf, def_dmode) < 0)
+	sprintf(p, "%s/%06"PRIx32, object_cache_dir, vid);
+	if (mkdir(p, def_dmode) < 0)
 		if (errno != EEXIST) {
-			sd_eprintf("%s, %m\n", buf.buf);
+			sd_eprintf("%s, %m\n", p);
 			ret = -1;
 			goto err;
 		}
 err:
-	strbuf_release(&buf);
 	return ret;
 }
 
@@ -545,6 +605,7 @@ not_found:
 		cache->vid = vid;
 		cache->lru_tree = RB_ROOT;
 		create_dir_for(vid);
+		cache->push_efd = eventfd(0, 0);
 
 		INIT_LIST_HEAD(&cache->dirty_head);
 		INIT_LIST_HEAD(&cache->lru_head);
@@ -577,7 +638,7 @@ void object_cache_try_to_reclaim(int delay)
 	rw->delay = delay;
 	rw->work.fn = do_reclaim;
 	rw->work.done = reclaim_done;
-	queue_work(sys->reclaim_wqueue, &rw->work);
+	queue_work(sys->oc_reclaim_wqueue, &rw->work);
 }
 
 static inline struct object_cache_entry *
@@ -590,6 +651,7 @@ alloc_cache_entry(struct object_cache *oc, uint32_t idx)
 	entry->idx = idx;
 	INIT_LIST_HEAD(&entry->dirty_list);
 	INIT_LIST_HEAD(&entry->lru_list);
+	pthread_rwlock_init(&entry->lock, NULL);
 
 	return entry;
 }
@@ -600,16 +662,18 @@ static void add_to_lru_cache(struct object_cache *oc, uint32_t idx, bool create)
 
 	sd_dprintf("oid %"PRIx64" added\n", idx_to_oid(oc->vid, idx));
 
-	pthread_rwlock_wrlock(&oc->lock);
-	assert(!lru_tree_insert(&oc->lru_tree, entry));
+	write_lock_cache(oc);
+	if (lru_tree_insert(&oc->lru_tree, entry))
+		panic("the object already exist\n");
 	uatomic_add(&gcache.capacity, CACHE_OBJECT_SIZE);
 	list_add_tail(&entry->lru_list, &oc->lru_head);
 	if (create) {
+		/* Cache lock assure it is not raced with pusher */
 		entry->bmap = UINT64_MAX;
 		entry->idx |= CACHE_CREATE_BIT;
-		list_add(&entry->dirty_list, &oc->dirty_head);
+		list_add_tail(&entry->dirty_list, &oc->dirty_head);
 	}
-	pthread_rwlock_unlock(&oc->lock);
+	unlock_cache(oc);
 }
 
 static inline int lookup_path(char *path)
@@ -693,7 +757,7 @@ static int create_cache_object(struct object_cache *oc, uint32_t idx,
 	if (ret != buf_size) {
 		ret = SD_RES_EIO;
 		sd_eprintf("failed, vid %"PRIx32", idx %"PRIx32"\n",
-			oc->vid, idx);
+			   oc->vid, idx);
 		goto out_close;
 	}
 	/* This is intended to take care of partial write due to crash */
@@ -766,26 +830,75 @@ err:
 	return ret;
 }
 
-/* Push back all the dirty objects to sheep cluster storage */
+static void do_push_object(struct work *work)
+{
+	struct push_work *pw = container_of(work, struct push_work, work);
+	struct object_cache_entry *entry = pw->entry;
+	struct object_cache *oc = entry->oc;
+	uint64_t oid = idx_to_oid(oc->vid, entry_idx(entry));
+
+	sd_dprintf("%"PRIx64"\n", oid);
+
+	read_lock_entry(entry);
+	if (push_cache_object(oc->vid, entry_idx(entry), entry->bmap,
+			      !!(entry->idx & CACHE_CREATE_BIT))
+	    != SD_RES_SUCCESS)
+		panic("push failed but should never fail\n");
+	if (uatomic_sub_return(&oc->push_count, 1) == 0)
+		eventfd_write(oc->push_efd, 1);
+	entry->idx &= ~CACHE_CREATE_BIT;
+	entry->bmap = 0;
+	unlock_entry(entry);
+
+	sd_dprintf("%"PRIx64" done\n", oid);
+	put_cache_entry(entry);
+}
+
+static void push_object_done(struct work *work)
+{
+	struct push_work *pw = container_of(work, struct push_work, work);
+	free(pw);
+}
+
+/*
+ * Push back all the dirty objects before the FLUSH request to sheep replicated
+ * storage synchronously.
+ *
+ * 1. Don't grab cache lock tigh so we can serve RW requests while pushing.
+ *    It is okay for allow subsequent RW after FLUSH because we only need to
+ *    garantee the dirty objects before FLUSH to be pushed.
+ * 2. Use threaded AIO to boost push performance, such as fsync(2) from VM.
+ */
 static int object_cache_push(struct object_cache *oc)
 {
 	struct object_cache_entry *entry, *t;
+	eventfd_t value;
 
-	int ret = SD_RES_SUCCESS;
-
-	pthread_rwlock_wrlock(&oc->lock);
+	write_lock_cache(oc);
+	if (list_empty(&oc->dirty_head)) {
+		unlock_cache(oc);
+		return SD_RES_SUCCESS;
+	}
 	list_for_each_entry_safe(entry, t, &oc->dirty_head, dirty_list) {
-		ret = push_cache_object(oc->vid, entry_idx(entry), entry->bmap,
-					!!(entry->idx & CACHE_CREATE_BIT));
-		if (ret != SD_RES_SUCCESS)
-			goto push_failed;
-		entry->idx &= ~CACHE_CREATE_BIT;
-		entry->bmap = 0;
+		struct push_work *pw;
+
+		get_cache_entry(entry);
+		uatomic_inc(&oc->push_count);
+		pw = xzalloc(sizeof(struct push_work));
+		pw->work.fn = do_push_object;
+		pw->work.done = push_object_done;
+		pw->entry = entry;
+		queue_work(sys->oc_push_wqueue, &pw->work);
 		list_del_init(&entry->dirty_list);
 	}
-push_failed:
-	pthread_rwlock_unlock(&oc->lock);
-	return ret;
+	unlock_cache(oc);
+reread:
+	if (eventfd_read(oc->push_efd, &value) < 0) {
+		sd_eprintf("eventfd read failed, %m\n");
+		goto reread;
+	}
+	sd_dprintf("%"PRIx32" completed\n", oc->vid);
+	return SD_RES_SUCCESS;
 }
 
 bool object_is_cached(uint64_t oid)
@@ -817,12 +930,14 @@ void object_cache_delete(uint32_t vid)
 	hlist_del(&cache->hash);
 	pthread_rwlock_unlock(&hashtable_lock[h]);
 
-	pthread_rwlock_wrlock(&cache->lock);
+	write_lock_cache(cache);
 	list_for_each_entry_safe(entry, t, &cache->lru_head, lru_list) {
-		free_cache_entry(entry, &cache->lru_tree);
+		free_cache_entry(entry);
 		uatomic_sub(&gcache.capacity, CACHE_OBJECT_SIZE);
 	}
-	pthread_rwlock_unlock(&cache->lock);
+	unlock_cache(cache);
+	pthread_rwlock_destroy(&cache->lock);
+	close(cache->push_efd);
 	free(cache);
 
 	/* Then we free disk */
@@ -831,27 +946,22 @@ void object_cache_delete(uint32_t vid)
 }
 
 static struct object_cache_entry *
-get_cache_entry(struct object_cache *cache, uint32_t idx)
+get_cache_entry_from(struct object_cache *cache, uint32_t idx)
 {
 	struct object_cache_entry *entry;
 
-	pthread_rwlock_rdlock(&cache->lock);
+	read_lock_cache(cache);
 	entry = lru_tree_search(&cache->lru_tree, idx);
 	if (!entry) {
 		/* The cache entry may be reclaimed, so try again. */
-		pthread_rwlock_unlock(&cache->lock);
+		unlock_cache(cache);
 		return NULL;
 	}
-	uatomic_inc(&entry->refcnt);
-	pthread_rwlock_unlock(&cache->lock);
+	get_cache_entry(entry);
+	unlock_cache(cache);
 	return entry;
 }
 
-static void put_cache_entry(struct object_cache_entry *entry)
-{
-	uatomic_dec(&entry->refcnt);
-}
-
 static int object_cache_flush_and_delete(struct object_cache *oc)
 {
 	DIR *dir;
@@ -885,9 +995,9 @@ static int object_cache_flush_and_delete(struct object_cache *oc)
 		if (idx == ULLONG_MAX)
 			continue;
 		if (push_cache_object(vid, idx, all, true) !=
-				SD_RES_SUCCESS) {
+		    SD_RES_SUCCESS) {
 			sd_dprintf("failed to push %"PRIx64"\n",
-				idx_to_oid(vid, idx));
+				   idx_to_oid(vid, idx));
 			ret = -1;
 			goto out_close_dir;
 		}
@@ -946,7 +1056,7 @@ int object_cache_handle_request(struct request *req)
 	bool create = false;
 
 	sd_dprintf("%08"PRIx32", len %"PRIu32", off %"PRIu64"\n", idx,
-		hdr->data_length, hdr->obj.offset);
+		   hdr->data_length, hdr->obj.offset);
 
 	cache = find_object_cache(vid, true);
 
@@ -965,7 +1075,7 @@ retry:
 		return ret;
 	}
 
-	entry = get_cache_entry(cache, idx);
+	entry = get_cache_entry_from(cache, idx);
 	if (!entry) {
 		sd_dprintf("retry oid %"PRIx64"\n", oid);
 		/*
@@ -1006,7 +1116,7 @@ int object_cache_write(uint64_t oid, char *data, unsigned int datalen,
 
 	sd_dprintf("%" PRIx64 "\n", oid);
 	cache = find_object_cache(vid, false);
-	entry = get_cache_entry(cache, idx);
+	entry = get_cache_entry_from(cache, idx);
 	if (!entry) {
 		sd_dprintf("%" PRIx64 " doesn't exist\n", oid);
 		return SD_RES_NO_CACHE;
@@ -1028,7 +1138,7 @@ int object_cache_read(uint64_t oid, char *data, unsigned int datalen,
 
 	sd_dprintf("%" PRIx64 "\n", oid);
 	cache = find_object_cache(vid, false);
-	entry = get_cache_entry(cache, idx);
+	entry = get_cache_entry_from(cache, idx);
 	if (!entry) {
 		sd_dprintf("%" PRIx64 " doesn't exist\n", oid);
 		return SD_RES_NO_CACHE;
@@ -1077,13 +1187,13 @@ void object_cache_remove(uint64_t oid)
 	if (!oc)
 		return;
 
-	pthread_rwlock_wrlock(&oc->lock);
+	write_lock_cache(oc);
 	entry = lru_tree_search(&oc->lru_tree, idx);
 	if (!entry)
 		goto out;
-	free_cache_entry(entry, &oc->lru_tree);
+	free_cache_entry(entry);
 out:
-	pthread_rwlock_unlock(&oc->lock);
+	unlock_cache(oc);
 	return;
 }
 
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 61e166c..58b93f8 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -374,8 +374,9 @@ static int init_work_queues(void)
 	sys->block_wqueue = init_work_queue("block", true);
 	sys->sockfd_wqueue = init_work_queue("sockfd", true);
 	if (is_object_cache_enabled()) {
-		sys->reclaim_wqueue = init_work_queue("reclaim", true);
-		if (!sys->reclaim_wqueue)
+		sys->oc_reclaim_wqueue = init_work_queue("reclaim", true);
+		sys->oc_push_wqueue = init_work_queue("push", false);
+		if (!sys->oc_reclaim_wqueue || !sys->oc_push_wqueue)
 			return -1;
 	}
 	if (!sys->gateway_wqueue || !sys->io_wqueue || !sys->recovery_wqueue ||
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index b19ca03..c48b686 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -113,7 +113,8 @@ struct cluster_info {
 	struct work_queue *recovery_notify_wqueue;
 	struct work_queue *block_wqueue;
 	struct work_queue *sockfd_wqueue;
-	struct work_queue *reclaim_wqueue;
+	struct work_queue *oc_reclaim_wqueue;
+	struct work_queue *oc_push_wqueue;
 
 #define CACHE_TYPE_OBJECT 0x1
 #define CACHE_TYPE_DISK   0x2
-- 
1.7.9.5




More information about the sheepdog mailing list