[sheepdog] [PATCH, RFC] sheep: rewrite blocked notifications

Christoph Hellwig hch at infradead.org
Wed May 16 09:04:05 CEST 2012


The prime AIM of this patch is to fix the racy access to sys->pending_list
in do_cluster_op, but it actually cleans up the surrounding code massively
as well.

It contains three tightly related changes:

 - split a new ->block operation from ->notify.  It is used to tell the
   cluster driver to block new events, but does not contain a message by
   itself yet.
 - the block_cb callback previously passed to ->notify is not passed to
   ->block any more, but a new sd_block_handler callback is provided
   that can be called from the cluster driver in main thread context.
   sd_block_handler takes care of grabbing the first request from
   sys->pending list in the main thread, and then scheduling a workqueue
   to handle the cluster operation
 - a new ->unblock cluster operation is added which is called from the
   ->done handler of the block workqueue to tell the cluster driver
   to unblock the event processing, as well as sending the message with
   the results from the main processing (or simplify the cluster wide
   notification if there is no work routine in the ops table)

Signed-off-by: Christoph Hellwig <hch at lst.de>

---
 sheep/cluster.h           |   23 ++++++---
 sheep/cluster/accord.c    |   45 ++++++++----------
 sheep/cluster/corosync.c  |   78 ++++++-------------------------
 sheep/cluster/local.c     |   54 ++++++++++-----------
 sheep/cluster/zookeeper.c |   99 ++++++++++++++++------------------------
 sheep/group.c             |  113 ++++++++++++++++++++++++++++------------------
 sheep/sdnet.c             |    2 
 sheep/sheep.c             |    3 -
 sheep/sheep_priv.h        |    1 
 9 files changed, 195 insertions(+), 223 deletions(-)

Index: sheepdog/sheep/cluster.h
===================================================================
--- sheepdog.orig/sheep/cluster.h	2012-05-16 08:20:02.419903970 +0200
+++ sheepdog/sheep/cluster.h	2012-05-16 08:43:40.103890448 +0200
@@ -76,15 +76,23 @@ struct cluster_driver {
 	 * This function sends 'msg' to all the nodes.  The notified messages
 	 * can be read through sd_notify_handler().
 	 *
-	 * If 'block_cb' is specified, block_cb() is called before 'msg' is
-	 * notified to all the nodes.  All the cluster events including this
-	 * notification are blocked until block_cb() returns or this blocking
-	 * node leaves the cluster.  The sheep daemon can sleep in block_cb(),
-	 * so this callback must be not called from the dispatch (main) thread.
-	 *
 	 * Returns zero on success, -1 on error
 	 */
-	int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
+	int (*notify)(void *msg, size_t msg_len);
+
+	/*
+	 * Send a message to all nodes to block further events.
+	 *
+	 * Once the cluster driver has ensured that events are blocked on all
+	 * nodes it needs to call sd_block_handler() on the node where ->block
+	 * was called.
+	 */
+	void (*block)(void);
+
+	/*
+	 * Unblock events on all nodes, and send a a message to all nodes.
+	 */
+	void (*unblock)(void *msg, size_t msg_len);
 
 	/*
 	 * Dispatch handlers
@@ -189,6 +197,7 @@ void sd_join_handler(struct sd_node *joi
 void sd_leave_handler(struct sd_node *left, struct sd_node *members,
 		size_t nr_members);
 void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
+void sd_block_handler(void);
 enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
 		void *opaque);
 
Index: sheepdog/sheep/cluster/corosync.c
===================================================================
--- sheepdog.orig/sheep/cluster/corosync.c	2012-05-16 08:20:02.419903970 +0200
+++ sheepdog/sheep/cluster/corosync.c	2012-05-16 08:21:23.147903200 +0200
@@ -29,10 +29,7 @@ static struct cpg_name cpg_group = { 8,
 static corosync_cfg_handle_t cfg_handle;
 static struct cpg_node this_node;
 
-static struct work_queue *corosync_block_wq;
-
 static LIST_HEAD(corosync_event_list);
-static LIST_HEAD(corosync_block_list);
 
 static struct cpg_node cpg_nodes[SD_MAX_NODES];
 static size_t nr_cpg_nodes;
@@ -205,24 +202,6 @@ retry:
 	return 0;
 }
 
-static void corosync_block(struct work *work)
-{
-	struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
-
-	bm->cb(bm->msg);
-}
-
-static void corosync_block_done(struct work *work)
-{
-	struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
-
-	send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
-		     bm->msg, bm->msg_len);
-
-	free(bm->msg);
-	free(bm);
-}
-
 static struct corosync_event *find_block_event(enum corosync_event_type type,
 					       struct cpg_node *sender)
 {
@@ -276,7 +255,6 @@ static void build_node_list(struct cpg_n
  */
 static int __corosync_dispatch_one(struct corosync_event *cevent)
 {
-	struct corosync_block_msg *bm;
 	enum cluster_join_result res;
 	struct sd_node entries[SD_MAX_NODES];
 	int idx;
@@ -343,18 +321,7 @@ static int __corosync_dispatch_one(struc
 		if (cevent->blocked) {
 			if (cpg_node_equal(&cevent->sender, &this_node) &&
 			    !cevent->callbacked) {
-				/* call a block callback function from a worker thread */
-				if (list_empty(&corosync_block_list))
-					panic("cannot call block callback\n");
-
-				bm = list_first_entry(&corosync_block_list,
-						      typeof(*bm), list);
-				list_del(&bm->list);
-
-				bm->work.fn = corosync_block;
-				bm->work.done = corosync_block_done;
-				queue_work(corosync_block_wq, &bm->work);
-
+			    	sd_block_handler();
 				cevent->callbacked = 1;
 			}
 
@@ -653,12 +620,6 @@ static int corosync_init(const char *opt
 		return -1;
 	}
 
-	corosync_block_wq = init_work_queue(1);
-	if (!corosync_block_wq) {
-		eprintf("failed to create corosync workqueue: %m\n");
-		return -1;
-	}
-
 	return fd;
 }
 
@@ -698,31 +659,22 @@ static int corosync_leave(void)
 			    NULL, 0);
 }
 
-static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))
+static void corosync_block(void)
 {
-	int ret;
-	struct corosync_block_msg *bm;
-
-	if (block_cb) {
-		bm = zalloc(sizeof(*bm));
-		if (!bm)
-			panic("failed to allocate memory\n");
-		bm->msg = zalloc(msg_len);
-		if (!bm->msg)
-			panic("failed to allocate memory\n");
+	send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, NULL, 0,
+			    NULL, 0);
+}
 
-		memcpy(bm->msg, msg, msg_len);
-		bm->msg_len = msg_len;
-		bm->cb = block_cb;
-		list_add_tail(&bm->list, &corosync_block_list);
-
-		ret = send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node,
-				   NULL, 0, NULL, 0);
-	} else
-		ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
-				   NULL, 0, msg, msg_len);
+static void corosync_unblock(void *msg, size_t msg_len)
+{
+	send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
+		     msg, msg_len);
+}
 
-	return ret;
+static int corosync_notify(void *msg, size_t msg_len)
+{
+	return send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
+			   NULL, 0, msg, msg_len);
 }
 
 static int corosync_dispatch(void)
@@ -743,6 +695,8 @@ struct cluster_driver cdrv_corosync = {
 	.join       = corosync_join,
 	.leave      = corosync_leave,
 	.notify     = corosync_notify,
+	.block      = corosync_block,
+	.unblock    = corosync_unblock,
 	.dispatch   = corosync_dispatch,
 };
 
Index: sheepdog/sheep/group.c
===================================================================
--- sheepdog.orig/sheep/group.c	2012-05-16 08:20:02.419903970 +0200
+++ sheepdog/sheep/group.c	2012-05-16 08:21:23.151903200 +0200
@@ -209,29 +209,70 @@ int get_nr_copies(struct vnode_info *vno
 	return min(vnode_info->nr_zones, sys->nr_copies);
 }
 
+static struct vdi_op_message *prepare_cluster_msg(struct request *req,
+		size_t *sizep)
+{
+	struct vdi_op_message *msg;
+	size_t size;
+
+	if (has_process_main(req->op))
+		size = sizeof(*msg) + req->rq.data_length;
+	else
+		size = sizeof(*msg);
+
+	msg = zalloc(size);
+	if (!msg) {
+		eprintf("failed to allocate memory\n");
+		return NULL;
+	}
+
+	memcpy(&msg->req, &req->rq, sizeof(struct sd_req));
+	memcpy(&msg->rsp, &req->rp, sizeof(struct sd_rsp));
+
+	if (has_process_main(req->op))
+		memcpy(msg->data, req->data, req->rq.data_length);
+
+	*sizep = size;
+	return msg;
+}
+
+static void do_cluster_request(struct work *work)
+{
+	struct request *req = container_of(work, struct request, work);
+	int ret;
+
+	ret = do_process_work(req->op, &req->rq, &req->rp, req->data);
+	req->rp.result = ret;
+}
+
+static void cluster_op_done(struct work *work)
+{
+	struct request *req = container_of(work, struct request, work);
+	struct vdi_op_message *msg;
+	size_t size;
+
+	msg = prepare_cluster_msg(req, &size);
+	if (!msg)
+		panic();
+
+	sys->cdrv->unblock(msg, size);
+}
+
 /*
  * Perform a blocked cluster operation.
  *
  * Must run in the main thread as it access unlocked state like
  * sys->pending_list.
  */
-static void do_cluster_op(void *arg)
+void sd_block_handler(void)
 {
-	struct vdi_op_message *msg = arg;
-	int ret;
-	struct request *req;
-	void *data;
-
-	req = list_first_entry(&sys->pending_list, struct request, pending_list);
+	struct request *req = list_first_entry(&sys->pending_list,
+						struct request, pending_list);
 
-	if (has_process_main(req->op))
-		data = msg->data;
-	else
-		data = req->data;
-	ret = do_process_work(req->op, (const struct sd_req *)&msg->req,
-			      (struct sd_rsp *)&msg->rsp, data);
+	req->work.fn = do_cluster_request;
+	req->work.done = cluster_op_done;
 
-	msg->rsp.result = ret;
+	queue_work(sys->block_wqueue, &req->work);
 }
 
 /*
@@ -241,40 +282,28 @@ static void do_cluster_op(void *arg)
  * Must run in the main thread as it access unlocked state like
  * sys->pending_list.
  */
-static void do_cluster_request(struct request *req)
+static void queue_cluster_request(struct request *req)
 {
-	struct sd_req *hdr = &req->rq;
-	struct vdi_op_message *msg;
-	size_t size;
+	eprintf("%p %x\n", req, req->rq.opcode);
 
-	eprintf("%p %x\n", req, hdr->opcode);
+	if (has_process_work(req->op)) {
+		list_add_tail(&req->pending_list, &sys->pending_list);
+		sys->cdrv->block();
+	} else {
+		struct vdi_op_message *msg;
+		size_t size;
 
-	if (has_process_main(req->op))
-		size = sizeof(*msg) + hdr->data_length;
-	else
-		size = sizeof(*msg);
-
-	msg = zalloc(size);
-	if (!msg) {
-		eprintf("failed to allocate memory\n");
-		return;
-	}
-
-	msg->req = *((struct sd_vdi_req *)&req->rq);
-	msg->rsp = *((struct sd_vdi_rsp *)&req->rp);
-	if (has_process_main(req->op))
-		memcpy(msg->data, req->data, hdr->data_length);
+		msg = prepare_cluster_msg(req, &size);
+		if (!msg)
+			return;
 
-	list_add_tail(&req->pending_list, &sys->pending_list);
+		list_add_tail(&req->pending_list, &sys->pending_list);
 
-	if (has_process_work(req->op))
-		sys->cdrv->notify(msg, size, do_cluster_op);
-	else {
 		msg->rsp.result = SD_RES_SUCCESS;
-		sys->cdrv->notify(msg, size, NULL);
-	}
+		sys->cdrv->notify(msg, size);
 
-	free(msg);
+		free(msg);
+	}
 }
 
 static void group_handler(int listen_fd, int events, void *data)
@@ -1075,7 +1104,7 @@ static void process_request_queue(void)
 			 * directly from the main thread.  It's the cluster
 			 * drivers job to ensure we avoid blocking on I/O here.
 			 */
-			do_cluster_request(req);
+			queue_cluster_request(req);
 		} else { /* is_local_op(req->op) */
 			queue_work(sys->io_wqueue, &req->work);
 		}
Index: sheepdog/sheep/sdnet.c
===================================================================
--- sheepdog.orig/sheep/sdnet.c	2012-05-16 08:20:02.419903970 +0200
+++ sheepdog/sheep/sdnet.c	2012-05-16 08:21:23.151903200 +0200
@@ -318,7 +318,7 @@ static void queue_request(struct request
 		req->work.fn = do_local_request;
 		req->work.done = local_op_done;
 	} else if (is_cluster_op(req->op)) {
-		/* directly executed in the main thread */;
+		;
 	} else {
 		eprintf("unknown operation %d\n", hdr->opcode);
 		rsp->result = SD_RES_SYSTEM_ERROR;
Index: sheepdog/sheep/sheep.c
===================================================================
--- sheepdog.orig/sheep/sheep.c	2012-05-16 08:20:02.419903970 +0200
+++ sheepdog/sheep/sheep.c	2012-05-16 08:21:23.151903200 +0200
@@ -258,9 +258,10 @@ int main(int argc, char **argv)
 	sys->recovery_wqueue = init_work_queue(1);
 	sys->deletion_wqueue = init_work_queue(1);
 	sys->flush_wqueue = init_work_queue(1);
+	sys->block_wqueue = init_work_queue(1);
 	if (!sys->event_wqueue || !sys->gateway_wqueue || !sys->io_wqueue ||
 	    !sys->recovery_wqueue || !sys->deletion_wqueue ||
-	    !sys->flush_wqueue)
+	    !sys->flush_wqueue || !sys->block_wqueue)
 		exit(1);
 
 	ret = init_signal();
Index: sheepdog/sheep/sheep_priv.h
===================================================================
--- sheepdog.orig/sheep/sheep_priv.h	2012-05-16 08:20:02.419903970 +0200
+++ sheepdog/sheep/sheep_priv.h	2012-05-16 08:21:23.151903200 +0200
@@ -149,6 +149,7 @@ struct cluster_info {
 	struct work_queue *deletion_wqueue;
 	struct work_queue *recovery_wqueue;
 	struct work_queue *flush_wqueue;
+	struct work_queue *block_wqueue;
 };
 
 struct siocb {
Index: sheepdog/sheep/cluster/local.c
===================================================================
--- sheepdog.orig/sheep/cluster/local.c	2012-05-16 08:20:02.419903970 +0200
+++ sheepdog/sheep/cluster/local.c	2012-05-16 08:40:30.647892257 +0200
@@ -53,10 +53,8 @@ struct local_event {
 
 	enum cluster_join_result join_result;
 
-	void (*block_cb)(void *arg);
-
 	int blocked; /* set non-zero when sheep must block this event */
-	int callbacked; /* set non-zero if sheep already called block_cb() */
+	int callbacked; /* set non-zero after sd_block_handler() was called */
 };
 
 
@@ -215,7 +213,7 @@ static void shm_queue_init(void)
 
 static void add_event(enum local_event_type type,
 		      struct sd_node *node, void *buf,
-		      size_t buf_len, void (*block_cb)(void *arg))
+		      size_t buf_len, int blocked)
 {
 	int idx;
 	struct sd_node *n;
@@ -250,8 +248,7 @@ static void add_event(enum local_event_t
 		memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx));
 		break;
 	case EVENT_NOTIFY:
-		ev.blocked = !!block_cb;
-		ev.block_cb = block_cb;
+		ev.blocked = blocked;
 		break;
 	}
 
@@ -273,7 +270,7 @@ static void check_pids(void *arg)
 
 	for (i = 0; i < nr; i++)
 		if (!process_exists(pids[i]))
-			add_event(EVENT_LEAVE, nodes + i, NULL, 0, NULL);
+			add_event(EVENT_LEAVE, nodes + i, NULL, 0, 0);
 
 	shm_queue_unlock();
 
@@ -329,7 +326,7 @@ static int local_join(struct sd_node *my
 
 	shm_queue_lock();
 
-	add_event(EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
+	add_event(EVENT_JOIN, &this_node, opaque, opaque_len, 0);
 
 	shm_queue_unlock();
 
@@ -340,25 +337,34 @@ static int local_leave(void)
 {
 	shm_queue_lock();
 
-	add_event(EVENT_LEAVE, &this_node, NULL, 0, NULL);
+	add_event(EVENT_LEAVE, &this_node, NULL, 0, 0);
 
 	shm_queue_unlock();
 
 	return 0;
 }
 
-static int local_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
+static int local_notify(void *msg, size_t msg_len)
 {
 	shm_queue_lock();
 
-	add_event(EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
+	add_event(EVENT_NOTIFY, &this_node, msg, msg_len, 0);
 
 	shm_queue_unlock();
 
 	return 0;
 }
 
-static void local_block(struct work *work)
+static void local_block(void)
+{
+	shm_queue_lock();
+
+	add_event(EVENT_NOTIFY, &this_node, NULL, 0, 1);
+
+	shm_queue_unlock();
+}
+
+static void local_unblock(void *msg, size_t msg_len)
 {
 	struct local_event *ev;
 
@@ -366,8 +372,10 @@ static void local_block(struct work *wor
 
 	ev = shm_queue_peek();
 
-	ev->block_cb(ev->buf);
 	ev->blocked = 0;
+	ev->buf_len = msg_len;
+	if (msg)
+		memcpy(ev->buf, msg, msg_len);
 	msync(ev, sizeof(*ev), MS_SYNC);
 
 	shm_queue_notify();
@@ -375,20 +383,12 @@ static void local_block(struct work *wor
 	shm_queue_unlock();
 }
 
-static void local_block_done(struct work *work)
-{
-}
-
 static int local_dispatch(void)
 {
 	int ret;
 	struct signalfd_siginfo siginfo;
 	struct local_event *ev;
 	enum cluster_join_result res;
-	static struct work work = {
-		.fn = local_block,
-		.done = local_block_done,
-	};
 
 	dprintf("read siginfo\n");
 	ret = read(sigfd, &siginfo, sizeof(siginfo));
@@ -438,12 +438,10 @@ static int local_dispatch(void)
 		break;
 	case EVENT_NOTIFY:
 		if (ev->blocked) {
-			if (node_eq(&ev->sender, &this_node)) {
-				if (!ev->callbacked) {
-					queue_work(local_block_wq, &work);
-
-					ev->callbacked = 1;
-				}
+			if (node_eq(&ev->sender, &this_node) &&
+			    !ev->callbacked) {
+				sd_block_handler();
+				ev->callbacked = 1;
 			}
 			goto out;
 		}
@@ -466,6 +464,8 @@ struct cluster_driver cdrv_local = {
 	.join       = local_join,
 	.leave      = local_leave,
 	.notify     = local_notify,
+	.block      = local_block,
+	.unblock    = local_unblock,
 	.dispatch   = local_dispatch,
 };
 
Index: sheepdog/sheep/cluster/zookeeper.c
===================================================================
--- sheepdog.orig/sheep/cluster/zookeeper.c	2012-05-16 07:18:11.715939357 +0200
+++ sheepdog/sheep/cluster/zookeeper.c	2012-05-16 08:40:23.707892324 +0200
@@ -76,10 +76,8 @@ struct zk_event {
 
 	enum cluster_join_result join_result;
 
-	void (*block_cb)(void *arg);
-
 	int blocked; /* set non-zero when sheep must block this event */
-	int callbacked; /* set non-zero if sheep already called block_cb() */
+	int callbacked; /* set non-zero after sd_block_handler() was called */
 
 	size_t buf_len;
 	uint8_t buf[MAX_EVENT_BUF_SIZE];
@@ -498,53 +496,46 @@ static void zk_member_init(zhandle_t *zh
 /* ZooKeeper driver APIs */
 
 static zhandle_t *zhandle;
-
-static struct work_queue *zk_block_wq;
-
 static struct zk_node this_node;
 
 static int add_event(zhandle_t *zh, enum zk_event_type type,
 		     struct zk_node *znode, void *buf,
-		     size_t buf_len, void (*block_cb)(void *arg))
+		     size_t buf_len, int blocked)
 {
-	int nr_levents;
-	struct zk_event ev, *lev;
-	eventfd_t value = 1;
+	struct zk_event ev;
 
 	ev.type = type;
 	ev.sender = *znode;
 	ev.buf_len = buf_len;
 	ev.callbacked = 0;
-	ev.blocked = 0;
+	ev.blocked = blocked;
 	if (buf)
 		memcpy(ev.buf, buf, buf_len);
+	zk_queue_push(zh, &ev);
+	return 0;
+}
 
-	switch (type) {
-	case EVENT_JOIN:
-		ev.blocked = 1;
-		break;
-	case EVENT_LEAVE:
-		lev = &zk_levents[zk_levent_tail%SD_MAX_NODES];
+static int leave_event(zhandle_t *zh, struct zk_node *znode)
+{
+	int nr_levents;
+	struct zk_event *ev;
+	const eventfd_t value = 1;
 
-		memcpy(lev, &ev, sizeof(ev));
+	ev = &zk_levents[zk_levent_tail % SD_MAX_NODES];
+	ev->type = EVENT_LEAVE;
+	ev->sender = *znode;
+	ev->buf_len = 0;
+	ev->callbacked = 0;
+	ev->blocked = 0;
 
-		nr_levents = uatomic_add_return(&nr_zk_levents, 1);
-		dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
+	nr_levents = uatomic_add_return(&nr_zk_levents, 1);
+	dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
 
-		zk_levent_tail++;
+	zk_levent_tail++;
 
-		/* manual notify */
-		dprintf("write event to efd:%d\n", efd);
-		eventfd_write(efd, value);
-		goto out;
-	case EVENT_NOTIFY:
-		ev.blocked = !!block_cb;
-		ev.block_cb = block_cb;
-		break;
-	}
-
-	zk_queue_push(zh, &ev);
-out:
+	/* manual notify */
+	dprintf("write event to efd:%d\n", efd);
+	eventfd_write(efd, value);
 	return 0;
 }
 
@@ -586,7 +577,7 @@ static void watcher(zhandle_t *zh, int t
 		str_to_node(p, &znode.node);
 		dprintf("zk_nodes leave:%s\n", node_to_str(&znode.node));
 
-		add_event(zh, EVENT_LEAVE, &znode, NULL, 0, NULL);
+		leave_event(zh, &znode);
 		return;
 	}
 
@@ -674,12 +665,6 @@ static int zk_init(const char *option, u
 		return -1;
 	}
 
-	zk_block_wq = init_work_queue(1);
-	if (!zk_block_wq) {
-		eprintf("failed to create zookeeper workqueue: %m\n");
-		return -1;
-	}
-
 	return efd;
 }
 
@@ -704,7 +689,7 @@ static int zk_join(struct sd_node *mysel
 
 	dprintf("clientid:%ld\n", cid->client_id);
 
-	rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
+	rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
 
 	return rc;
 }
@@ -717,12 +702,17 @@ static int zk_leave(void)
 	return zk_delete(zhandle, path, -1);
 }
 
-static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
+static int zk_notify(void *msg, size_t msg_len)
+{
+	return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
+}
+
+static void zk_block(void)
 {
-	return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
+	add_event(zhandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
 }
 
-static void zk_block(struct work *work)
+static void zk_unblock(void *msg, size_t msg_len)
 {
 	int rc;
 	struct zk_event ev;
@@ -731,8 +721,10 @@ static void zk_block(struct work *work)
 	rc = zk_queue_pop(zhandle, &ev);
 	assert(rc == 0);
 
-	ev.block_cb(ev.buf);
 	ev.blocked = 0;
+	ev.buf_len = msg_len;
+	if (msg)
+		memcpy(ev.buf, msg, msg_len);
 
 	zk_queue_push_back(zhandle, &ev);
 
@@ -743,10 +735,6 @@ static void zk_block(struct work *work)
 	eventfd_write(efd, value);
 }
 
-static void zk_block_done(struct work *work)
-{
-}
-
 static int zk_dispatch(void)
 {
 	int ret, rc, retry;
@@ -755,10 +743,6 @@ static int zk_dispatch(void)
 	struct zk_event ev;
 	struct zk_node *n;
 	enum cluster_join_result res;
-	static struct work work = {
-		.fn = zk_block,
-		.done = zk_block_done,
-	};
 
 	dprintf("read event\n");
 	ret = eventfd_read(efd, &value);
@@ -866,13 +850,10 @@ static int zk_dispatch(void)
 		if (ev.blocked) {
 			if (node_eq(&ev.sender.node, &this_node.node)
 					&& !ev.callbacked) {
-				ev.callbacked = 1;
-
 				uatomic_inc(&zk_notify_blocked);
-
+				ev.callbacked = 1;
 				zk_queue_push_back(zhandle, &ev);
-
-				queue_work(zk_block_wq, &work);
+				sd_block_handler();
 			} else
 				zk_queue_push_back(zhandle, NULL);
 
@@ -893,6 +874,8 @@ struct cluster_driver cdrv_zookeeper = {
 	.join       = zk_join,
 	.leave      = zk_leave,
 	.notify     = zk_notify,
+	.block      = zk_block,
+	.unblock    = zk_unblock,
 	.dispatch   = zk_dispatch,
 };
 
Index: sheepdog/sheep/cluster/accord.c
===================================================================
--- sheepdog.orig/sheep/cluster/accord.c	2012-05-15 09:15:08.859954176 +0200
+++ sheepdog/sheep/cluster/accord.c	2012-05-16 08:40:37.235892186 +0200
@@ -46,10 +46,8 @@ struct acrd_event {
 
 	enum cluster_join_result join_result;
 
-	void (*block_cb)(void *arg);
-
 	int blocked; /* set non-zero when sheep must block this event */
-	int callbacked; /* set non-zero if sheep already called block_cb() */
+	int callbacked; /* set non-zero after sd_block_handler() was called */
 };
 
 static struct sd_node this_node;
@@ -246,7 +244,7 @@ again:
 
 static int add_event(struct acrd_handle *ah, enum acrd_event_type type,
 		     struct sd_node *node, void *buf,
-		     size_t buf_len, void (*block_cb)(void *arg))
+		     size_t buf_len, int blocked)
 {
 	int idx;
 	struct sd_node *n;
@@ -257,6 +255,7 @@ static int add_event(struct acrd_handle
 
 	ev.type = type;
 	ev.sender = *node;
+	ev.blocked = blocked;
 	ev.buf_len = buf_len;
 	if (buf)
 		memcpy(ev.buf, buf, buf_len);
@@ -265,7 +264,6 @@ static int add_event(struct acrd_handle
 
 	switch (type) {
 	case EVENT_JOIN:
-		ev.blocked = 1;
 		ev.nodes[ev.nr_nodes] = *node;
 		ev.ids[ev.nr_nodes] = this_id; /* must be local node */
 		ev.nr_nodes++;
@@ -282,8 +280,6 @@ static int add_event(struct acrd_handle
 		memmove(i, i + 1, sizeof(*i) * (ev.nr_nodes - idx));
 		break;
 	case EVENT_NOTIFY:
-		ev.blocked = !!block_cb;
-		ev.block_cb = block_cb;
 		break;
 	}
 
@@ -412,8 +408,7 @@ static void __acrd_leave(struct work *wo
 
 	for (i = 0; i < nr_nodes; i++) {
 		if (ids[i] == info->left_nodeid) {
-			add_event(ah, EVENT_LEAVE, nodes + i, NULL, 0,
-				  NULL);
+			add_event(ah, EVENT_LEAVE, nodes + i, NULL, 0, 0);
 			break;
 		}
 	}
@@ -517,20 +512,25 @@ static int accord_join(struct sd_node *m
 {
 	this_node = *myself;
 
-	return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
+	return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
 }
 
 static int accord_leave(void)
 {
-	return add_event(ahandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
+	return add_event(ahandle, EVENT_LEAVE, &this_node, NULL, 0, 0);
 }
 
-static int accord_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
+static int accord_notify(void *msg, size_t msg_len)
 {
-	return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
+	return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
 }
 
-static void acrd_block(struct work *work)
+static void accord_block(void)
+{
+	return add_event(ahandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
+}
+
+static void acrd_unblock(void *msg, size_t msg_len)
 {
 	struct acrd_event ev;
 
@@ -538,28 +538,22 @@ static void acrd_block(struct work *work
 
 	acrd_queue_pop(ahandle, &ev);
 
-	ev.block_cb(ev.buf);
 	ev.blocked = 0;
+	ev.buf_len = msg_len;
+	if (msg)
+		memcpy(ev.buf, msg, msg_len);
 
 	acrd_queue_push_back(ahandle, &ev);
 
 	pthread_mutex_unlock(&queue_lock);
 }
 
-static void acrd_block_done(struct work *work)
-{
-}
-
 static int accord_dispatch(void)
 {
 	int ret;
 	eventfd_t value;
 	struct acrd_event ev;
 	enum cluster_join_result res;
-	static struct work work = {
-		.fn = acrd_block,
-		.done = acrd_block_done,
-	};
 
 	dprintf("read event\n");
 	ret = eventfd_read(efd, &value);
@@ -612,11 +606,10 @@ static int accord_dispatch(void)
 	case EVENT_NOTIFY:
 		if (ev.blocked) {
 			if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
-				queue_work(acrd_wq, &work);
-
 				ev.callbacked = 1;
 
 				acrd_queue_push_back(ahandle, &ev);
+				sd_block_handler();
 			} else
 				acrd_queue_push_back(ahandle, NULL);
 
@@ -639,6 +632,8 @@ struct cluster_driver cdrv_accord = {
 	.join       = accord_join,
 	.leave      = accord_leave,
 	.notify     = accord_notify,
+	.block      = accord_block,
+	.unblock    = accord_unblock,
 	.dispatch   = accord_dispatch,
 };
 



More information about the sheepdog mailing list