[sheepdog] [PATCH] sheep: remove block/unblock callbacks

Liu Yuan namei.unix at gmail.com
Wed Jun 6 11:33:21 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

update: clean up strcut cluster_driver too
------------------------------------------- >8

This patch tries to completely remove block/unblock as well as
sd_block_handler() code from both cluster driver and core sheep code, simpifying
the code a lot also boost performance a lot. We actually have a very nice
construct to do blocking request handling:our top/bottom style work queue. So
the requests can be nicely ordered by a block_wqueue.

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/cluster.h           |   14 ---------
 sheep/cluster/accord.c    |   39 -------------------------
 sheep/cluster/corosync.c  |   59 ++++----------------------------------
 sheep/cluster/local.c     |   40 --------------------------
 sheep/cluster/zookeeper.c |   50 +-------------------------------
 sheep/group.c             |   69 +++++++++++----------------------------------
 6 files changed, 24 insertions(+), 247 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 07e5f7b..970e6ac 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -83,20 +83,6 @@ struct cluster_driver {
 	 */
 	int (*notify)(void *msg, size_t msg_len);
 
-	/*
-	 * Send a message to all nodes to block further events.
-	 *
-	 * Once the cluster driver has ensured that events are blocked on all
-	 * nodes it needs to call sd_block_handler() on the node where ->block
-	 * was called.
-	 */
-	void (*block)(void);
-
-	/*
-	 * Unblock events on all nodes, and send a a message to all nodes.
-	 */
-	void (*unblock)(void *msg, size_t msg_len);
-
 	struct list_head list;
 };
 
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index c3e0320..27ddb99 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -30,7 +30,6 @@ enum acrd_event_type {
 	EVENT_JOIN_REQUEST = 1,
 	EVENT_JOIN_RESPONSE,
 	EVENT_LEAVE,
-	EVENT_BLOCK,
 	EVENT_NOTIFY,
 };
 
@@ -46,8 +45,6 @@ struct acrd_event {
 	uint64_t ids[SD_MAX_NODES];
 
 	enum cluster_join_result join_result;
-
-	int callbacked; /* set non-zero after sd_block_handler() was called */
 };
 
 static struct sd_node this_node;
@@ -279,7 +276,6 @@ static int add_event(struct acrd_handle *ah, enum acrd_event_type type,
 		memmove(i, i + 1, sizeof(*i) * (ev.nr_nodes - idx));
 		break;
 	case EVENT_NOTIFY:
-	case EVENT_BLOCK:
 		break;
 	case EVENT_JOIN_RESPONSE:
 		abort();
@@ -427,29 +423,6 @@ static int accord_notify(void *msg, size_t msg_len)
 	return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len);
 }
 
-static void accord_block(void)
-{
-	add_event(ahandle, EVENT_BLOCK, &this_node, NULL, 0);
-}
-
-static void accord_unblock(void *msg, size_t msg_len)
-{
-	struct acrd_event ev;
-
-	pthread_mutex_lock(&queue_lock);
-
-	acrd_queue_pop(ahandle, &ev);
-
-	ev.type = EVENT_NOTIFY;
-	ev.buf_len = msg_len;
-	if (msg)
-		memcpy(ev.buf, msg, msg_len);
-
-	acrd_queue_push_back(ahandle, &ev);
-
-	pthread_mutex_unlock(&queue_lock);
-}
-
 static void acrd_handler(int listen_fd, int events, void *data)
 {
 	int ret;
@@ -510,16 +483,6 @@ static void acrd_handler(int listen_fd, int events, void *data)
 	case EVENT_LEAVE:
 		sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
 		break;
-	case EVENT_BLOCK:
-		if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
-			ev.callbacked = 1;
-
-			acrd_queue_push_back(ahandle, &ev);
-			sd_block_handler();
-		} else {
-			acrd_queue_push_back(ahandle, NULL);
-		}
-		break;
 	case EVENT_NOTIFY:
 		sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
 		break;
@@ -592,8 +555,6 @@ struct cluster_driver cdrv_accord = {
 	.join       = accord_join,
 	.leave      = accord_leave,
 	.notify     = accord_notify,
-	.block      = accord_block,
-	.unblock    = accord_unblock,
 };
 
 cdrv_register(cdrv_accord);
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index e6655a0..6ecbf27 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -43,7 +43,6 @@ enum corosync_event_type {
 	COROSYNC_EVENT_TYPE_JOIN_REQUEST,
 	COROSYNC_EVENT_TYPE_JOIN_RESPONSE,
 	COROSYNC_EVENT_TYPE_LEAVE,
-	COROSYNC_EVENT_TYPE_BLOCK,
 	COROSYNC_EVENT_TYPE_NOTIFY,
 };
 
@@ -53,8 +52,6 @@ enum corosync_message_type {
 	COROSYNC_MSG_TYPE_JOIN_RESPONSE,
 	COROSYNC_MSG_TYPE_LEAVE,
 	COROSYNC_MSG_TYPE_NOTIFY,
-	COROSYNC_MSG_TYPE_BLOCK,
-	COROSYNC_MSG_TYPE_UNBLOCK,
 };
 
 struct corosync_event {
@@ -291,8 +288,8 @@ static int __corosync_dispatch_one(struct corosync_event *cevent)
 		case CJ_RES_JOIN_LATER:
 			build_node_list(cpg_nodes, nr_cpg_nodes, entries);
 			sd_join_handler(&cevent->sender.ent, entries,
-						       nr_cpg_nodes, cevent->result,
-						       cevent->msg);
+					nr_cpg_nodes, cevent->result,
+					cevent->msg);
 			break;
 		}
 		break;
@@ -307,21 +304,11 @@ static int __corosync_dispatch_one(struct corosync_event *cevent)
 		build_node_list(cpg_nodes, nr_cpg_nodes, entries);
 		sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes);
 		break;
-	case COROSYNC_EVENT_TYPE_BLOCK:
-		if (cpg_node_equal(&cevent->sender, &this_node) &&
-		    !cevent->callbacked) {
-			sd_block_handler();
-			cevent->callbacked = 1;
-		}
-
-		/* block the rest messages until unblock message comes */
-		return 0;
 	case COROSYNC_EVENT_TYPE_NOTIFY:
 		sd_notify_handler(&cevent->sender.ent, cevent->msg,
-						 cevent->msg_len);
+				  cevent->msg_len);
 		break;
 	}
-
 	return 1;
 }
 
@@ -330,7 +317,8 @@ static void __corosync_dispatch(void)
 	struct corosync_event *cevent;
 
 	while (!list_empty(&corosync_event_list)) {
-		cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
+		cevent = list_first_entry(&corosync_event_list, typeof(*cevent),
+					  list);
 
 		/* update join status */
 		if (!join_finished) {
@@ -362,7 +350,6 @@ static void __corosync_dispatch(void)
 		} else {
 			switch (cevent->type) {
 			case COROSYNC_MSG_TYPE_JOIN_REQUEST:
-			case COROSYNC_MSG_TYPE_BLOCK:
 				return;
 			default:
 				break;
@@ -420,16 +407,12 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
 		cevent->sender = cmsg->sender;
 		cevent->msg_len = cmsg->msg_len;
 		break;
-	case COROSYNC_MSG_TYPE_BLOCK:
 	case COROSYNC_MSG_TYPE_NOTIFY:
 		cevent = zalloc(sizeof(*cevent));
 		if (!cevent)
 			panic("failed to allocate memory\n");
 
-		if (cmsg->type == COROSYNC_MSG_TYPE_BLOCK)
-			cevent->type = COROSYNC_EVENT_TYPE_BLOCK;
-		else
-			cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
+		cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
 
 		cevent->sender = cmsg->sender;
 		cevent->msg_len = cmsg->msg_len;
@@ -480,14 +463,6 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
 		       sizeof(*cmsg->nodes) * cmsg->nr_nodes);
 
 		break;
-	case COROSYNC_MSG_TYPE_UNBLOCK:
-		cevent = update_event(COROSYNC_EVENT_TYPE_BLOCK, &cmsg->sender,
-				      cmsg->msg, cmsg->msg_len);
-		if (!cevent)
-			break;
-
-		cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
-		break;
 	}
 
 	__corosync_dispatch();
@@ -542,14 +517,6 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
 			continue;
 		}
 
-		cevent = find_event(COROSYNC_EVENT_TYPE_BLOCK, left_sheep + i);
-		if (cevent) {
-			/* the node left before sending UNBLOCK */
-			list_del(&cevent->list);
-			free(cevent->msg);
-			free(cevent);
-		}
-
 		cevent = zalloc(sizeof(*cevent));
 		if (!cevent)
 			panic("failed to allocate memory\n");
@@ -640,18 +607,6 @@ static int corosync_leave(void)
 			    NULL, 0);
 }
 
-static void corosync_block(void)
-{
-	send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, NULL, 0,
-			    NULL, 0);
-}
-
-static void corosync_unblock(void *msg, size_t msg_len)
-{
-	send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
-		     msg, msg_len);
-}
-
 static int corosync_notify(void *msg, size_t msg_len)
 {
 	return send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
@@ -739,8 +694,6 @@ struct cluster_driver cdrv_corosync = {
 	.join       = corosync_join,
 	.leave      = corosync_leave,
 	.notify     = corosync_notify,
-	.block      = corosync_block,
-	.unblock    = corosync_unblock,
 };
 
 cdrv_register(cdrv_corosync);
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 4f2d8e8..a1f0cc6 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -36,7 +36,6 @@ enum local_event_type {
 	EVENT_JOIN_REQUEST = 1,
 	EVENT_JOIN_RESPONSE,
 	EVENT_LEAVE,
-	EVENT_BLOCK,
 	EVENT_NOTIFY,
 };
 
@@ -52,8 +51,6 @@ struct local_event {
 	pid_t pids[SD_MAX_NODES];
 
 	enum cluster_join_result join_result;
-
-	int callbacked; /* set non-zero after sd_block_handler() was called */
 };
 
 
@@ -244,7 +241,6 @@ static void add_event(enum local_event_type type, struct sd_node *node,
 		memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx));
 		break;
 	case EVENT_NOTIFY:
-	case EVENT_BLOCK:
 		break;
 	case EVENT_JOIN_RESPONSE:
 		abort();
@@ -314,34 +310,6 @@ static int local_notify(void *msg, size_t msg_len)
 	return 0;
 }
 
-static void local_block(void)
-{
-	shm_queue_lock();
-
-	add_event(EVENT_BLOCK, &this_node, NULL, 0);
-
-	shm_queue_unlock();
-}
-
-static void local_unblock(void *msg, size_t msg_len)
-{
-	struct local_event *ev;
-
-	shm_queue_lock();
-
-	ev = shm_queue_peek();
-
-	ev->type = EVENT_NOTIFY;
-	ev->buf_len = msg_len;
-	if (msg)
-		memcpy(ev->buf, msg, msg_len);
-	msync(ev, sizeof(*ev), MS_SYNC);
-
-	shm_queue_notify();
-
-	shm_queue_unlock();
-}
-
 static void local_handler(int listen_fd, int events, void *data)
 {
 	struct signalfd_siginfo siginfo;
@@ -404,12 +372,6 @@ static void local_handler(int listen_fd, int events, void *data)
 		sd_leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
 		shm_queue_pop();
 		break;
-	case EVENT_BLOCK:
-		if (node_eq(&ev->sender, &this_node) && !ev->callbacked) {
-			sd_block_handler();
-			ev->callbacked = 1;
-		}
-		break;
 	case EVENT_NOTIFY:
 		sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
 		shm_queue_pop();
@@ -467,8 +429,6 @@ struct cluster_driver cdrv_local = {
 	.join       = local_join,
 	.leave      = local_leave,
 	.notify     = local_notify,
-	.block      = local_block,
-	.unblock    = local_unblock,
 };
 
 cdrv_register(cdrv_local);
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 8499cb4..7924920 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -44,7 +44,6 @@ enum zk_event_type {
 	EVENT_JOIN_REQUEST = 1,
 	EVENT_JOIN_RESPONSE,
 	EVENT_LEAVE,
-	EVENT_BLOCK,
 	EVENT_NOTIFY,
 };
 
@@ -60,8 +59,6 @@ struct zk_event {
 
 	enum cluster_join_result join_result;
 
-	int callbacked; /* set non-zero after sd_block_handler() was called */
-
 	size_t buf_len;
 	uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
 };
@@ -82,7 +79,7 @@ static size_t nr_zk_nodes;
 
 static inline int is_blocking_event(struct zk_event *ev)
 {
-	return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_REQUEST;
+	return ev->type == EVENT_JOIN_REQUEST;
 }
 
 /* zookeeper API wrapper */
@@ -497,7 +494,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 	ev.type = type;
 	ev.sender = *znode;
 	ev.buf_len = buf_len;
-	ev.callbacked = 0;
 	if (buf)
 		memcpy(ev.buf, buf, buf_len);
 	zk_queue_push(zh, &ev);
@@ -514,7 +510,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
 	ev->type = EVENT_LEAVE;
 	ev->sender = *znode;
 	ev->buf_len = 0;
-	ev->callbacked = 0;
 
 	nr_levents = uatomic_add_return(&nr_zk_levents, 1);
 	dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
@@ -611,34 +606,6 @@ static int zk_notify(void *msg, size_t msg_len)
 	return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len);
 }
 
-static void zk_block(void)
-{
-	add_event(zhandle, EVENT_BLOCK, &this_node, NULL, 0);
-}
-
-static void zk_unblock(void *msg, size_t msg_len)
-{
-	int rc;
-	struct zk_event ev;
-	eventfd_t value = 1;
-
-	rc = zk_queue_pop(zhandle, &ev);
-	assert(rc == 0);
-
-	ev.type = EVENT_NOTIFY;
-	ev.buf_len = msg_len;
-	if (msg)
-		memcpy(ev.buf, msg, msg_len);
-
-	zk_queue_push_back(zhandle, &ev);
-
-	uatomic_dec(&zk_notify_blocked);
-
-	/* this notify is necessary */
-	dprintf("write event to efd:%d\n", efd);
-	eventfd_write(efd, value);
-}
-
 static void zk_handler(int listen_fd, int events, void *data)
 {
 	int ret, rc;
@@ -764,19 +731,6 @@ static void zk_handler(int listen_fd, int events, void *data)
 		build_node_list(zk_node_btroot);
 		sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
 		break;
-	case EVENT_BLOCK:
-		dprintf("BLOCK\n");
-		if (node_eq(&ev.sender.node, &this_node.node)
-				&& !ev.callbacked) {
-			uatomic_inc(&zk_notify_blocked);
-			ev.callbacked = 1;
-			zk_queue_push_back(zhandle, &ev);
-			sd_block_handler();
-		} else {
-			zk_queue_push_back(zhandle, NULL);
-		}
-
-		break;
 	case EVENT_NOTIFY:
 		dprintf("NOTIFY\n");
 		sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
@@ -835,8 +789,6 @@ struct cluster_driver cdrv_zookeeper = {
 	.join       = zk_join,
 	.leave      = zk_leave,
 	.notify     = zk_notify,
-	.block      = zk_block,
-	.unblock    = zk_unblock,
 };
 
 cdrv_register(cdrv_zookeeper);
diff --git a/sheep/group.c b/sheep/group.c
index c2679f2..638200f 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -252,7 +252,7 @@ int get_nr_copies(struct vnode_info *vnode_info)
 }
 
 static struct vdi_op_message *prepare_cluster_msg(struct request *req,
-		size_t *sizep)
+						  size_t *sizep)
 {
 	struct vdi_op_message *msg;
 	size_t size;
@@ -264,11 +264,7 @@ static struct vdi_op_message *prepare_cluster_msg(struct request *req,
 
 	assert(size <= SD_MAX_EVENT_BUF_SIZE);
 
-	msg = zalloc(size);
-	if (!msg) {
-		eprintf("failed to allocate memory\n");
-		return NULL;
-	}
+	msg = xzalloc(size);
 
 	memcpy(&msg->req, &req->rq, sizeof(struct sd_req));
 	memcpy(&msg->rsp, &req->rp, sizeof(struct sd_rsp));
@@ -283,9 +279,11 @@ static struct vdi_op_message *prepare_cluster_msg(struct request *req,
 static void do_cluster_request(struct work *work)
 {
 	struct request *req = container_of(work, struct request, work);
-	int ret;
+	int ret = SD_RES_SUCCESS;
+
+	if (has_process_work(req->op))
+		ret = do_process_work(req);
 
-	ret = do_process_work(req);
 	req->rp.result = ret;
 }
 
@@ -296,32 +294,14 @@ static void cluster_op_done(struct work *work)
 	size_t size;
 
 	msg = prepare_cluster_msg(req, &size);
-	if (!msg)
-		panic();
 
-	sys->cdrv->unblock(msg, size);
+	/* Kick off the cluster to process_main() */
+	sys->cdrv->notify(msg, size);
 
 	free(msg);
 }
 
 /*
- * Perform a blocked cluster operation.
- *
- * Must run in the main thread as it access unlocked state like
- * sys->pending_list.
- */
-void sd_block_handler(void)
-{
-	struct request *req = list_first_entry(&sys->pending_list,
-						struct request, pending_list);
-
-	req->work.fn = do_cluster_request;
-	req->work.done = cluster_op_done;
-
-	queue_work(sys->block_wqueue, &req->work);
-}
-
-/*
  * Execute a cluster operation by letting the cluster driver send it to all
  * nodes in the cluster.
  *
@@ -332,24 +312,10 @@ void queue_cluster_request(struct request *req)
 {
 	eprintf("%p %x\n", req, req->rq.opcode);
 
-	if (has_process_work(req->op)) {
-		list_add_tail(&req->pending_list, &sys->pending_list);
-		sys->cdrv->block();
-	} else {
-		struct vdi_op_message *msg;
-		size_t size;
-
-		msg = prepare_cluster_msg(req, &size);
-		if (!msg)
-			return;
-
-		list_add_tail(&req->pending_list, &sys->pending_list);
-
-		msg->rsp.result = SD_RES_SUCCESS;
-		sys->cdrv->notify(msg, size);
-
-		free(msg);
-	}
+	list_add_tail(&req->pending_list, &sys->pending_list);
+	req->work.fn = do_cluster_request;
+	req->work.done = cluster_op_done;
+	queue_work(sys->block_wqueue, &req->work);
 }
 
 static inline int get_nodes_nr_from(struct list_head *l)
@@ -748,20 +714,19 @@ void sd_notify_handler(struct sd_node *sender, void *data, size_t data_len)
 	struct vdi_op_message *msg = data;
 	struct sd_op_template *op = get_sd_op(msg->req.opcode);
 	int ret = msg->rsp.result;
-	struct request *req = NULL;
 
 	dprintf("size: %zd, from: %s\n", data_len, node_to_str(sender));
 
+	if (ret == SD_RES_SUCCESS && has_process_main(op))
+		ret = do_process_main(op, &msg->req, &msg->rsp, msg->data);
+
 	if (is_myself(sender->addr, sender->port)) {
+		struct request *req;
+
 		req = list_first_entry(&sys->pending_list, struct request,
 				       pending_list);
 		list_del(&req->pending_list);
-	}
-
-	if (ret == SD_RES_SUCCESS && has_process_main(op))
-		ret = do_process_main(op, &msg->req, &msg->rsp, msg->data);
 
-	if (req) {
 		msg->rsp.result = ret;
 		if (has_process_main(req->op))
 			memcpy(req->data, msg->data, msg->rsp.data_length);
-- 
1.7.10.2




More information about the sheepdog mailing list