[sheepdog] [PATCH 3/3] sheep: use single list for request wait queue

Liu Yuan namei.unix at gmail.com
Tue Mar 26 07:30:08 CET 2013


From: Liu Yuan <tailai.ly at taobao.com>

Multi-queues aims to provide more finer requests retry control but end up being
much more error prone. We don't need multi-queues because
 1) VM mostly just issue several IO requests in one go and won't progress until
    getting response of those previous IO requests, so most likely we don't have
    huge number of requests on the wait queues.
 2) single queue is very easy to make correct

This patch
 a) fixes a bug found by refined 043
 b) use single list instead of multi-lists for wait queue
 c) add comment for requests retry
 d) refactor the code and rename the helpers a bit

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 include/sheepdog_proto.h |    2 +-
 sheep/group.c            |    3 +-
 sheep/recovery.c         |   27 +++--------
 sheep/request.c          |  119 +++++++++++++++++++++++-----------------------
 sheep/sheep_priv.h       |   11 ++---
 5 files changed, 73 insertions(+), 89 deletions(-)

diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index ee813d6..fe3738b 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -70,7 +70,7 @@
 #define SD_RES_NO_STORE         0x20 /* No targeted backend store */
 #define SD_RES_NO_SUPPORT       0x21 /* Operation is not supported by backend store */
 #define SD_RES_NODE_IN_RECOVERY 0x22 /*	Targeted node is in recovery */
-#define SD_RES_OBJ_RECOVERING     0x23 /* Object is recovering */
+/* #define SD_RES_OBJ_RECOVERING 0x23  Object is recovering */
 #define SD_RES_KILLED           0x24 /* Node is killed */
 #define SD_RES_OID_EXIST        0x25 /* Object ID exists already */
 #define SD_RES_AGAIN            0x26 /* Ask to try again */
diff --git a/sheep/group.c b/sheep/group.c
index 9d8a9ac..b2ea2bf 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -1258,8 +1258,7 @@ int create_cluster(int port, int64_t zone, int nr_vnodes,
 	INIT_LIST_HEAD(&sys->delayed_nodes);
 
 	INIT_LIST_HEAD(&sys->local_req_queue);
-	INIT_LIST_HEAD(&sys->wait_rw_queue);
-	INIT_LIST_HEAD(&sys->wait_obj_queue);
+	INIT_LIST_HEAD(&sys->req_wait_queue);
 
 	ret = send_join_request(&sys->this_node);
 	if (ret != 0)
diff --git a/sheep/recovery.c b/sheep/recovery.c
index deba08a..7cbeea2 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -218,13 +218,6 @@ bool node_in_recovery(void)
 	return !!recovering_work;
 }
 
-bool is_recovery_init(void)
-{
-	struct recovery_work *rw = recovering_work;
-
-	return rw->state == RW_INIT;
-}
-
 static inline void prepare_schedule_oid(uint64_t oid)
 {
 	struct recovery_work *rw = recovering_work;
@@ -314,7 +307,7 @@ static inline void run_next_rw(struct recovery_work *rw)
 	rw = next_rw;
 	next_rw = NULL;
 	recovering_work = rw;
-	flush_wait_obj_requests();
+	wakeup_all_requests();
 	queue_work(sys->recovery_wqueue, &rw->work);
 	sd_dprintf("recovery work is superseded");
 }
@@ -352,6 +345,8 @@ static inline void finish_recovery(struct recovery_work *rw)
 	if (sd_store->end_recover)
 		sd_store->end_recover(sys->epoch - 1, rw->old_vinfo);
 
+	wakeup_all_requests();
+
 	/* notify recovery completion to other nodes */
 	rw->work.fn = notify_recovery_completion_work;
 	rw->work.done = notify_recovery_completion_main;
@@ -466,16 +461,15 @@ static void recover_object_main(struct work *work)
 	if (rw->stop) {
 		/*
 		 * Stop this recovery process and wait for epoch to be
-		 * lifted and flush wait_obj queue to requeue those
+		 * lifted and flush wait queue to requeue those
 		 * requests
 		 */
-		flush_wait_obj_requests();
+		wakeup_all_requests();
 		sd_dprintf("recovery is stopped");
 		return;
 	}
 
-	resume_wait_obj_requests(rw->oids[rw->done++]);
-
+	wakeup_requests_on_oid(rw->oids[rw->done++]);
 	if (rw->done < rw->count) {
 		recover_next_object(rw);
 		return;
@@ -497,13 +491,6 @@ static void finish_object_list(struct work *work)
 		finish_recovery(rw);
 		return;
 	}
-	/*
-	 * We have got the object list to be recovered locally, most of
-	 * objects are actually already being there, so let's resume
-	 * requests in the hope that most requests will be processed
-	 * without any problem.
-	 */
-	resume_wait_recovery_requests();
 	rw->work.fn = recover_object_work;
 	rw->work.done = recover_object_main;
 	recover_next_object(rw);
@@ -677,7 +664,7 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
 		queue_work(sys->recovery_wqueue, &rw->work);
 	}
 
-	resume_wait_epoch_requests();
+	wakeup_requests_on_epoch();
 
 	return 0;
 }
diff --git a/sheep/request.c b/sheep/request.c
index 57edb1f..bd0f170 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -25,6 +25,12 @@
 
 static void requeue_request(struct request *req);
 
+static void del_requeue_request(struct request *req)
+{
+	list_del(&req->request_list);
+	requeue_request(req);
+}
+
 static bool is_access_local(struct request *req, uint64_t oid)
 {
 	const struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
@@ -65,6 +71,33 @@ static void io_op_done(struct work *work)
 	return;
 }
 
+/*
+ * There are 4 cases that a request needs to sleep on wait queues for requeue:
+ *
+ *	1. Epoch of request sender is older than system epoch of receiver
+ *	   In this case, we response the sender with SD_RES_OLD_NODE_VER to
+ *	   sender so sender would put the request into its own wait queue to
+ *	   wait its system epoch get lifted and resend the request.
+ *
+ *      2. Epoch of request sender is newer than system epoch of receiver
+ *         In this case, we put the request into wait queue of receiver, to wait
+ *         system epoch of receiver to get lifted, then retry this request on
+ *         its own.
+ *
+ *      3. Object requested doesn't exist and recovery work is at RW_INIT state
+ *         In this case, we check whether the requested object exists, if so,
+ *         go process the request directly, if not put the request into wait
+ *         queue of the receiver to wait for the finish of this oid recovery.
+ *
+ *      4. Object requested doesn't exist and is being recoverred
+ *         In this case, we put the request into wait queue of receiver and when
+ *         we recover an object we try to wake up the request on this oid.
+ */
+static inline void sleep_on_wait_queue(struct request *req)
+{
+	list_add_tail(&req->request_list, &sys->req_wait_queue);
+}
+
 static void gateway_op_done(struct work *work)
 {
 	struct request *req = container_of(work, struct request, work);
@@ -73,12 +106,11 @@ static void gateway_op_done(struct work *work)
 	switch (req->rp.result) {
 	case SD_RES_OLD_NODE_VER:
 		if (req->rp.epoch > sys->epoch) {
-			list_add_tail(&req->request_list,
-				      &sys->wait_rw_queue);
 			/*
 			 * Gateway of this node is expected to process this
 			 * request later when epoch is lifted.
 			 */
+			sleep_on_wait_queue(req);
 			return;
 		}
 		/*FALLTHRU*/
@@ -128,7 +160,7 @@ static int check_request_epoch(struct request *req)
 	if (before(req->rq.epoch, sys->epoch)) {
 		sd_eprintf("old node version %u, %u (%s)",
 			   sys->epoch, req->rq.epoch, op_name(req->op));
-		/* ask gateway to retry. */
+		/* Ask for sleeping req on requester's wait queue */
 		req->rp.result = SD_RES_OLD_NODE_VER;
 		req->rp.epoch = sys->epoch;
 		put_request(req);
@@ -136,13 +168,9 @@ static int check_request_epoch(struct request *req)
 	} else if (after(req->rq.epoch, sys->epoch)) {
 		sd_eprintf("new node version %u, %u (%s)",
 			   sys->epoch, req->rq.epoch, op_name(req->op));
-
-		/*
-		 * put on local wait queue, waiting for local epoch
-		 * to be lifted
-		 */
+		/* Wait for local epoch to be lifted */
 		req->rp.result = SD_RES_NEW_NODE_VER;
-		list_add_tail(&req->request_list, &sys->wait_rw_queue);
+		sleep_on_wait_queue(req);
 		return -1;
 	}
 
@@ -168,26 +196,20 @@ static bool request_in_recovery(struct request *req)
 	 */
 	if (oid_in_recovery(req->local_oid) &&
 	    !(req->rq.flags & SD_FLAG_CMD_RECOVERY)) {
-		/* Put request on wait queues of local node */
-		if (is_recovery_init()) {
-			sd_dprintf("%"PRIx64" on rw_queue", req->local_oid);
-			req->rp.result = SD_RES_OBJ_RECOVERING;
-			list_add_tail(&req->request_list, &sys->wait_rw_queue);
-		} else {
-			sd_dprintf("%"PRIx64" on obj_queue", req->local_oid);
-			list_add_tail(&req->request_list, &sys->wait_obj_queue);
-		}
+		sd_dprintf("%"PRIx64" wait on oid", req->local_oid);
+		sleep_on_wait_queue(req);
 		return true;
 	}
 	return false;
 }
 
-void resume_wait_epoch_requests(void)
+/* Wakeup requests because of epoch mismatch */
+void wakeup_requests_on_epoch(void)
 {
 	struct request *req, *t;
 	LIST_HEAD(pending_list);
 
-	list_splice_init(&sys->wait_rw_queue, &pending_list);
+	list_splice_init(&sys->req_wait_queue, &pending_list);
 
 	list_for_each_entry_safe(req, t, &pending_list, request_list) {
 		switch (req->rp.result) {
@@ -197,75 +219,54 @@ void resume_wait_epoch_requests(void)
 			 * its epoch changes.
 			 */
 			assert(is_gateway_op(req->op));
+			sd_dprintf("gateway %"PRIx64, req->rq.obj.oid);
 			req->rq.epoch = sys->epoch;
-			list_del(&req->request_list);
-			requeue_request(req);
+			del_requeue_request(req);
 			break;
 		case SD_RES_NEW_NODE_VER:
-			/* Peer retries the request locally when its epoch changes. */
+			/*
+			 * Peer retries the request locally when its epoch
+			 * changes.
+			 */
 			assert(!is_gateway_op(req->op));
-			list_del(&req->request_list);
-			requeue_request(req);
+			sd_dprintf("peer %"PRIx64, req->rq.obj.oid);
+			del_requeue_request(req);
 			break;
 		default:
 			break;
 		}
 	}
 
-	list_splice_init(&pending_list, &sys->wait_rw_queue);
+	list_splice_init(&pending_list, &sys->req_wait_queue);
 }
 
-void resume_wait_recovery_requests(void)
+/* Wakeup the requests on the oid that was previously being recoverred */
+void wakeup_requests_on_oid(uint64_t oid)
 {
 	struct request *req, *t;
 	LIST_HEAD(pending_list);
 
-	list_splice_init(&sys->wait_rw_queue, &pending_list);
-
-	list_for_each_entry_safe(req, t, &pending_list, request_list) {
-		if (req->rp.result != SD_RES_OBJ_RECOVERING)
-			continue;
-
-		sd_dprintf("resume wait oid %" PRIx64, req->local_oid);
-		list_del(&req->request_list);
-		requeue_request(req);
-	}
-
-	list_splice_init(&pending_list, &sys->wait_rw_queue);
-}
-
-void resume_wait_obj_requests(uint64_t oid)
-{
-	struct request *req, *t;
-	LIST_HEAD(pending_list);
-
-	list_splice_init(&sys->wait_obj_queue, &pending_list);
+	list_splice_init(&sys->req_wait_queue, &pending_list);
 
 	list_for_each_entry_safe(req, t, &pending_list, request_list) {
 		if (req->local_oid != oid)
 			continue;
-
-		/*
-		 * the object requested by a pending request has been
-		 * recovered, notify the pending request.
-		 */
 		sd_dprintf("retry %" PRIx64, req->local_oid);
-		list_del(&req->request_list);
-		requeue_request(req);
+		del_requeue_request(req);
 	}
-	list_splice_init(&pending_list, &sys->wait_obj_queue);
+	list_splice_init(&pending_list, &sys->req_wait_queue);
 }
 
-void flush_wait_obj_requests(void)
+void wakeup_all_requests(void)
 {
 	struct request *req, *n;
 	LIST_HEAD(pending_list);
 
-	list_splice_init(&sys->wait_obj_queue, &pending_list);
+	list_splice_init(&sys->req_wait_queue, &pending_list);
 
 	list_for_each_entry_safe(req, n, &pending_list, request_list) {
-		list_del(&req->request_list);
-		requeue_request(req);
+		sd_dprintf("%"PRIx64, req->rq.obj.oid);
+		del_requeue_request(req);
 	}
 }
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 6180442..35cea77 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -98,8 +98,7 @@ struct cluster_info {
 
 	pthread_mutex_t local_req_lock;
 	struct list_head local_req_queue;
-	struct list_head wait_rw_queue;
-	struct list_head wait_obj_queue;
+	struct list_head req_wait_queue;
 	int nr_outstanding_reqs;
 
 	uint32_t recovered_epoch;
@@ -261,10 +260,9 @@ void wait_get_vdis_done(void);
 int get_nr_copies(struct vnode_info *vnode_info);
 
 void resume_pending_requests(void);
-void resume_wait_epoch_requests(void);
-void resume_wait_obj_requests(uint64_t oid);
-void resume_wait_recovery_requests(void);
-void flush_wait_obj_requests(void);
+void wakeup_requests_on_epoch(void);
+void wakeup_requests_on_oid(uint64_t oid);
+void wakeup_all_requests(void);
 void resume_suspended_recovery(void);
 
 int create_cluster(int port, int64_t zone, int nr_vnodes,
@@ -304,7 +302,6 @@ int objlist_cache_cleanup(uint32_t vid);
 int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo);
 void resume_recovery_work(void);
 bool oid_in_recovery(uint64_t oid);
-bool is_recovery_init(void);
 bool node_in_recovery(void);
 
 int read_backend_object(uint64_t oid, char *data, unsigned int datalen,
-- 
1.7.9.5




More information about the sheepdog mailing list