From: levin li <xingke.lwp at taobao.com> When an object requested is in recovery, we should put it into the wait_obj_queue to make the request wait until the object is recovered by the recovery work. Signed-off-by: levin li <xingke.lwp at taobao.com> --- sheep/group.c | 1 + sheep/recovery.c | 6 +++++- sheep/sdnet.c | 25 ++++++++++++++++++++----- sheep/sheep_priv.h | 2 ++ 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index 8673b7a..50a53c1 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -1382,6 +1382,7 @@ int create_cluster(int port, int64_t zone, int nr_vnodes) INIT_LIST_HEAD(&sys->request_queue); INIT_LIST_HEAD(&sys->event_queue); INIT_LIST_HEAD(&sys->wait_rw_queue); + INIT_LIST_HEAD(&sys->wait_obj_queue); ret = send_join_request(&sys->this_node); if (ret != 0) diff --git a/sheep/recovery.c b/sheep/recovery.c index 72c74c7..0708eeb 100644 --- a/sheep/recovery.c +++ b/sheep/recovery.c @@ -410,10 +410,11 @@ static void resume_wait_recovery_requests(void) static void do_recover_main(struct work *work) { struct recovery_work *rw = container_of(work, struct recovery_work, work); - uint64_t oid; + uint64_t oid, recovered_oid = rw->oids[rw->done]; if (rw->state == RW_INIT) { rw->state = RW_RUN; + recovered_oid = 0; resume_wait_recovery_requests(); } else if (!rw->retry) { rw->done++; @@ -423,6 +424,9 @@ static void do_recover_main(struct work *work) oid = rw->oids[rw->done]; + if (recovered_oid) + resume_retry_requests(recovered_oid); + if (rw->retry && !next_rw) { rw->retry = 0; diff --git a/sheep/sdnet.c b/sheep/sdnet.c index 7f7e761..fa205b4 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -217,11 +217,9 @@ static int check_request(struct request *req) req->rp.result = SD_RES_OBJ_RECOVERING; list_add_tail(&req->request_list, &sys->wait_rw_queue); - } else { - req->rp.result = SD_RES_NEW_NODE_VER; - sys->nr_outstanding_io++; - req->work.done(&req->work); - } + } else + list_add_tail(&req->request_list, + &sys->wait_obj_queue); } else { /* Gateway request */ list_add_tail(&req->request_list, &sys->req_wait_for_obj_list); @@ -282,6 +280,23 @@ void resume_wait_epoch_requests(void) process_request_event_queues(); } +void resume_retry_requests(uint64_t oid) +{ + struct request *req, *t; + + list_for_each_entry_safe(req, t, &sys->wait_obj_queue, + request_list) { + /* the object requested by a pending request has been + * recovered, notify the pending request. */ + if (req->local_oid == oid) { + dprintf("retry %" PRIx64 "\n", req->local_oid); + list_del(&req->request_list); + list_add_tail(&req->request_list, &sys->request_queue); + } + } + process_request_event_queues(); +} + static void queue_request(struct request *req) { struct sd_req *hdr = &req->rq; diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index b9ae438..67fbf1a 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -138,6 +138,7 @@ struct cluster_info { struct list_head request_queue; struct list_head event_queue; struct list_head wait_rw_queue; + struct list_head wait_obj_queue; struct event_struct *cur_cevent; int nr_outstanding_io; int nr_outstanding_reqs; @@ -269,6 +270,7 @@ int is_access_to_busy_objects(uint64_t oid); void resume_pending_requests(void); void resume_wait_epoch_requests(void); +void resume_retry_requests(uint64_t oid); int create_cluster(int port, int64_t zone, int nr_vnodes); int leave_cluster(void); -- 1.7.10 |