From: Liu Yuan <tailai.ly at taobao.com> We need to split cpg_queue because: 1) IO requests shouldn't block node change events because succeeding event will replace the previous one to mitigate the overhead of unnecessary IOs from recovery. 2) IO requests may not block some notify events such as SHUTDOWN because some nodes are shutdown-ed maybe mistaken as nodes to be left by the blocked nodes. This patch splits the queue with the following characteristics - local requests and IO requests are placed on request queue - confchg and notify event are placed on event queue Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/group.c | 86 +++++++++++++++++++++------------------------------- sheep/sdnet.c | 10 +++--- sheep/sheep_priv.h | 3 +- 3 files changed, 42 insertions(+), 57 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index e850c9b..088aa5f 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -670,7 +670,7 @@ static void sd_notify_handler(struct sd_node *sender, list_del(&w->req->pending_list); } - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_queue); start_cpg_event_work(); @@ -972,7 +972,7 @@ static void cpg_event_done(struct work *work) coroutine_enter(cdrv_co, NULL); - if (!list_empty(&sys->cpg_event_siblings)) + if (!list_empty(&sys->cpg_event_queue)) start_cpg_event_work(); } @@ -1058,37 +1058,16 @@ static int need_consistency_check(uint8_t opcode, uint16_t flags) return 1; } -/* can be called only by the main process */ -void start_cpg_event_work(void) +static void process_request_queue(void) { struct cpg_event *cevent, *n; LIST_HEAD(failed_req_list); - int retry; - - if (list_empty(&sys->cpg_event_siblings)) - vprintf(SDOG_ERR, "bug\n"); - - cevent = list_first_entry(&sys->cpg_event_siblings, - struct cpg_event, cpg_event_list); - /* - * we need to serialize cpg events so we don't call queue_work - * if a thread is still running for a cpg event; executing - * cpg_event_fn() or cpg_event_done(). - */ - if (cpg_event_running && is_membership_change_event(cevent->ctype)) - return; -do_retry: - retry = 0; - list_for_each_entry_safe(cevent, n, &sys->cpg_event_siblings, cpg_event_list) { +retry: + list_for_each_entry_safe(cevent, n, &sys->cpg_request_queue, cpg_event_list) { struct request *req = container_of(cevent, struct request, cev); struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - if (cevent->ctype == CPG_EVENT_NOTIFY) - continue; - if (is_membership_change_event(cevent->ctype)) - break; - list_del(&cevent->cpg_event_list); if (is_io_op(req->op)) { @@ -1102,7 +1081,8 @@ do_retry: /* If we have cache of it we are at its service. */ list_add_tail(&req->r_wlist, &sys->outstanding_req_list); sys->nr_outstanding_io++; - goto gateway_work; + queue_work(sys->gateway_wqueue, &req->work); + continue; } if (__is_access_to_recoverying_objects(req)) { @@ -1158,40 +1138,43 @@ do_retry: else if (req->rq.flags & SD_FLAG_CMD_IO_LOCAL) queue_work(sys->io_wqueue, &req->work); else -gateway_work: queue_work(sys->gateway_wqueue, &req->work); } - while (!list_empty(&failed_req_list)) { struct request *req = list_first_entry(&failed_req_list, struct request, r_wlist); req->work.done(&req->work); - - retry = 1; + goto retry; } +} - if (retry) - goto do_retry; - - if (cpg_event_running || list_empty(&sys->cpg_event_siblings)) - return; - - cevent = list_first_entry(&sys->cpg_event_siblings, - struct cpg_event, cpg_event_list); +/* can be called only by the main process */ +void start_cpg_event_work(void) +{ - if (is_membership_change_event(cevent->ctype) && sys->nr_outstanding_io) - return; + if (!list_empty(&sys->cpg_event_queue)) { + struct cpg_event *cevent; + /* + * we need to serialize cpg events so we don't call queue_work + * if a thread is still running for a cpg event; executing + * cpg_event_fn() or cpg_event_done(). + */ + if (cpg_event_running || sys->nr_outstanding_io) + return; - list_del(&cevent->cpg_event_list); - sys->cur_cevent = cevent; + cevent = list_first_entry(&sys->cpg_event_queue, + struct cpg_event, cpg_event_list); + list_del(&cevent->cpg_event_list); + sys->cur_cevent = cevent; - cpg_event_running = 1; + cpg_event_running = 1; - INIT_LIST_HEAD(&cpg_event_work.w_list); - cpg_event_work.fn = cpg_event_fn; - cpg_event_work.done = cpg_event_done; + cpg_event_work.fn = cpg_event_fn; + cpg_event_work.done = cpg_event_done; - queue_work(sys->cpg_wqueue, &cpg_event_work); + queue_work(sys->cpg_wqueue, &cpg_event_work); + } else + process_request_queue(); } static void sd_join_handler(struct sd_node *joined, @@ -1253,7 +1236,7 @@ static void sd_join_handler(struct sd_node *joined, panic("failed to allocate memory\n"); memcpy(w->jm, opaque, size); - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_queue); start_cpg_event_work(); unregister_event(cdrv_fd); @@ -1374,7 +1357,7 @@ static void sd_leave_handler(struct sd_node *left, w->left = *left; - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_queue); start_cpg_event_work(); unregister_event(cdrv_fd); @@ -1437,7 +1420,8 @@ int create_cluster(int port, int64_t zone, int nr_vnodes) INIT_LIST_HEAD(&sys->consistent_obj_list); INIT_LIST_HEAD(&sys->blocking_conn_list); - INIT_LIST_HEAD(&sys->cpg_event_siblings); + INIT_LIST_HEAD(&sys->cpg_request_queue); + INIT_LIST_HEAD(&sys->cpg_event_queue); ret = register_event(cdrv_fd, group_handler, NULL); if (ret) { diff --git a/sheep/sdnet.c b/sheep/sdnet.c index 5db9f29..fb75c89 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -28,10 +28,10 @@ void resume_pending_requests(void) struct cpg_event *cevent = &next->cev; list_del(&next->r_wlist); - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + list_add_tail(&cevent->cpg_event_list, &sys->cpg_request_queue); } - if (!list_empty(&sys->cpg_event_siblings)) + if (!list_empty(&sys->cpg_request_queue)) start_cpg_event_work(); } @@ -105,7 +105,7 @@ static void io_op_done(struct work *work) setup_ordered_sd_vnode_list(req); setup_access_to_local_objects(req); - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + list_add_tail(&cevent->cpg_event_list, &sys->cpg_request_queue); again = 1; } else if (req->rp.result == SD_RES_SUCCESS && req->check_consistency) { struct sd_obj_req *obj_hdr = (struct sd_obj_req *)&req->rq; @@ -155,7 +155,7 @@ static void io_op_done(struct work *work) setup_ordered_sd_vnode_list(req); setup_access_to_local_objects(req); - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + list_add_tail(&cevent->cpg_event_list, &sys->cpg_request_queue); again = 1; } } @@ -270,7 +270,7 @@ static void queue_request(struct request *req) setup_access_to_local_objects(req); cevent->ctype = CPG_EVENT_REQUEST; - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + list_add_tail(&cevent->cpg_event_list, &sys->cpg_request_queue); start_cpg_event_work(); return; done: diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index a9e8440..a5d8cad 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -139,7 +139,8 @@ struct cluster_info { uint32_t nr_sobjs; int nr_zones; - struct list_head cpg_event_siblings; + struct list_head cpg_request_queue; + struct list_head cpg_event_queue; struct cpg_event *cur_cevent; int nr_outstanding_io; int nr_outstanding_reqs; -- 1.7.8.2 |