This patch uses vdi_op() as a blocking callback of notification. We no longer need to consider that other notifications (vdi operations or membership changes) are delivered during vdi operations. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/group.c | 89 +++++++++++--------------------------------------------- 1 files changed, 18 insertions(+), 71 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index a25f8bf..7d79051 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -121,7 +121,6 @@ struct work_leave { enum cpg_event_work_bits { CPG_EVENT_WORK_RUNNING = 1, - CPG_EVENT_WORK_SUSPENDED, CPG_EVENT_WORK_JOINING, }; @@ -141,7 +140,6 @@ static void cpg_event_set_##name(void) \ } CPG_EVENT_WORK_FNS(RUNNING, running) -CPG_EVENT_WORK_FNS(SUSPENDED, suspended) CPG_EVENT_WORK_FNS(JOINING, joining) static inline int join_message(struct message_header *m) @@ -276,6 +274,8 @@ static int get_epoch(struct sd_obj_req *req, return ret; } +static void vdi_op(void *arg); + void cluster_queue_request(struct work *work, int idx) { struct request *req = container_of(work, struct request, work); @@ -356,7 +356,7 @@ forward: } msg->header.op = SD_MSG_VDI_OP; - msg->header.state = DM_INIT; + msg->header.state = DM_FIN; msg->header.msg_length = sizeof(*msg) + hdr->data_length; msg->header.from = sys->this_node; msg->req = *((struct sd_vdi_req *)&req->rq); @@ -366,7 +366,7 @@ forward: list_add(&req->pending_list, &sys->pending_list); - sys->cdrv->notify(msg, msg->header.msg_length, NULL); + sys->cdrv->notify(msg, msg->header.msg_length, vdi_op); free(msg); } @@ -846,8 +846,9 @@ join_finished: return; } -static void vdi_op(struct vdi_op_message *msg) +static void vdi_op(void *arg) { + struct vdi_op_message *msg = arg; const struct sd_vdi_req *hdr = &msg->req; struct sd_vdi_rsp *rsp = &msg->rsp; void *data = msg->data; @@ -1019,19 +1020,6 @@ static void __sd_notify(struct cpg_event *cevent) node->ent = m->from; } - if (m->state == DM_INIT && is_master()) { - switch (m->op) { - case SD_MSG_JOIN: - break; - case SD_MSG_VDI_OP: - vdi_op((struct vdi_op_message *)m); - break; - default: - eprintf("unknown message %d\n", m->op); - break; - } - } - if (m->state == DM_FIN) { switch (m->op) { case SD_MSG_JOIN: @@ -1171,10 +1159,6 @@ static void __sd_notify_done(struct cpg_event *cevent) case SD_MSG_JOIN: send_join_response(w); break; - case SD_MSG_VDI_OP: - m->state = DM_FIN; - sys->cdrv->notify(m, m->msg_length, NULL); - break; default: eprintf("unknown message %d\n", m->op); break; @@ -1215,11 +1199,9 @@ static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len) return; memcpy(w->msg, msg, msg_len); - if (cpg_event_suspended() && m->state == DM_FIN) { + if (cpg_event_joining() && m->state == DM_FIN) { list_add(&cevent->cpg_event_list, &sys->cpg_event_siblings); - cpg_event_clear_suspended(); - if (join_message(m)) - cpg_event_clear_joining(); + cpg_event_clear_joining(); } else list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); @@ -1500,7 +1482,7 @@ static void cpg_event_done(struct work *work, int idx) vprintf(SDOG_DEBUG "%p\n", cevent); - if (cpg_event_suspended()) + if (cpg_event_joining()) goto out; if (cevent->skip) @@ -1517,39 +1499,13 @@ static void cpg_event_done(struct work *work, int idx) { struct work_notify *w = container_of(cevent, struct work_notify, cev); - if (w->msg->state == DM_FIN && vdi_op_message(w->msg)) - vdi_op_done((struct vdi_op_message *)w->msg); + if (sys->join_finished && w->msg->state == DM_INIT) + cpg_event_set_joining(); - /* - * if we are in the process of the JOIN, we will not - * be suspended. So sd_deliver() links events to - * cpg_event_siblings in order. The events except for - * JOIN with DM_CONT and DM_FIN are skipped. - */ - if (sys->join_finished && w->msg->state == DM_INIT) { - struct cpg_event *f_cevent; - - list_for_each_entry(f_cevent, &sys->cpg_event_siblings, - cpg_event_list) { - struct work_notify *fw = - container_of(f_cevent, struct work_notify, cev); - if (f_cevent->ctype == CPG_EVENT_NOTIFY && - fw->msg->state == DM_FIN) { - vprintf("already got fin %p\n", - f_cevent); - - list_del(&f_cevent->cpg_event_list); - list_add(&f_cevent->cpg_event_list, - &sys->cpg_event_siblings); - goto got_fin; - } - } - cpg_event_set_suspended(); - if (join_message(w->msg)) - cpg_event_set_joining(); - } - got_fin: - __sd_notify_done(cevent); + if (vdi_op_message(w->msg)) + vdi_op_done((struct vdi_op_message *)w->msg); + else + __sd_notify_done(cevent); break; } case CPG_EVENT_REQUEST: @@ -1563,13 +1519,8 @@ out: cpg_event_free(cevent); cpg_event_clear_running(); - if (!list_empty(&sys->cpg_event_siblings)) { - if (cpg_event_joining()) - /* io requests need to return SD_RES_NEW_NODE_VER */ - start_cpg_event_work(); - else if (!cpg_event_suspended()) - start_cpg_event_work(); - } + if (!list_empty(&sys->cpg_event_siblings)) + start_cpg_event_work(); } static int check_epoch(struct request *req) @@ -1668,9 +1619,6 @@ void start_cpg_event_work(void) * execute requests (or cpg events). */ if (cpg_event_joining()) { - if (!cpg_event_suspended()) - panic("should not happen\n"); - if (cevent->ctype == CPG_EVENT_REQUEST) { struct request *req = container_of(cevent, struct request, cev); if (is_io_request(req->rq.opcode) && req->rq.flags & SD_FLAG_CMD_DIRECT) { @@ -1773,8 +1721,7 @@ do_retry: if (retry) goto do_retry; - if (cpg_event_running() || cpg_event_suspended() || - list_empty(&sys->cpg_event_siblings)) + if (cpg_event_running() || list_empty(&sys->cpg_event_siblings)) return; cevent = list_first_entry(&sys->cpg_event_siblings, -- 1.7.2.5 |