- __sd_confchg looks too large. This cleans up it with a new helper function, for_each_node_list(). - if we fail to add a new node to oom, we can't continue. TODO: we need to call panic() in some failure in del_node(). Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp> --- collie/group.c | 232 +++++++++++++++++++++++++++++--------------------------- 1 files changed, 121 insertions(+), 111 deletions(-) diff --git a/collie/group.c b/collie/group.c index 7a22232..ce8ad53 100644 --- a/collie/group.c +++ b/collie/group.c @@ -328,21 +328,6 @@ static void group_handler(int listen_fd, int events, void *data) cpg_dispatch(sys->handle, CPG_DISPATCH_ALL); } -static void add_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid) -{ - struct node *node; - - node = zalloc(sizeof(*node)); - if (!node) { - eprintf("out of memory\n"); - return; - } - node->nodeid = nodeid; - node->pid = pid; - - list_add_tail(&node->list, node_list); -} - static struct node *find_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid) { struct node *node; @@ -928,99 +913,122 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, start_cpg_event_work(); } -static void __sd_confchg(struct cpg_event *cevent) +static void for_each_node_list(struct cpg_address list[], int count, + void (*func)(struct cpg_address *addr, + struct work_confchg *w), + struct work_confchg *w) { - struct work_confchg *w = &cevent->c; - struct node *node; int i; + for (i = 0; i < count; i++) + func(&list[i], w); +} - const struct cpg_address *member_list = w->member_list; - size_t member_list_entries = w->member_list_entries; - const struct cpg_address *left_list = w->left_list; - size_t left_list_entries = w->left_list_entries; - const struct cpg_address *joined_list = w->joined_list; - size_t joined_list_entries = w->joined_list_entries; +static void add_node(struct cpg_address *addr, struct work_confchg *w) +{ + struct node *node; - if (member_list_entries == joined_list_entries - left_list_entries && - sys->this_nodeid == member_list[0].nodeid && - sys->this_pid == member_list[0].pid){ - sys->join_finished = 1; - w->first_cpg_node = 1; - } + node = zalloc(sizeof(*node)); + if (!node) + panic("failed to alloc memory for a new node\n"); - if (list_empty(&sys->cpg_node_list)) { - for (i = 0; i < member_list_entries; i++) - add_node(&sys->cpg_node_list, member_list[i].nodeid, member_list[i].pid); - } else { - for (i = 0; i < joined_list_entries; i++) - add_node(&sys->cpg_node_list, joined_list[i].nodeid, joined_list[i].pid); - } + node->nodeid = addr->nodeid; + node->pid = addr->pid; - for (i = 0; i < left_list_entries; i++) { - /* the node must be on sd_node_list or cpg_node_list. */ + list_add_tail(&node->list, &sys->cpg_node_list); +} - node = find_node(&sys->sd_node_list, left_list[i].nodeid, left_list[i].pid); - if (node) { - int nr; - struct sheepdog_node_list_entry e[SD_MAX_NODES]; - struct vm *vm, *n; - int ret, size; - uint64_t oid; - void *buf; - - size = sizeof(*w->failed_vdis) * 64; - w->failed_vdis = malloc(size); - list_for_each_entry_safe(vm, n, &sys->vm_list, list) { - if (memcmp(vm->ent.host_addr, node->ent.addr, - sizeof(node->ent.addr)) != 0) - continue; - if (vm->ent.host_port != node->ent.port) - continue; - - if (w->nr_failed_vdis * sizeof(*w->failed_vdis) >= size) { - size *= 2; - buf = realloc(w->failed_vdis, size); - if (!buf) { - eprintf("out of memory, %d\n", size); - break; - } - w->failed_vdis = buf; +static void del_node(struct cpg_address *addr, struct work_confchg *w) +{ + struct node *node; + + node = find_node(&sys->sd_node_list, addr->nodeid, addr->pid); + if (node) { + int nr; + struct sheepdog_node_list_entry e[SD_MAX_NODES]; + struct vm *vm, *n; + int ret, size; + uint64_t oid; + void *buf; + + size = sizeof(*w->failed_vdis) * 64; + w->failed_vdis = malloc(size); + list_for_each_entry_safe(vm, n, &sys->vm_list, list) { + if (memcmp(vm->ent.host_addr, node->ent.addr, + sizeof(node->ent.addr)) != 0) + continue; + if (vm->ent.host_port != node->ent.port) + continue; + + if (w->nr_failed_vdis * sizeof(*w->failed_vdis) >= size) { + size *= 2; + buf = realloc(w->failed_vdis, size); + if (!buf) { + eprintf("out of memory, %d\n", size); + break; } + w->failed_vdis = buf; + } - ret = lookup_vdi((char *)vm->ent.name, - sizeof(vm->ent.name), &oid, 0); - if (ret == SD_RES_SUCCESS) - w->failed_vdis[w->nr_failed_vdis++] = oid_to_bit(oid); - else - eprintf("cannot find vdi %s\n", vm->ent.name); + ret = lookup_vdi((char *)vm->ent.name, + sizeof(vm->ent.name), &oid, 0); + if (ret == SD_RES_SUCCESS) + w->failed_vdis[w->nr_failed_vdis++] = oid_to_bit(oid); + else + eprintf("cannot find vdi %s\n", vm->ent.name); - list_del(&vm->list); - free(vm); - } + list_del(&vm->list); + free(vm); + } + list_del(&node->list); + free(node); + + if (sys->status == SD_STATUS_OK) { + nr = get_ordered_sd_node_list(e); + dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr); + epoch_log_write(sys->epoch + 1, (char *)e, + nr * sizeof(struct sheepdog_node_list_entry)); + + sys->epoch++; + + update_epoch_store(sys->epoch); + } + } else { + node = find_node(&sys->cpg_node_list, addr->nodeid, addr->pid); + if (node) { list_del(&node->list); free(node); + } + } +} - if (sys->status == SD_STATUS_OK) { - nr = get_ordered_sd_node_list(e); - dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr); - epoch_log_write(sys->epoch + 1, (char *)e, - nr * sizeof(struct sheepdog_node_list_entry)); +static int is_my_cpg_addr(struct cpg_address *addr) +{ + return (sys->this_nodeid == addr->nodeid) && + (sys->this_pid == addr->pid); +} - sys->epoch++; +static void __sd_confchg(struct cpg_event *cevent) +{ + struct work_confchg *w = &cevent->c; - update_epoch_store(sys->epoch); - } - } else { - node = find_node(&sys->cpg_node_list, left_list[i].nodeid, - left_list[i].pid); - if (node) { - list_del(&node->list); - free(node); - } - } + if (w->member_list_entries == + w->joined_list_entries - w->left_list_entries && + is_my_cpg_addr(w->member_list)) { + sys->join_finished = 1; + w->first_cpg_node = 1; } + if (list_empty(&sys->cpg_node_list)) + for_each_node_list(w->member_list, w->member_list_entries, + add_node, w); + else + for_each_node_list(w->joined_list, w->joined_list_entries, + add_node, w); + + for_each_node_list(w->left_list, w->left_list_entries, + del_node, w); + if (w->first_cpg_node) { struct join_message msg; @@ -1046,35 +1054,37 @@ static void __sd_confchg(struct cpg_event *cevent) print_node_list(&sys->sd_node_list); } -static void __sd_confchg_done(struct cpg_event *cevent) +static void send_join_request(struct cpg_address *addr, struct work_confchg *w) { - struct work_confchg *w = &cevent->c; - int i; + struct join_message msg; - if (w->first_cpg_node) - goto skip_join; + /* if I've just joined in cpg, I'll join in sheepdog. */ + if (!is_my_cpg_addr(addr)) + return; - for (i = 0; i < w->joined_list_entries; i++) { - if (sys->this_nodeid == w->joined_list[i].nodeid && - sys->this_pid == w->joined_list[i].pid) { - struct join_message msg; + msg.header.op = SD_MSG_JOIN; + msg.header.state = DM_INIT; + msg.header.msg_length = sizeof(msg); + msg.header.from = sys->this_node; + msg.header.nodeid = sys->this_nodeid; + msg.header.pid = sys->this_pid; - msg.header.op = SD_MSG_JOIN; - msg.header.state = DM_INIT; - msg.header.msg_length = sizeof(msg); - msg.header.from = sys->this_node; - msg.header.nodeid = sys->this_nodeid; - msg.header.pid = sys->this_pid; + get_global_nr_copies(&msg.nr_sobjs); - get_global_nr_copies(&msg.nr_sobjs); + send_message(sys->handle, (struct message_header *)&msg); - send_message(sys->handle, (struct message_header *)&msg); + vprintf(SDOG_INFO "%u %u\n", sys->this_nodeid, sys->this_pid); +} - eprintf("%d\n", i); +static void __sd_confchg_done(struct cpg_event *cevent) +{ + struct work_confchg *w = &cevent->c; - break; - } - } + if (w->first_cpg_node) + goto skip_join; + + for_each_node_list(w->joined_list, w->joined_list_entries, + send_join_request, w); skip_join: /* FIXME: worker threads can't call start_recovery */ -- 1.6.5 |