This depends on the logging patch that I've just sent. = From: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp> Subject: [PATCH] simplify confchg management - avoids allocating memory every time we call confchg handler since we can't handle the memory allocation failure in confchg handler. - avoid using the work queue mechnism. it simplifies the code and also solves the problem that we can't call the recovery code in work threads. Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp> --- collie/collie.h | 1 + collie/group.c | 184 ++++++++++++++++++++++++------------------------------- 2 files changed, 82 insertions(+), 103 deletions(-) diff --git a/collie/collie.h b/collie/collie.h index 5cd2383..af2c8e9 100644 --- a/collie/collie.h +++ b/collie/collie.h @@ -71,6 +71,7 @@ struct cluster_info { struct list_head pending_list; int nr_sobjs; + int got_confchg; }; struct cluster_info *sys; diff --git a/collie/group.c b/collie/group.c index a49c1be..48e607f 100644 --- a/collie/group.c +++ b/collie/group.c @@ -72,17 +72,6 @@ struct work_deliver { struct work work; }; -struct work_confch { - struct cpg_address *member_list; - size_t member_list_entries; - struct cpg_address *left_list; - size_t left_list_entries; - struct cpg_address *joined_list; - size_t joined_list_entries; - - struct work work; -}; - static int node_cmp(const void *a, const void *b) { const struct sheepdog_node_list_entry *node1 = a; @@ -242,11 +231,6 @@ static struct vm *lookup_vm(struct list_head *entries, char *name) return NULL; } -static void group_handler(int listen_fd, int events, void *data) -{ - cpg_dispatch(sys->handle, CPG_DISPATCH_ALL); -} - static void print_node_list(struct list_head *node_list) { struct node *node; @@ -736,41 +720,44 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, queue_work(dobj_queue, &w->work); } -static void __sd_confch(struct work *work, int idx) +static size_t saved_left_count; +static size_t saved_joined_count; +static size_t saved_member_count; +static struct cpg_address saved_left[SD_MAX_NODES]; +static struct cpg_address saved_joined[SD_MAX_NODES]; +static struct cpg_address saved_member[SD_MAX_NODES]; + +static void process_confchg(void) { - struct work_confch *w = container_of(work, struct work_confch, work); struct node *node; int i; - const struct cpg_address *member_list = w->member_list; - size_t member_list_entries = w->member_list_entries; - const struct cpg_address *left_list = w->left_list; - size_t left_list_entries = w->left_list_entries; - const struct cpg_address *joined_list = w->joined_list; - size_t joined_list_entries = w->joined_list_entries; - - if (member_list_entries == joined_list_entries - left_list_entries && - sys->this_nodeid == member_list[0].nodeid && - sys->this_pid == member_list[0].pid) + if (saved_member_count == saved_joined_count - saved_left_count && + sys->this_nodeid == saved_member[0].nodeid && + sys->this_pid == saved_member[0].pid) sys->synchronized = 1; if (list_empty(&sys->cpg_node_list)) { - for (i = 0; i < member_list_entries; i++) - add_node(&sys->cpg_node_list, member_list[i].nodeid, member_list[i].pid, NULL); + for (i = 0; i < saved_member_count; i++) + add_node(&sys->cpg_node_list, saved_member[i].nodeid, + saved_member[i].pid, NULL); } else { - for (i = 0; i < joined_list_entries; i++) - add_node(&sys->cpg_node_list, joined_list[i].nodeid, joined_list[i].pid, NULL); + for (i = 0; i < saved_joined_count; i++) + add_node(&sys->cpg_node_list, saved_joined[i].nodeid, + saved_joined[i].pid, NULL); } - for (i = 0; i < left_list_entries; i++) { - node = find_node(&sys->cpg_node_list, left_list[i].nodeid, left_list[i].pid); + for (i = 0; i < saved_left_count; i++) { + node = find_node(&sys->cpg_node_list, saved_left[i].nodeid, + saved_left[i].pid); if (node) { list_del(&node->list); free(node); } else eprintf("System error\n"); - node = find_node(&sys->sd_node_list, left_list[i].nodeid, left_list[i].pid); + node = find_node(&sys->sd_node_list, saved_left[i].nodeid, + saved_left[i].pid); if (node) { int nr; struct sheepdog_node_list_entry e[SD_MAX_NODES]; @@ -791,9 +778,9 @@ static void __sd_confch(struct work *work, int idx) } } - for (i = 0; i < joined_list_entries; i++) { - if (sys->this_nodeid == joined_list[i].nodeid && - sys->this_pid == joined_list[i].pid) { + for (i = 0; i < saved_joined_count; i++) { + if (sys->this_nodeid == saved_joined[i].nodeid && + sys->this_pid == saved_joined[i].pid) { struct join_message msg; msg.header.op = SD_MSG_JOIN; @@ -812,39 +799,26 @@ static void __sd_confch(struct work *work, int idx) } } - if (left_list_entries == 0) + if (saved_left_count) { + if (saved_left_count > 0) + vprintf(SDOG_ERR "can't handle %zd left members\n", + saved_left_count); + start_recovery(sys->epoch, 0); + } else return; print_node_list(&sys->sd_node_list); } -static void __sd_confch_done(struct work *work, int idx) +static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name, + const struct cpg_address *member_list, + size_t member_list_entries, + const struct cpg_address *left_list, + size_t left_list_entries, + const struct cpg_address *joined_list, + size_t joined_list_entries) { - struct work_confch *w = container_of(work, struct work_confch, work); - - /* FIXME: worker threads can't call start_recovery */ - if (w->left_list_entries) { - if (w->left_list_entries > 1) - eprintf("we can't handle %Zd\n", w->left_list_entries); - start_recovery(sys->epoch, 0); - } - - free(w->member_list); - free(w->left_list); - free(w->joined_list); - free(w); -} - -static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name, - const struct cpg_address *member_list, - size_t member_list_entries, - const struct cpg_address *left_list, - size_t left_list_entries, - const struct cpg_address *joined_list, - size_t joined_list_entries) -{ - struct work_confch *w = NULL; - int i, size; + int i; dprintf("confchg nodeid %x\n", member_list[0].nodeid); dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries, @@ -858,47 +832,51 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name, if (sys->status == SD_STATUS_SHUTDOWN || sys->status == SD_STATUS_INCONSISTENT_EPOCHS) return; - w = zalloc(sizeof(*w)); - if (!w) - return; + if (member_list_entries > SD_MAX_NODES) { + vprintf(SDOG_WARNING "too many members %Zd\n", member_list_entries); + member_list_entries = SD_MAX_NODES; + } - size = sizeof(struct cpg_address) * member_list_entries; - w->member_list = zalloc(size); - if (!w->member_list) - goto err; - memcpy(w->member_list, member_list, size); - w->member_list_entries = member_list_entries; - - size = sizeof(struct cpg_address) * left_list_entries; - w->left_list = zalloc(size); - if (!w->left_list) - goto err; - memcpy(w->left_list, left_list, size); - w->left_list_entries = left_list_entries; - - size = sizeof(struct cpg_address) * joined_list_entries; - w->joined_list = zalloc(size); - if (!w->joined_list) - goto err; - memcpy(w->joined_list, joined_list, size); - w->joined_list_entries = joined_list_entries; - - w->work.fn = __sd_confch; - w->work.done = __sd_confch_done; - w->work.attr = WORK_ORDERED; + if (left_list_entries > SD_MAX_NODES) { + vprintf(SDOG_WARNING "too many left %Zd\n", left_list_entries); + left_list_entries = SD_MAX_NODES; + } - queue_work(dobj_queue, &w->work); - return; -err: - if (!w) + if (joined_list_entries > SD_MAX_NODES) { + vprintf(SDOG_WARNING "too many left %Zd\n", joined_list_entries); + joined_list_entries = SD_MAX_NODES; + } + + saved_member_count = member_list_entries; + saved_left_count = left_list_entries; + saved_joined_count = joined_list_entries; + + memcpy(saved_member, member_list, + sizeof(struct cpg_address) * saved_member_count); + + memcpy(saved_left, left_list, + sizeof(struct cpg_address) * saved_left_count); + + memcpy(saved_joined, joined_list, + sizeof(struct cpg_address) * saved_joined_count); + + sys->got_confchg = 1; +} + +static void group_handler(int listen_fd, int events, void *data) +{ + int ret; + + sys->got_confchg = 0; + + ret = cpg_dispatch(sys->handle, CPG_DISPATCH_ONE); + if (ret != CPG_OK) { + vprintf(SDOG_WARNING "cpg_dispatch failed %d\n", ret); return; + } - if (w->member_list) - free(w->member_list); - if (w->left_list) - free(w->left_list); - if (w->joined_list) - free(w->joined_list); + if (sys->got_confchg) + process_confchg(); } int build_node_list(struct list_head *node_list, @@ -925,7 +903,7 @@ int create_cluster(int port) struct addrinfo hints, *res, *res0; char name[INET6_ADDRSTRLEN]; struct cpg_name group = { 8, "sheepdog" }; - cpg_callbacks_t cb = { &sd_deliver, &sd_confch }; + cpg_callbacks_t cb = { &sd_deliver, &sd_confchg }; unsigned int nodeid = 0; uint64_t hval; int i; -- 1.6.5 |