[sheepdog] [PATCH] sheep: process events directly

Christoph Hellwig hch at infradead.org
Mon May 28 13:41:10 CEST 2012


Now that events don't wait for outstanding I/O requests there is no good
need to delay their processing in a queue.  In fact we can already handle
notification and leave events directly without problems, only the vdi_inuse
bitmap update in the join event prevents us from process all events directly.

This patch moves the vdi_inuse update into the blocking workqueue, so that
we can make sure it is processed before any VDI events, while we can update
the cluster status and start recovery independently and remove the complex
code to queue and serialize events.

Signed-off-by: Christoph Hellwig <hch at lst.de>

---
 sheep/group.c      |  418 +++++++++++------------------------------------------
 sheep/sheep.c      |    3 
 sheep/sheep_priv.h |   16 --
 3 files changed, 90 insertions(+), 347 deletions(-)

Index: sheepdog/sheep/group.c
===================================================================
--- sheepdog.orig/sheep/group.c	2012-05-28 13:18:06.804556498 +0200
+++ sheepdog/sheep/group.c	2012-05-28 13:27:04.108570257 +0200
@@ -62,31 +62,11 @@ struct vdi_op_message {
 	uint8_t data[0];
 };
 
-struct work_notify {
-	struct event_struct cev;
-
-	struct sd_node sender;
-
-	struct request *req;
-	void *msg;
-};
-
-struct work_join {
-	struct event_struct cev;
-
-	struct sd_node *member_list;
-	size_t member_list_entries;
-	struct sd_node joined;
-
-	struct join_message *jm;
-};
-
-struct work_leave {
-	struct event_struct cev;
-
-	struct sd_node *member_list;
-	size_t member_list_entries;
-	struct sd_node left;
+struct vdi_bitmap_work {
+	struct work work;
+	DECLARE_BITMAP(vdi_inuse, SD_NR_VDIS);
+	size_t nr_members;
+	struct sd_node members[];
 };
 
 #define print_node_list(nodes, nr_nodes)			\
@@ -102,7 +82,6 @@ struct work_leave {
 	}							\
 })
 
-static int event_running;
 static struct vnode_info *current_vnode_info;
 
 static size_t get_join_message_size(struct join_message *jm)
@@ -550,6 +529,36 @@ out:
 	return ret;
 }
 
+static void do_get_vdi_bitmap(struct work *work)
+{
+	struct vdi_bitmap_work *w =
+		container_of(work, struct vdi_bitmap_work, work);
+	int i;
+
+	for (i = 0; i < w->nr_members; i++) {
+		/* We should not fetch vdi_bitmap from myself */
+		if (node_eq(&w->members[i], &sys->this_node))
+			continue;
+
+		get_vdi_bitmap_from(&w->members[i]);
+
+		/*
+		 * If a new comer try to join the running cluster, it only
+		 * need read one copy of bitmap from one of other members.
+		 */
+		if (sys_stat_wait_format())
+			break;
+	}
+}
+
+static void get_vdi_bitmap_done(struct work *work)
+{
+	struct vdi_bitmap_work *w =
+		container_of(work, struct vdi_bitmap_work, work);
+
+	free(w);
+}
+
 static void update_node_info(struct sd_node *nodes, size_t nr_nodes)
 {
 	print_node_list(nodes, nr_nodes);
@@ -666,107 +675,39 @@ static void update_cluster_info(struct j
 	}
 }
 
-static void __sd_notify(struct event_struct *cevent)
-{
-}
-
-static void __sd_notify_done(struct event_struct *cevent)
-{
-	struct work_notify *w = container_of(cevent, struct work_notify, cev);
-	struct vdi_op_message *msg = w->msg;
-	struct request *req = w->req;
-	int ret = msg->rsp.result;
-	struct sd_op_template *op = get_sd_op(msg->req.opcode);
-
-	if (ret == SD_RES_SUCCESS && has_process_main(op))
-		ret = do_process_main(op, (const struct sd_req *)&msg->req,
-				      (struct sd_rsp *)&msg->rsp, msg->data);
-
-	if (!req)
-		return;
-
-	msg->rsp.result = ret;
-	if (has_process_main(req->op))
-		memcpy(req->data, msg->data, msg->rsp.data_length);
-	memcpy(&req->rp, &msg->rsp, sizeof(req->rp));
-	req_done(req);
-}
-
 /*
  * Pass on a notification message from the cluster driver.
  *
- * Must run in the main thread as it access unlocked state like
+ * Must run in the main thread as it accesses unlocked state like
  * sys->pending_list.
  */
-void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len)
+void sd_notify_handler(struct sd_node *sender, void *data, size_t data_len)
 {
-	struct event_struct *cevent;
-	struct work_notify *w;
-
-	dprintf("size: %zd, from: %s\n", msg_len, node_to_str(sender));
-
-	w = zalloc(sizeof(*w));
-	if (!w)
-		return;
-
-	cevent = &w->cev;
-	cevent->ctype = EVENT_NOTIFY;
-
-	vprintf(SDOG_DEBUG, "allow new deliver %p\n", cevent);
+	struct vdi_op_message *msg = data;
+	struct sd_op_template *op = get_sd_op(msg->req.opcode);
+	int ret = msg->rsp.result;
+	struct request *req = NULL;
 
-	w->sender = *sender;
-	if (msg_len) {
-		w->msg = zalloc(msg_len);
-		if (!w->msg)
-			return;
-		memcpy(w->msg, msg, msg_len);
-	} else
-		w->msg = NULL;
+	dprintf("size: %zd, from: %s\n", data_len, node_to_str(sender));
 
 	if (is_myself(sender->addr, sender->port)) {
-		w->req = list_first_entry(&sys->pending_list, struct request,
-					  pending_list);
-		list_del(&w->req->pending_list);
+		req = list_first_entry(&sys->pending_list, struct request,
+				       pending_list);
+		list_del(&req->pending_list);
 	}
 
-	list_add_tail(&cevent->event_list, &sys->event_queue);
-
-	process_request_event_queues();
-}
-
-static void __sd_join(struct event_struct *cevent)
-{
-	struct work_join *w = container_of(cevent, struct work_join, cev);
-	struct join_message *msg = w->jm;
-	int i;
-
-	if (msg->cluster_status != SD_STATUS_OK &&
-	    msg->cluster_status != SD_STATUS_HALT)
-		return;
-
-	if (sys_stat_ok())
-		return;
-
-	for (i = 0; i < w->member_list_entries; i++) {
-		/* We should not fetch vdi_bitmap from myself */
-		if (node_eq(w->member_list + i, &sys->this_node))
-			continue;
-
-		get_vdi_bitmap_from(w->member_list + i);
+	if (ret == SD_RES_SUCCESS && has_process_main(op))
+		ret = do_process_main(op, &msg->req, &msg->rsp, msg->data);
 
-		/*
-		 * If a new comer try to join the running cluster, it only
-		 * need read one copy of bitmap from one of other members.
-		 */
-		if (sys_stat_wait_format())
-			break;
+	if (req) {
+		msg->rsp.result = ret;
+		if (has_process_main(req->op))
+			memcpy(req->data, msg->data, msg->rsp.data_length);
+		memcpy(&req->rp, &msg->rsp, sizeof(req->rp));
+		req_done(req);
 	}
 }
 
-static void __sd_leave(struct event_struct *cevent)
-{
-}
-
 enum cluster_join_result sd_check_join_cb(struct sd_node *joining, void *opaque)
 {
 	struct join_message *jm = opaque;
@@ -871,130 +812,6 @@ static int send_join_request(struct sd_n
 	return ret;
 }
 
-static void __sd_join_done(struct event_struct *cevent)
-{
-	struct work_join *w = container_of(cevent, struct work_join, cev);
-	struct join_message *jm = w->jm;
-	struct node *node, *t;
-
-	print_node_list(sys->nodes, sys->nr_nodes);
-
-	sys_stat_set(jm->cluster_status);
-
-	if (sys_can_recover() && jm->inc_epoch) {
-		list_for_each_entry_safe(node, t, &sys->leave_list, list) {
-			list_del(&node->list);
-		}
-		start_recovery(sys->epoch);
-	}
-
-	if (sys_stat_halt()) {
-		if (current_vnode_info->nr_zones >= sys->nr_copies)
-			sys_stat_set(SD_STATUS_OK);
-	}
-
-	if (node_eq(&w->joined, &sys->this_node))
-		/* this output is used for testing */
-		vprintf(SDOG_DEBUG, "join Sheepdog cluster\n");
-}
-
-static void __sd_leave_done(struct event_struct *cevent)
-{
-	if (sys_can_recover())
-		start_recovery(sys->epoch);
-
-	if (sys_can_halt()) {
-		if (current_vnode_info->nr_zones < sys->nr_copies)
-			sys_stat_set(SD_STATUS_HALT);
-	}
-}
-
-static void event_free(struct event_struct *cevent)
-{
-	switch (cevent->ctype) {
-	case EVENT_JOIN: {
-		struct work_join *w = container_of(cevent, struct work_join, cev);
-		free(w->member_list);
-		free(w->jm);
-		free(w);
-		break;
-	}
-	case EVENT_LEAVE: {
-		struct work_leave *w = container_of(cevent, struct work_leave, cev);
-		free(w->member_list);
-		free(w);
-		break;
-	}
-	case EVENT_NOTIFY: {
-		struct work_notify *w = container_of(cevent, struct work_notify, cev);
-		free(w->msg);
-		free(w);
-		break;
-	}
-	default:
-		break;
-	}
-}
-
-static struct work event_work;
-
-static void event_fn(struct work *work)
-{
-	struct event_struct *cevent = sys->cur_cevent;
-
-	/*
-	 * we can't touch sys->event_queue because of a race
-	 * with sd_deliver() and sd_confchg()...
-	 */
-
-	switch (cevent->ctype) {
-	case EVENT_JOIN:
-		__sd_join(cevent);
-		break;
-	case EVENT_LEAVE:
-		__sd_leave(cevent);
-		break;
-	case EVENT_NOTIFY:
-		__sd_notify(cevent);
-		break;
-	default:
-		vprintf(SDOG_ERR, "unknown event %d\n", cevent->ctype);
-	}
-}
-
-static void event_done(struct work *work)
-{
-	struct event_struct *cevent;
-
-	if (!sys->cur_cevent)
-		vprintf(SDOG_ERR, "bug\n");
-
-	cevent = sys->cur_cevent;
-	sys->cur_cevent = NULL;
-
-	vprintf(SDOG_DEBUG, "%p\n", cevent);
-
-	switch (cevent->ctype) {
-	case EVENT_JOIN:
-		__sd_join_done(cevent);
-		break;
-	case EVENT_LEAVE:
-		__sd_leave_done(cevent);
-		break;
-	case EVENT_NOTIFY:
-		__sd_notify_done(cevent);
-		break;
-	default:
-		vprintf(SDOG_ERR, "unknown event %d\n", cevent->ctype);
-	}
-
-	vprintf(SDOG_DEBUG, "free %p\n", cevent);
-	event_free(cevent);
-	event_running = 0;
-
-	process_request_event_queues();
-}
-
 int is_access_to_busy_objects(uint64_t oid)
 {
 	struct request *req;
@@ -1049,7 +866,8 @@ static inline void set_consistency_check
 	}
 }
 
-static void process_request_queue(void)
+/* can be called only by the main process */
+void process_request_event_queues(void)
 {
 	struct request *req, *n;
 
@@ -1080,47 +898,13 @@ static void process_request_queue(void)
 	}
 }
 
-static inline void process_event_queue(void)
-{
-	struct event_struct *cevent;
-	/*
-	 * we need to serialize events so we don't call queue_work
-	 * if one event is running by executing event_fn() or event_done().
-	 */
-	if (event_running)
-		return;
-
-	cevent = list_first_entry(&sys->event_queue,
-			struct event_struct, event_list);
-	list_del(&cevent->event_list);
-	sys->cur_cevent = cevent;
-
-	event_running = 1;
-
-	event_work.fn = event_fn;
-	event_work.done = event_done;
-
-	queue_work(sys->event_wqueue, &event_work);
-}
-
-/* can be called only by the main process */
-void process_request_event_queues(void)
-{
-	if (!list_empty(&sys->event_queue))
-		process_event_queue();
-	else
-		process_request_queue();
-}
-
 void sd_join_handler(struct sd_node *joined, struct sd_node *members,
 		size_t nr_members, enum cluster_join_result result,
 		void *opaque)
 {
-	struct event_struct *cevent;
-	struct work_join *w = NULL;
-	int i, size;
+	int i;
 	int nr, nr_local, nr_leave;
-	struct node *n;
+	struct node *n, *t;
 	struct join_message *jm = opaque;
 	uint32_t le = get_latest_epoch();
 
@@ -1147,33 +931,37 @@ void sd_join_handler(struct sd_node *joi
 
 		update_cluster_info(jm, joined, members, nr_members);
 
-		w = zalloc(sizeof(*w));
-		if (!w)
-			panic("failed to allocate memory");
-
-		cevent = &w->cev;
-		cevent->ctype = EVENT_JOIN;
-
-		vprintf(SDOG_DEBUG, "allow new confchg %p\n", cevent);
-
-		size = sizeof(struct sd_node) * nr_members;
-		w->member_list = zalloc(size);
-		if (!w->member_list)
-			panic("failed to allocate memory");
+		if (!sys_stat_ok() &&
+		    (jm->cluster_status == SD_STATUS_OK ||
+		     jm->cluster_status != SD_STATUS_HALT)) {
+		     	int array_len = nr_members * sizeof(struct sd_node);
+			struct vdi_bitmap_work *w;
+
+			w = xmalloc(sizeof(*w) + array_len);
+			w->nr_members = nr_members;
+			memcpy(w->members, members, array_len);
+
+			w->work.fn = do_get_vdi_bitmap;
+			w->work.done = get_vdi_bitmap_done;
+			queue_work(sys->block_wqueue, &w->work);
+		}
 
-		memcpy(w->member_list, members, size);
-		w->member_list_entries = nr_members;
+		sys_stat_set(jm->cluster_status);
 
-		w->joined = *joined;
+		if (sys_can_recover() && jm->inc_epoch) {
+			list_for_each_entry_safe(n, t, &sys->leave_list, list)
+				list_del(&n->list);
+			start_recovery(sys->epoch);
+		}
 
-		size = get_join_message_size(opaque);
-		w->jm = zalloc(size);
-		if (!w->jm)
-			panic("failed to allocate memory\n");
-		memcpy(w->jm, opaque, size);
+		if (sys_stat_halt()) {
+			if (current_vnode_info->nr_zones >= sys->nr_copies)
+				sys_stat_set(SD_STATUS_OK);
+		}
 
-		list_add_tail(&cevent->event_list, &sys->event_queue);
-		process_request_event_queues();
+		if (node_eq(joined, &sys->this_node))
+			/* this output is used for testing */
+			vprintf(SDOG_DEBUG, "join Sheepdog cluster\n");
 		break;
 	case CJ_RES_FAIL:
 	case CJ_RES_JOIN_LATER:
@@ -1250,9 +1038,7 @@ void sd_join_handler(struct sd_node *joi
 void sd_leave_handler(struct sd_node *left, struct sd_node *members,
 		size_t nr_members)
 {
-	struct event_struct *cevent;
-	struct work_leave *w = NULL;
-	int i, size;
+	int i;
 
 	dprintf("leave %s\n", node_to_str(left));
 	for (i = 0; i < nr_members; i++)
@@ -1266,38 +1052,13 @@ void sd_leave_handler(struct sd_node *le
 	if (sys_can_recover()) {
 		uatomic_inc(&sys->epoch);
 		update_epoch_log(sys->epoch, sys->nodes, sys->nr_nodes);
+		start_recovery(sys->epoch);
 	}
 
-	w = zalloc(sizeof(*w));
-	if (!w)
-		goto oom;
-
-	cevent = &w->cev;
-	cevent->ctype = EVENT_LEAVE;
-
-
-	vprintf(SDOG_DEBUG, "allow new confchg %p\n", cevent);
-
-	size = sizeof(struct sd_node) * nr_members;
-	w->member_list = zalloc(size);
-	if (!w->member_list)
-		goto oom;
-	memcpy(w->member_list, members, size);
-	w->member_list_entries = nr_members;
-
-	w->left = *left;
-
-	list_add_tail(&cevent->event_list, &sys->event_queue);
-	process_request_event_queues();
-
-	return;
-oom:
-	if (w) {
-		if (w->member_list)
-			free(w->member_list);
-		free(w);
+	if (sys_can_halt()) {
+		if (current_vnode_info->nr_zones < sys->nr_copies)
+			sys_stat_set(SD_STATUS_HALT);
 	}
-	panic("failed to allocate memory for a confchg event\n");
 }
 
 int create_cluster(int port, int64_t zone, int nr_vnodes)
@@ -1342,7 +1103,6 @@ int create_cluster(int port, int64_t zon
 	INIT_LIST_HEAD(&sys->blocking_conn_list);
 
 	INIT_LIST_HEAD(&sys->request_queue);
-	INIT_LIST_HEAD(&sys->event_queue);
 	INIT_LIST_HEAD(&sys->wait_rw_queue);
 	INIT_LIST_HEAD(&sys->wait_obj_queue);
 
Index: sheepdog/sheep/sheep.c
===================================================================
--- sheepdog.orig/sheep/sheep.c	2012-05-28 13:17:24.628555418 +0200
+++ sheepdog/sheep/sheep.c	2012-05-28 13:27:04.108570257 +0200
@@ -260,14 +260,13 @@ int main(int argc, char **argv)
 		exit(1);
 	}
 
-	sys->event_wqueue = init_work_queue(1);
 	sys->gateway_wqueue = init_work_queue(nr_gateway_worker);
 	sys->io_wqueue = init_work_queue(nr_io_worker);
 	sys->recovery_wqueue = init_work_queue(1);
 	sys->deletion_wqueue = init_work_queue(1);
 	sys->flush_wqueue = init_work_queue(1);
 	sys->block_wqueue = init_work_queue(1);
-	if (!sys->event_wqueue || !sys->gateway_wqueue || !sys->io_wqueue ||
+	if (!sys->gateway_wqueue || !sys->io_wqueue ||
 	    !sys->recovery_wqueue || !sys->deletion_wqueue ||
 	    !sys->flush_wqueue || !sys->block_wqueue)
 		exit(1);
Index: sheepdog/sheep/sheep_priv.h
===================================================================
--- sheepdog.orig/sheep/sheep_priv.h	2012-05-28 13:18:06.736556497 +0200
+++ sheepdog/sheep/sheep_priv.h	2012-05-28 13:27:04.112570258 +0200
@@ -36,19 +36,6 @@
 
 #define SD_RES_NETWORK_ERROR    0x81 /* Network error between sheep */
 
-enum event_type {
-	EVENT_JOIN,
-	EVENT_LEAVE,
-	EVENT_NOTIFY,
-};
-
-#define is_membership_change_event(x) \
-	((x) == EVENT_JOIN || (x) == EVENT_LEAVE)
-
-struct event_struct {
-	enum event_type ctype;
-	struct list_head event_list;
-};
 
 struct client_info {
 	struct connection conn;
@@ -136,10 +123,8 @@ struct cluster_info {
 	int nr_copies;
 
 	struct list_head request_queue;
-	struct list_head event_queue;
 	struct list_head wait_rw_queue;
 	struct list_head wait_obj_queue;
-	struct event_struct *cur_cevent;
 	int nr_outstanding_reqs;
 	unsigned int outstanding_data_size;
 
@@ -148,7 +133,6 @@ struct cluster_info {
 	int use_directio;
 	uint8_t async_flush;
 
-	struct work_queue *event_wqueue;
 	struct work_queue *gateway_wqueue;
 	struct work_queue *io_wqueue;
 	struct work_queue *deletion_wqueue;



More information about the sheepdog mailing list