On 07/25/2012 08:15 PM, levin li wrote: > +#define list_for_each_entry_revert_safe_rcu(pos, n, head, member) \ > + for (pos = cds_list_entry(rcu_dereference((head)->prev), \ > + typeof(*pos), member), \ > + n = cds_list_entry(rcu_dereference(pos->member.prev),\ > + typeof(*pos), member); \ > + &pos->member != (head); \ > + pos = n, n = cds_list_entry(rcu_dereference(pos->member.prev),\ > + typeof(*pos), member)) > This should be placed in list.h > struct global_cache { > uint64_t cache_size; > @@ -50,6 +64,7 @@ struct global_cache { > > struct object_cache { > uint32_t vid; > + int flushing; uint8_t is enough. > + > +static int check_cache_status(struct object_cache *oc, > + struct object_cache_entry *entry) > +{ > + int refcnt = uatomic_read(&entry->refcnt); > + > + if (cache_is_flushing(oc)) { > + dprintf("cache %" PRIx32 " is flushing, don't reclaim it.\n", > + oc->vid); > + return SD_RES_CACHE_FLUSHING; > + } > + > + /* If entry is being accessed, we don't reclaim it */ > + if (refcnt > 0) { uatomic_read(&entry->refcnt); should be placed in if clause. > + dprintf("cache object %" PRIx32 "(%08" PRIx32 ") " > + "can't be reclaimed, refcnt: %d\n", > + oc->vid, entry->idx, refcnt); > + return SD_RES_CACHE_REFERENCING; > + } > + > + return SD_RES_SUCCESS; > +} > + > +static int reclaim_object(struct object_cache_entry *entry) > +{ > + struct object_cache *oc = entry->oc; > + uint32_t idx = entry->idx; > + int ret = SD_RES_SUCCESS; > + > + pthread_rwlock_wrlock(&oc->lock); > + dprintf("reclaiming /%06"PRIx32"/%08"PRIx32", cache_size: %ld\n", > + oc->vid, idx, uatomic_read(&sys_cache.cache_size)); > + > + ret = check_cache_status(oc, entry); > + if (ret != SD_RES_SUCCESS) > + goto out; > + > + if (entry_is_dirty(entry)) { > + uint64_t bmap = entry->bmap; > + int create = entry->flags & ENTRY_CREATE_BIT; > + > + entry->bmap = 0; > + del_from_dirty_tree_and_list(entry, &oc->dirty_tree); > + pthread_rwlock_unlock(&oc->lock); > + > + ret = push_cache_object(oc->vid, idx, bmap, create); > + > + pthread_rwlock_wrlock(&oc->lock); > + if (ret == SD_RES_SUCCESS) { > + entry->flags &= ~ENTRY_CREATE_BIT; > + /* dirty again */ > + if (entry_is_dirty(entry)) { > + dprintf("object cache is dirty again %06" PRIx32 "\n", idx); > + ret = SD_RES_CACHE_REFERENCING; > + goto out; > + } > + } else { > + /* still dirty */ > + entry->bmap |= bmap; > + ret = SD_RES_CACHE_REFERENCING; > + goto out; > + } > + > + /* Now we get lock again, check cache status again */ > + ret = check_cache_status(oc, entry); > + if (ret != SD_RES_SUCCESS) > + goto out; > + } > + > + /* Mark the entry as being reclaimed */ > + entry_start_reclaiming(entry); rename entry_start_reclaiming as mark_entry_dirty() instead of adding another comment. > + > + ret = remove_cache_object(oc, idx); > + if (ret == SD_RES_SUCCESS) > + del_from_object_tree_and_list(entry, &oc->object_tree); > +out: > + pthread_rwlock_unlock(&oc->lock); > + return ret; > +} > + > +static void reclaim_work(struct work *work) > +{ > + struct object_cache_entry *entry, *n; > + int ret; > + > + if (node_in_recovery()) > + return; > + > + list_for_each_entry_revert_safe_rcu(entry, n, > + &sys_cache.cache_lru_list, lru_list) { > + /* Reclaim cache to 80% of max size */ > + if (uatomic_read(&sys_cache.cache_size) <= > + sys->cache_size * 8 / 10) > + break; > + > + ret = reclaim_object(entry); check if (ret == SD_RES_CACHE_FLUSHING) first, then we don't need else clause. > + if (ret == SD_RES_SUCCESS) { > + unsigned data_length; > + if (idx_has_vdi_bit(entry->idx)) > + data_length = SD_INODE_SIZE / 1024; > + else > + data_length = SD_DATA_OBJ_SIZE / 1024; > + Why 1024? can't we simply cache_size - data_length? > + uatomic_sub(&sys_cache.cache_size, data_length); > + free(entry); > + } else if (ret == SD_RES_CACHE_FLUSHING) > + /* If cache is flushing, stop reclaiming. */ > + break; > + } > + > + dprintf("cache reclaim complete\n"); > +} > + > +static void reclaim_done(struct work *work) > +{ > + uatomic_set(&sys_cache.reclaiming, 0); > + free(work); > +} > + > static struct object_cache_entry * > dirty_tree_insert(struct object_cache *oc, uint32_t idx, > uint64_t bmap, int create) > @@ -183,6 +406,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx, > else { > /* already has this entry, merge bmap */ > entry->bmap |= bmap; > + if (create) > + entry->flags |= ENTRY_CREATE_BIT; > return entry; > } > } > @@ -192,7 +417,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx, > return NULL; > > entry->bmap |= bmap; > - entry->flags |= ENTRY_CREATE_BIT; > + if (create) > + entry->flags |= ENTRY_CREATE_BIT; > rb_link_node(&entry->dirty_node, parent, p); > rb_insert_color(&entry->dirty_node, &oc->dirty_tree); > list_add(&entry->list, &oc->dirty_list); > @@ -200,26 +426,6 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx, > return entry; > } > > -static struct object_cache_entry *dirty_tree_search(struct rb_root *root, > - uint32_t idx) > -{ > - struct rb_node *n = root->rb_node; > - struct object_cache_entry *t; > - > - while (n) { > - t = rb_entry(n, struct object_cache_entry, dirty_node); > - > - if (idx < t->idx) > - n = n->rb_left; > - else if (idx > t->idx) > - n = n->rb_right; > - else > - return t; /* found it */ > - } > - > - return NULL; > -} > - > static int create_dir_for(uint32_t vid) > { > int ret = 0; > @@ -257,6 +463,7 @@ not_found: > if (create) { > cache = xzalloc(sizeof(*cache)); > cache->vid = vid; > + cache->object_tree = RB_ROOT; > create_dir_for(vid); > > cache->dirty_tree = RB_ROOT; > @@ -271,14 +478,6 @@ out: > return cache; > } > > -static inline void > -del_from_dirty_tree_and_list(struct object_cache_entry *entry, > - struct rb_root *dirty_tree) > -{ > - rb_erase(&entry->dirty_node, dirty_tree); > - list_del(&entry->list); > -} > - > /* Caller should hold the oc->lock */ > static inline void > add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx, > @@ -289,6 +488,9 @@ add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx, > if (!entry) > panic("Can not find object entry %" PRIx32 "\n", idx); > > + if (cache_is_reclaiming()) should be renamed as cache_in_reclaim() > + return; > + > /* If cache isn't in reclaiming, move it > * to the head of lru list */ > cds_list_del_rcu(&entry->lru_list); > @@ -321,6 +523,9 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx) > entry->idx = idx; > CDS_INIT_LIST_HEAD(&entry->lru_list); > > + dprintf("cache object for vdi %" PRIx32 ", idx %08" PRIx32 "added\n", > + oc->vid, idx); > + > pthread_rwlock_wrlock(&oc->lock); > old = object_cache_insert(&oc->object_tree, entry); > if (!old) { > @@ -331,20 +536,55 @@ static void add_to_object_cache(struct object_cache *oc, uint32_t idx) > entry = old; > } > pthread_rwlock_unlock(&oc->lock); > + > + dprintf("sys_cache.cache_size %" PRIx64 ", sys->cache_size %" PRIx64 "\n", > + uatomic_read(&sys_cache.cache_size), sys->cache_size); > + if (sys->cache_size && > + uatomic_read(&sys_cache.cache_size) > sys->cache_size && > + !cache_is_reclaiming()) { > + struct work *work = xzalloc(sizeof(struct work)); > + uatomic_set(&sys_cache.reclaiming, 1); > + work->fn = reclaim_work; > + work->done = reclaim_done; > + queue_work(sys->reclaim_wqueue, work); > + } > +} > + > +static inline int cache_sanity_check(struct object_cache *oc, uint32_t idx, > + struct object_cache_entry **entry) > +{ > + struct object_cache_entry *ent; > + > + ent = object_tree_search(&oc->object_tree, idx); > + > + if (!ent || entry_is_reclaiming(ent)) > + return SD_RES_NO_CACHE; > + > + if (entry) > + *entry = ent; > + > + return SD_RES_SUCCESS; > } better renamed as get_cache_entry() which inc refcnt inside if successful and add another helper find_cache_entry() to find the entry, put_cache_entry() to dec the refcnt (other uninit work if any). get_cache_entry() { entry = find_cache_entry(); ... inc refcnt.. ... } Then find_cache_entry can be used by object_cache_lookup() too. > > static int object_cache_lookup(struct object_cache *oc, uint32_t idx, > int create) > { > struct strbuf buf; > - int fd, ret = 0, flags = def_open_flags; > + int fd, ret = SD_RES_SUCCESS, flags = def_open_flags; > + unsigned data_length; > + > + if (!create) { > + pthread_rwlock_wrlock(&oc->lock); > + ret = cache_sanity_check(oc, idx, NULL); > + pthread_rwlock_unlock(&oc->lock); > + return ret; > + } > > strbuf_init(&buf, PATH_MAX); > strbuf_addstr(&buf, cache_dir); > strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx); > > - if (create) > - flags |= O_CREAT | O_TRUNC; > + flags |= O_CREAT | O_TRUNC; > > fd = open(buf.buf, flags, def_fmode); > if (fd < 0) { > @@ -352,26 +592,22 @@ static int object_cache_lookup(struct object_cache *oc, uint32_t idx, > goto out; > } > > - if (create) { > - unsigned data_length; > - > - if (idx_has_vdi_bit(idx)) > - data_length = SD_INODE_SIZE; > - else > - data_length = SD_DATA_OBJ_SIZE; > + if (idx_has_vdi_bit(idx)) > + data_length = SD_INODE_SIZE; > + else > + data_length = SD_DATA_OBJ_SIZE; > > - ret = prealloc(fd, data_length); > - if (ret != SD_RES_SUCCESS) > - ret = -1; > - else { > - uint64_t bmap = UINT64_MAX; > + ret = prealloc(fd, data_length); > + if (ret != SD_RES_SUCCESS) > + ret = -1; > + else { > + uint64_t bmap = UINT64_MAX; > > - add_to_object_cache(oc, idx); > + add_to_object_cache(oc, idx); > > - pthread_rwlock_wrlock(&oc->lock); > - add_to_dirty_tree_and_list(oc, idx, bmap, 1); > - pthread_rwlock_unlock(&oc->lock); > - } > + pthread_rwlock_wrlock(&oc->lock); > + add_to_dirty_tree_and_list(oc, idx, bmap, 1); > + pthread_rwlock_unlock(&oc->lock); > } > close(fd); > out: > @@ -651,20 +887,23 @@ static int object_cache_push(struct object_cache *oc) > pthread_rwlock_rdlock(&oc->lock); > bmap = entry->bmap; > create = entry->flags & ENTRY_CREATE_BIT; > + entry->bmap = 0; > pthread_rwlock_unlock(&oc->lock); > > ret = push_cache_object(oc->vid, entry->idx, bmap, create); > - if (ret != SD_RES_SUCCESS) > - goto push_failed; > > pthread_rwlock_wrlock(&oc->lock); > + if (ret != SD_RES_SUCCESS) { > + entry->bmap |= bmap; > + goto push_failed; > + } > + entry->flags &= ~ENTRY_CREATE_BIT; > del_from_dirty_tree_and_list(entry, &oc->dirty_tree); > pthread_rwlock_unlock(&oc->lock); > } > return ret; > > push_failed: > - pthread_rwlock_wrlock(&oc->lock); > list_splice_init(&inactive_dirty_list, &oc->dirty_list); > pthread_rwlock_unlock(&oc->lock); > > @@ -681,10 +920,10 @@ int object_is_cached(uint64_t oid) > if (!cache) > return 0; > > - if (object_cache_lookup(cache, idx, 0) < 0) > - return 0; > + if (object_cache_lookup(cache, idx, 0) == SD_RES_SUCCESS) > + return 1; > else > - return 1; /* found it */ > + return 0; > } > > void object_cache_delete(uint32_t vid) > @@ -716,6 +955,52 @@ void object_cache_delete(uint32_t vid) > > } > > +static void object_cache_flush_begin(struct object_cache *oc) > +{ > + pthread_rwlock_wrlock(&oc->lock); > + oc->flushing = 1; > + pthread_rwlock_unlock(&oc->lock); > +} > + > +static void object_cache_flush_end(struct object_cache *oc) > +{ > + pthread_rwlock_wrlock(&oc->lock); > + oc->flushing = 0; > + pthread_rwlock_unlock(&oc->lock); > +} Why can't flushing be atomic? Better fold these two helpers into callers. > + > +static int object_cache_access_begin(struct object_cache *cache, uint32_t idx, > + struct object_cache_entry **entry) > +{ > + struct object_cache_entry *ent; > + int ret = SD_RES_SUCCESS; > + > + if (entry) > + *entry = NULL; > + > + pthread_rwlock_rdlock(&cache->lock); > + ret = cache_sanity_check(cache, idx, &ent); > + if (ret != SD_RES_SUCCESS) { > + /* The cache entry may be reclaimed, so try again. */ > + pthread_rwlock_unlock(&cache->lock); > + dprintf("cache sanity error %d\n", ret); > + return ret; > + } > + > + uatomic_inc(&ent->refcnt); > + pthread_rwlock_unlock(&cache->lock); > + > + if (entry) > + *entry = ent; > + > + return ret; > +} > + It is cleaner to call access_begin/end() in read/write_cache_object(). > +static void object_cache_access_end(struct object_cache_entry *entry) > +{ > + uatomic_dec(&entry->refcnt); > +} > + > static int object_cache_flush_and_delete(struct object_cache *oc) > { > DIR *dir; > @@ -726,6 +1011,8 @@ static int object_cache_flush_and_delete(struct object_cache *oc) > struct strbuf p; > int ret = 0; > > + object_cache_flush_begin(oc); > + > strbuf_init(&p, PATH_MAX); > strbuf_addstr(&p, cache_dir); > strbuf_addf(&p, "/%06"PRIx32, vid); > @@ -754,6 +1041,8 @@ static int object_cache_flush_and_delete(struct object_cache *oc) > } > > object_cache_delete(vid); > + > + object_cache_flush_end(oc); > out: > strbuf_release(&p); > return ret; > @@ -777,10 +1066,10 @@ int bypass_object_cache(struct request *req) > /* For read requet, we can read cache if any */ > uint32_t idx = object_cache_oid_to_idx(oid); > > - if (object_cache_lookup(cache, idx, 0) < 0) > - return 1; > - else > + if (object_cache_lookup(cache, idx, 0) == 0) > return 0; > + else > + return 1; > } > } > > @@ -811,37 +1100,42 @@ int object_cache_handle_request(struct request *req) > if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ) > create = 1; > > - if (object_cache_lookup(cache, idx, create) < 0) { > +retry: > + ret = object_cache_lookup(cache, idx, create); > + if (ret == SD_RES_NO_CACHE) { > ret = object_cache_pull(cache, idx); > if (ret != SD_RES_SUCCESS) > return ret; > } > > - pthread_rwlock_rdlock(&cache->lock); > - entry = object_tree_search(&cache->object_tree, idx); > - pthread_rwlock_unlock(&cache->lock); > + ret = object_cache_access_begin(cache, idx, &entry); > + if (ret != SD_RES_SUCCESS) { > + dprintf("retrying... %d\n", ret); > + goto retry; > + } > > if (hdr->flags & SD_FLAG_CMD_WRITE) { > ret = write_cache_object(cache->vid, idx, req->data, > hdr->data_length, hdr->obj.offset); > if (ret != SD_RES_SUCCESS) > - goto out; > + goto err; > update_cache_entry(cache, idx, hdr->data_length, > hdr->obj.offset); > } else { > ret = read_cache_object(cache->vid, idx, req->data, > hdr->data_length, hdr->obj.offset); > if (ret != SD_RES_SUCCESS) > - goto out; > + goto err; > req->rp.data_length = hdr->data_length; > > - if (entry) { > + if (entry && !cache_is_reclaiming()) { > cds_list_del_rcu(&entry->lru_list); > cds_list_add_rcu(&entry->lru_list, > &sys_cache.cache_lru_list); > } > } > -out: > +err: > + object_cache_access_end(entry); > return ret; > } > > @@ -851,13 +1145,23 @@ int object_cache_write(uint64_t oid, char *data, unsigned int datalen, > uint32_t vid = oid_to_vid(oid); > uint32_t idx = object_cache_oid_to_idx(oid); > struct object_cache *cache; > + struct object_cache_entry *entry; > int ret; > > cache = find_object_cache(vid, 0); > > + dprintf("cache object write %" PRIx32 "\n", idx); > + > + ret = object_cache_access_begin(cache, idx, &entry); > + if (ret != SD_RES_SUCCESS) > + panic("cache object %" PRIx32 " doesn't exist\n", idx); > + > ret = write_cache_object(vid, idx, data, datalen, offset); > if (ret == SD_RES_SUCCESS) > update_cache_entry(cache, idx, datalen, offset); > + > + object_cache_access_end(entry); > + > return ret; > } > > @@ -866,20 +1170,40 @@ int object_cache_read(uint64_t oid, char *data, unsigned int datalen, > { > uint32_t vid = oid_to_vid(oid); > uint32_t idx = object_cache_oid_to_idx(oid); > + struct object_cache *cache; > + struct object_cache_entry *entry; > + int ret; > + > + cache = find_object_cache(vid, 0); > + > + dprintf("cache object read %" PRIx32 "\n", idx); > + > + ret = object_cache_access_begin(cache, idx, &entry); > + if (ret != SD_RES_SUCCESS) > + panic("cache object %" PRIx32 " doesn't exist\n", idx); > > - return read_cache_object(vid, idx, data, datalen, offset); > + ret = read_cache_object(vid, idx, data, datalen, offset); > + > + object_cache_access_end(entry); > + > + return ret; > } > > int object_cache_flush_vdi(struct request *req) > { > uint32_t vid = oid_to_vid(req->rq.obj.oid); > struct object_cache *cache; > + int ret; > > cache = find_object_cache(vid, 0); > if (!cache) > return SD_RES_SUCCESS; > > - return object_cache_push(cache); > + object_cache_flush_begin(cache); > + ret = object_cache_push(cache); > + object_cache_flush_end(cache); > + > + return ret; > } > > int object_cache_flush_and_del(struct request *req) > @@ -888,8 +1212,10 @@ int object_cache_flush_and_del(struct request *req) > struct object_cache *cache; > > cache = find_object_cache(vid, 0); > + > if (cache && object_cache_flush_and_delete(cache) < 0) > return SD_RES_EIO; > + > return SD_RES_SUCCESS; > } > > @@ -1003,6 +1329,7 @@ int object_cache_init(const char *p) > > CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list); > uatomic_set(&sys_cache.cache_size, 0); > + uatomic_set(&sys_cache.reclaiming, 0); > > ret = load_existing_cache(); > err: > diff --git a/sheep/sheep.c b/sheep/sheep.c > index 8b78669..29270e9 100644 > --- a/sheep/sheep.c > +++ b/sheep/sheep.c > @@ -359,8 +359,9 @@ int main(int argc, char **argv) > sys->deletion_wqueue = init_work_queue("deletion", true); > sys->block_wqueue = init_work_queue("block", true); > sys->sockfd_wqueue = init_work_queue("sockfd", true); > + sys->reclaim_wqueue = init_work_queue("reclaim", true); > if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue || > - !sys->deletion_wqueue || !sys->block_wqueue) > + !sys->deletion_wqueue || !sys->block_wqueue || !sys->reclaim_wqueue) > exit(1); > > ret = trace_init(); > diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h > index fe61411..2090d67 100644 > --- a/sheep/sheep_priv.h > +++ b/sheep/sheep_priv.h > @@ -125,6 +125,7 @@ struct cluster_info { > struct work_queue *recovery_wqueue; > struct work_queue *block_wqueue; > struct work_queue *sockfd_wqueue; > + struct work_queue *reclaim_wqueue; > }; > > struct siocb { > diff --git a/sheep/store.c b/sheep/store.c > index a05822d..4a65c9e 100644 > --- a/sheep/store.c > +++ b/sheep/store.c > @@ -492,9 +492,13 @@ int write_object(uint64_t oid, char *data, unsigned int datalen, > struct sd_req hdr; > int ret; > > +retry: > if (sys->enable_write_cache && object_is_cached(oid)) { > ret = object_cache_write(oid, data, datalen, offset, > flags, create); > + if (ret == SD_RES_NO_CACHE) > + goto retry; > + > if (ret != 0) { > eprintf("write cache failed %"PRIx64" %"PRIx32"\n", > oid, ret); > @@ -529,8 +533,12 @@ int read_object(uint64_t oid, char *data, unsigned int datalen, > struct sd_req hdr; > int ret; > > +retry: > if (sys->enable_write_cache && object_is_cached(oid)) { > ret = object_cache_read(oid, data, datalen, offset); > + if (ret == SD_RES_NO_CACHE) > + goto retry; > + > if (ret != SD_RES_SUCCESS) { > eprintf("try forward read %"PRIx64" %"PRIx32"\n", > oid, ret); > |