[sheepdog] [PATCH] sheep: simplify the cluster driver interface for blocking events

Christoph Hellwig hch at infradead.org
Tue Jun 5 14:07:34 CEST 2012


Let sd_block_handler handle the fine details of how to handle an incoming
blocking event.  By passing the sender node structure we can easily handle
ignoring it on other nodes, and by keeping a local operation in progress
flag in group.c we can replace the callbacked flag in the on the wire
events with a way simpler mechanism.

The only slightly complicated bit is that zk_notify_blocked in the zookeeper
backend can now go negative for a short period of time, so we explicitly
have to check for it beeing positive in two places.

Signed-off-by: Christoph Hellwig <hch at lst.de>

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 07e5f7b..80865cf 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -11,6 +11,7 @@
 #ifndef __CLUSTER_H__
 #define __CLUSTER_H__
 
+#include <stdbool.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <stdint.h>
@@ -186,7 +187,7 @@ void sd_join_handler(struct sd_node *joined, struct sd_node *members,
 void sd_leave_handler(struct sd_node *left, struct sd_node *members,
 		size_t nr_members);
 void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
-void sd_block_handler(void);
+bool sd_block_handler(struct sd_node *sender);
 enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
 		void *opaque);
 
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index abb6db1..013b978 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -47,8 +47,6 @@ struct acrd_event {
 	uint64_t ids[SD_MAX_NODES];
 
 	enum cluster_join_result join_result;
-
-	int callbacked; /* set non-zero after sd_block_handler() was called */
 };
 
 static struct sd_node this_node;
@@ -564,14 +562,8 @@ static void acrd_handler(int listen_fd, int events, void *data)
 		sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
 		break;
 	case EVENT_BLOCK:
-		if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
-			ev.callbacked = 1;
-
-			acrd_queue_push_back(ahandle, &ev);
-			sd_block_handler();
-		} else {
-			acrd_queue_push_back(ahandle, NULL);
-		}
+		acrd_queue_push_back(ahandle, NULL);
+		sd_block_handler(&ev.sender);
 		break;
 	case EVENT_NOTIFY:
 		sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 77c6710..d42a4cb 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -308,13 +308,9 @@ static int __corosync_dispatch_one(struct corosync_event *cevent)
 		sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes);
 		break;
 	case COROSYNC_EVENT_TYPE_BLOCK:
-		if (cpg_node_equal(&cevent->sender, &this_node) &&
-		    !cevent->callbacked) {
-			sd_block_handler();
-			cevent->callbacked = 1;
-		}
+		sd_block_handler(&cevent->sender.ent);
 
-		/* block the rest messages until unblock message comes */
+		/* block other messages until the unblock message comes */
 		return 0;
 	case COROSYNC_EVENT_TYPE_NOTIFY:
 		sd_notify_handler(&cevent->sender.ent, cevent->msg,
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 2c3ce24..dd3936b 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -52,8 +52,6 @@ struct local_event {
 	pid_t pids[SD_MAX_NODES];
 
 	enum cluster_join_result join_result;
-
-	int callbacked; /* set non-zero after sd_block_handler() was called */
 };
 
 
@@ -405,10 +403,7 @@ static void local_handler(int listen_fd, int events, void *data)
 		shm_queue_pop();
 		break;
 	case EVENT_BLOCK:
-		if (node_eq(&ev->sender, &this_node) && !ev->callbacked) {
-			sd_block_handler();
-			ev->callbacked = 1;
-		}
+		sd_block_handler(&ev->sender);
 		break;
 	case EVENT_NOTIFY:
 		sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 3dde1e4..6bd8e3a 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -61,8 +61,6 @@ struct zk_event {
 
 	enum cluster_join_result join_result;
 
-	int callbacked; /* set non-zero after sd_block_handler() was called */
-
 	size_t buf_len;
 	uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
 };
@@ -243,7 +241,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 	eventfd_t value = 1;
 
 	/* process leave event */
-	if (!uatomic_read(&zk_notify_blocked) &&
+	if (uatomic_read(&zk_notify_blocked) <= 0 &&
 	     uatomic_read(&nr_zk_levents)) {
 		nr_levents = uatomic_sub_return(&nr_zk_levents, 1) + 1;
 		dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head);
@@ -498,7 +496,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 	ev.type = type;
 	ev.sender = *znode;
 	ev.buf_len = buf_len;
-	ev.callbacked = 0;
 	if (buf)
 		memcpy(ev.buf, buf, buf_len);
 	zk_queue_push(zh, &ev);
@@ -515,7 +512,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
 	ev->type = EVENT_LEAVE;
 	ev->sender = *znode;
 	ev->buf_len = 0;
-	ev->callbacked = 0;
 
 	nr_levents = uatomic_add_return(&nr_zk_levents, 1);
 	dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
@@ -714,7 +710,7 @@ static void zk_handler(int listen_fd, int events, void *data)
 	if (ret < 0)
 		return;
 
-	if (uatomic_read(&zk_notify_blocked))
+	if (uatomic_read(&zk_notify_blocked) > 0)
 		return;
 
 	ret = zk_queue_pop(zhandle, &ev);
@@ -820,16 +816,9 @@ static void zk_handler(int listen_fd, int events, void *data)
 		break;
 	case EVENT_BLOCK:
 		dprintf("BLOCK\n");
-		if (node_eq(&ev.sender.node, &this_node.node)
-				&& !ev.callbacked) {
-			uatomic_inc(&zk_notify_blocked);
-			ev.callbacked = 1;
-			zk_queue_push_back(zhandle, &ev);
-			sd_block_handler();
-		} else {
-			zk_queue_push_back(zhandle, NULL);
-		}
-
+		zk_queue_push_back(zhandle, NULL);
+		if (sd_block_handler(&ev.sender.node))
+ 			uatomic_inc(&zk_notify_blocked);
 		break;
 	case EVENT_NOTIFY:
 		dprintf("NOTIFY\n");
diff --git a/sheep/group.c b/sheep/group.c
index c2679f2..d00a121 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -251,6 +251,11 @@ int get_nr_copies(struct vnode_info *vnode_info)
 	return min(vnode_info->nr_zones, sys->nr_copies);
 }
 
+/*
+ * Indicator if a cluster operation is currently running.
+ */
+static bool cluster_op_running = false;
+
 static struct vdi_op_message *prepare_cluster_msg(struct request *req,
 		size_t *sizep)
 {
@@ -295,6 +300,8 @@ static void cluster_op_done(struct work *work)
 	struct vdi_op_message *msg;
 	size_t size;
 
+	cluster_op_running = false;
+
 	msg = prepare_cluster_msg(req, &size);
 	if (!msg)
 		panic();
@@ -305,20 +312,33 @@ static void cluster_op_done(struct work *work)
 }
 
 /*
- * Perform a blocked cluster operation.
+ * Perform a blocked cluster operation if we were the node requesting it
+ * and do not have any other operation pending.
  *
- * Must run in the main thread as it access unlocked state like
+ * If this method returns false the caller must call the method again for
+ * the same event once it gets notified again.
+ *
+ * Must run in the main thread as it accesses unlocked state like
  * sys->pending_list.
  */
-void sd_block_handler(void)
+bool sd_block_handler(struct sd_node *sender)
 {
-	struct request *req = list_first_entry(&sys->pending_list,
-						struct request, pending_list);
+	struct request *req;
+
+	if (!node_eq(sender, &sys->this_node))
+		return false;
+	if (cluster_op_running)
+		return false;
+
+	cluster_op_running = true;
 
+	req = list_first_entry(&sys->pending_list,
+				struct request, pending_list);
 	req->work.fn = do_cluster_request;
 	req->work.done = cluster_op_done;
 
 	queue_work(sys->block_wqueue, &req->work);
+	return true;
 }
 
 /*



More information about the sheepdog mailing list