[sheepdog] [PATCH v2 2/3] sheep: introduce async request handling

Liu Yuan namei.unix at gmail.com
Tue Dec 17 07:31:57 CET 2013


This patch introduces async request framework that has the following usage
patern:

iocb = local_req_init();

loop:
    ...
    exec_local_req_async(req, iocb);
    ...

ret = local_req_wait(iocb);

wait until all the reqs finish, if any req fails, just return it from
local_req_wait();

Request throttling should be handled by the async user.

Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
 sheep/request.c    |   85 ++++++++++++++++++++++++++++++++++++++++++++++++----
 sheep/sheep.c      |    2 +-
 sheep/sheep_priv.h |   13 +++++++-
 3 files changed, 92 insertions(+), 8 deletions(-)

diff --git a/sheep/request.c b/sheep/request.c
index 97c95ae..c542221 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -499,6 +499,15 @@ static void free_local_request(struct request *req)
 	free(req);
 }
 
+static void submit_local_request(struct request *req)
+{
+	pthread_mutex_lock(&sys->local_req_lock);
+	list_add_tail(&req->request_list, &sys->local_req_queue);
+	pthread_mutex_unlock(&sys->local_req_lock);
+
+	eventfd_xwrite(sys->local_req_efd, 1);
+}
+
 /*
  * Exec the request locally and synchronously.
  *
@@ -514,16 +523,13 @@ worker_fn int exec_local_req(struct sd_req *rq, void *data)
 	req->rq = *rq;
 	req->local_req_efd = eventfd(0, 0);
 	if (req->local_req_efd < 0) {
+		sd_err("eventfd failed, %m");
 		/* Fake the result to ask for retry */
 		req->rp.result = SD_RES_NETWORK_ERROR;
 		goto out;
 	}
 
-	pthread_mutex_lock(&sys->local_req_lock);
-	list_add_tail(&req->request_list, &sys->local_req_queue);
-	pthread_mutex_unlock(&sys->local_req_lock);
-
-	eventfd_xwrite(sys->local_req_efd, 1);
+	submit_local_request(req);
 	eventfd_xread(req->local_req_efd);
 out:
 	/* fill rq with response header as exec_req does */
@@ -536,6 +542,73 @@ out:
 	return ret;
 }
 
+static void local_req_async_handler(int fd, int events, void *data)
+{
+	struct request *req = data;
+	struct request_iocb *iocb = req->iocb;
+
+	if (events & EPOLLERR)
+		sd_err("request handler error");
+	eventfd_xread(fd);
+
+	if (unlikely(req->rp.result != SD_RES_SUCCESS))
+		iocb->result = req->rp.result;
+
+	if (uatomic_sub_return(&iocb->count, 1) == 0)
+		eventfd_xwrite(iocb->efd, 1);
+
+	unregister_event(req->local_req_efd);
+	close(req->local_req_efd);
+	free_local_request(req);
+}
+
+worker_fn struct request_iocb *local_req_init(void)
+{
+	struct request_iocb *iocb = xzalloc(sizeof(*iocb));
+
+	iocb->efd = eventfd(0, 0);
+	if (iocb->efd < 0) {
+		sd_err("eventfd failed, %m");
+		free(iocb);
+		return NULL;
+	}
+	iocb->result = SD_RES_SUCCESS;
+	return iocb;
+}
+
+worker_fn int local_req_wait(struct request_iocb *iocb)
+{
+	int ret;
+
+	eventfd_xread(iocb->efd);
+
+	ret = iocb->result;
+	close(iocb->efd);
+	free(iocb);
+	return ret;
+}
+
+worker_fn int exec_local_req_async(struct sd_req *rq, void *data,
+				   struct request_iocb *iocb)
+{
+	struct request *req;
+
+	req = alloc_local_request(data, rq->data_length);
+	req->rq = *rq;
+	req->local_req_efd = eventfd(0, EFD_NONBLOCK);
+	if (req->local_req_efd < 0) {
+		sd_err("eventfd failed, %m");
+		return SD_RES_SYSTEM_ERROR;
+	}
+
+	uatomic_inc(&iocb->count);
+	req->iocb = iocb;
+	register_event(req->local_req_efd, local_req_async_handler, req);
+	submit_local_request(req);
+
+	return SD_RES_SUCCESS;
+}
+
 static struct request *alloc_request(struct client_info *ci, int data_length)
 {
 	struct request *req;
@@ -903,7 +976,7 @@ static void local_req_handler(int listen_fd, int events, void *data)
 	}
 }
 
-void local_req_init(void)
+void local_request_init(void)
 {
 	pthread_mutex_init(&sys->local_req_lock, NULL);
 	sys->local_req_efd = eventfd(0, EFD_NONBLOCK);
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 9dff267..e1d4a39 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -801,7 +801,7 @@ int main(int argc, char **argv)
 	if (ret)
 		exit(1);
 
-	local_req_init();
+	local_request_init();
 
 	ret = init_signal();
 	if (ret)
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 685aeb3..92b72f7 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -84,6 +84,12 @@ enum REQUST_STATUS {
 	REQUEST_DROPPED
 };
 
+struct request_iocb {
+	uint32_t count;
+	int efd;
+	int result;
+};
+
 struct request {
 	struct sd_req rq;
 	struct sd_rsp rp;
@@ -100,6 +106,7 @@ struct request {
 	refcnt_t refcnt;
 	bool local;
 	int local_req_efd;
+	struct request_iocb *iocb;
 
 	uint64_t local_oid;
 
@@ -367,8 +374,12 @@ int sd_read_object(uint64_t oid, char *data, unsigned int datalen,
 		   uint64_t offset);
 int sd_remove_object(uint64_t oid);
 
+struct request_iocb *local_req_init(void);
 int exec_local_req(struct sd_req *rq, void *data);
-void local_req_init(void);
+int exec_local_req_async(struct sd_req *rq, void *, struct request_iocb *);
+int local_req_wait(struct request_iocb *iocb);
+
+void local_request_init(void);
 
 int prealloc(int fd, uint32_t size);
 
-- 
1.7.9.5




More information about the sheepdog mailing list