[sheepdog] [PATCH v2 2/3] sheep: introduce async request handling
Liu Yuan
namei.unix at gmail.com
Tue Dec 17 07:31:57 CET 2013
This patch introduces async request framework that has the following usage
patern:
iocb = local_req_init();
loop:
...
exec_local_req_async(req, iocb);
...
ret = local_req_wait(iocb);
wait until all the reqs finish, if any req fails, just return it from
local_req_wait();
Request throttling should be handled by the async user.
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
sheep/request.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++++----
sheep/sheep.c | 2 +-
sheep/sheep_priv.h | 13 +++++++-
3 files changed, 92 insertions(+), 8 deletions(-)
diff --git a/sheep/request.c b/sheep/request.c
index 97c95ae..c542221 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -499,6 +499,15 @@ static void free_local_request(struct request *req)
free(req);
}
+static void submit_local_request(struct request *req)
+{
+ pthread_mutex_lock(&sys->local_req_lock);
+ list_add_tail(&req->request_list, &sys->local_req_queue);
+ pthread_mutex_unlock(&sys->local_req_lock);
+
+ eventfd_xwrite(sys->local_req_efd, 1);
+}
+
/*
* Exec the request locally and synchronously.
*
@@ -514,16 +523,13 @@ worker_fn int exec_local_req(struct sd_req *rq, void *data)
req->rq = *rq;
req->local_req_efd = eventfd(0, 0);
if (req->local_req_efd < 0) {
+ sd_err("eventfd failed, %m");
/* Fake the result to ask for retry */
req->rp.result = SD_RES_NETWORK_ERROR;
goto out;
}
- pthread_mutex_lock(&sys->local_req_lock);
- list_add_tail(&req->request_list, &sys->local_req_queue);
- pthread_mutex_unlock(&sys->local_req_lock);
-
- eventfd_xwrite(sys->local_req_efd, 1);
+ submit_local_request(req);
eventfd_xread(req->local_req_efd);
out:
/* fill rq with response header as exec_req does */
@@ -536,6 +542,73 @@ out:
return ret;
}
+static void local_req_async_handler(int fd, int events, void *data)
+{
+ struct request *req = data;
+ struct request_iocb *iocb = req->iocb;
+
+ if (events & EPOLLERR)
+ sd_err("request handler error");
+ eventfd_xread(fd);
+
+ if (unlikely(req->rp.result != SD_RES_SUCCESS))
+ iocb->result = req->rp.result;
+
+ if (uatomic_sub_return(&iocb->count, 1) == 0)
+ eventfd_xwrite(iocb->efd, 1);
+
+ unregister_event(req->local_req_efd);
+ close(req->local_req_efd);
+ free_local_request(req);
+}
+
+worker_fn struct request_iocb *local_req_init(void)
+{
+ struct request_iocb *iocb = xzalloc(sizeof(*iocb));
+
+ iocb->efd = eventfd(0, 0);
+ if (iocb->efd < 0) {
+ sd_err("eventfd failed, %m");
+ free(iocb);
+ return NULL;
+ }
+ iocb->result = SD_RES_SUCCESS;
+ return iocb;
+}
+
+worker_fn int local_req_wait(struct request_iocb *iocb)
+{
+ int ret;
+
+ eventfd_xread(iocb->efd);
+
+ ret = iocb->result;
+ close(iocb->efd);
+ free(iocb);
+ return ret;
+}
+
+worker_fn int exec_local_req_async(struct sd_req *rq, void *data,
+ struct request_iocb *iocb)
+{
+ struct request *req;
+
+ req = alloc_local_request(data, rq->data_length);
+ req->rq = *rq;
+ req->local_req_efd = eventfd(0, EFD_NONBLOCK);
+ if (req->local_req_efd < 0) {
+ sd_err("eventfd failed, %m");
+ return SD_RES_SYSTEM_ERROR;
+ }
+
+ uatomic_inc(&iocb->count);
+ req->iocb = iocb;
+ register_event(req->local_req_efd, local_req_async_handler, req);
+ submit_local_request(req);
+
+ return SD_RES_SUCCESS;
+}
+
static struct request *alloc_request(struct client_info *ci, int data_length)
{
struct request *req;
@@ -903,7 +976,7 @@ static void local_req_handler(int listen_fd, int events, void *data)
}
}
-void local_req_init(void)
+void local_request_init(void)
{
pthread_mutex_init(&sys->local_req_lock, NULL);
sys->local_req_efd = eventfd(0, EFD_NONBLOCK);
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 9dff267..e1d4a39 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -801,7 +801,7 @@ int main(int argc, char **argv)
if (ret)
exit(1);
- local_req_init();
+ local_request_init();
ret = init_signal();
if (ret)
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 685aeb3..92b72f7 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -84,6 +84,12 @@ enum REQUST_STATUS {
REQUEST_DROPPED
};
+struct request_iocb {
+ uint32_t count;
+ int efd;
+ int result;
+};
+
struct request {
struct sd_req rq;
struct sd_rsp rp;
@@ -100,6 +106,7 @@ struct request {
refcnt_t refcnt;
bool local;
int local_req_efd;
+ struct request_iocb *iocb;
uint64_t local_oid;
@@ -367,8 +374,12 @@ int sd_read_object(uint64_t oid, char *data, unsigned int datalen,
uint64_t offset);
int sd_remove_object(uint64_t oid);
+struct request_iocb *local_req_init(void);
int exec_local_req(struct sd_req *rq, void *data);
-void local_req_init(void);
+int exec_local_req_async(struct sd_req *rq, void *, struct request_iocb *);
+int local_req_wait(struct request_iocb *iocb);
+
+void local_request_init(void);
int prealloc(int fd, uint32_t size);
--
1.7.9.5
More information about the sheepdog
mailing list