Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/group.c | 60 ++++++++++++++++++++------------------------------------ 1 files changed, 22 insertions(+), 38 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index 7e4531d..4db644d 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -29,15 +29,7 @@ struct node { struct list_head list; }; -struct message_header { - uint8_t proto_ver; - uint8_t pad[3]; - uint32_t msg_length; - struct sheepdog_node_list_entry from; -}; - struct join_message { - struct message_header header; uint16_t nr_nodes; uint16_t nr_leave_nodes; uint32_t nr_sobjs; @@ -47,7 +39,8 @@ struct join_message { uint64_t ctime; uint32_t result; uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */ - uint8_t pad[3]; + uint8_t proto_ver; + uint8_t pad[2]; union { struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; struct sheepdog_node_list_entry leave_nodes[SD_MAX_NODES]; @@ -55,7 +48,6 @@ struct join_message { }; struct vdi_op_message { - struct message_header header; struct sd_vdi_req req; struct sd_vdi_rsp rsp; uint8_t data[0]; @@ -64,6 +56,8 @@ struct vdi_op_message { struct work_notify { struct cpg_event cev; + struct sheepdog_node_list_entry sender; + struct message_header *msg; }; @@ -260,8 +254,6 @@ forward: return; } - msg->header.msg_length = sizeof(*msg) + hdr->data_length; - msg->header.from = sys->this_node; msg->req = *((struct sd_vdi_req *)&req->rq); msg->rsp = *((struct sd_vdi_rsp *)&req->rp); if (hdr->flags & SD_FLAG_CMD_WRITE) @@ -269,7 +261,7 @@ forward: list_add(&req->pending_list, &sys->pending_list); - sys->cdrv->notify(msg, msg->header.msg_length, vdi_op); + sys->cdrv->notify(msg, sizeof(*msg) + hdr->data_length, vdi_op); free(msg); } @@ -459,18 +451,17 @@ out: return ret; } -static void join(struct join_message *msg) +static void join(struct sheepdog_node_list_entry *joining, struct join_message *msg) { - if (msg->header.proto_ver != SD_SHEEP_PROTO_VER) { + if (msg->proto_ver != SD_SHEEP_PROTO_VER) { eprintf("joining node send a wrong version message\n"); msg->result = SD_RES_VER_MISMATCH; return; } - msg->result = get_cluster_status(&msg->header.from, msg->nodes, - msg->nr_nodes, msg->ctime, - msg->epoch, &msg->cluster_status, - &msg->inc_epoch); + msg->result = get_cluster_status(joining, msg->nodes, msg->nr_nodes, + msg->ctime, msg->epoch, + &msg->cluster_status, &msg->inc_epoch); msg->nr_sobjs = sys->nr_sobjs; msg->cluster_flags = sys->flags; msg->ctime = get_cluster_ctime(); @@ -546,6 +537,7 @@ static int update_epoch_log(int epoch) } static void update_cluster_info(struct join_message *msg, + struct sheepdog_node_list_entry *joined, struct sheepdog_node_list_entry *nodes, size_t nr_nodes) { @@ -566,7 +558,7 @@ static void update_cluster_info(struct join_message *msg, /* add nodes execept for newly joined one */ for (i = 0; i < nr_nodes; i++) { - if (node_cmp(nodes + i, &msg->header.from) == 0) + if (node_cmp(nodes + i, joined) == 0) continue; sys->nodes[sys->nr_nodes++] = nodes[i]; @@ -600,7 +592,7 @@ static void update_cluster_info(struct join_message *msg, update_epoch_log(sys->epoch); join_finished: - sys->nodes[sys->nr_nodes++] = msg->header.from; + sys->nodes[sys->nr_nodes++] = *joined; qsort(sys->nodes, sys->nr_nodes, sizeof(*sys->nodes), node_cmp); sys->nr_vnodes = nodes_to_vnodes(sys->nodes, sys->nr_nodes, sys->vnodes); @@ -780,7 +772,7 @@ static void __sd_notify_done(struct cpg_event *cevent) ret = SD_RES_UNKNOWN; } out: - if (!is_myself(msg->header.from.addr, msg->header.from.port)) + if (!is_myself(w->sender.addr, w->sender.port)) return; req = list_first_entry(&sys->pending_list, struct request, pending_list); @@ -797,13 +789,8 @@ static void sd_notify_handler(struct sheepdog_node_list_entry *sender, { struct cpg_event *cevent; struct work_notify *w; - struct message_header *m = msg; - char name[128]; - dprintf("size: %d, from: %s, pid: %u\n", - m->msg_length, - addr_to_str(name, sizeof(name), m->from.addr, m->from.port), - sender->port); + dprintf("size: %zd, from: %s\n", msg_len, node_to_str(sender)); w = zalloc(sizeof(*w)); if (!w) @@ -814,6 +801,7 @@ static void sd_notify_handler(struct sheepdog_node_list_entry *sender, vprintf(SDOG_DEBUG, "allow new deliver, %p\n", cevent); + w->sender = *sender; if (msg_len) { w->msg = zalloc(msg_len); if (!w->msg) @@ -910,22 +898,20 @@ static enum cluster_join_result sd_check_join_cb( vprintf(SDOG_DEBUG, "%s\n", node_to_str(&sys->this_node)); - jm->header.from = sys->this_node; - nr_entries = ARRAY_SIZE(entries); ret = read_epoch(&epoch, &ctime, entries, &nr_entries); if (ret == SD_RES_SUCCESS) { sys->epoch = epoch; jm->ctime = ctime; - get_cluster_status(&jm->header.from, entries, nr_entries, - ctime, epoch, &jm->cluster_status, NULL); + get_cluster_status(joining, entries, nr_entries, ctime, + epoch, &jm->cluster_status, NULL); } else jm->cluster_status = SD_STATUS_WAIT_FOR_FORMAT; return CJ_RES_SUCCESS; } - join(jm); + join(joining, jm); dprintf("%d, %d\n", jm->result, jm->cluster_status); if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) { @@ -957,9 +943,7 @@ static int send_join_request(struct sheepdog_node_list_entry *ent) int nr_entries, ret; memset(&msg, 0, sizeof(msg)); - msg.header.proto_ver = SD_SHEEP_PROTO_VER; - msg.header.msg_length = sizeof(msg); - msg.header.from = *ent; + msg.proto_ver = SD_SHEEP_PROTO_VER; get_cluster_copies(&msg.nr_sobjs); get_cluster_flags(&msg.cluster_flags); @@ -969,7 +953,7 @@ static int send_join_request(struct sheepdog_node_list_entry *ent) if (ret == SD_RES_SUCCESS) msg.nr_nodes = nr_entries; - ret = sys->cdrv->join(ent, sd_check_join_cb, &msg, msg.header.msg_length); + ret = sys->cdrv->join(ent, sd_check_join_cb, &msg, sizeof(msg)); vprintf(SDOG_INFO, "%s\n", node_to_str(&sys->this_node)); @@ -984,7 +968,7 @@ static void __sd_join_done(struct cpg_event *cevent) print_node_list(sys->nodes, sys->nr_nodes); - update_cluster_info(jm, w->member_list, w->member_list_entries); + update_cluster_info(jm, &w->joined, w->member_list, w->member_list_entries); if (sys_can_recover()) { list_for_each_entry_safe(node, t, &sys->leave_list, list) { -- 1.7.2.5 |