[Sheepdog] [PATCH] split __sd_confchg

FUJITA Tomonori fujita.tomonori at lab.ntt.co.jp
Wed Apr 21 11:05:57 CEST 2010


- __sd_confchg looks too large. This cleans up it with a new helper
function, for_each_node_list().

- if we fail to add a new node to oom, we can't continue.

TODO: we need to call panic() in some failure in del_node().

Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp>
---
 collie/group.c |  232 +++++++++++++++++++++++++++++---------------------------
 1 files changed, 121 insertions(+), 111 deletions(-)

diff --git a/collie/group.c b/collie/group.c
index 7a22232..ce8ad53 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -328,21 +328,6 @@ static void group_handler(int listen_fd, int events, void *data)
 	cpg_dispatch(sys->handle, CPG_DISPATCH_ALL);
 }
 
-static void add_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid)
-{
-	struct node *node;
-
-	node = zalloc(sizeof(*node));
-	if (!node) {
-		eprintf("out of memory\n");
-		return;
-	}
-	node->nodeid = nodeid;
-	node->pid = pid;
-
-	list_add_tail(&node->list, node_list);
-}
-
 static struct node *find_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid)
 {
 	struct node *node;
@@ -928,99 +913,122 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
 	start_cpg_event_work();
 }
 
-static void __sd_confchg(struct cpg_event *cevent)
+static void for_each_node_list(struct cpg_address list[], int count,
+			       void (*func)(struct cpg_address *addr,
+					    struct work_confchg *w),
+			       struct work_confchg *w)
 {
-	struct work_confchg *w = &cevent->c;
-	struct node *node;
 	int i;
+	for (i = 0; i < count; i++)
+		func(&list[i], w);
+}
 
-	const struct cpg_address *member_list = w->member_list;
-	size_t member_list_entries = w->member_list_entries;
-	const struct cpg_address *left_list = w->left_list;
-	size_t left_list_entries = w->left_list_entries;
-	const struct cpg_address *joined_list = w->joined_list;
-	size_t joined_list_entries = w->joined_list_entries;
+static void add_node(struct cpg_address *addr, struct work_confchg *w)
+{
+	struct node *node;
 
-	if (member_list_entries == joined_list_entries - left_list_entries &&
-	    sys->this_nodeid == member_list[0].nodeid &&
-	    sys->this_pid == member_list[0].pid){
-		sys->join_finished = 1;
-		w->first_cpg_node = 1;
-	}
+	node = zalloc(sizeof(*node));
+	if (!node)
+		panic("failed to alloc memory for a new node\n");
 
-	if (list_empty(&sys->cpg_node_list)) {
-		for (i = 0; i < member_list_entries; i++)
-			add_node(&sys->cpg_node_list, member_list[i].nodeid, member_list[i].pid);
-	} else {
-		for (i = 0; i < joined_list_entries; i++)
-			add_node(&sys->cpg_node_list, joined_list[i].nodeid, joined_list[i].pid);
-	}
+	node->nodeid = addr->nodeid;
+	node->pid = addr->pid;
 
-	for (i = 0; i < left_list_entries; i++) {
-		/* the node must be on sd_node_list or cpg_node_list. */
+	list_add_tail(&node->list, &sys->cpg_node_list);
+}
 
-		node = find_node(&sys->sd_node_list, left_list[i].nodeid, left_list[i].pid);
-		if (node) {
-			int nr;
-			struct sheepdog_node_list_entry e[SD_MAX_NODES];
-			struct vm *vm, *n;
-			int ret, size;
-			uint64_t oid;
-			void *buf;
-
-			size = sizeof(*w->failed_vdis) * 64;
-			w->failed_vdis = malloc(size);
-			list_for_each_entry_safe(vm, n, &sys->vm_list, list) {
-				if (memcmp(vm->ent.host_addr, node->ent.addr,
-					   sizeof(node->ent.addr)) != 0)
-					continue;
-				if (vm->ent.host_port != node->ent.port)
-					continue;
-
-				if (w->nr_failed_vdis * sizeof(*w->failed_vdis) >= size) {
-					size *= 2;
-					buf = realloc(w->failed_vdis, size);
-					if (!buf) {
-						eprintf("out of memory, %d\n", size);
-						break;
-					}
-					w->failed_vdis = buf;
+static void del_node(struct cpg_address *addr, struct work_confchg *w)
+{
+	struct node *node;
+
+	node = find_node(&sys->sd_node_list, addr->nodeid, addr->pid);
+	if (node) {
+		int nr;
+		struct sheepdog_node_list_entry e[SD_MAX_NODES];
+		struct vm *vm, *n;
+		int ret, size;
+		uint64_t oid;
+		void *buf;
+
+		size = sizeof(*w->failed_vdis) * 64;
+		w->failed_vdis = malloc(size);
+		list_for_each_entry_safe(vm, n, &sys->vm_list, list) {
+			if (memcmp(vm->ent.host_addr, node->ent.addr,
+				   sizeof(node->ent.addr)) != 0)
+				continue;
+			if (vm->ent.host_port != node->ent.port)
+				continue;
+
+			if (w->nr_failed_vdis * sizeof(*w->failed_vdis) >= size) {
+				size *= 2;
+				buf = realloc(w->failed_vdis, size);
+				if (!buf) {
+					eprintf("out of memory, %d\n", size);
+					break;
 				}
+				w->failed_vdis = buf;
+			}
 
-				ret = lookup_vdi((char *)vm->ent.name,
-						 sizeof(vm->ent.name), &oid, 0);
-				if (ret == SD_RES_SUCCESS)
-					w->failed_vdis[w->nr_failed_vdis++] = oid_to_bit(oid);
-				else
-					eprintf("cannot find vdi %s\n", vm->ent.name);
+			ret = lookup_vdi((char *)vm->ent.name,
+					 sizeof(vm->ent.name), &oid, 0);
+			if (ret == SD_RES_SUCCESS)
+				w->failed_vdis[w->nr_failed_vdis++] = oid_to_bit(oid);
+			else
+				eprintf("cannot find vdi %s\n", vm->ent.name);
 
-				list_del(&vm->list);
-				free(vm);
-			}
+			list_del(&vm->list);
+			free(vm);
+		}
 
+		list_del(&node->list);
+		free(node);
+
+		if (sys->status == SD_STATUS_OK) {
+			nr = get_ordered_sd_node_list(e);
+			dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr);
+			epoch_log_write(sys->epoch + 1, (char *)e,
+					nr * sizeof(struct sheepdog_node_list_entry));
+
+			sys->epoch++;
+
+			update_epoch_store(sys->epoch);
+		}
+	} else {
+		node = find_node(&sys->cpg_node_list, addr->nodeid, addr->pid);
+		if (node) {
 			list_del(&node->list);
 			free(node);
+		}
+	}
+}
 
-			if (sys->status == SD_STATUS_OK) {
-				nr = get_ordered_sd_node_list(e);
-				dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr);
-				epoch_log_write(sys->epoch + 1, (char *)e,
-						nr * sizeof(struct sheepdog_node_list_entry));
+static int is_my_cpg_addr(struct cpg_address *addr)
+{
+	return (sys->this_nodeid == addr->nodeid) &&
+		(sys->this_pid == addr->pid);
+}
 
-				sys->epoch++;
+static void __sd_confchg(struct cpg_event *cevent)
+{
+	struct work_confchg *w = &cevent->c;
 
-				update_epoch_store(sys->epoch);
-			}
-		} else {
-			node = find_node(&sys->cpg_node_list, left_list[i].nodeid,
-					 left_list[i].pid);
-			if (node) {
-				list_del(&node->list);
-				free(node);
-			}
-		}
+	if (w->member_list_entries ==
+	    w->joined_list_entries - w->left_list_entries &&
+	    is_my_cpg_addr(w->member_list)) {
+		sys->join_finished = 1;
+		w->first_cpg_node = 1;
 	}
 
+	if (list_empty(&sys->cpg_node_list))
+		for_each_node_list(w->member_list, w->member_list_entries,
+				   add_node, w);
+	else
+		for_each_node_list(w->joined_list, w->joined_list_entries,
+				   add_node, w);
+
+	for_each_node_list(w->left_list, w->left_list_entries,
+			   del_node, w);
+
 	if (w->first_cpg_node) {
 		struct join_message msg;
 
@@ -1046,35 +1054,37 @@ static void __sd_confchg(struct cpg_event *cevent)
 	print_node_list(&sys->sd_node_list);
 }
 
-static void __sd_confchg_done(struct cpg_event *cevent)
+static void send_join_request(struct cpg_address *addr, struct work_confchg *w)
 {
-	struct work_confchg *w = &cevent->c;
-	int i;
+	struct join_message msg;
 
-	if (w->first_cpg_node)
-		goto skip_join;
+	/* if I've just joined in cpg, I'll join in sheepdog. */
+	if (!is_my_cpg_addr(addr))
+		return;
 
-	for (i = 0; i < w->joined_list_entries; i++) {
-		if (sys->this_nodeid == w->joined_list[i].nodeid &&
-		    sys->this_pid == w->joined_list[i].pid) {
-			struct join_message msg;
+	msg.header.op = SD_MSG_JOIN;
+	msg.header.state = DM_INIT;
+	msg.header.msg_length = sizeof(msg);
+	msg.header.from = sys->this_node;
+	msg.header.nodeid = sys->this_nodeid;
+	msg.header.pid = sys->this_pid;
 
-			msg.header.op = SD_MSG_JOIN;
-			msg.header.state = DM_INIT;
-			msg.header.msg_length = sizeof(msg);
-			msg.header.from = sys->this_node;
-			msg.header.nodeid = sys->this_nodeid;
-			msg.header.pid = sys->this_pid;
+	get_global_nr_copies(&msg.nr_sobjs);
 
-			get_global_nr_copies(&msg.nr_sobjs);
+	send_message(sys->handle, (struct message_header *)&msg);
 
-			send_message(sys->handle, (struct message_header *)&msg);
+	vprintf(SDOG_INFO "%u %u\n", sys->this_nodeid, sys->this_pid);
+}
 
-			eprintf("%d\n", i);
+static void __sd_confchg_done(struct cpg_event *cevent)
+{
+	struct work_confchg *w = &cevent->c;
 
-			break;
-		}
-	}
+	if (w->first_cpg_node)
+		goto skip_join;
+
+	for_each_node_list(w->joined_list, w->joined_list_entries,
+			   send_join_request, w);
 
 skip_join:
 	/* FIXME: worker threads can't call start_recovery */
-- 
1.6.5




More information about the sheepdog mailing list