We need to separate a node list into sheepdog node list and corosync node list because corosync leave messages may come during sheepdog is processing join messages and it will break node list information. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- collie/collie.h | 3 +- collie/group.c | 114 ++++++++++++++++++++++++++++++++++++++++--------------- collie/store.c | 12 +++--- collie/vdi.c | 6 +- 4 files changed, 94 insertions(+), 41 deletions(-) diff --git a/collie/collie.h b/collie/collie.h index 098d4c9..d24a6c4 100644 --- a/collie/collie.h +++ b/collie/collie.h @@ -63,7 +63,8 @@ struct cluster_info { uint32_t epoch; - struct list_head node_list; + struct list_head cpg_node_list; + struct list_head sd_node_list; int node_list_idx; struct list_head vm_list; struct list_head pending_list; diff --git a/collie/group.c b/collie/group.c index c3508e8..9b0fde3 100644 --- a/collie/group.c +++ b/collie/group.c @@ -135,16 +135,16 @@ static void get_node_list(struct sd_node_req *req, int nr_nodes; struct node *node; - nr_nodes = build_node_list(&sys->node_list, data); + nr_nodes = build_node_list(&sys->sd_node_list, data); rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry); rsp->nr_nodes = nr_nodes; rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes); - if (list_empty(&sys->node_list)) { + if (list_empty(&sys->sd_node_list)) { rsp->master_idx = -1; return; } - node = list_first_entry(&sys->node_list, struct node, list); + node = list_first_entry(&sys->sd_node_list, struct node, list); rsp->master_idx = get_node_idx(&node->ent, data, nr_nodes); } @@ -228,11 +228,11 @@ static void group_handler(int listen_fd, int events, void *data) cpg_dispatch(sys->handle, CPG_DISPATCH_ALL); } -static void print_node_list(void) +static void print_node_list(struct list_head *node_list) { struct node *node; char name[128]; - list_for_each_entry(node, &sys->node_list, list) { + list_for_each_entry(node, node_list, list) { dprintf("%c nodeid: %x, pid: %d, ip: %s\n", node_cmp(&node->ent, &sys->this_node) ? ' ' : 'l', node->nodeid, node->pid, @@ -240,7 +240,7 @@ static void print_node_list(void) } } -static void add_node(uint32_t nodeid, uint32_t pid, +static void add_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid, struct sheepdog_node_list_entry *sd_ent) { struct node *node; @@ -252,8 +252,21 @@ static void add_node(uint32_t nodeid, uint32_t pid, } node->nodeid = nodeid; node->pid = pid; - node->ent = *sd_ent; - list_add_tail(&node->list, &sys->node_list); + if (sd_ent) + node->ent = *sd_ent; + list_add_tail(&node->list, node_list); +} + +static struct node *find_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid) +{ + struct node *node; + + list_for_each_entry(node, node_list, list) { + if (node->nodeid == nodeid && node->pid == pid) + return node; + } + + return NULL; } static int is_master(void) @@ -263,10 +276,10 @@ static int is_master(void) if (!sys->synchronized) return 0; - if (list_empty(&sys->node_list)) + if (list_empty(&sys->sd_node_list)) return 1; - node = list_first_entry(&sys->node_list, struct node, list); + node = list_first_entry(&sys->sd_node_list, struct node, list); if (node_cmp(&node->ent, &sys->this_node) == 0) return 1; @@ -288,7 +301,12 @@ static void join(struct join_message *msg) msg->epoch = sys->epoch; msg->nr_sobjs = sys->nr_sobjs; - list_for_each_entry(node, &sys->node_list, list) { + list_for_each_entry(node, &sys->cpg_node_list, list) { + if (node->nodeid == msg->nodeid && node->pid == msg->pid) + continue; + if (node->ent.id == 0) + continue; + msg->nodes[msg->nr_nodes].nodeid = node->nodeid; msg->nodes[msg->nr_nodes].pid = node->pid; msg->nodes[msg->nr_nodes].ent = node->ent; @@ -309,20 +327,29 @@ static void update_cluster_info(struct join_message *msg) if (sys->synchronized) goto out; - list_for_each_entry_safe(node, e, &sys->node_list, list) { + list_for_each_entry_safe(node, e, &sys->sd_node_list, list) { list_del(&node->list); free(node); } - INIT_LIST_HEAD(&sys->node_list); - for (i = 0; i < nr_nodes; i++) - add_node(msg->nodes[i].nodeid, msg->nodes[i].pid, + INIT_LIST_HEAD(&sys->sd_node_list); + for (i = 0; i < nr_nodes; i++) { + node = find_node(&sys->cpg_node_list, msg->nodes[i].nodeid, + msg->nodes[i].pid); + if (!node) + continue; + + if (!node->ent.id) + node->ent = msg->nodes[i].ent; + + add_node(&sys->sd_node_list, msg->nodes[i].nodeid, msg->nodes[i].pid, &msg->nodes[i].ent); + } sys->epoch = msg->epoch; sys->synchronized = 1; - nr_nodes = build_node_list(&sys->node_list, entry); + nr_nodes = build_node_list(&sys->sd_node_list, entry); ret = epoch_log_write(sys->epoch, (char *)entry, nr_nodes * sizeof(struct sheepdog_node_list_entry)); @@ -332,9 +359,9 @@ static void update_cluster_info(struct join_message *msg) /* we are ready for object operations */ update_epoch_store(sys->epoch); out: - add_node(msg->nodeid, msg->pid, &msg->header.from); + add_node(&sys->sd_node_list, msg->nodeid, msg->pid, &msg->header.from); - nr_nodes = build_node_list(&sys->node_list, entry); + nr_nodes = build_node_list(&sys->sd_node_list, entry); ret = epoch_log_write(sys->epoch + 1, (char *)entry, nr_nodes * sizeof(struct sheepdog_node_list_entry)); @@ -345,7 +372,7 @@ out: update_epoch_store(sys->epoch); - print_node_list(); + print_node_list(&sys->sd_node_list); } static void vdi_op(struct vdi_op_message *msg) @@ -455,11 +482,26 @@ static void __sd_deliver(struct work *work, int idx) struct work_deliver *w = container_of(work, struct work_deliver, work); struct message_header *m = w->msg; char name[128]; + struct node *node; dprintf("op: %d, done: %d, size: %d, from: %s\n", m->op, m->done, m->msg_length, addr_to_str(name, sizeof(name), m->from.addr, m->from.port)); + if (m->op == SD_MSG_JOIN) { + uint32_t nodeid = ((struct join_message *)m)->nodeid; + uint32_t pid = ((struct join_message *)m)->pid; + + node = find_node(&sys->cpg_node_list, nodeid, pid); + if (!node) { + dprintf("the node was left before join operation is finished\n"); + return; + } + + if (!node->ent.id) + node->ent = m->from; + } + if (!m->done) { if (!is_master()) return; @@ -544,7 +586,7 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, static void __sd_confch(struct work *work, int idx) { struct work_confch *w = container_of(work, struct work_confch, work); - struct node *node, *e; + struct node *node; int i; const struct cpg_address *member_list = w->member_list; @@ -559,22 +601,31 @@ static void __sd_confch(struct work *work, int idx) sys->this_pid == member_list[0].pid) sys->synchronized = 1; + if (list_empty(&sys->cpg_node_list)) { + for (i = 0; i < member_list_entries; i++) + add_node(&sys->cpg_node_list, member_list[i].nodeid, member_list[i].pid, NULL); + } else { + for (i = 0; i < joined_list_entries; i++) + add_node(&sys->cpg_node_list, joined_list[i].nodeid, joined_list[i].pid, NULL); + } + for (i = 0; i < left_list_entries; i++) { - list_for_each_entry_safe(node, e, &sys->node_list, list) { + node = find_node(&sys->cpg_node_list, left_list[i].nodeid, left_list[i].pid); + if (node) { + list_del(&node->list); + free(node); + } else + eprintf("System error\n"); + + node = find_node(&sys->sd_node_list, left_list[i].nodeid, left_list[i].pid); + if (node) { int nr; - unsigned pid; struct sheepdog_node_list_entry e[SD_MAX_NODES]; - if (node->nodeid != left_list[i].nodeid || - node->pid != left_list[i].pid) - continue; - - pid = node->pid; - list_del(&node->list); free(node); - nr = build_node_list(&sys->node_list, e); + nr = build_node_list(&sys->sd_node_list, e); epoch_log_write(sys->epoch + 1, (char *)e, nr * sizeof(struct sheepdog_node_list_entry)); @@ -608,7 +659,7 @@ static void __sd_confch(struct work *work, int idx) if (left_list_entries == 0) return; - print_node_list(); + print_node_list(&sys->sd_node_list); } static void __sd_confch_done(struct work *work, int idx) @@ -808,7 +859,8 @@ join_retry: sys->this_node.id = hval; sys->synchronized = 0; - INIT_LIST_HEAD(&sys->node_list); + INIT_LIST_HEAD(&sys->sd_node_list); + INIT_LIST_HEAD(&sys->cpg_node_list); INIT_LIST_HEAD(&sys->vm_list); INIT_LIST_HEAD(&sys->pending_list); cpg_context_set(cpg_handle, sys); diff --git a/collie/store.c b/collie/store.c index 05b19c6..1c416d2 100644 --- a/collie/store.c +++ b/collie/store.c @@ -148,7 +148,7 @@ static int read_from_one(uint64_t oid, e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry)); again: - nr = build_node_list(&sys->node_list, e); + nr = build_node_list(&sys->sd_node_list, e); for (i = 0; i < nr; i++) { n = obj_to_sheep(e, nr, oid, i); @@ -229,7 +229,7 @@ static int forward_obj_req(struct request *req, char *buf) e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry)); again: - nr = build_node_list(&sys->node_list, e); + nr = build_node_list(&sys->sd_node_list, e); copies = hdr->copies; @@ -340,7 +340,7 @@ static int is_my_obj(uint64_t oid, int copies) int i, n, nr; struct sheepdog_node_list_entry e[SD_MAX_NODES]; - nr = build_node_list(&sys->node_list, e); + nr = build_node_list(&sys->sd_node_list, e); for (i = 0; i < copies; i++) { n = obj_to_sheep(e, nr, oid, i); @@ -538,7 +538,7 @@ void store_queue_request(struct work *work, int idx) dprintf("%d, %x, %" PRIx64" , %u, %u\n", idx, opcode, oid, epoch, req_epoch); - if (list_empty(&sys->node_list)) { + if (list_empty(&sys->sd_node_list)) { /* we haven't got SD_OP_GET_NODE_LIST response yet. */ ret = SD_RES_SYSTEM_ERROR; goto out; @@ -732,7 +732,7 @@ void so_queue_request(struct work *work, int idx) char oldname[1024]; uint16_t id = 0; - if (list_empty(&sys->node_list)) { + if (list_empty(&sys->sd_node_list)) { /* we haven't got SD_OP_GET_NODE_LIST response yet. */ result = SD_RES_SYSTEM_ERROR; goto out; @@ -748,7 +748,7 @@ void so_queue_request(struct work *work, int idx) int local = 0; e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry)); - nr = build_node_list(&sys->node_list, e); + nr = build_node_list(&sys->sd_node_list, e); for (i = 0; i < sys->nr_sobjs; i++) { n = obj_to_sheep(e, nr, SD_DIR_OID, i); diff --git a/collie/vdi.c b/collie/vdi.c index f30a14a..5904488 100644 --- a/collie/vdi.c +++ b/collie/vdi.c @@ -93,7 +93,7 @@ int add_vdi(char *name, int len, uint64_t size, memset(&req, 0, sizeof(req)); - nr_nodes = build_node_list(&sys->node_list, entries); + nr_nodes = build_node_list(&sys->sd_node_list, entries); dprintf("%s (%d) %" PRIu64 ", base: %" PRIu64 "\n", name, len, size, base_oid); @@ -149,7 +149,7 @@ int lookup_vdi(char *filename, uint64_t * oid, uint32_t tag, int do_lock, memset(&req, 0, sizeof(req)); - nr_nodes = build_node_list(&sys->node_list, entries); + nr_nodes = build_node_list(&sys->sd_node_list, entries); *current = 0; @@ -196,7 +196,7 @@ int make_super_object(struct sd_vdi_req *hdr) req.ctime = (uint64_t)tv.tv_sec << 32 | tv.tv_usec * 1000; req.copies = ((struct sd_obj_req *)hdr)->copies; - nr_nodes = build_node_list(&sys->node_list, entries); + nr_nodes = build_node_list(&sys->sd_node_list, entries); ret = exec_reqs(entries, nr_nodes, sys->epoch, SD_DIR_OID, (struct sd_req *)&req, NULL, 0, 0, req.copies, -- 1.5.6.5 |