[Sheepdog] [PATCH] serialize all cpg events

FUJITA Tomonori fujita.tomonori at lab.ntt.co.jp
Tue Apr 20 16:39:07 CEST 2010


On Tue, 20 Apr 2010 15:59:35 +0900
FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp> wrote:

> There is a bug that deliver and confchg events are not performed in
> order.
> 
> - we link all the cpg events to sys->cpg_event_siblings.
> 
> - the above events are performed serially.

There are some problems in the patch so here's v2.

=
From: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp>
Subject: [PATCH v2] serialize all cpg events

There is a bug that deliver and confchg events are not performed in
order.

- we link all the cpg events to sys->cpg_event_siblings.

- the above events are performed serially.

- we need to ignore deliver events until we join sheepdog.

- we can't use WORK_ORDERED since it blocks qemu I/Os. All cpg events
  are serialized so we don't need it.

- we can't call join() and vdi_op() that can sleep for long time in
  the main process.

- we need to think about two cases when we finish __sd_deliver_done()
 for a INIT message; a) we already got for the FIN for it or we
 haven't so we need to suspend the cpg event execution.

Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp>
---
 collie/collie.h  |    4 +-
 collie/group.c   |  504 +++++++++++++++++++++++++++++-------------------------
 include/bitops.h |    5 +
 3 files changed, 283 insertions(+), 230 deletions(-)

diff --git a/collie/collie.h b/collie/collie.h
index 734b1a4..163da8e 100644
--- a/collie/collie.h
+++ b/collie/collie.h
@@ -80,7 +80,9 @@ struct cluster_info {
 
 	int nr_sobjs;
 
-	struct list_head work_deliver_siblings;
+	struct list_head cpg_event_siblings;
+	struct cpg_event *cur_cevent;
+	unsigned long cpg_event_work_flags;
 };
 
 struct cluster_info *sys;
diff --git a/collie/group.c b/collie/group.c
index d8ebad9..f1d1a19 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -48,15 +48,13 @@ struct message_header {
 	uint8_t state;
 	uint8_t pad[2];
 	uint32_t msg_length;
-		uint32_t nodeid;
-		uint32_t pid;
+	uint32_t nodeid;
+	uint32_t pid;
 	struct sheepdog_node_list_entry from;
 };
 
 struct join_message {
 	struct message_header header;
-	uint32_t nodeid;
-	uint32_t pid;
 	uint32_t nr_nodes;
 	uint32_t nr_sobjs;
 	uint32_t cluster_status;
@@ -77,9 +75,6 @@ struct vdi_op_message {
 
 struct work_deliver {
 	struct message_header *msg;
-
-	struct work work;
-	struct list_head work_deliver_list;
 };
 
 struct work_confch {
@@ -92,8 +87,23 @@ struct work_confch {
 
 	unsigned long *failed_vdis;
 	int nr_failed_vdis;
+	int first_cpg_node;
+};
+
+enum cpg_event_type {
+	CPG_EVENT_CONCHG,
+	CPG_EVENT_DELIVER,
+};
+
+struct cpg_event {
+	enum cpg_event_type ctype;
+	struct list_head cpg_event_list;
+	unsigned int skip;
 
-	struct work work;
+	union {
+		struct work_confch c;
+		struct work_deliver d;
+	};
 };
 
 #define print_node_list(node_list)				\
@@ -109,6 +119,29 @@ struct work_confch {
 	}							\
 })
 
+enum cpg_event_work_bits {
+	CPG_EVENT_WORK_RUNNING = 1,
+	CPG_EVENT_WORK_SUSPENDED,
+};
+
+#define CPG_EVENT_WORK_FNS(bit, name)					\
+static int cpg_event_##name(void)					\
+{									\
+	return test_bit(CPG_EVENT_WORK_##bit,				\
+		&sys->cpg_event_work_flags);				\
+}									\
+static void cpg_event_clear_##name(void)				\
+{									\
+	clear_bit(CPG_EVENT_WORK_##bit, &sys->cpg_event_work_flags);	\
+}									\
+static void cpg_event_set_##name(void)					\
+{									\
+	set_bit(CPG_EVENT_WORK_##bit, &sys->cpg_event_work_flags);	\
+}
+
+CPG_EVENT_WORK_FNS(RUNNING, running)
+CPG_EVENT_WORK_FNS(SUSPENDED, suspended)
+
 static int node_cmp(const void *a, const void *b)
 {
 	const struct sheepdog_node_list_entry *node1 = a;
@@ -568,14 +601,14 @@ static void update_cluster_info(struct join_message *msg)
 	}
 
 out:
-	ret = move_node_to_sd_list(msg->nodeid, msg->pid, msg->header.from);
+	ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from);
 	/*
 	 * this should not happen since __sd_deliver() checks if the
 	 * host from msg on cpg_node_list.
 	 */
 	if (ret)
 		vprintf(SDOG_ERR "nodeid: %x, pid: %d has gone\n",
-			msg->nodeid, msg->pid);
+			msg->header.nodeid, msg->header.pid);
 
 	if (sys->status == SD_STATUS_OK) {
 		nr_nodes = get_ordered_sd_node_list(entry);
@@ -753,65 +786,32 @@ out:
 	req->done(req);
 }
 
-static void handle_join(struct work_deliver *w)
-{
-	struct message_header *m;
-	struct vm *vm;
-	struct sheepdog_vm_list_entry *e;
-	int i, nr = 2000;
-	char *buf;
-
-	buf = malloc(sizeof(*m) + sizeof(*e) * nr);
-	m = (struct message_header *)buf;
-	e = (struct sheepdog_vm_list_entry *)(buf + sizeof(*m));
-
-	i = 0;
-	m->state = DM_CONT;
-	m->pid = ((struct join_message *)w->msg)->pid;
-	m->nodeid = ((struct join_message *)w->msg)->nodeid;
-
-	vprintf(SDOG_DEBUG "%u %u\n", m->pid, m->nodeid);
-
-	list_for_each_entry(vm, &sys->vm_list, list) {
-		*e = vm->ent;
-		vprintf(SDOG_DEBUG "%d %s\n", i, e->name);
-		e++;
-		i++;
-
-		if (!(i % nr)) {
-			m->msg_length = sizeof(*m) + i * sizeof(*e);
-			send_message(sys->handle, m);
-			e = (struct sheepdog_vm_list_entry *)(buf + sizeof(*m));
-			i = 0;
-		}
-	}
-
-	if (i) {
-		m->msg_length = sizeof(*m) + i * sizeof(*e);
-		vprintf(SDOG_DEBUG "%d %d\n", i, m->msg_length);
-		send_message(sys->handle, m);
-	}
-
-	m = w->msg;
-	join((struct join_message *)m);
-	m->state = DM_FIN;
-	send_message(sys->handle, m);
-}
-
-static void __sd_deliver(struct work *work, int idx)
+static void __sd_deliver(struct cpg_event *cevent)
 {
-	struct work_deliver *w = container_of(work, struct work_deliver, work);
+	struct work_deliver *w = &cevent->d;
 	struct message_header *m = w->msg;
 	char name[128];
 	struct node *node;
 
-	dprintf("op: %d, state: %u, size: %d, from: %s\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, pid: %d\n",
 		m->op, m->state, m->msg_length,
-		addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
+		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
+		m->pid);
+
+	/*
+	 * we don't want to perform any deliver events until we
+	 * join; we wait for our JOIN message.
+	 */
+	if (!sys->synchronized) {
+		if (m->pid != sys->this_pid || m->nodeid != sys->this_nodeid) {
+			cevent->skip = 1;
+			return;
+		}
+	}
 
 	if (m->op == SD_MSG_JOIN) {
-		uint32_t nodeid = ((struct join_message *)m)->nodeid;
-		uint32_t pid = ((struct join_message *)m)->pid;
+		uint32_t nodeid = m->nodeid;
+		uint32_t pid = m->pid;
 
 		node = find_node(&sys->cpg_node_list, nodeid, pid);
 		if (!node) {
@@ -823,24 +823,21 @@ static void __sd_deliver(struct work *work, int idx)
 			node->ent = m->from;
 	}
 
-	if (m->state == DM_INIT) {
-		if (!is_master())
-			return;
-
+	if (m->state == DM_INIT && is_master()) {
 		switch (m->op) {
 		case SD_MSG_JOIN:
-			handle_join(w);
+			join((struct join_message *)m);
 			break;
 		case SD_MSG_VDI_OP:
 			vdi_op((struct vdi_op_message *)m);
-			m->state = DM_FIN;
-			send_message(sys->handle, m);
 			break;
 		default:
 			eprintf("unknown message %d\n", m->op);
 			break;
 		}
-	} else if (m->state == DM_FIN) {
+	}
+
+	if (m->state == DM_FIN) {
 		switch (m->op) {
 		case SD_MSG_JOIN:
 			update_cluster_info((struct join_message *)m);
@@ -855,27 +852,33 @@ static void __sd_deliver(struct work *work, int idx)
 	}
 }
 
-static void __sd_deliver_done(struct work *work, int idx)
+static void __sd_deliver_done(struct cpg_event *cevent)
 {
-	struct work_deliver *w, *n = NULL;
+	struct work_deliver *w = &cevent->d;
 	struct message_header *m;
+	char name[128];
 
-	w = container_of(work, struct work_deliver, work);
 	m = w->msg;
 
-	list_del(&w->work_deliver_list);
-
-	/*
-	 * for the master node, when I finished one message, if I have
-	 * pending messages, I need to perform the first of them now.
-	 *
-	 * for the non master nodes, when I get one finished message,
-	 * if I can forget it.
-	 */
-	if (m->state == DM_FIN && !list_empty(&sys->work_deliver_siblings)) {
+	dprintf("op: %d, state: %u, size: %d, from: %s\n",
+		m->op, m->state, m->msg_length,
+		addr_to_str(name, sizeof(name), m->from.addr,
+			    m->from.port));
 
-		n = list_first_entry(&sys->work_deliver_siblings,
-				     struct work_deliver, work_deliver_list);
+	if (m->state == DM_INIT && is_master()) {
+		switch (m->op) {
+		case SD_MSG_JOIN:
+			m->state = DM_FIN;
+			send_message(sys->handle, m);
+			break;
+		case SD_MSG_VDI_OP:
+			m->state = DM_FIN;
+			send_message(sys->handle, m);
+			break;
+		default:
+			eprintf("unknown message %d\n", m->op);
+			break;
+		}
 	}
 
 	/*
@@ -885,144 +888,51 @@ static void __sd_deliver_done(struct work *work, int idx)
 
 	if (m->state == DM_FIN && m->op == SD_MSG_JOIN && sys->epoch >= 2)
 		start_recovery(sys->epoch, NULL, 0);
-
-	free(w->msg);
-	free(w);
-
-	if (!n)
-		return;
-
-	if (is_master())
-		queue_work(dobj_queue, &n->work);
-	else {
-		char name[128];
-		m = n->msg;
-
-		dprintf("op: %d, state: %u, size: %d, from: %s\n",
-		m->op, m->state, m->msg_length,
-			addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
-
-		list_del(&n->work_deliver_list);
-		free(n->msg);
-		free(n);
-	}
 }
 
+static void start_cpg_event_work(void);
+
 static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
 		       uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
+	struct cpg_event *cevent;
 	struct work_deliver *w;
 	struct message_header *m = msg;
 	char name[128];
 
-	dprintf("op: %d, state: %u, size: %d, from: %s\n",
+	dprintf("op: %d, state: %u, size: %d, from: %s, nodeid: %u, pid: %u\n",
 		m->op, m->state, m->msg_length,
-		addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
-
-	if (m->state == DM_CONT) {
-		struct sheepdog_vm_list_entry *e;
-		int nr, i;
-		struct vm *vm;
+		addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
+		nodeid, pid);
 
-		dprintf("op: %d, state: %u, size: %d, from: %s\n",
-			m->op, m->state, m->msg_length,
-			addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
-
-		if (is_master())
-			return;
-
-		vprintf(SDOG_DEBUG "%u %u %u %u\n",
-			m->nodeid, m->pid, sys->this_nodeid, sys->this_pid);
-
-		if (sys->this_nodeid != m->nodeid ||
-		    sys->this_pid != m->pid)
-			return;
-
-		/* This is my JOIN message. */
-		vprintf(SDOG_DEBUG "we update the vm list\n");
-
-		nr = (m->msg_length - sizeof(*m)) / sizeof(*e);
-		e = (struct sheepdog_vm_list_entry *)((char *)msg + sizeof(*m));
-
-		for (i = 0; i < nr; i++) {
-			vm = zalloc(sizeof(*vm));
-			if (!vm)
-				break;
-
-			vm->ent = e[i];
-			vprintf(SDOG_DEBUG "%d, got %s\n", i, e[i].name);
-			list_add(&vm->list, &sys->vm_list);
-		}
+	cevent = zalloc(sizeof(*cevent));
+	if (!cevent)
 		return;
-	}
 
-	w = zalloc(sizeof(*w));
-	if (!w)
-		return;
+	cevent->ctype = CPG_EVENT_DELIVER;
+	w = &cevent->d;
+
+	vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent);
 
 	w->msg = zalloc(msg_len);
 	if (!w->msg)
 		return;
 	memcpy(w->msg, msg, msg_len);
-	INIT_LIST_HEAD(&w->work_deliver_list);
-
-	w->work.fn = __sd_deliver;
-	w->work.done = __sd_deliver_done;
-
-	if (is_master()) {
-		if (m->state == DM_INIT) {
-			int run = 0;
-
-			/*
-			 * I can broadcast this message if there is no
-			 * outstanding messages.
-			 */
-			if (list_empty(&sys->work_deliver_siblings))
-				run = 1;
-
-			list_add_tail(&w->work_deliver_list,
-				      &sys->work_deliver_siblings);
-			if (run) {
-				vprintf(SDOG_DEBUG "%u\n", pid);
-				queue_work(dobj_queue, &w->work);
-			} else
-				vprintf(SDOG_DEBUG "%u\n", pid);
 
-			return;
-		} else
-			/*
-			 * must be blocked until the message with
-			 * m->state == DM_INIT is completely finished
-			 * (__sd_deliver_done is called)
-			 */
-			w->work.attr = WORK_ORDERED;
-	} else {
-		if (m->state == DM_INIT) {
-			list_add_tail(&w->work_deliver_list,
-				      &sys->work_deliver_siblings);
-
-			/*
-			 * non master nodes just links it to
-			 * work_deliver_siblings.
-			 */
-			return;
-		} else
-		/*
-		 * __sd_deliver_done() frees requests on
-		 * work_deliver_siblings in order.
-		 */
-		w->work.attr = WORK_ORDERED;
-	}
+	if (cpg_event_suspended() && m->state == DM_FIN) {
+		list_add(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+		cpg_event_clear_suspended();
+	} else
+		list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
 
-	queue_work(dobj_queue, &w->work);
+	start_cpg_event_work();
 }
 
-static void __sd_confch(struct work *work, int idx)
+static void __sd_confch(struct cpg_event *cevent)
 {
-	struct work_confch *w = container_of(work, struct work_confch, work);
+	struct work_confch *w = &cevent->c;
 	struct node *node;
 	int i;
-	int init = 0;
 
 	const struct cpg_address *member_list = w->member_list;
 	size_t member_list_entries = w->member_list_entries;
@@ -1035,7 +945,7 @@ static void __sd_confch(struct work *work, int idx)
 	    sys->this_nodeid == member_list[0].nodeid &&
 	    sys->this_pid == member_list[0].pid){
 		sys->synchronized = 1;
-		init = 1;
+		w->first_cpg_node = 1;
 	}
 
 	if (list_empty(&sys->cpg_node_list)) {
@@ -1111,7 +1021,7 @@ static void __sd_confch(struct work *work, int idx)
 		}
 	}
 
-	if (init) {
+	if (w->first_cpg_node) {
 		struct join_message msg;
 
 		/*
@@ -1124,8 +1034,8 @@ static void __sd_confch(struct work *work, int idx)
 		memset(&msg, 0, sizeof(msg));
 
 		msg.header.from = sys->this_node;
-		msg.nodeid = sys->this_nodeid;
-		msg.pid = sys->this_pid;
+		msg.header.nodeid = sys->this_nodeid;
+		msg.header.pid = sys->this_pid;
 		msg.cluster_status = get_cluster_status(&msg.header.from);
 
 		update_cluster_info(&msg);
@@ -1133,17 +1043,28 @@ static void __sd_confch(struct work *work, int idx)
 		return;
 	}
 
-	for (i = 0; i < joined_list_entries; i++) {
-		if (sys->this_nodeid == joined_list[i].nodeid &&
-		    sys->this_pid == joined_list[i].pid) {
+	print_node_list(&sys->sd_node_list);
+}
+
+static void __sd_confch_done(struct cpg_event *cevent)
+{
+	struct work_confch *w = &cevent->c;
+	int i;
+
+	if (w->first_cpg_node)
+		goto skip_join;
+
+	for (i = 0; i < w->joined_list_entries; i++) {
+		if (sys->this_nodeid == w->joined_list[i].nodeid &&
+		    sys->this_pid == w->joined_list[i].pid) {
 			struct join_message msg;
 
 			msg.header.op = SD_MSG_JOIN;
 			msg.header.state = DM_INIT;
 			msg.header.msg_length = sizeof(msg);
 			msg.header.from = sys->this_node;
-			msg.nodeid = sys->this_nodeid;
-			msg.pid = sys->this_pid;
+			msg.header.nodeid = sys->this_nodeid;
+			msg.header.pid = sys->this_pid;
 
 			get_global_nr_copies(&msg.nr_sobjs);
 
@@ -1155,28 +1076,148 @@ static void __sd_confch(struct work *work, int idx)
 		}
 	}
 
-	if (left_list_entries == 0)
-		return;
-
-	print_node_list(&sys->sd_node_list);
-}
-
-static void __sd_confch_done(struct work *work, int idx)
-{
-	struct work_confch *w = container_of(work, struct work_confch, work);
-
+skip_join:
 	/* FIXME: worker threads can't call start_recovery */
 	if (w->left_list_entries) {
 		if (w->left_list_entries > 1)
 			eprintf("we can't handle %Zd\n", w->left_list_entries);
 		start_recovery(sys->epoch, w->failed_vdis, w->nr_failed_vdis);
 	}
+}
 
-	free(w->member_list);
-	free(w->left_list);
-	free(w->joined_list);
-	free(w->failed_vdis);
-	free(w);
+static void cpg_event_free(struct cpg_event *cevent)
+{
+	switch (cevent->ctype) {
+	case CPG_EVENT_CONCHG: {
+		struct work_confch *w = &cevent->c;
+		free(w->member_list);
+		free(w->left_list);
+		free(w->joined_list);
+		free(w->failed_vdis);
+		break;
+	}
+	case CPG_EVENT_DELIVER: {
+		struct work_deliver *w = &cevent->d;
+		free(w->msg);
+		break;
+	}
+	}
+	free(cevent);
+}
+
+static struct work cpg_event_work;
+
+void cpg_event_fn(struct work *w, int idx)
+{
+	struct cpg_event *cevent = sys->cur_cevent;
+
+	vprintf(SDOG_DEBUG "%p, %d %lx\n", cevent, cevent->ctype,
+		sys->cpg_event_work_flags);
+
+	/*
+	 * we can't touch sys->cpg_event_siblings because of a race
+	 * with sd_deliver() and sd_confch()...
+	 */
+
+	switch (cevent->ctype) {
+	case CPG_EVENT_CONCHG:
+		__sd_confch(cevent);
+		break;
+	case CPG_EVENT_DELIVER:
+		vprintf(SDOG_DEBUG "%d\n", cevent->d.msg->state);
+		__sd_deliver(cevent);
+		break;
+	default:
+		vprintf(SDOG_ERR "unknown event %d\n", cevent->ctype);
+	}
+}
+
+void cpg_event_done(struct work *w, int idx)
+{
+	struct cpg_event *cevent;
+
+	if (!sys->cur_cevent)
+		vprintf(SDOG_ERR "bug\n");
+
+	cevent = sys->cur_cevent;
+	sys->cur_cevent = NULL;
+
+	vprintf(SDOG_DEBUG "%p\n", cevent);
+
+	if (cpg_event_suspended()) {
+		cpg_event_clear_running();
+		return;
+	}
+
+	if (cevent->skip)
+		goto out;
+
+	switch (cevent->ctype) {
+	case CPG_EVENT_CONCHG:
+		__sd_confch_done(cevent);
+		break;
+	case CPG_EVENT_DELIVER:
+		if (cevent->d.msg->state == DM_INIT) {
+			struct cpg_event *f_cevent;
+
+			list_for_each_entry(f_cevent, &sys->cpg_event_siblings,
+					    cpg_event_list) {
+				if (f_cevent->ctype == CPG_EVENT_DELIVER &&
+				    f_cevent->d.msg->state == DM_FIN) {
+					vprintf("already got fin %p\n",
+						f_cevent);
+
+					list_del(&f_cevent->cpg_event_list);
+					list_add(&f_cevent->cpg_event_list,
+						 &sys->cpg_event_siblings);
+					goto got_fin;
+				}
+			}
+			cpg_event_set_suspended();
+		}
+	got_fin:
+		__sd_deliver_done(cevent);
+		break;
+	default:
+		vprintf(SDOG_ERR "unknown event %d\n", cevent->ctype);
+	}
+
+out:
+	vprintf(SDOG_DEBUG "free %p\n", cevent);
+	cpg_event_free(cevent);
+	cpg_event_clear_running();
+
+	if (!list_empty(&sys->cpg_event_siblings) && !cpg_event_suspended())
+		start_cpg_event_work();
+}
+
+/* can be called only by the main process */
+static void start_cpg_event_work(void)
+{
+	struct cpg_event *cevent;
+
+	if (cpg_event_running())
+		return;
+
+	if (list_empty(&sys->cpg_event_siblings))
+		vprintf(SDOG_ERR "bug\n");
+
+	cevent = list_first_entry(&sys->cpg_event_siblings,
+				  struct cpg_event, cpg_event_list);
+
+	if (cpg_event_suspended())
+		return;
+
+	list_del(&cevent->cpg_event_list);
+	sys->cur_cevent = cevent;
+
+	cpg_event_set_running();
+
+	INIT_LIST_HEAD(&cpg_event_work.w_list);
+	cpg_event_work.fn = cpg_event_fn;
+	cpg_event_work.done = cpg_event_done;
+
+	queue_work(dobj_queue, &cpg_event_work);
 }
 
 static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
@@ -1187,6 +1228,7 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
 		      const struct cpg_address *joined_list,
 		      size_t joined_list_entries)
 {
+	struct cpg_event *cevent;
 	struct work_confch *w = NULL;
 	int i, size;
 
@@ -1202,9 +1244,14 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
 	if (sys->status == SD_STATUS_SHUTDOWN || sys->status == SD_STATUS_INCONSISTENT_EPOCHS)
 		return;
 
-	w = zalloc(sizeof(*w));
-	if (!w)
-		return;
+	cevent = zalloc(sizeof(*cevent));
+	if (!cevent)
+		return; /* should die */
+
+	cevent->ctype = CPG_EVENT_CONCHG;
+	w = &cevent->c;
+
+	vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
 
 	size = sizeof(struct cpg_address) * member_list_entries;
 	w->member_list = zalloc(size);
@@ -1227,11 +1274,9 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
 	memcpy(w->joined_list, joined_list, size);
 	w->joined_list_entries = joined_list_entries;
 
-	w->work.fn = __sd_confch;
-	w->work.done = __sd_confch_done;
-	w->work.attr = WORK_ORDERED;
+	list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+	start_cpg_event_work();
 
-	queue_work(dobj_queue, &w->work);
 	return;
 err:
 	if (!w)
@@ -1356,7 +1401,8 @@ join_retry:
 	INIT_LIST_HEAD(&sys->cpg_node_list);
 	INIT_LIST_HEAD(&sys->vm_list);
 	INIT_LIST_HEAD(&sys->pending_list);
-	INIT_LIST_HEAD(&sys->work_deliver_siblings);
+
+	INIT_LIST_HEAD(&sys->cpg_event_siblings);
 	cpg_context_set(cpg_handle, sys);
 
 	cpg_fd_get(cpg_handle, &fd);
diff --git a/include/bitops.h b/include/bitops.h
index e3191dd..6c85d5e 100644
--- a/include/bitops.h
+++ b/include/bitops.h
@@ -130,3 +130,8 @@ static inline int test_bit(unsigned int nr, const unsigned long *addr)
 	return ((1UL << (nr % BITS_PER_LONG)) &
 		(((unsigned long *)addr)[nr / BITS_PER_LONG])) != 0;
 }
+
+static inline void clear_bit(unsigned int nr, unsigned long *addr)
+{
+	addr[nr / BITS_PER_LONG] &= ~(1UL << (nr % BITS_PER_LONG));
+}
-- 
1.6.5




More information about the sheepdog mailing list