From: Liu Yuan <tailai.ly at taobao.com> Now sheep support more than just corosync cluster driver, so we'd better remove cpg* naming from sheep core code. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/group.c | 140 +++++++++++++++++++++++++++------------------------- sheep/sdnet.c | 26 +++++----- sheep/sheep.c | 4 +- sheep/sheep_priv.h | 30 ++++++------ 4 files changed, 102 insertions(+), 98 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index 9f42d24..79600a5 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -58,7 +58,7 @@ struct vdi_op_message { }; struct work_notify { - struct cpg_event cev; + struct event_struct cev; struct sd_node sender; @@ -67,7 +67,7 @@ struct work_notify { }; struct work_join { - struct cpg_event cev; + struct event_struct cev; struct sd_node *member_list; size_t member_list_entries; @@ -77,7 +77,7 @@ struct work_join { }; struct work_leave { - struct cpg_event cev; + struct event_struct cev; struct sd_node *member_list; size_t member_list_entries; @@ -97,7 +97,7 @@ struct work_leave { } \ }) -static int cpg_event_running; +static int event_running; static size_t get_join_message_size(struct join_message *jm) { @@ -611,11 +611,11 @@ join_finished: return; } -static void __sd_notify(struct cpg_event *cevent) +static void __sd_notify(struct event_struct *cevent) { } -static void __sd_notify_done(struct cpg_event *cevent) +static void __sd_notify_done(struct event_struct *cevent) { struct work_notify *w = container_of(cevent, struct work_notify, cev); struct vdi_op_message *msg = w->msg; @@ -640,7 +640,7 @@ static void __sd_notify_done(struct cpg_event *cevent) static void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len) { - struct cpg_event *cevent; + struct event_struct *cevent; struct work_notify *w; dprintf("size: %zd, from: %s\n", msg_len, node_to_str(sender)); @@ -650,7 +650,7 @@ static void sd_notify_handler(struct sd_node *sender, return; cevent = &w->cev; - cevent->ctype = CPG_EVENT_NOTIFY; + cevent->ctype = EVENT_NOTIFY; vprintf(SDOG_DEBUG, "allow new deliver %p\n", cevent); @@ -669,9 +669,9 @@ static void sd_notify_handler(struct sd_node *sender, list_del(&w->req->pending_list); } - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_queue); + list_add_tail(&cevent->event_list, &sys->event_queue); - start_cpg_event_work(); + process_request_event_queues(); unregister_event(cdrv_fd); } @@ -708,7 +708,7 @@ static int check_majority(struct sd_node *nodes, int nr_nodes) return 0; } -static void __sd_join(struct cpg_event *cevent) +static void __sd_join(struct event_struct *cevent) { struct work_join *w = container_of(cevent, struct work_join, cev); struct join_message *msg = w->jm; @@ -726,7 +726,7 @@ static void __sd_join(struct cpg_event *cevent) get_vdi_bitmap_from(w->member_list + i); } -static void __sd_leave(struct cpg_event *cevent) +static void __sd_leave(struct event_struct *cevent) { struct work_leave *w = container_of(cevent, struct work_leave, cev); @@ -823,7 +823,7 @@ static int send_join_request(struct sd_node *ent) return ret; } -static void __sd_join_done(struct cpg_event *cevent) +static void __sd_join_done(struct event_struct *cevent) { struct work_join *w = container_of(cevent, struct work_join, cev); struct join_message *jm = w->jm; @@ -850,7 +850,7 @@ static void __sd_join_done(struct cpg_event *cevent) vprintf(SDOG_DEBUG, "join Sheepdog cluster\n"); } -static void __sd_leave_done(struct cpg_event *cevent) +static void __sd_leave_done(struct event_struct *cevent) { struct work_leave *w = container_of(cevent, struct work_leave, cev); @@ -878,22 +878,22 @@ static void __sd_leave_done(struct cpg_event *cevent) } } -static void cpg_event_free(struct cpg_event *cevent) +static void event_free(struct event_struct *cevent) { switch (cevent->ctype) { - case CPG_EVENT_JOIN: { + case EVENT_JOIN: { struct work_join *w = container_of(cevent, struct work_join, cev); free(w->member_list); free(w); break; } - case CPG_EVENT_LEAVE: { + case EVENT_LEAVE: { struct work_leave *w = container_of(cevent, struct work_leave, cev); free(w->member_list); free(w); break; } - case CPG_EVENT_NOTIFY: { + case EVENT_NOTIFY: { struct work_notify *w = container_of(cevent, struct work_notify, cev); free(w->msg); free(w); @@ -904,28 +904,28 @@ static void cpg_event_free(struct cpg_event *cevent) } } -static struct work cpg_event_work; +static struct work event_work; -static void cpg_event_fn(struct work *work) +static void event_fn(struct work *work) { - struct cpg_event *cevent = sys->cur_cevent; + struct event_struct *cevent = sys->cur_cevent; /* - * we can't touch sys->cpg_event_siblings because of a race + * we can't touch sys->event_queue because of a race * with sd_deliver() and sd_confchg()... */ switch (cevent->ctype) { - case CPG_EVENT_JOIN: + case EVENT_JOIN: __sd_join(cevent); break; - case CPG_EVENT_LEAVE: + case EVENT_LEAVE: __sd_leave(cevent); break; - case CPG_EVENT_NOTIFY: + case EVENT_NOTIFY: __sd_notify(cevent); break; - case CPG_EVENT_REQUEST: + case EVENT_REQUEST: vprintf(SDOG_ERR, "should not happen\n"); break; default: @@ -933,9 +933,9 @@ static void cpg_event_fn(struct work *work) } } -static void cpg_event_done(struct work *work) +static void event_done(struct work *work) { - struct cpg_event *cevent; + struct event_struct *cevent; if (!sys->cur_cevent) vprintf(SDOG_ERR, "bug\n"); @@ -946,16 +946,16 @@ static void cpg_event_done(struct work *work) vprintf(SDOG_DEBUG, "%p\n", cevent); switch (cevent->ctype) { - case CPG_EVENT_JOIN: + case EVENT_JOIN: __sd_join_done(cevent); break; - case CPG_EVENT_LEAVE: + case EVENT_LEAVE: __sd_leave_done(cevent); break; - case CPG_EVENT_NOTIFY: + case EVENT_NOTIFY: __sd_notify_done(cevent); break; - case CPG_EVENT_REQUEST: + case EVENT_REQUEST: vprintf(SDOG_ERR, "should not happen\n"); break; default: @@ -963,12 +963,12 @@ static void cpg_event_done(struct work *work) } vprintf(SDOG_DEBUG, "free %p\n", cevent); - cpg_event_free(cevent); - cpg_event_running = 0; + event_free(cevent); + event_running = 0; register_event(cdrv_fd, group_handler, NULL); - if (!list_empty(&sys->cpg_event_queue)) - start_cpg_event_work(); + if (!list_empty(&sys->event_queue)) + process_request_event_queues(); } int is_access_to_busy_objects(uint64_t oid) @@ -1027,13 +1027,13 @@ static inline void set_consistency_check(struct request *req, uint64_t oid) static void process_request_queue(void) { - struct cpg_event *cevent, *n; + struct event_struct *cevent, *n; - list_for_each_entry_safe(cevent, n, &sys->cpg_request_queue, cpg_event_list) { + list_for_each_entry_safe(cevent, n, &sys->request_queue, event_list) { struct request *req = container_of(cevent, struct request, cev); struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - list_del(&cevent->cpg_event_list); + list_del(&cevent->event_list); if (is_io_op(req->op)) { int copies = sys->nr_sobjs; @@ -1065,31 +1065,35 @@ static void process_request_queue(void) } } -/* can be called only by the main process */ -void start_cpg_event_work(void) +static inline void process_event_queue(void) { + struct event_struct *cevent; + /* + * we need to serialize events so we don't call queue_work + * if one event is running by executing event_fn() or event_done(). + */ + if (event_running || sys->nr_outstanding_io) + return; - if (!list_empty(&sys->cpg_event_queue)) { - struct cpg_event *cevent; - /* - * we need to serialize cpg events so we don't call queue_work - * if a thread is still running for a cpg event; executing - * cpg_event_fn() or cpg_event_done(). - */ - if (cpg_event_running || sys->nr_outstanding_io) - return; + cevent = list_first_entry(&sys->event_queue, + struct event_struct, event_list); + list_del(&cevent->event_list); + sys->cur_cevent = cevent; + + event_running = 1; - cevent = list_first_entry(&sys->cpg_event_queue, - struct cpg_event, cpg_event_list); - list_del(&cevent->cpg_event_list); - sys->cur_cevent = cevent; + event_work.fn = event_fn; + event_work.done = event_done; - cpg_event_running = 1; + queue_work(sys->event_wqueue, &event_work); +} - cpg_event_work.fn = cpg_event_fn; - cpg_event_work.done = cpg_event_done; +/* can be called only by the main process */ +void process_request_event_queues(void) +{ - queue_work(sys->cpg_wqueue, &cpg_event_work); + if (!list_empty(&sys->event_queue)) { + process_event_queue(); } else process_request_queue(); } @@ -1099,7 +1103,7 @@ static void sd_join_handler(struct sd_node *joined, size_t nr_members, enum cluster_join_result result, void *opaque) { - struct cpg_event *cevent; + struct event_struct *cevent; struct work_join *w = NULL; int i, size; int nr, nr_local, nr_leave; @@ -1133,7 +1137,7 @@ static void sd_join_handler(struct sd_node *joined, panic("failed to allocate memory"); cevent = &w->cev; - cevent->ctype = CPG_EVENT_JOIN; + cevent->ctype = EVENT_JOIN; vprintf(SDOG_DEBUG, "allow new confchg %p\n", cevent); @@ -1153,8 +1157,8 @@ static void sd_join_handler(struct sd_node *joined, panic("failed to allocate memory\n"); memcpy(w->jm, opaque, size); - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_queue); - start_cpg_event_work(); + list_add_tail(&cevent->event_list, &sys->event_queue); + process_request_event_queues(); unregister_event(cdrv_fd); break; case CJ_RES_FAIL: @@ -1240,7 +1244,7 @@ static void sd_leave_handler(struct sd_node *left, struct sd_node *members, size_t nr_members) { - struct cpg_event *cevent; + struct event_struct *cevent; struct work_leave *w = NULL; int i, size; @@ -1256,7 +1260,7 @@ static void sd_leave_handler(struct sd_node *left, goto oom; cevent = &w->cev; - cevent->ctype = CPG_EVENT_LEAVE; + cevent->ctype = EVENT_LEAVE; vprintf(SDOG_DEBUG, "allow new confchg %p\n", cevent); @@ -1270,8 +1274,8 @@ static void sd_leave_handler(struct sd_node *left, w->left = *left; - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_queue); - start_cpg_event_work(); + list_add_tail(&cevent->event_list, &sys->event_queue); + process_request_event_queues(); unregister_event(cdrv_fd); return; @@ -1330,8 +1334,8 @@ int create_cluster(int port, int64_t zone, int nr_vnodes) INIT_LIST_HEAD(&sys->consistent_obj_list); INIT_LIST_HEAD(&sys->blocking_conn_list); - INIT_LIST_HEAD(&sys->cpg_request_queue); - INIT_LIST_HEAD(&sys->cpg_event_queue); + INIT_LIST_HEAD(&sys->request_queue); + INIT_LIST_HEAD(&sys->event_queue); ret = register_event(cdrv_fd, group_handler, NULL); if (ret) { diff --git a/sheep/sdnet.c b/sheep/sdnet.c index d258bbd..74d42f9 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -25,14 +25,14 @@ void resume_pending_requests(void) list_for_each_entry_safe(next, tmp, &sys->req_wait_for_obj_list, r_wlist) { - struct cpg_event *cevent = &next->cev; + struct event_struct *cevent = &next->cev; list_del(&next->r_wlist); - list_add_tail(&cevent->cpg_event_list, &sys->cpg_request_queue); + list_add_tail(&cevent->event_list, &sys->request_queue); } - if (!list_empty(&sys->cpg_request_queue)) - start_cpg_event_work(); + if (!list_empty(&sys->request_queue)) + process_request_event_queues(); } int is_access_local(struct sd_vnode *e, int nr_nodes, @@ -83,7 +83,7 @@ static void setup_access_to_local_objects(struct request *req) static void io_op_done(struct work *work) { struct request *req = container_of(work, struct request, work); - struct cpg_event *cevent = &req->cev; + struct event_struct *cevent = &req->cev; int again = 0; int copies = sys->nr_sobjs; @@ -96,7 +96,7 @@ static void io_op_done(struct work *work) /* * TODO: if the request failed due to epoch unmatch, * we should retry here (adds this request to the tail - * of sys->cpg_event_siblings. + * of sys->request_queue. */ if (!(req->rq.flags & SD_FLAG_CMD_IO_LOCAL) && (req->rp.result == SD_RES_OLD_NODE_VER || @@ -109,7 +109,7 @@ static void io_op_done(struct work *work) setup_ordered_sd_vnode_list(req); setup_access_to_local_objects(req); - list_add_tail(&cevent->cpg_event_list, &sys->cpg_request_queue); + list_add_tail(&cevent->event_list, &sys->request_queue); again = 1; } else if (req->rp.result == SD_RES_SUCCESS && req->check_consistency) { struct sd_obj_req *obj_hdr = (struct sd_obj_req *)&req->rq; @@ -159,7 +159,7 @@ static void io_op_done(struct work *work) setup_ordered_sd_vnode_list(req); setup_access_to_local_objects(req); - list_add_tail(&cevent->cpg_event_list, &sys->cpg_request_queue); + list_add_tail(&cevent->event_list, &sys->request_queue); again = 1; } } @@ -260,7 +260,7 @@ static int check_request(struct request *req) static void queue_request(struct request *req) { - struct cpg_event *cevent = &req->cev; + struct event_struct *cevent = &req->cev; struct sd_req *hdr = (struct sd_req *)&req->rq; struct sd_rsp *rsp = (struct sd_rsp *)&req->rp; @@ -319,7 +319,7 @@ static void queue_request(struct request *req) /* * we set epoch for non direct requests here. Note that we * can't access to sys->epoch after calling - * start_cpg_event_work(that is, passing requests to work + * process_request_event_queues(that is, passing requests to work * threads). */ if (!(hdr->flags & SD_FLAG_CMD_IO_LOCAL)) @@ -332,9 +332,9 @@ static void queue_request(struct request *req) setup_ordered_sd_vnode_list(req); - cevent->ctype = CPG_EVENT_REQUEST; - list_add_tail(&cevent->cpg_event_list, &sys->cpg_request_queue); - start_cpg_event_work(); + cevent->ctype = EVENT_REQUEST; + list_add_tail(&cevent->event_list, &sys->request_queue); + process_request_event_queues(); return; done: req->done(req); diff --git a/sheep/sheep.c b/sheep/sheep.c index 92f9760..6d500ae 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -216,13 +216,13 @@ int main(int argc, char **argv) exit(1); } - sys->cpg_wqueue = init_work_queue(1); + sys->event_wqueue = init_work_queue(1); sys->gateway_wqueue = init_work_queue(NR_GW_WORKER_THREAD); sys->io_wqueue = init_work_queue(NR_IO_WORKER_THREAD); sys->recovery_wqueue = init_work_queue(1); sys->deletion_wqueue = init_work_queue(1); sys->flush_wqueue = init_work_queue(1); - if (!sys->cpg_wqueue || !sys->gateway_wqueue || !sys->io_wqueue || + if (!sys->event_wqueue || !sys->gateway_wqueue || !sys->io_wqueue || !sys->recovery_wqueue || !sys->deletion_wqueue || !sys->flush_wqueue) exit(1); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index d59606d..c8d9bb4 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -34,19 +34,19 @@ #define SD_RES_NETWORK_ERROR 0x81 /* Network error between sheep */ -enum cpg_event_type { - CPG_EVENT_JOIN, - CPG_EVENT_LEAVE, - CPG_EVENT_NOTIFY, - CPG_EVENT_REQUEST, +enum event_type { + EVENT_JOIN, + EVENT_LEAVE, + EVENT_NOTIFY, + EVENT_REQUEST, }; #define is_membership_change_event(x) \ - ((x) == CPG_EVENT_JOIN || (x) == CPG_EVENT_LEAVE) + ((x) == EVENT_JOIN || (x) == EVENT_LEAVE) -struct cpg_event { - enum cpg_event_type ctype; - struct list_head cpg_event_list; +struct event_struct { + enum event_type ctype; + struct list_head event_list; }; struct client_info { @@ -67,7 +67,7 @@ struct request; typedef void (*req_end_t) (struct request *); struct request { - struct cpg_event cev; + struct event_struct cev; struct sd_req rq; struct sd_rsp rp; @@ -140,9 +140,9 @@ struct cluster_info { uint32_t nr_sobjs; int nr_zones; - struct list_head cpg_request_queue; - struct list_head cpg_event_queue; - struct cpg_event *cur_cevent; + struct list_head request_queue; + struct list_head event_queue; + struct event_struct *cur_cevent; int nr_outstanding_io; int nr_outstanding_reqs; unsigned int outstanding_data_size; @@ -152,7 +152,7 @@ struct cluster_info { int use_directio; uint8_t sync_flush; - struct work_queue *cpg_wqueue; + struct work_queue *event_wqueue; struct work_queue *gateway_wqueue; struct work_queue *io_wqueue; struct work_queue *deletion_wqueue; @@ -245,7 +245,7 @@ void resume_pending_requests(void); int create_cluster(int port, int64_t zone, int nr_vnodes); int leave_cluster(void); -void start_cpg_event_work(void); +void process_request_event_queues(void); void do_io_request(struct work *work); int write_object_local(uint64_t oid, char *data, unsigned int datalen, uint64_t offset, uint16_t flags, int copies, -- 1.7.8.2 |