On 2012年07月26日 11:27, Liu Yuan wrote: > On 07/25/2012 08:15 PM, levin li wrote: >> +#define list_for_each_entry_revert_safe_rcu(pos, n, head, member) \ >> + for (pos = cds_list_entry(rcu_dereference((head)->prev), \ >> + typeof(*pos), member), \ >> + n = cds_list_entry(rcu_dereference(pos->member.prev),\ >> + typeof(*pos), member); \ >> + &pos->member != (head); \ >> + pos = n, n = cds_list_entry(rcu_dereference(pos->member.prev),\ >> + typeof(*pos), member)) >> > > This should be placed in list.h > >> struct global_cache { >> uint64_t cache_size; >> @@ -50,6 +64,7 @@ struct global_cache { >> >> struct object_cache { >> uint32_t vid; >> + int flushing; > > uint8_t is enough. > >> + >> +static int check_cache_status(struct object_cache *oc, >> + struct object_cache_entry *entry) >> +{ >> + int refcnt = uatomic_read(&entry->refcnt); >> + >> + if (cache_is_flushing(oc)) { >> + dprintf("cache %" PRIx32 " is flushing, don't reclaim it.\n", >> + oc->vid); >> + return SD_RES_CACHE_FLUSHING; >> + } >> + >> + /* If entry is being accessed, we don't reclaim it */ >> + if (refcnt > 0) { > > uatomic_read(&entry->refcnt); should be placed in if clause. > >> + dprintf("cache object %" PRIx32 "(%08" PRIx32 ") " >> + "can't be reclaimed, refcnt: %d\n", >> + oc->vid, entry->idx, refcnt); >> + return SD_RES_CACHE_REFERENCING; >> + } >> + >> + return SD_RES_SUCCESS; >> +} >> + >> +static int reclaim_object(struct object_cache_entry *entry) >> +{ >> + struct object_cache *oc = entry->oc; >> + uint32_t idx = entry->idx; >> + int ret = SD_RES_SUCCESS; >> + >> + pthread_rwlock_wrlock(&oc->lock); >> + dprintf("reclaiming /%06"PRIx32"/%08"PRIx32", cache_size: %ld\n", >> + oc->vid, idx, uatomic_read(&sys_cache.cache_size)); >> + >> + ret = check_cache_status(oc, entry); >> + if (ret != SD_RES_SUCCESS) >> + goto out; >> + >> + if (entry_is_dirty(entry)) { >> + uint64_t bmap = entry->bmap; >> + int create = entry->flags & ENTRY_CREATE_BIT; >> + >> + entry->bmap = 0; >> + del_from_dirty_tree_and_list(entry, &oc->dirty_tree); >> + pthread_rwlock_unlock(&oc->lock); >> + >> + ret = push_cache_object(oc->vid, idx, bmap, create); >> + >> + pthread_rwlock_wrlock(&oc->lock); >> + if (ret == SD_RES_SUCCESS) { >> + entry->flags &= ~ENTRY_CREATE_BIT; >> + /* dirty again */ >> + if (entry_is_dirty(entry)) { >> + dprintf("object cache is dirty again %06" PRIx32 "\n", idx); >> + ret = SD_RES_CACHE_REFERENCING; >> + goto out; >> + } >> + } else { >> + /* still dirty */ >> + entry->bmap |= bmap; >> + ret = SD_RES_CACHE_REFERENCING; >> + goto out; >> + } >> + >> + /* Now we get lock again, check cache status again */ >> + ret = check_cache_status(oc, entry); >> + if (ret != SD_RES_SUCCESS) >> + goto out; >> + } >> + >> + /* Mark the entry as being reclaimed */ >> + entry_start_reclaiming(entry); > > rename entry_start_reclaiming as mark_entry_dirty() instead of adding > another comment. > >> + >> + ret = remove_cache_object(oc, idx); >> + if (ret == SD_RES_SUCCESS) >> + del_from_object_tree_and_list(entry, &oc->object_tree); >> +out: >> + pthread_rwlock_unlock(&oc->lock); >> + return ret; >> +} >> + >> +static void reclaim_work(struct work *work) >> +{ >> + struct object_cache_entry *entry, *n; >> + int ret; >> + >> + if (node_in_recovery()) >> + return; >> + >> + list_for_each_entry_revert_safe_rcu(entry, n, >> + &sys_cache.cache_lru_list, lru_list) { >> + /* Reclaim cache to 80% of max size */ >> + if (uatomic_read(&sys_cache.cache_size) <= >> + sys->cache_size * 8 / 10) >> + break; >> + >> + ret = reclaim_object(entry); > > check if (ret == SD_RES_CACHE_FLUSHING) first, then we don't need else > clause. > >> + if (ret == SD_RES_SUCCESS) { >> + unsigned data_length; >> + if (idx_has_vdi_bit(entry->idx)) >> + data_length = SD_INODE_SIZE / 1024; >> + else >> + data_length = SD_DATA_OBJ_SIZE / 1024; >> + > > Why 1024? can't we simply cache_size - data_length? > >> + uatomic_sub(&sys_cache.cache_size, data_length); >> + free(entry); >> + } else if (ret == SD_RES_CACHE_FLUSHING) >> + /* If cache is flushing, stop reclaiming. */ >> + break; >> + } >> + >> + dprintf("cache reclaim complete\n"); >> +} >> + >> +static void reclaim_done(struct work *work) >> +{ >> + uatomic_set(&sys_cache.reclaiming, 0); >> + free(work); >> +} >> + >> static struct object_cache_entry * >> dirty_tree_insert(struct object_cache *oc, uint32_t idx, >> uint64_t bmap, int create) >> @@ -183,6 +406,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx, >> else { >> /* already has this entry, merge bmap */ >> entry->bmap |= bmap; >> + if (create) >> + entry->flags |= ENTRY_CREATE_BIT; >> return entry; >> } >> } >> @@ -192,7 +417,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx, >> return NULL; >> >> entry->bmap |= bmap; >> - entry->flags |= ENTRY_CREATE_BIT; >> + if (create) >> + entry->flags |= ENTRY_CREATE_BIT; >> rb_link_node(&entry->dirty_node, parent, p); >> rb_insert_color(&entry->dirty_node, &oc->dirty_tree); >> list_add(&entry->list, &oc->dirty_list); >> @@ -200,26 +426,6 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx, >> return entry; >> } >> >> -static struct object_cache_entry *dirty_tree_search(struct rb_root *root, >> - uint32_t idx) >> -{ >> - struct rb_node *n = root->rb_node; >> - struct object_cache_entry *t; >> - >> - while (n) { >> - t = rb_entry(n, struct object_cache_entry, dirty_node); >> - >> - if (idx < t->idx) >> - n = n->rb_left; >> - else if (idx > t->idx) >> - n = n->rb_right; >> - else >> - return t; /* found it */ >> - } >> - >> - return NULL; >> -} >> - >> static int create_dir_for(uint32_t vid) >> { >> int ret = 0; >> @@ -257,6 +463,7 @@ not_found: >> if (create) { >> cache = xzalloc(sizeof(*cache)); >> cache->vid = vid; >> + cache->object_tree = RB_ROOT; >> create_dir_for(vid); >> >> cache->dirty_tree = RB_ROOT; >> @@ -271,14 +478,6 @@ out: >> return cache; >> } >> >> -static inline void >> -del_from_dirty_tree_and_list(struct object_cache_entry *entry, >> - struct rb_root *dirty_tree) >> -{ >> - rb_erase(&entry->dirty_node, dirty_tree); >> - list_del(&entry->list); >> -} >> - >> /* Caller should hold the oc->lock */ >> static inline void >> add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx, >> @@ -289,6 +488,9 @@ add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx, >> if (!entry) >> panic("Can not find object entry %" PRIx32 "\n", idx); >> >> + if (cache_is_reclaiming()) > > should be renamed as cache_in_reclaim() > >> + return; >> + >> /* If cache isn't in reclaiming, move it >> * to the head of lru list */ >> cds_list_del_rcu(&entry->lru_list); >> @@ -321,6 +523,9 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx) >> entry->idx = idx; >> CDS_INIT_LIST_HEAD(&entry->lru_list); >> >> + dprintf("cache object for vdi %" PRIx32 ", idx %08" PRIx32 "added\n", >> + oc->vid, idx); >> + >> pthread_rwlock_wrlock(&oc->lock); >> old = object_cache_insert(&oc->object_tree, entry); >> if (!old) { >> @@ -331,20 +536,55 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx) >> entry = old; >> } >> pthread_rwlock_unlock(&oc->lock); >> + >> + dprintf("sys_cache.cache_size %" PRIx64 ", sys->cache_size %" PRIx64 "\n", >> + uatomic_read(&sys_cache.cache_size), sys->cache_size); >> + if (sys->cache_size && >> + uatomic_read(&sys_cache.cache_size) > sys->cache_size && >> + !cache_is_reclaiming()) { >> + struct work *work = xzalloc(sizeof(struct work)); >> + uatomic_set(&sys_cache.reclaiming, 1); >> + work->fn = reclaim_work; >> + work->done = reclaim_done; >> + queue_work(sys->reclaim_wqueue, work); >> + } >> +} >> + >> +static inline int cache_sanity_check(struct object_cache *oc, uint32_t idx, >> + struct object_cache_entry **entry) >> +{ >> + struct object_cache_entry *ent; >> + >> + ent = object_tree_search(&oc->object_tree, idx); >> + >> + if (!ent || entry_is_reclaiming(ent)) >> + return SD_RES_NO_CACHE; >> + >> + if (entry) >> + *entry = ent; >> + >> + return SD_RES_SUCCESS; >> } > > better renamed as get_cache_entry() which inc refcnt inside if > successful and add another helper find_cache_entry() to find the entry, > put_cache_entry() to dec the refcnt (other uninit work if any). > > get_cache_entry() { > entry = find_cache_entry(); > ... > inc refcnt.. > ... > } > > Then find_cache_entry can be used by object_cache_lookup() too. > You mean rename object_cache_access_begin/end to get{put}_cache_entry? >> >> static int object_cache_lookup(struct object_cache *oc, uint32_t idx, >> int create) >> { >> struct strbuf buf; >> - int fd, ret = 0, flags = def_open_flags; >> + int fd, ret = SD_RES_SUCCESS, flags = def_open_flags; >> + unsigned data_length; >> + >> + if (!create) { >> + pthread_rwlock_wrlock(&oc->lock); >> + ret = cache_sanity_check(oc, idx, NULL); >> + pthread_rwlock_unlock(&oc->lock); >> + return ret; >> + } >> >> strbuf_init(&buf, PATH_MAX); >> strbuf_addstr(&buf, cache_dir); >> strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx); >> >> - if (create) >> - flags |= O_CREAT | O_TRUNC; >> + flags |= O_CREAT | O_TRUNC; >> >> fd = open(buf.buf, flags, def_fmode); >> if (fd < 0) { >> @@ -352,26 +592,22 @@ static int object_cache_lookup(struct object_cache *oc, uint32_t idx, >> goto out; >> } >> >> - if (create) { >> - unsigned data_length; >> - >> - if (idx_has_vdi_bit(idx)) >> - data_length = SD_INODE_SIZE; >> - else >> - data_length = SD_DATA_OBJ_SIZE; >> + if (idx_has_vdi_bit(idx)) >> + data_length = SD_INODE_SIZE; >> + else >> + data_length = SD_DATA_OBJ_SIZE; >> >> - ret = prealloc(fd, data_length); >> - if (ret != SD_RES_SUCCESS) >> - ret = -1; >> - else { >> - uint64_t bmap = UINT64_MAX; >> + ret = prealloc(fd, data_length); >> + if (ret != SD_RES_SUCCESS) >> + ret = -1; >> + else { >> + uint64_t bmap = UINT64_MAX; >> >> - add_to_object_cache(oc, idx); >> + add_to_object_cache(oc, idx); >> >> - pthread_rwlock_wrlock(&oc->lock); >> - add_to_dirty_tree_and_list(oc, idx, bmap, 1); >> - pthread_rwlock_unlock(&oc->lock); >> - } >> + pthread_rwlock_wrlock(&oc->lock); >> + add_to_dirty_tree_and_list(oc, idx, bmap, 1); >> + pthread_rwlock_unlock(&oc->lock); >> } >> close(fd); >> out: >> @@ -651,20 +887,23 @@ static int object_cache_push(struct object_cache *oc) >> pthread_rwlock_rdlock(&oc->lock); >> bmap = entry->bmap; >> create = entry->flags & ENTRY_CREATE_BIT; >> + entry->bmap = 0; >> pthread_rwlock_unlock(&oc->lock); >> >> ret = push_cache_object(oc->vid, entry->idx, bmap, create); >> - if (ret != SD_RES_SUCCESS) >> - goto push_failed; >> >> pthread_rwlock_wrlock(&oc->lock); >> + if (ret != SD_RES_SUCCESS) { >> + entry->bmap |= bmap; >> + goto push_failed; >> + } >> + entry->flags &= ~ENTRY_CREATE_BIT; >> del_from_dirty_tree_and_list(entry, &oc->dirty_tree); >> pthread_rwlock_unlock(&oc->lock); >> } >> return ret; >> >> push_failed: >> - pthread_rwlock_wrlock(&oc->lock); >> list_splice_init(&inactive_dirty_list, &oc->dirty_list); >> pthread_rwlock_unlock(&oc->lock); >> >> @@ -681,10 +920,10 @@ int object_is_cached(uint64_t oid) >> if (!cache) >> return 0; >> >> - if (object_cache_lookup(cache, idx, 0) < 0) >> - return 0; >> + if (object_cache_lookup(cache, idx, 0) == SD_RES_SUCCESS) >> + return 1; >> else >> - return 1; /* found it */ >> + return 0; >> } >> >> void object_cache_delete(uint32_t vid) >> @@ -716,6 +955,52 @@ void object_cache_delete(uint32_t vid) >> >> } >> >> +static void object_cache_flush_begin(struct object_cache *oc) >> +{ >> + pthread_rwlock_wrlock(&oc->lock); >> + oc->flushing = 1; >> + pthread_rwlock_unlock(&oc->lock); >> +} >> + >> +static void object_cache_flush_end(struct object_cache *oc) >> +{ >> + pthread_rwlock_wrlock(&oc->lock); >> + oc->flushing = 0; >> + pthread_rwlock_unlock(&oc->lock); >> +} > > Why can't flushing be atomic? Better fold these two helpers into callers. > >> + >> +static int object_cache_access_begin(struct object_cache *cache, uint32_t idx, >> + struct object_cache_entry **entry) >> +{ >> + struct object_cache_entry *ent; >> + int ret = SD_RES_SUCCESS; >> + >> + if (entry) >> + *entry = NULL; >> + >> + pthread_rwlock_rdlock(&cache->lock); >> + ret = cache_sanity_check(cache, idx, &ent); >> + if (ret != SD_RES_SUCCESS) { >> + /* The cache entry may be reclaimed, so try again. */ >> + pthread_rwlock_unlock(&cache->lock); >> + dprintf("cache sanity error %d\n", ret); >> + return ret; >> + } >> + >> + uatomic_inc(&ent->refcnt); >> + pthread_rwlock_unlock(&cache->lock); >> + >> + if (entry) >> + *entry = ent; >> + >> + return ret; >> +} >> + > > It is cleaner to call access_begin/end() in read/write_cache_object(). > >> +static void object_cache_access_end(struct object_cache_entry *entry) >> +{ >> + uatomic_dec(&entry->refcnt); >> +} >> + >> static int object_cache_flush_and_delete(struct object_cache *oc) >> { >> DIR *dir; >> @@ -726,6 +1011,8 @@ static int object_cache_flush_and_delete(struct object_cache *oc) >> struct strbuf p; >> int ret = 0; >> >> + object_cache_flush_begin(oc); >> + >> strbuf_init(&p, PATH_MAX); >> strbuf_addstr(&p, cache_dir); >> strbuf_addf(&p, "/%06"PRIx32, vid); >> @@ -754,6 +1041,8 @@ static int object_cache_flush_and_delete(struct object_cache *oc) >> } >> >> object_cache_delete(vid); >> + >> + object_cache_flush_end(oc); >> out: >> strbuf_release(&p); >> return ret; >> @@ -777,10 +1066,10 @@ int bypass_object_cache(struct request *req) >> /* For read requet, we can read cache if any */ >> uint32_t idx = object_cache_oid_to_idx(oid); >> >> - if (object_cache_lookup(cache, idx, 0) < 0) >> - return 1; >> - else >> + if (object_cache_lookup(cache, idx, 0) == 0) >> return 0; >> + else >> + return 1; >> } >> } >> >> @@ -811,37 +1100,42 @@ int object_cache_handle_request(struct request *req) >> if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ) >> create = 1; >> >> - if (object_cache_lookup(cache, idx, create) < 0) { >> +retry: >> + ret = object_cache_lookup(cache, idx, create); >> + if (ret == SD_RES_NO_CACHE) { >> ret = object_cache_pull(cache, idx); >> if (ret != SD_RES_SUCCESS) >> return ret; >> } >> >> - pthread_rwlock_rdlock(&cache->lock); >> - entry = object_tree_search(&cache->object_tree, idx); >> - pthread_rwlock_unlock(&cache->lock); >> + ret = object_cache_access_begin(cache, idx, &entry); >> + if (ret != SD_RES_SUCCESS) { >> + dprintf("retrying... %d\n", ret); >> + goto retry; >> + } >> >> if (hdr->flags & SD_FLAG_CMD_WRITE) { >> ret = write_cache_object(cache->vid, idx, req->data, >> hdr->data_length, hdr->obj.offset); >> if (ret != SD_RES_SUCCESS) >> - goto out; >> + goto err; >> update_cache_entry(cache, idx, hdr->data_length, >> hdr->obj.offset); >> } else { >> ret = read_cache_object(cache->vid, idx, req->data, >> hdr->data_length, hdr->obj.offset); >> if (ret != SD_RES_SUCCESS) >> - goto out; >> + goto err; >> req->rp.data_length = hdr->data_length; >> >> - if (entry) { >> + if (entry && !cache_is_reclaiming()) { >> cds_list_del_rcu(&entry->lru_list); >> cds_list_add_rcu(&entry->lru_list, >> &sys_cache.cache_lru_list); >> } >> } >> -out: >> +err: >> + object_cache_access_end(entry); >> return ret; >> } >> >> @@ -851,13 +1145,23 @@ int object_cache_write(uint64_t oid, char *data, unsigned int datalen, >> uint32_t vid = oid_to_vid(oid); >> uint32_t idx = object_cache_oid_to_idx(oid); >> struct object_cache *cache; >> + struct object_cache_entry *entry; >> int ret; >> >> cache = find_object_cache(vid, 0); >> >> + dprintf("cache object write %" PRIx32 "\n", idx); >> + >> + ret = object_cache_access_begin(cache, idx, &entry); >> + if (ret != SD_RES_SUCCESS) >> + panic("cache object %" PRIx32 " doesn't exist\n", idx); >> + >> ret = write_cache_object(vid, idx, data, datalen, offset); >> if (ret == SD_RES_SUCCESS) >> update_cache_entry(cache, idx, datalen, offset); >> + >> + object_cache_access_end(entry); >> + >> return ret; >> } >> >> @@ -866,20 +1170,40 @@ int object_cache_read(uint64_t oid, char *data, unsigned int datalen, >> { >> uint32_t vid = oid_to_vid(oid); >> uint32_t idx = object_cache_oid_to_idx(oid); >> + struct object_cache *cache; >> + struct object_cache_entry *entry; >> + int ret; >> + >> + cache = find_object_cache(vid, 0); >> + >> + dprintf("cache object read %" PRIx32 "\n", idx); >> + >> + ret = object_cache_access_begin(cache, idx, &entry); >> + if (ret != SD_RES_SUCCESS) >> + panic("cache object %" PRIx32 " doesn't exist\n", idx); >> >> - return read_cache_object(vid, idx, data, datalen, offset); >> + ret = read_cache_object(vid, idx, data, datalen, offset); >> + >> + object_cache_access_end(entry); >> + >> + return ret; >> } >> >> int object_cache_flush_vdi(struct request *req) >> { >> uint32_t vid = oid_to_vid(req->rq.obj.oid); >> struct object_cache *cache; >> + int ret; >> >> cache = find_object_cache(vid, 0); >> if (!cache) >> return SD_RES_SUCCESS; >> >> - return object_cache_push(cache); >> + object_cache_flush_begin(cache); >> + ret = object_cache_push(cache); >> + object_cache_flush_end(cache); >> + >> + return ret; >> } >> >> int object_cache_flush_and_del(struct request *req) >> @@ -888,8 +1212,10 @@ int object_cache_flush_and_del(struct request *req) >> struct object_cache *cache; >> >> cache = find_object_cache(vid, 0); >> + >> if (cache && object_cache_flush_and_delete(cache) < 0) >> return SD_RES_EIO; >> + >> return SD_RES_SUCCESS; >> } >> >> @@ -1003,6 +1329,7 @@ int object_cache_init(const char *p) >> >> CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list); >> uatomic_set(&sys_cache.cache_size, 0); >> + uatomic_set(&sys_cache.reclaiming, 0); >> >> ret = load_existing_cache(); >> err: >> diff --git a/sheep/sheep.c b/sheep/sheep.c >> index 8b78669..29270e9 100644 >> --- a/sheep/sheep.c >> +++ b/sheep/sheep.c >> @@ -359,8 +359,9 @@ int main(int argc, char **argv) >> sys->deletion_wqueue = init_work_queue("deletion", true); >> sys->block_wqueue = init_work_queue("block", true); >> sys->sockfd_wqueue = init_work_queue("sockfd", true); >> + sys->reclaim_wqueue = init_work_queue("reclaim", true); >> if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue || >> - !sys->deletion_wqueue || !sys->block_wqueue) >> + !sys->deletion_wqueue || !sys->block_wqueue || !sys->reclaim_wqueue) >> exit(1); >> >> ret = trace_init(); >> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h >> index fe61411..2090d67 100644 >> --- a/sheep/sheep_priv.h >> +++ b/sheep/sheep_priv.h >> @@ -125,6 +125,7 @@ struct cluster_info { >> struct work_queue *recovery_wqueue; >> struct work_queue *block_wqueue; >> struct work_queue *sockfd_wqueue; >> + struct work_queue *reclaim_wqueue; >> }; >> >> struct siocb { >> diff --git a/sheep/store.c b/sheep/store.c >> index a05822d..4a65c9e 100644 >> --- a/sheep/store.c >> +++ b/sheep/store.c >> @@ -492,9 +492,13 @@ int write_object(uint64_t oid, char *data, unsigned int datalen, >> struct sd_req hdr; >> int ret; >> >> +retry: >> if (sys->enable_write_cache && object_is_cached(oid)) { >> ret = object_cache_write(oid, data, datalen, offset, >> flags, create); >> + if (ret == SD_RES_NO_CACHE) >> + goto retry; >> + >> if (ret != 0) { >> eprintf("write cache failed %"PRIx64" %"PRIx32"\n", >> oid, ret); >> @@ -529,8 +533,12 @@ int read_object(uint64_t oid, char *data, unsigned int datalen, >> struct sd_req hdr; >> int ret; >> >> +retry: >> if (sys->enable_write_cache && object_is_cached(oid)) { >> ret = object_cache_read(oid, data, datalen, offset); >> + if (ret == SD_RES_NO_CACHE) >> + goto retry; >> + >> if (ret != SD_RES_SUCCESS) { >> eprintf("try forward read %"PRIx64" %"PRIx32"\n", >> oid, ret); >> > |