[Sheepdog] [PATCH] split __sd_confchg
FUJITA Tomonori
fujita.tomonori at lab.ntt.co.jp
Wed Apr 21 11:05:57 CEST 2010
- __sd_confchg looks too large. This cleans up it with a new helper
function, for_each_node_list().
- if we fail to add a new node to oom, we can't continue.
TODO: we need to call panic() in some failure in del_node().
Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp>
---
collie/group.c | 232 +++++++++++++++++++++++++++++---------------------------
1 files changed, 121 insertions(+), 111 deletions(-)
diff --git a/collie/group.c b/collie/group.c
index 7a22232..ce8ad53 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -328,21 +328,6 @@ static void group_handler(int listen_fd, int events, void *data)
cpg_dispatch(sys->handle, CPG_DISPATCH_ALL);
}
-static void add_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid)
-{
- struct node *node;
-
- node = zalloc(sizeof(*node));
- if (!node) {
- eprintf("out of memory\n");
- return;
- }
- node->nodeid = nodeid;
- node->pid = pid;
-
- list_add_tail(&node->list, node_list);
-}
-
static struct node *find_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid)
{
struct node *node;
@@ -928,99 +913,122 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
start_cpg_event_work();
}
-static void __sd_confchg(struct cpg_event *cevent)
+static void for_each_node_list(struct cpg_address list[], int count,
+ void (*func)(struct cpg_address *addr,
+ struct work_confchg *w),
+ struct work_confchg *w)
{
- struct work_confchg *w = &cevent->c;
- struct node *node;
int i;
+ for (i = 0; i < count; i++)
+ func(&list[i], w);
+}
- const struct cpg_address *member_list = w->member_list;
- size_t member_list_entries = w->member_list_entries;
- const struct cpg_address *left_list = w->left_list;
- size_t left_list_entries = w->left_list_entries;
- const struct cpg_address *joined_list = w->joined_list;
- size_t joined_list_entries = w->joined_list_entries;
+static void add_node(struct cpg_address *addr, struct work_confchg *w)
+{
+ struct node *node;
- if (member_list_entries == joined_list_entries - left_list_entries &&
- sys->this_nodeid == member_list[0].nodeid &&
- sys->this_pid == member_list[0].pid){
- sys->join_finished = 1;
- w->first_cpg_node = 1;
- }
+ node = zalloc(sizeof(*node));
+ if (!node)
+ panic("failed to alloc memory for a new node\n");
- if (list_empty(&sys->cpg_node_list)) {
- for (i = 0; i < member_list_entries; i++)
- add_node(&sys->cpg_node_list, member_list[i].nodeid, member_list[i].pid);
- } else {
- for (i = 0; i < joined_list_entries; i++)
- add_node(&sys->cpg_node_list, joined_list[i].nodeid, joined_list[i].pid);
- }
+ node->nodeid = addr->nodeid;
+ node->pid = addr->pid;
- for (i = 0; i < left_list_entries; i++) {
- /* the node must be on sd_node_list or cpg_node_list. */
+ list_add_tail(&node->list, &sys->cpg_node_list);
+}
- node = find_node(&sys->sd_node_list, left_list[i].nodeid, left_list[i].pid);
- if (node) {
- int nr;
- struct sheepdog_node_list_entry e[SD_MAX_NODES];
- struct vm *vm, *n;
- int ret, size;
- uint64_t oid;
- void *buf;
-
- size = sizeof(*w->failed_vdis) * 64;
- w->failed_vdis = malloc(size);
- list_for_each_entry_safe(vm, n, &sys->vm_list, list) {
- if (memcmp(vm->ent.host_addr, node->ent.addr,
- sizeof(node->ent.addr)) != 0)
- continue;
- if (vm->ent.host_port != node->ent.port)
- continue;
-
- if (w->nr_failed_vdis * sizeof(*w->failed_vdis) >= size) {
- size *= 2;
- buf = realloc(w->failed_vdis, size);
- if (!buf) {
- eprintf("out of memory, %d\n", size);
- break;
- }
- w->failed_vdis = buf;
+static void del_node(struct cpg_address *addr, struct work_confchg *w)
+{
+ struct node *node;
+
+ node = find_node(&sys->sd_node_list, addr->nodeid, addr->pid);
+ if (node) {
+ int nr;
+ struct sheepdog_node_list_entry e[SD_MAX_NODES];
+ struct vm *vm, *n;
+ int ret, size;
+ uint64_t oid;
+ void *buf;
+
+ size = sizeof(*w->failed_vdis) * 64;
+ w->failed_vdis = malloc(size);
+ list_for_each_entry_safe(vm, n, &sys->vm_list, list) {
+ if (memcmp(vm->ent.host_addr, node->ent.addr,
+ sizeof(node->ent.addr)) != 0)
+ continue;
+ if (vm->ent.host_port != node->ent.port)
+ continue;
+
+ if (w->nr_failed_vdis * sizeof(*w->failed_vdis) >= size) {
+ size *= 2;
+ buf = realloc(w->failed_vdis, size);
+ if (!buf) {
+ eprintf("out of memory, %d\n", size);
+ break;
}
+ w->failed_vdis = buf;
+ }
- ret = lookup_vdi((char *)vm->ent.name,
- sizeof(vm->ent.name), &oid, 0);
- if (ret == SD_RES_SUCCESS)
- w->failed_vdis[w->nr_failed_vdis++] = oid_to_bit(oid);
- else
- eprintf("cannot find vdi %s\n", vm->ent.name);
+ ret = lookup_vdi((char *)vm->ent.name,
+ sizeof(vm->ent.name), &oid, 0);
+ if (ret == SD_RES_SUCCESS)
+ w->failed_vdis[w->nr_failed_vdis++] = oid_to_bit(oid);
+ else
+ eprintf("cannot find vdi %s\n", vm->ent.name);
- list_del(&vm->list);
- free(vm);
- }
+ list_del(&vm->list);
+ free(vm);
+ }
+ list_del(&node->list);
+ free(node);
+
+ if (sys->status == SD_STATUS_OK) {
+ nr = get_ordered_sd_node_list(e);
+ dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr);
+ epoch_log_write(sys->epoch + 1, (char *)e,
+ nr * sizeof(struct sheepdog_node_list_entry));
+
+ sys->epoch++;
+
+ update_epoch_store(sys->epoch);
+ }
+ } else {
+ node = find_node(&sys->cpg_node_list, addr->nodeid, addr->pid);
+ if (node) {
list_del(&node->list);
free(node);
+ }
+ }
+}
- if (sys->status == SD_STATUS_OK) {
- nr = get_ordered_sd_node_list(e);
- dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr);
- epoch_log_write(sys->epoch + 1, (char *)e,
- nr * sizeof(struct sheepdog_node_list_entry));
+static int is_my_cpg_addr(struct cpg_address *addr)
+{
+ return (sys->this_nodeid == addr->nodeid) &&
+ (sys->this_pid == addr->pid);
+}
- sys->epoch++;
+static void __sd_confchg(struct cpg_event *cevent)
+{
+ struct work_confchg *w = &cevent->c;
- update_epoch_store(sys->epoch);
- }
- } else {
- node = find_node(&sys->cpg_node_list, left_list[i].nodeid,
- left_list[i].pid);
- if (node) {
- list_del(&node->list);
- free(node);
- }
- }
+ if (w->member_list_entries ==
+ w->joined_list_entries - w->left_list_entries &&
+ is_my_cpg_addr(w->member_list)) {
+ sys->join_finished = 1;
+ w->first_cpg_node = 1;
}
+ if (list_empty(&sys->cpg_node_list))
+ for_each_node_list(w->member_list, w->member_list_entries,
+ add_node, w);
+ else
+ for_each_node_list(w->joined_list, w->joined_list_entries,
+ add_node, w);
+
+ for_each_node_list(w->left_list, w->left_list_entries,
+ del_node, w);
+
if (w->first_cpg_node) {
struct join_message msg;
@@ -1046,35 +1054,37 @@ static void __sd_confchg(struct cpg_event *cevent)
print_node_list(&sys->sd_node_list);
}
-static void __sd_confchg_done(struct cpg_event *cevent)
+static void send_join_request(struct cpg_address *addr, struct work_confchg *w)
{
- struct work_confchg *w = &cevent->c;
- int i;
+ struct join_message msg;
- if (w->first_cpg_node)
- goto skip_join;
+ /* if I've just joined in cpg, I'll join in sheepdog. */
+ if (!is_my_cpg_addr(addr))
+ return;
- for (i = 0; i < w->joined_list_entries; i++) {
- if (sys->this_nodeid == w->joined_list[i].nodeid &&
- sys->this_pid == w->joined_list[i].pid) {
- struct join_message msg;
+ msg.header.op = SD_MSG_JOIN;
+ msg.header.state = DM_INIT;
+ msg.header.msg_length = sizeof(msg);
+ msg.header.from = sys->this_node;
+ msg.header.nodeid = sys->this_nodeid;
+ msg.header.pid = sys->this_pid;
- msg.header.op = SD_MSG_JOIN;
- msg.header.state = DM_INIT;
- msg.header.msg_length = sizeof(msg);
- msg.header.from = sys->this_node;
- msg.header.nodeid = sys->this_nodeid;
- msg.header.pid = sys->this_pid;
+ get_global_nr_copies(&msg.nr_sobjs);
- get_global_nr_copies(&msg.nr_sobjs);
+ send_message(sys->handle, (struct message_header *)&msg);
- send_message(sys->handle, (struct message_header *)&msg);
+ vprintf(SDOG_INFO "%u %u\n", sys->this_nodeid, sys->this_pid);
+}
- eprintf("%d\n", i);
+static void __sd_confchg_done(struct cpg_event *cevent)
+{
+ struct work_confchg *w = &cevent->c;
- break;
- }
- }
+ if (w->first_cpg_node)
+ goto skip_join;
+
+ for_each_node_list(w->joined_list, w->joined_list_entries,
+ send_join_request, w);
skip_join:
/* FIXME: worker threads can't call start_recovery */
--
1.6.5
More information about the sheepdog
mailing list