[Sheepdog] [PATCH] serialize all cpg events

FUJITA Tomonori fujita.tomonori at lab.ntt.co.jp
Tue Apr 20 08:59:35 CEST 2010


There is a bug that deliver and confchg events are not performed in
order.

- we link all the cpg events to sys->cpg_event_siblings.

- the above events are performed serially.

Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp>
---
 collie/collie.h |    4 +-
 collie/group.c  |  421 ++++++++++++++++++++++++++-----------------------------
 2 files changed, 203 insertions(+), 222 deletions(-)

diff --git a/collie/collie.h b/collie/collie.h
index 734b1a4..164f700 100644
--- a/collie/collie.h
+++ b/collie/collie.h
@@ -80,7 +80,9 @@ struct cluster_info {
 
 	int nr_sobjs;
 
-	struct list_head work_deliver_siblings;
+	struct list_head cpg_event_siblings;
+	struct cpg_event *cur_cevent;
+	unsigned int cpg_event_work_flags;
 };
 
 struct cluster_info *sys;
diff --git a/collie/group.c b/collie/group.c
index d8ebad9..6b2e532 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -77,9 +77,6 @@ struct vdi_op_message {
 
 struct work_deliver {
 	struct message_header *msg;
-
-	struct work work;
-	struct list_head work_deliver_list;
 };
 
 struct work_confch {
@@ -92,8 +89,22 @@ struct work_confch {
 
 	unsigned long *failed_vdis;
 	int nr_failed_vdis;
+	int first_cpg_node;
+};
+
+enum cpg_event_type {
+	CPG_EVENT_CONCHG,
+	CPG_EVENT_DELIVER,
+};
+
+struct cpg_event {
+	enum cpg_event_type ctype;
+	struct list_head cpg_event_list;
 
-	struct work work;
+	union {
+		struct work_confch c;
+		struct work_deliver d;
+	};
 };
 
 #define print_node_list(node_list)				\
@@ -753,61 +764,17 @@ out:
 	req->done(req);
 }
 
-static void handle_join(struct work_deliver *w)
+static void __sd_deliver(struct cpg_event *cevent)
 {
-	struct message_header *m;
-	struct vm *vm;
-	struct sheepdog_vm_list_entry *e;
-	int i, nr = 2000;
-	char *buf;
-
-	buf = malloc(sizeof(*m) + sizeof(*e) * nr);
-	m = (struct message_header *)buf;
-	e = (struct sheepdog_vm_list_entry *)(buf + sizeof(*m));
-
-	i = 0;
-	m->state = DM_CONT;
-	m->pid = ((struct join_message *)w->msg)->pid;
-	m->nodeid = ((struct join_message *)w->msg)->nodeid;
-
-	vprintf(SDOG_DEBUG "%u %u\n", m->pid, m->nodeid);
-
-	list_for_each_entry(vm, &sys->vm_list, list) {
-		*e = vm->ent;
-		vprintf(SDOG_DEBUG "%d %s\n", i, e->name);
-		e++;
-		i++;
-
-		if (!(i % nr)) {
-			m->msg_length = sizeof(*m) + i * sizeof(*e);
-			send_message(sys->handle, m);
-			e = (struct sheepdog_vm_list_entry *)(buf + sizeof(*m));
-			i = 0;
-		}
-	}
-
-	if (i) {
-		m->msg_length = sizeof(*m) + i * sizeof(*e);
-		vprintf(SDOG_DEBUG "%d %d\n", i, m->msg_length);
-		send_message(sys->handle, m);
-	}
-
-	m = w->msg;
-	join((struct join_message *)m);
-	m->state = DM_FIN;
-	send_message(sys->handle, m);
-}
-
-static void __sd_deliver(struct work *work, int idx)
-{
-	struct work_deliver *w = container_of(work, struct work_deliver, work);
+	struct work_deliver *w = &cevent->d;
 	struct message_header *m = w->msg;
 	char name[128];
 	struct node *node;
 
-	dprintf("op: %d, state: %u, size: %d, from: %s\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %d\n",
 		m->op, m->state, m->msg_length,
-		addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
+		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
+		m->pid);
 
 	if (m->op == SD_MSG_JOIN) {
 		uint32_t nodeid = ((struct join_message *)m)->nodeid;
@@ -823,24 +790,7 @@ static void __sd_deliver(struct work *work, int idx)
 			node->ent = m->from;
 	}
 
-	if (m->state == DM_INIT) {
-		if (!is_master())
-			return;
-
-		switch (m->op) {
-		case SD_MSG_JOIN:
-			handle_join(w);
-			break;
-		case SD_MSG_VDI_OP:
-			vdi_op((struct vdi_op_message *)m);
-			m->state = DM_FIN;
-			send_message(sys->handle, m);
-			break;
-		default:
-			eprintf("unknown message %d\n", m->op);
-			break;
-		}
-	} else if (m->state == DM_FIN) {
+	if (m->state == DM_FIN) {
 		switch (m->op) {
 		case SD_MSG_JOIN:
 			update_cluster_info((struct join_message *)m);
@@ -855,27 +805,35 @@ static void __sd_deliver(struct work *work, int idx)
 	}
 }
 
-static void __sd_deliver_done(struct work *work, int idx)
+static void __sd_deliver_done(struct cpg_event *cevent)
 {
-	struct work_deliver *w, *n = NULL;
+	struct work_deliver *w = &cevent->d;
 	struct message_header *m;
+	char name[128];
 
-	w = container_of(work, struct work_deliver, work);
 	m = w->msg;
 
-	list_del(&w->work_deliver_list);
-
-	/*
-	 * for the master node, when I finished one message, if I have
-	 * pending messages, I need to perform the first of them now.
-	 *
-	 * for the non master nodes, when I get one finished message,
-	 * if I can forget it.
-	 */
-	if (m->state == DM_FIN && !list_empty(&sys->work_deliver_siblings)) {
+	dprintf("op: %d, state: %u, size: %d, from: %s\n",
+		m->op, m->state, m->msg_length,
+		addr_to_str(name, sizeof(name), m->from.addr,
+			    m->from.port));
 
-		n = list_first_entry(&sys->work_deliver_siblings,
-				     struct work_deliver, work_deliver_list);
+	if (m->state == DM_INIT && is_master()) {
+		switch (m->op) {
+		case SD_MSG_JOIN:
+			join((struct join_message *)m);
+			m->state = DM_FIN;
+			send_message(sys->handle, m);
+			break;
+		case SD_MSG_VDI_OP:
+			vdi_op((struct vdi_op_message *)m);
+			m->state = DM_FIN;
+			send_message(sys->handle, m);
+			break;
+		default:
+			eprintf("unknown message %d\n", m->op);
+			break;
+		}
 	}
 
 	/*
@@ -887,142 +845,51 @@ static void __sd_deliver_done(struct work *work, int idx)
 		start_recovery(sys->epoch, NULL, 0);
 
 	free(w->msg);
-	free(w);
-
-	if (!n)
-		return;
-
-	if (is_master())
-		queue_work(dobj_queue, &n->work);
-	else {
-		char name[128];
-		m = n->msg;
-
-		dprintf("op: %d, state: %u, size: %d, from: %s\n",
-		m->op, m->state, m->msg_length,
-			addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
-
-		list_del(&n->work_deliver_list);
-		free(n->msg);
-		free(n);
-	}
+	free(cevent);
 }
 
+static void start_cpg_event_work(void);
+
 static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
 		       uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
+	struct cpg_event *cevent;
 	struct work_deliver *w;
 	struct message_header *m = msg;
 	char name[128];
 
-	dprintf("op: %d, state: %u, size: %d, from: %s\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, nodeid: %u, pid: %u\n",
 		m->op, m->state, m->msg_length,
-		addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
-
-	if (m->state == DM_CONT) {
-		struct sheepdog_vm_list_entry *e;
-		int nr, i;
-		struct vm *vm;
-
-		dprintf("op: %d, state: %u, size: %d, from: %s\n",
-			m->op, m->state, m->msg_length,
-			addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
+		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
+		nodeid, pid);
 
-		if (is_master())
-			return;
-
-		vprintf(SDOG_DEBUG "%u %u %u %u\n",
-			m->nodeid, m->pid, sys->this_nodeid, sys->this_pid);
-
-		if (sys->this_nodeid != m->nodeid ||
-		    sys->this_pid != m->pid)
-			return;
-
-		/* This is my JOIN message. */
-		vprintf(SDOG_DEBUG "we update the vm list\n");
-
-		nr = (m->msg_length - sizeof(*m)) / sizeof(*e);
-		e = (struct sheepdog_vm_list_entry *)((char *)msg + sizeof(*m));
-
-		for (i = 0; i < nr; i++) {
-			vm = zalloc(sizeof(*vm));
-			if (!vm)
-				break;
-
-			vm->ent = e[i];
-			vprintf(SDOG_DEBUG "%d, got %s\n", i, e[i].name);
-			list_add(&vm->list, &sys->vm_list);
-		}
+	cevent = zalloc(sizeof(*cevent));
+	if (!cevent)
 		return;
-	}
 
-	w = zalloc(sizeof(*w));
-	if (!w)
-		return;
+	cevent->ctype = CPG_EVENT_DELIVER;
+	w = &cevent->d;
+
+	vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent);
 
 	w->msg = zalloc(msg_len);
 	if (!w->msg)
 		return;
 	memcpy(w->msg, msg, msg_len);
-	INIT_LIST_HEAD(&w->work_deliver_list);
-
-	w->work.fn = __sd_deliver;
-	w->work.done = __sd_deliver_done;
-
-	if (is_master()) {
-		if (m->state == DM_INIT) {
-			int run = 0;
-
-			/*
-			 * I can broadcast this message if there is no
-			 * outstanding messages.
-			 */
-			if (list_empty(&sys->work_deliver_siblings))
-				run = 1;
-
-			list_add_tail(&w->work_deliver_list,
-				      &sys->work_deliver_siblings);
-			if (run) {
-				vprintf(SDOG_DEBUG "%u\n", pid);
-				queue_work(dobj_queue, &w->work);
-			} else
-				vprintf(SDOG_DEBUG "%u\n", pid);
 
-			return;
-		} else
-			/*
-			 * must be blocked until the message with
-			 * m->state == DM_INIT is completely finished
-			 * (__sd_deliver_done is called)
-			 */
-			w->work.attr = WORK_ORDERED;
-	} else {
-		if (m->state == DM_INIT) {
-			list_add_tail(&w->work_deliver_list,
-				      &sys->work_deliver_siblings);
-
-			/*
-			 * non master nodes just links it to
-			 * work_deliver_siblings.
-			 */
-			return;
-		} else
-		/*
-		 * __sd_deliver_done() frees requests on
-		 * work_deliver_siblings in order.
-		 */
-		w->work.attr = WORK_ORDERED;
-	}
+	if (m->state == DM_FIN)
+		list_add(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+	else
+		list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
 
-	queue_work(dobj_queue, &w->work);
+	start_cpg_event_work();
 }
 
-static void __sd_confch(struct work *work, int idx)
+static void __sd_confch(struct cpg_event *cevent)
 {
-	struct work_confch *w = container_of(work, struct work_confch, work);
+	struct work_confch *w = &cevent->c;
 	struct node *node;
 	int i;
-	int init = 0;
 
 	const struct cpg_address *member_list = w->member_list;
 	size_t member_list_entries = w->member_list_entries;
@@ -1035,7 +902,7 @@ static void __sd_confch(struct work *work, int idx)
 	    sys->this_nodeid == member_list[0].nodeid &&
 	    sys->this_pid == member_list[0].pid){
 		sys->synchronized = 1;
-		init = 1;
+		w->first_cpg_node = 1;
 	}
 
 	if (list_empty(&sys->cpg_node_list)) {
@@ -1111,7 +978,7 @@ static void __sd_confch(struct work *work, int idx)
 		}
 	}
 
-	if (init) {
+	if (w->first_cpg_node) {
 		struct join_message msg;
 
 		/*
@@ -1133,9 +1000,20 @@ static void __sd_confch(struct work *work, int idx)
 		return;
 	}
 
-	for (i = 0; i < joined_list_entries; i++) {
-		if (sys->this_nodeid == joined_list[i].nodeid &&
-		    sys->this_pid == joined_list[i].pid) {
+	print_node_list(&sys->sd_node_list);
+}
+
+static void __sd_confch_done(struct cpg_event *cevent)
+{
+	struct work_confch *w = &cevent->c;
+	int i;
+
+	if (w->first_cpg_node)
+		goto skip_join;
+
+	for (i = 0; i < w->joined_list_entries; i++) {
+		if (sys->this_nodeid == w->joined_list[i].nodeid &&
+		    sys->this_pid == w->joined_list[i].pid) {
 			struct join_message msg;
 
 			msg.header.op = SD_MSG_JOIN;
@@ -1155,16 +1033,7 @@ static void __sd_confch(struct work *work, int idx)
 		}
 	}
 
-	if (left_list_entries == 0)
-		return;
-
-	print_node_list(&sys->sd_node_list);
-}
-
-static void __sd_confch_done(struct work *work, int idx)
-{
-	struct work_confch *w = container_of(work, struct work_confch, work);
-
+skip_join:
 	/* FIXME: worker threads can't call start_recovery */
 	if (w->left_list_entries) {
 		if (w->left_list_entries > 1)
@@ -1176,7 +1045,112 @@ static void __sd_confch_done(struct work *work, int idx)
 	free(w->left_list);
 	free(w->joined_list);
 	free(w->failed_vdis);
-	free(w);
+	free(cevent);
+}
+
+static struct work cpg_event_work;
+
+#define CPG_EVENT_WORK_FLAG_RUNNING   (1U << 0)
+#define CPG_EVENT_WORK_FLAG_SUSPENDED (1U << 1)
+
+
+void cpg_event_fn(struct work *w, int idx)
+{
+	struct cpg_event *cevent = sys->cur_cevent;
+
+	vprintf(SDOG_DEBUG "%p, %d %x\n", cevent, cevent->ctype,
+		sys->cpg_event_work_flags);
+
+	/*
+	 * we can't touch sys->cpg_event_siblings because of a race
+	 * with sd_deliver() and sd_confch()...
+	 */
+
+	switch (cevent->ctype) {
+	case CPG_EVENT_CONCHG:
+		__sd_confch(cevent);
+		break;
+	case CPG_EVENT_DELIVER:
+		vprintf(SDOG_DEBUG "%d\n", cevent->d.msg->state);
+		__sd_deliver(cevent);
+		break;
+	default:
+		vprintf(SDOG_ERR "unknown event %d\n", cevent->ctype);
+	}
+}
+
+void cpg_event_done(struct work *w, int idx)
+{
+	struct cpg_event *cevent;
+
+	if (!sys->cur_cevent)
+		vprintf(SDOG_ERR "bug\n");
+
+	cevent = sys->cur_cevent;
+	sys->cur_cevent = NULL;
+
+	vprintf(SDOG_DEBUG "%p\n", cevent);
+
+	if (sys->cpg_event_work_flags & CPG_EVENT_WORK_FLAG_SUSPENDED) {
+		sys->cpg_event_work_flags &= ~CPG_EVENT_WORK_FLAG_RUNNING;
+		return;
+	}
+
+	switch (cevent->ctype) {
+	case CPG_EVENT_CONCHG:
+		__sd_confch_done(cevent);
+		break;
+	case CPG_EVENT_DELIVER:
+		if (cevent->d.msg->state == DM_INIT)
+			sys->cpg_event_work_flags |= CPG_EVENT_WORK_FLAG_SUSPENDED;
+		__sd_deliver_done(cevent);
+		break;
+	default:
+		vprintf(SDOG_ERR "unknown event %d\n", cevent->ctype);
+	}
+
+	vprintf(SDOG_DEBUG "free %p\n", cevent);
+
+	sys->cpg_event_work_flags &= ~CPG_EVENT_WORK_FLAG_RUNNING;
+
+	if (!list_empty(&sys->cpg_event_siblings))
+		start_cpg_event_work();
+}
+
+/* can be called only by the main process */
+static void start_cpg_event_work(void)
+{
+	struct cpg_event *cevent;
+
+	if (sys->cpg_event_work_flags & CPG_EVENT_WORK_FLAG_RUNNING)
+		return;
+
+	if (list_empty(&sys->cpg_event_siblings))
+		vprintf(SDOG_ERR "bug\n");
+
+	cevent = list_first_entry(&sys->cpg_event_siblings,
+				  struct cpg_event, cpg_event_list);
+
+	if (sys->cpg_event_work_flags & CPG_EVENT_WORK_FLAG_SUSPENDED) {
+		if (cevent->ctype != CPG_EVENT_DELIVER)
+			return;
+		if (cevent->d.msg->state != DM_FIN)
+			return;
+	}
+
+	list_del(&cevent->cpg_event_list);
+	sys->cur_cevent = cevent;
+
+	sys->cpg_event_work_flags &= ~CPG_EVENT_WORK_FLAG_SUSPENDED;
+	sys->cpg_event_work_flags |= CPG_EVENT_WORK_FLAG_RUNNING;
+
+	INIT_LIST_HEAD(&cpg_event_work.w_list);
+	/* we should remove this */
+	cpg_event_work.attr = WORK_ORDERED;
+	cpg_event_work.fn = cpg_event_fn;
+	cpg_event_work.done = cpg_event_done;
+
+	queue_work(dobj_queue, &cpg_event_work);
 }
 
 static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
@@ -1187,6 +1161,7 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
 		      const struct cpg_address *joined_list,
 		      size_t joined_list_entries)
 {
+	struct cpg_event *cevent;
 	struct work_confch *w = NULL;
 	int i, size;
 
@@ -1202,9 +1177,14 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
 	if (sys->status == SD_STATUS_SHUTDOWN || sys->status == SD_STATUS_INCONSISTENT_EPOCHS)
 		return;
 
-	w = zalloc(sizeof(*w));
-	if (!w)
-		return;
+	cevent = zalloc(sizeof(*cevent));
+	if (!cevent)
+		return; /* should die */
+
+	cevent->ctype = CPG_EVENT_CONCHG;
+	w = &cevent->c;
+
+	vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
 
 	size = sizeof(struct cpg_address) * member_list_entries;
 	w->member_list = zalloc(size);
@@ -1227,11 +1207,9 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
 	memcpy(w->joined_list, joined_list, size);
 	w->joined_list_entries = joined_list_entries;
 
-	w->work.fn = __sd_confch;
-	w->work.done = __sd_confch_done;
-	w->work.attr = WORK_ORDERED;
+	list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+	start_cpg_event_work();
 
-	queue_work(dobj_queue, &w->work);
 	return;
 err:
 	if (!w)
@@ -1356,7 +1334,8 @@ join_retry:
 	INIT_LIST_HEAD(&sys->cpg_node_list);
 	INIT_LIST_HEAD(&sys->vm_list);
 	INIT_LIST_HEAD(&sys->pending_list);
-	INIT_LIST_HEAD(&sys->work_deliver_siblings);
+
+	INIT_LIST_HEAD(&sys->cpg_event_siblings);
 	cpg_context_set(cpg_handle, sys);
 
 	cpg_fd_get(cpg_handle, &fd);
-- 
1.6.5




More information about the sheepdog mailing list