From: levin li <xingke.lwp at taobao.com> This patch do reclaiming work when the total size of cached objects reaches the max size specified by user, I did it in the following way: 1. check the object tree for the object entry to determine whether the cache entry is exist and whether it's reclaiming, if it's reclaiming we make sheep ingore the cache. 2. In object_cache_rw() we search the cache entry, after passed the sanity check, we increment its refcnt to tell the reclaiming worker that this entry is being referenced, we should not reclaim it now. 3. In add_to_object_cache(), when the cached size reaches the max size, we start a reclaiming thread, only one such thread can be running at one time. 4. In reclaim_work(), we reclaim cached objects until the cache size reduced to 80% of the max size. 5. In reclaim_object(), we start to reclaim an object, before this, we check that if the cache is flushing, we don't reclaim it, and if the refcnt of the object is not zero, we also don't reclaim it. If the cached object is dirty, we flush it by push_cache_object(), and then try to remove the object. Signed-off-by: levin li <xingke.lwp at taobao.com> --- include/internal_proto.h | 1 + include/list.h | 10 + sheep/object_cache.c | 757 +++++++++++++++++++++++++++++++--------------- sheep/sheep.c | 3 +- sheep/sheep_priv.h | 1 + sheep/store.c | 4 + 6 files changed, 534 insertions(+), 242 deletions(-) diff --git a/include/internal_proto.h b/include/internal_proto.h index abc14dc..a296217 100644 --- a/include/internal_proto.h +++ b/include/internal_proto.h @@ -76,6 +76,7 @@ #define SD_RES_INVALID_CTIME 0x84 /* Creation time of sheepdog is different */ #define SD_RES_INVALID_EPOCH 0x85 /* Invalid epoch */ #define SD_RES_NETWORK_ERROR 0x86 /* Network error between sheep */ +#define SD_RES_NO_CACHE 0x87 /* No cache object found */ #define SD_FLAG_NOHALT 0x0004 /* Serve the IO rquest even lack of nodes */ #define SD_FLAG_QUORUM 0x0008 /* Serve the IO rquest as long we are quorate */ diff --git a/include/list.h b/include/list.h index 30ee3c4..e1d645d 100644 --- a/include/list.h +++ b/include/list.h @@ -259,3 +259,13 @@ static inline void hlist_add_after(struct hlist_node *n, pos = n) #endif + + +#define list_for_each_entry_revert_safe_rcu(pos, n, head, member) \ + for (pos = cds_list_entry(rcu_dereference((head)->prev), \ + typeof(*pos), member), \ + n = cds_list_entry(rcu_dereference(pos->member.prev), \ + typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = cds_list_entry(rcu_dereference(pos->member.prev),\ + typeof(*pos), member)) diff --git a/sheep/object_cache.c b/sheep/object_cache.c index 0ef94be..b86b523 100644 --- a/sheep/object_cache.c +++ b/sheep/object_cache.c @@ -40,7 +40,13 @@ #define CACHE_VDI_BIT (UINT32_C(1) << CACHE_VDI_SHIFT) #define CACHE_BLOCK_SIZE ((UINT64_C(1) << 10) * 64) /* 64 KB */ -#define ENTRY_CREATE_BIT (1) +#define CACHE_RECLAIM_SHIFT 27 +#define CACHE_RECLAIM_BIT (UINT32_C(1) << CACHE_RECLAIM_SHIFT) + +#define CACHE_CREATE_SHIFT 26 +#define CACHE_CREATE_BIT (UINT32_C(1) << CACHE_CREATE_SHIFT) + +#define CACHE_INDEX_MASK (CACHE_RECLAIM_BIT | CACHE_CREATE_BIT) struct global_cache { uint64_t cache_size; @@ -50,6 +56,7 @@ struct global_cache { struct object_cache { uint32_t vid; + uint8_t in_flush; struct hlist_node hash; struct list_head dirty_list; @@ -64,7 +71,6 @@ struct object_cache_entry { uint64_t bmap; /* each bit represents one dirty * block which should be flushed */ int refcnt; - int flags; struct rb_node node; struct rb_node dirty_node; struct object_cache *oc; @@ -85,11 +91,29 @@ static pthread_mutex_t hashtable_lock[HASH_SIZE] = { static struct hlist_head cache_hashtable[HASH_SIZE]; +static inline int cache_in_reclaim(int start) +{ + if (start) + return uatomic_cmpxchg(&sys_cache.reclaiming, 0, 1); + else + return uatomic_read(&sys_cache.reclaiming); +} + +static inline int entry_is_dirty(struct object_cache_entry *entry) +{ + return !!entry->bmap; +} + static inline int hash(uint64_t vid) { return hash_64(vid, HASH_BITS); } +static inline uint32_t idx_mask(uint32_t idx) +{ + return idx &= ~CACHE_INDEX_MASK; +} + static inline uint32_t object_cache_oid_to_idx(uint64_t oid) { uint32_t idx = data_oid_to_idx(oid); @@ -124,14 +148,15 @@ object_cache_insert(struct rb_root *root, struct object_cache_entry *new) struct rb_node **p = &root->rb_node; struct rb_node *parent = NULL; struct object_cache_entry *entry; + uint32_t idx = idx_mask(new->idx); while (*p) { parent = *p; entry = rb_entry(parent, struct object_cache_entry, node); - if (new->idx < entry->idx) + if (idx < idx_mask(entry->idx)) p = &(*p)->rb_left; - else if (new->idx > entry->idx) + else if (idx > idx_mask(entry->idx)) p = &(*p)->rb_right; else { /* already has this entry */ @@ -149,13 +174,35 @@ static struct object_cache_entry *object_tree_search(struct rb_root *root, { struct rb_node *n = root->rb_node; struct object_cache_entry *t; + idx = idx_mask(idx); while (n) { t = rb_entry(n, struct object_cache_entry, node); - if (idx < t->idx) + if (idx < idx_mask(t->idx)) + n = n->rb_left; + else if (idx > idx_mask(t->idx)) + n = n->rb_right; + else + return t; /* found it */ + } + + return NULL; +} + +static struct object_cache_entry *dirty_tree_search(struct rb_root *root, + uint32_t idx) +{ + struct rb_node *n = root->rb_node; + struct object_cache_entry *t; + idx = idx_mask(idx); + + while (n) { + t = rb_entry(n, struct object_cache_entry, dirty_node); + + if (idx < idx_mask(t->idx)) n = n->rb_left; - else if (idx > t->idx) + else if (idx > idx_mask(t->idx)) n = n->rb_right; else return t; /* found it */ @@ -164,6 +211,319 @@ static struct object_cache_entry *object_tree_search(struct rb_root *root, return NULL; } +static inline void +del_from_dirty_tree_and_list(struct object_cache_entry *entry, + struct rb_root *dirty_tree) +{ + rb_erase(&entry->dirty_node, dirty_tree); + list_del(&entry->list); +} + +static inline void +del_from_object_tree_and_list(struct object_cache_entry *entry, + struct rb_root *object_tree) +{ + rb_erase(&entry->node, object_tree); + cds_list_del_rcu(&entry->lru_list); +} + +static uint64_t cache_vid_to_data_oid(uint32_t vid, uint32_t idx) +{ + idx = idx_mask(idx); + + return vid_to_data_oid(vid, idx); +} + +static uint64_t idx_to_oid(uint32_t vid, uint32_t idx) +{ + if (idx_has_vdi_bit(idx)) + return vid_to_vdi_oid(vid); + else + return cache_vid_to_data_oid(vid, idx); +} + +static int remove_cache_object(struct object_cache *oc, uint32_t idx) +{ + struct strbuf buf; + int ret = SD_RES_SUCCESS; + + idx = idx_mask(idx); + + strbuf_init(&buf, PATH_MAX); + strbuf_addstr(&buf, cache_dir); + strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx); + + dprintf("removing cache object %s\n", buf.buf); + if (unlink(buf.buf) < 0) { + ret = SD_RES_EIO; + eprintf("failed to remove cached object %m\n"); + goto out; + } +out: + strbuf_release(&buf); + + return ret; +} + +static int write_cache_object(uint32_t vid, uint32_t idx, void *buf, + size_t count, off_t offset) +{ + size_t size; + int fd, flags = def_open_flags, ret = SD_RES_SUCCESS; + struct strbuf p; + + strbuf_init(&p, PATH_MAX); + strbuf_addstr(&p, cache_dir); + strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx); + + if (sys->use_directio && !idx_has_vdi_bit(idx)) + flags |= O_DIRECT; + + fd = open(p.buf, flags, def_fmode); + if (fd < 0) { + eprintf("%m\n"); + ret = SD_RES_EIO; + goto out; + } + + if (flock(fd, LOCK_EX) < 0) { + ret = SD_RES_EIO; + eprintf("%m\n"); + goto out_close; + } + size = xpwrite(fd, buf, count, offset); + if (flock(fd, LOCK_UN) < 0) { + ret = SD_RES_EIO; + eprintf("%m\n"); + goto out_close; + } + + if (size != count) { + eprintf("size %zu, count:%zu, offset %jd %m\n", + size, count, (intmax_t)offset); + ret = SD_RES_EIO; + } +out_close: + close(fd); +out: + strbuf_release(&p); + return ret; +} + +static int read_cache_object(uint32_t vid, uint32_t idx, void *buf, + size_t count, off_t offset) +{ + size_t size; + int fd, flags = def_open_flags, ret = SD_RES_SUCCESS; + struct strbuf p; + + strbuf_init(&p, PATH_MAX); + strbuf_addstr(&p, cache_dir); + strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx); + + if (sys->use_directio && !idx_has_vdi_bit(idx)) + flags |= O_DIRECT; + + fd = open(p.buf, flags, def_fmode); + if (fd < 0) { + eprintf("%m\n"); + ret = SD_RES_EIO; + goto out; + } + + if (flock(fd, LOCK_SH) < 0) { + ret = SD_RES_EIO; + eprintf("%m\n"); + goto out_close; + } + size = xpread(fd, buf, count, offset); + if (flock(fd, LOCK_UN) < 0) { + ret = SD_RES_EIO; + eprintf("%m\n"); + goto out_close; + } + + if (size != count) { + eprintf("size %zu, count:%zu, offset %jd %m\n", + size, count, (intmax_t)offset); + ret = SD_RES_EIO; + } + +out_close: + close(fd); +out: + strbuf_release(&p); + return ret; +} + +static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap, + int create) +{ + struct sd_req hdr; + void *buf; + off_t offset; + unsigned data_length; + int ret = SD_RES_NO_MEM; + uint64_t oid = idx_to_oid(vid, idx); + int first_bit, last_bit; + + dprintf("%"PRIx64", create %d\n", oid, create); + + idx = idx_mask(idx); + + if (!bmap) { + dprintf("WARN: nothing to flush\n"); + return SD_RES_SUCCESS; + } + + first_bit = ffsll(bmap) - 1; + last_bit = fls64(bmap) - 1; + + dprintf("bmap:0x%"PRIx64", first_bit:%d, last_bit:%d\n", + bmap, first_bit, last_bit); + offset = first_bit * CACHE_BLOCK_SIZE; + data_length = (last_bit - first_bit + 1) * CACHE_BLOCK_SIZE; + + /* + * CACHE_BLOCK_SIZE may not be divisible by SD_INODE_SIZE, + * so (offset + data_length) could larger than SD_INODE_SIZE + */ + if (is_vdi_obj(oid) && (offset + data_length) > SD_INODE_SIZE) + data_length = SD_INODE_SIZE - offset; + + buf = valloc(data_length); + if (buf == NULL) { + eprintf("failed to allocate memory\n"); + goto out; + } + + ret = read_cache_object(vid, idx, buf, data_length, offset); + if (ret != SD_RES_SUCCESS) + goto out; + + if (create) + sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ); + else + sd_init_req(&hdr, SD_OP_WRITE_OBJ); + hdr.flags = SD_FLAG_CMD_WRITE; + hdr.data_length = data_length; + hdr.obj.oid = oid; + hdr.obj.offset = offset; + + ret = exec_local_req(&hdr, buf); + if (ret != SD_RES_SUCCESS) + eprintf("failed to push object %x\n", ret); + +out: + free(buf); + return ret; +} + +static int reclaim_object(struct object_cache_entry *entry) +{ + struct object_cache *oc = entry->oc; + int ret = SD_RES_SUCCESS; + + pthread_rwlock_wrlock(&oc->lock); + dprintf("reclaiming /%06"PRIx32"/%08"PRIx32", cache_size: %ld\n", + oc->vid, entry->idx, uatomic_read(&sys_cache.cache_size)); + + if (uatomic_read(&entry->refcnt) > 0) { + ret = -1; + goto out; + } + + if (entry_is_dirty(entry)) { + uint64_t bmap = entry->bmap; + int create = entry->idx & CACHE_CREATE_BIT; + + if (oc->in_flush) { + ret = -1; + goto out; + } + + entry->bmap = 0; + del_from_dirty_tree_and_list(entry, &oc->dirty_tree); + pthread_rwlock_unlock(&oc->lock); + + ret = push_cache_object(oc->vid, entry->idx, bmap, create); + + pthread_rwlock_wrlock(&oc->lock); + if (ret != SD_RES_SUCCESS) { + /* still dirty */ + entry->bmap |= bmap; + ret = -1; + goto out; + } + + entry->idx &= ~CACHE_CREATE_BIT; + /* dirty again */ + if (entry_is_dirty(entry)) { + dprintf("object cache is dirty again %06" PRIx32 "\n", + entry->idx); + ret = -1; + goto out; + } + + if (oc->in_flush) { + ret = -1; + goto out; + } + + if (uatomic_read(&entry->refcnt) > 0) { + ret = -1; + goto out; + } + } + + entry->idx |= CACHE_RECLAIM_BIT; + + ret = remove_cache_object(oc, entry->idx); + if (ret == SD_RES_SUCCESS) + del_from_object_tree_and_list(entry, &oc->object_tree); +out: + pthread_rwlock_unlock(&oc->lock); + return ret; +} + +static void reclaim_work(struct work *work) +{ + struct object_cache_entry *entry, *n; + int ret; + + /* TODO confirm whether this check is necessary */ + if (node_in_recovery()) + return; + + list_for_each_entry_revert_safe_rcu(entry, n, + &sys_cache.cache_lru_list, lru_list) { + unsigned data_length; + /* Reclaim cache to 80% of max size */ + if (uatomic_read(&sys_cache.cache_size) <= + sys->cache_size * 8 / 10) + break; + + ret = reclaim_object(entry); + if (ret != SD_RES_SUCCESS) + continue; + if (idx_has_vdi_bit(entry->idx)) + data_length = SD_INODE_SIZE; + else + data_length = SD_DATA_OBJ_SIZE; + + uatomic_sub(&sys_cache.cache_size, data_length); + free(entry); + } + + dprintf("cache reclaim complete\n"); +} + +static void reclaim_done(struct work *work) +{ + uatomic_set(&sys_cache.reclaiming, 0); + free(work); +} + static struct object_cache_entry * dirty_tree_insert(struct object_cache *oc, uint32_t idx, uint64_t bmap, int create) @@ -171,18 +531,21 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx, struct rb_node **p = &oc->dirty_tree.rb_node; struct rb_node *parent = NULL; struct object_cache_entry *entry; + idx = idx_mask(idx); while (*p) { parent = *p; entry = rb_entry(parent, struct object_cache_entry, dirty_node); - if (idx < entry->idx) + if (idx < idx_mask(entry->idx)) p = &(*p)->rb_left; - else if (idx > entry->idx) + else if (idx > idx_mask(entry->idx)) p = &(*p)->rb_right; else { /* already has this entry, merge bmap */ entry->bmap |= bmap; + if (create) + entry->idx |= CACHE_CREATE_BIT; return entry; } } @@ -192,7 +555,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx, return NULL; entry->bmap |= bmap; - entry->flags |= ENTRY_CREATE_BIT; + if (create) + entry->idx |= CACHE_CREATE_BIT; rb_link_node(&entry->dirty_node, parent, p); rb_insert_color(&entry->dirty_node, &oc->dirty_tree); list_add(&entry->list, &oc->dirty_list); @@ -200,26 +564,6 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx, return entry; } -static struct object_cache_entry *dirty_tree_search(struct rb_root *root, - uint32_t idx) -{ - struct rb_node *n = root->rb_node; - struct object_cache_entry *t; - - while (n) { - t = rb_entry(n, struct object_cache_entry, dirty_node); - - if (idx < t->idx) - n = n->rb_left; - else if (idx > t->idx) - n = n->rb_right; - else - return t; /* found it */ - } - - return NULL; -} - static int create_dir_for(uint32_t vid) { int ret = 0; @@ -257,6 +601,7 @@ not_found: if (create) { cache = xzalloc(sizeof(*cache)); cache->vid = vid; + cache->object_tree = RB_ROOT; create_dir_for(vid); cache->dirty_tree = RB_ROOT; @@ -271,14 +616,6 @@ out: return cache; } -static inline void -del_from_dirty_tree_and_list(struct object_cache_entry *entry, - struct rb_root *dirty_tree) -{ - rb_erase(&entry->dirty_node, dirty_tree); - list_del(&entry->list); -} - /* Caller should hold the oc->lock */ static inline void add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx, @@ -289,6 +626,9 @@ add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx, if (!entry) panic("Can not find object entry %" PRIx32 "\n", idx); + if (cache_in_reclaim(0)) + return; + /* If cache isn't in reclaiming, move it * to the head of lru list */ cds_list_del_rcu(&entry->lru_list); @@ -321,6 +661,9 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx) entry->idx = idx; CDS_INIT_LIST_HEAD(&entry->lru_list); + dprintf("cache object for vdi %" PRIx32 ", idx %08" PRIx32 "added\n", + oc->vid, idx); + pthread_rwlock_wrlock(&oc->lock); old = object_cache_insert(&oc->object_tree, entry); if (!old) { @@ -331,142 +674,76 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx) entry = old; } pthread_rwlock_unlock(&oc->lock); + + if (sys->cache_size && + uatomic_read(&sys_cache.cache_size) > sys->cache_size && + !cache_in_reclaim(1)) { + struct work *work = xzalloc(sizeof(struct work)); + work->fn = reclaim_work; + work->done = reclaim_done; + queue_work(sys->reclaim_wqueue, work); + } +} + +static inline struct object_cache_entry * +find_cache_entry(struct object_cache *oc, uint32_t idx) +{ + struct object_cache_entry *entry; + + entry = object_tree_search(&oc->object_tree, idx); + if (!entry || entry->idx & CACHE_RECLAIM_BIT) + return NULL; + + return entry; } static int object_cache_lookup(struct object_cache *oc, uint32_t idx, int create) { struct strbuf buf; - int fd, ret = 0, flags = def_open_flags; + int fd, ret = SD_RES_SUCCESS, flags = def_open_flags; + unsigned data_length; + + if (!create) { + pthread_rwlock_wrlock(&oc->lock); + if (!find_cache_entry(oc, idx)) + ret = SD_RES_NO_CACHE; + pthread_rwlock_unlock(&oc->lock); + return ret; + } strbuf_init(&buf, PATH_MAX); strbuf_addstr(&buf, cache_dir); strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx); - if (create) - flags |= O_CREAT | O_TRUNC; + flags |= O_CREAT | O_TRUNC; fd = open(buf.buf, flags, def_fmode); if (fd < 0) { - ret = -1; - goto out; - } - - if (create) { - unsigned data_length; - - if (idx_has_vdi_bit(idx)) - data_length = SD_INODE_SIZE; - else - data_length = SD_DATA_OBJ_SIZE; - - ret = prealloc(fd, data_length); - if (ret != SD_RES_SUCCESS) - ret = -1; - else { - uint64_t bmap = UINT64_MAX; - - add_to_object_cache(oc, idx); - - pthread_rwlock_wrlock(&oc->lock); - add_to_dirty_tree_and_list(oc, idx, bmap, 1); - pthread_rwlock_unlock(&oc->lock); - } - } - close(fd); -out: - strbuf_release(&buf); - return ret; -} - -static int write_cache_object(uint32_t vid, uint32_t idx, void *buf, - size_t count, off_t offset) -{ - size_t size; - int fd, flags = def_open_flags, ret = SD_RES_SUCCESS; - struct strbuf p; - - strbuf_init(&p, PATH_MAX); - strbuf_addstr(&p, cache_dir); - strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx); - - if (sys->use_directio && !idx_has_vdi_bit(idx)) - flags |= O_DIRECT; - - fd = open(p.buf, flags, def_fmode); - if (fd < 0) { - eprintf("%m\n"); ret = SD_RES_EIO; goto out; } - if (flock(fd, LOCK_EX) < 0) { - ret = SD_RES_EIO; - eprintf("%m\n"); - goto out_close; - } - size = xpwrite(fd, buf, count, offset); - if (flock(fd, LOCK_UN) < 0) { - ret = SD_RES_EIO; - eprintf("%m\n"); - goto out_close; - } - - if (size != count) { - eprintf("size %zu, count:%zu, offset %jd %m\n", - size, count, (intmax_t)offset); - ret = SD_RES_EIO; - } -out_close: - close(fd); -out: - strbuf_release(&p); - return ret; -} - -static int read_cache_object(uint32_t vid, uint32_t idx, void *buf, - size_t count, off_t offset) -{ - size_t size; - int fd, flags = def_open_flags, ret = SD_RES_SUCCESS; - struct strbuf p; - - strbuf_init(&p, PATH_MAX); - strbuf_addstr(&p, cache_dir); - strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx); - - if (sys->use_directio && !idx_has_vdi_bit(idx)) - flags |= O_DIRECT; + if (idx_has_vdi_bit(idx)) + data_length = SD_INODE_SIZE; + else + data_length = SD_DATA_OBJ_SIZE; - fd = open(p.buf, flags, def_fmode); - if (fd < 0) { - eprintf("%m\n"); + ret = prealloc(fd, data_length); + if (ret != SD_RES_SUCCESS) ret = SD_RES_EIO; - goto out; - } + else { + uint64_t bmap = UINT64_MAX; - if (flock(fd, LOCK_SH) < 0) { - ret = SD_RES_EIO; - eprintf("%m\n"); - goto out_close; - } - size = xpread(fd, buf, count, offset); - if (flock(fd, LOCK_UN) < 0) { - ret = SD_RES_EIO; - eprintf("%m\n"); - goto out_close; - } + add_to_object_cache(oc, idx); - if (size != count) { - eprintf("size %zu, count:%zu, offset %jd %m\n", - size, count, (intmax_t)offset); - ret = SD_RES_EIO; + pthread_rwlock_wrlock(&oc->lock); + add_to_dirty_tree_and_list(oc, idx, bmap, 1); + pthread_rwlock_unlock(&oc->lock); } - -out_close: close(fd); out: - strbuf_release(&p); + strbuf_release(&buf); return ret; } @@ -531,7 +808,7 @@ static int object_cache_pull(struct object_cache *oc, uint32_t idx) oid = vid_to_vdi_oid(oc->vid); data_length = SD_INODE_SIZE; } else { - oid = vid_to_data_oid(oc->vid, idx); + oid = cache_vid_to_data_oid(oc->vid, idx); data_length = SD_DATA_OBJ_SIZE; } @@ -558,75 +835,6 @@ out: return ret; } -static uint64_t idx_to_oid(uint32_t vid, uint32_t idx) -{ - if (idx_has_vdi_bit(idx)) - return vid_to_vdi_oid(vid); - else - return vid_to_data_oid(vid, idx); -} - -static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap, - int create) -{ - struct sd_req hdr; - void *buf; - off_t offset; - unsigned data_length; - int ret = SD_RES_NO_MEM; - uint64_t oid = idx_to_oid(vid, idx); - int first_bit, last_bit; - - dprintf("%"PRIx64", create %d\n", oid, create); - - if (!bmap) { - dprintf("WARN: nothing to flush\n"); - return SD_RES_SUCCESS; - } - - first_bit = ffsll(bmap) - 1; - last_bit = fls64(bmap) - 1; - - dprintf("bmap:0x%"PRIx64", first_bit:%d, last_bit:%d\n", - bmap, first_bit, last_bit); - offset = first_bit * CACHE_BLOCK_SIZE; - data_length = (last_bit - first_bit + 1) * CACHE_BLOCK_SIZE; - - /* - * CACHE_BLOCK_SIZE may not be divisible by SD_INODE_SIZE, - * so (offset + data_length) could larger than SD_INODE_SIZE - */ - if (is_vdi_obj(oid) && (offset + data_length) > SD_INODE_SIZE) - data_length = SD_INODE_SIZE - offset; - - buf = valloc(data_length); - if (buf == NULL) { - eprintf("failed to allocate memory\n"); - goto out; - } - - ret = read_cache_object(vid, idx, buf, data_length, offset); - if (ret != SD_RES_SUCCESS) - goto out; - - if (create) - sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ); - else - sd_init_req(&hdr, SD_OP_WRITE_OBJ); - hdr.flags = SD_FLAG_CMD_WRITE; - hdr.data_length = data_length; - hdr.obj.oid = oid; - hdr.obj.offset = offset; - - ret = exec_local_req(&hdr, buf); - if (ret != SD_RES_SUCCESS) - eprintf("failed to push object %x\n", ret); - -out: - free(buf); - return ret; -} - /* Push back all the dirty objects to sheep cluster storage */ static int object_cache_push(struct object_cache *oc) { @@ -640,27 +848,33 @@ static int object_cache_push(struct object_cache *oc) return SD_RES_SUCCESS; pthread_rwlock_wrlock(&oc->lock); + oc->in_flush = 1; list_splice_init(&oc->dirty_list, &inactive_dirty_list); pthread_rwlock_unlock(&oc->lock); list_for_each_entry_safe(entry, t, &inactive_dirty_list, list) { - pthread_rwlock_rdlock(&oc->lock); + pthread_rwlock_wrlock(&oc->lock); bmap = entry->bmap; - create = entry->flags & ENTRY_CREATE_BIT; + create = entry->idx & CACHE_CREATE_BIT; + entry->bmap = 0; + del_from_dirty_tree_and_list(entry, &oc->dirty_tree); pthread_rwlock_unlock(&oc->lock); ret = push_cache_object(oc->vid, entry->idx, bmap, create); - if (ret != SD_RES_SUCCESS) - goto push_failed; pthread_rwlock_wrlock(&oc->lock); - del_from_dirty_tree_and_list(entry, &oc->dirty_tree); + if (ret != SD_RES_SUCCESS) { + entry->bmap |= bmap; + goto push_failed; + } + entry->idx &= ~CACHE_CREATE_BIT; pthread_rwlock_unlock(&oc->lock); } + oc->in_flush = 0; return ret; push_failed: - pthread_rwlock_wrlock(&oc->lock); + oc->in_flush = 0; list_splice_init(&inactive_dirty_list, &oc->dirty_list); pthread_rwlock_unlock(&oc->lock); @@ -677,10 +891,7 @@ int object_is_cached(uint64_t oid) if (!cache) return 0; - if (object_cache_lookup(cache, idx, 0) < 0) - return 0; - else - return 1; /* found it */ + return (object_cache_lookup(cache, idx, 0) == SD_RES_SUCCESS); } void object_cache_delete(uint32_t vid) @@ -712,6 +923,30 @@ void object_cache_delete(uint32_t vid) } +static struct object_cache_entry * +get_cache_entry(struct object_cache *cache, uint32_t idx) +{ + struct object_cache_entry *entry; + + pthread_rwlock_rdlock(&cache->lock); + entry = find_cache_entry(cache, idx); + if (!entry) { + /* The cache entry may be reclaimed, so try again. */ + pthread_rwlock_unlock(&cache->lock); + return NULL; + } + + uatomic_inc(&entry->refcnt); + pthread_rwlock_unlock(&cache->lock); + + return entry; +} + +static void put_cache_entry(struct object_cache_entry *entry) +{ + uatomic_dec(&entry->refcnt); +} + static int object_cache_flush_and_delete(struct object_cache *oc) { DIR *dir; @@ -750,6 +985,7 @@ static int object_cache_flush_and_delete(struct object_cache *oc) } object_cache_delete(vid); + out: strbuf_release(&p); return ret; @@ -773,10 +1009,10 @@ int bypass_object_cache(struct request *req) /* For read requet, we can read cache if any */ uint32_t idx = object_cache_oid_to_idx(oid); - if (object_cache_lookup(cache, idx, 0) < 0) - return 1; - else + if (object_cache_lookup(cache, idx, 0) == 0) return 0; + else + return 1; } } @@ -807,37 +1043,44 @@ int object_cache_handle_request(struct request *req) if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ) create = 1; - if (object_cache_lookup(cache, idx, create) < 0) { +retry: + ret = object_cache_lookup(cache, idx, create); + if (ret == SD_RES_NO_CACHE) { ret = object_cache_pull(cache, idx); if (ret != SD_RES_SUCCESS) return ret; + } else if (ret == SD_RES_EIO) + return ret; + + entry = get_cache_entry(cache, idx); + if (!entry) { + dprintf("cache entry %"PRIx32"/%"PRIx32" may be reclaimed\n", + vid, idx); + goto retry; } - pthread_rwlock_rdlock(&cache->lock); - entry = object_tree_search(&cache->object_tree, idx); - pthread_rwlock_unlock(&cache->lock); - if (hdr->flags & SD_FLAG_CMD_WRITE) { ret = write_cache_object(cache->vid, idx, req->data, hdr->data_length, hdr->obj.offset); if (ret != SD_RES_SUCCESS) - goto out; + goto err; update_cache_entry(cache, idx, hdr->data_length, hdr->obj.offset); } else { ret = read_cache_object(cache->vid, idx, req->data, hdr->data_length, hdr->obj.offset); if (ret != SD_RES_SUCCESS) - goto out; + goto err; req->rp.data_length = hdr->data_length; - if (entry) { + if (entry && !cache_in_reclaim(0)) { cds_list_del_rcu(&entry->lru_list); cds_list_add_rcu(&entry->lru_list, &sys_cache.cache_lru_list); } } -out: +err: + put_cache_entry(entry); return ret; } @@ -847,13 +1090,25 @@ int object_cache_write(uint64_t oid, char *data, unsigned int datalen, uint32_t vid = oid_to_vid(oid); uint32_t idx = object_cache_oid_to_idx(oid); struct object_cache *cache; + struct object_cache_entry *entry; int ret; cache = find_object_cache(vid, 0); + dprintf("cache object write %" PRIx32 "\n", idx); + + entry = get_cache_entry(cache, idx); + if (!entry) { + panic("cache object %" PRIx32 " doesn't exist\n", idx); + return SD_RES_NO_CACHE; + } + ret = write_cache_object(vid, idx, data, datalen, offset); if (ret == SD_RES_SUCCESS) update_cache_entry(cache, idx, datalen, offset); + + put_cache_entry(entry); + return ret; } @@ -862,8 +1117,25 @@ int object_cache_read(uint64_t oid, char *data, unsigned int datalen, { uint32_t vid = oid_to_vid(oid); uint32_t idx = object_cache_oid_to_idx(oid); + struct object_cache *cache; + struct object_cache_entry *entry; + int ret; + + cache = find_object_cache(vid, 0); - return read_cache_object(vid, idx, data, datalen, offset); + dprintf("cache object read %" PRIx32 "\n", idx); + + entry = get_cache_entry(cache, idx); + if (!entry) { + panic("cache object %" PRIx32 " doesn't exist\n", idx); + return SD_RES_NO_CACHE; + } + + ret = read_cache_object(vid, idx, data, datalen, offset); + + put_cache_entry(entry); + + return ret; } int object_cache_flush_vdi(struct request *req) @@ -884,8 +1156,10 @@ int object_cache_flush_and_del(struct request *req) struct object_cache *cache; cache = find_object_cache(vid, 0); + if (cache && object_cache_flush_and_delete(cache) < 0) return SD_RES_EIO; + return SD_RES_SUCCESS; } @@ -999,6 +1273,7 @@ int object_cache_init(const char *p) CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list); uatomic_set(&sys_cache.cache_size, 0); + uatomic_set(&sys_cache.reclaiming, 0); ret = load_existing_cache(); err: diff --git a/sheep/sheep.c b/sheep/sheep.c index 9f371ba..d17884d 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -356,8 +356,9 @@ int main(int argc, char **argv) sys->deletion_wqueue = init_work_queue("deletion", true); sys->block_wqueue = init_work_queue("block", true); sys->sockfd_wqueue = init_work_queue("sockfd", true); + sys->reclaim_wqueue = init_work_queue("reclaim", true); if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue || - !sys->deletion_wqueue || !sys->block_wqueue) + !sys->deletion_wqueue || !sys->block_wqueue || !sys->reclaim_wqueue) exit(1); ret = trace_init(); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index fe61411..2090d67 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -125,6 +125,7 @@ struct cluster_info { struct work_queue *recovery_wqueue; struct work_queue *block_wqueue; struct work_queue *sockfd_wqueue; + struct work_queue *reclaim_wqueue; }; struct siocb { diff --git a/sheep/store.c b/sheep/store.c index a05822d..ddb9511 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -495,6 +495,9 @@ int write_object(uint64_t oid, char *data, unsigned int datalen, if (sys->enable_write_cache && object_is_cached(oid)) { ret = object_cache_write(oid, data, datalen, offset, flags, create); + if (ret == SD_RES_NO_CACHE) + goto forward_write; + if (ret != 0) { eprintf("write cache failed %"PRIx64" %"PRIx32"\n", oid, ret); @@ -502,6 +505,7 @@ int write_object(uint64_t oid, char *data, unsigned int datalen, } } +forward_write: if (create) sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ); else -- 1.7.10 |