From: levin li <xingke.lwp at taobao.com> In read/write/remove_object(), we directly call forward_*_obj_req() to forward the request to peer nodes, but without any retrying machanism as the gateway does, so we should queue a local gateway request for this routine to make it take advantage of the retrying machanism of gateway. Signed-off-by: levin li <xingke.lwp at taobao.com> --- sheep/group.c | 1 + sheep/sdnet.c | 101 ++++++++++++++++++++++++++++++++++++++++++++++++---- sheep/sheep.c | 2 ++ sheep/sheep_priv.h | 9 +++++ sheep/store.c | 101 +++++++++++++++------------------------------------- 5 files changed, 136 insertions(+), 78 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index b448809..2431c97 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -1161,6 +1161,7 @@ int create_cluster(int port, int64_t zone, int nr_vnodes, INIT_LIST_HEAD(&sys->blocking_conn_list); + INIT_LIST_HEAD(&sys->wait_req_queue); INIT_LIST_HEAD(&sys->wait_rw_queue); INIT_LIST_HEAD(&sys->wait_obj_queue); diff --git a/sheep/sdnet.c b/sheep/sdnet.c index 34d65cf..8b2a460 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -13,6 +13,8 @@ #include <stdlib.h> #include <unistd.h> #include <netdb.h> +#include <pthread.h> +#include <sys/eventfd.h> #include <arpa/inet.h> #include <netinet/tcp.h> #include <sys/epoll.h> @@ -384,6 +386,53 @@ static void requeue_request(struct request *req) static void client_incref(struct client_info *ci); static void client_decref(struct client_info *ci); +static struct request *alloc_local_request(void *data, int data_length) +{ + struct request *req; + + req = zalloc(sizeof(struct request)); + if (data_length) { + req->data_length = data_length; + req->data = data; + } + + req->local = 1; + + INIT_LIST_HEAD(&req->request_list); + + sys->nr_outstanding_reqs++; + sys->outstanding_data_size += data_length; + + return req; +} + +int exec_local_req(struct sd_req *rq, void *data, int data_length) +{ + struct request *req; + eventfd_t value = 1; + int ret; + + req = alloc_local_request(data, data_length); + req->rq = *rq; + req->rq.data_length = data_length; + req->wait_efd = eventfd(0, 0); + + pthread_mutex_lock(&sys->wait_req_lock); + list_add_tail(&req->request_list, &sys->wait_req_queue); + eventfd_write(sys->req_efd, value); + pthread_mutex_unlock(&sys->wait_req_lock); + + ret = eventfd_read(req->wait_efd, &value); + if (ret < 0) + eprintf("event fd read error %m"); + + close(req->wait_efd); + ret = req->rp.result; + free(req); + + return ret; +} + static struct request *alloc_request(struct client_info *ci, int data_length) { struct request *req; @@ -424,15 +473,24 @@ static void free_request(struct request *req) void req_done(struct request *req) { struct client_info *ci = req->ci; + eventfd_t value = 1; - if (conn_tx_on(&ci->conn)) { - dprintf("connection seems to be dead\n"); - free_request(req); + if (req->local) { + req->done = 1; + sys->nr_outstanding_reqs--; + sys->outstanding_data_size -= req->data_length; + + eventfd_write(req->wait_efd, value); } else { - list_add(&req->request_list, &ci->done_reqs); - } + if (conn_tx_on(&ci->conn)) { + dprintf("connection seems to be dead\n"); + free_request(req); + } else { + list_add(&req->request_list, &ci->done_reqs); + } - client_decref(ci); + client_decref(ci); + } } static void init_rx_hdr(struct client_info *ci) @@ -805,3 +863,34 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch) return fd; } + +static void req_handler(int listen_fd, int events, void *data) +{ + eventfd_t value; + struct request *req, *t; + LIST_HEAD(pending_list); + int ret; + + if (events & EPOLLERR) + eprintf("request handler error\n"); + + ret = eventfd_read(listen_fd, &value); + if (ret < 0) + return; + + pthread_mutex_lock(&sys->wait_req_lock); + list_splice_init(&sys->wait_req_queue, &pending_list); + pthread_mutex_unlock(&sys->wait_req_lock); + + list_for_each_entry_safe(req, t, &pending_list, request_list) { + list_del(&req->request_list); + queue_request(req); + } +} + +void local_req_init(void) +{ + pthread_mutex_init(&sys->wait_req_lock, NULL); + sys->req_efd = eventfd(0, EFD_NONBLOCK); + register_event(sys->req_efd, req_handler, NULL); +} diff --git a/sheep/sheep.c b/sheep/sheep.c index a2cd43e..6e4c317 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -275,6 +275,8 @@ int main(int argc, char **argv) exit(1); } + local_req_init(); + sys->gateway_wqueue = init_work_queue("gateway", nr_gateway_worker); sys->io_wqueue = init_work_queue("io", nr_io_worker); sys->recovery_wqueue = init_work_queue("recovery", 1); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index afdaad8..21fd7ff 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -73,6 +73,10 @@ struct request { struct list_head request_list; struct list_head pending_list; + int local; + int done; + int wait_efd; + uint64_t local_oid; struct vnode_info *vnodes; @@ -118,7 +122,10 @@ struct cluster_info { struct list_head blocking_conn_list; int nr_copies; + int req_efd; + pthread_mutex_t wait_req_lock; + struct list_head wait_req_queue; struct list_head wait_rw_queue; struct list_head wait_obj_queue; int nr_outstanding_reqs; @@ -294,6 +301,8 @@ int remove_object(struct vnode_info *vnodes, uint32_t node_version, void del_sheep_fd(int fd); int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch); +int exec_local_req(struct sd_req *rq, void *data, int data_length); +void local_req_init(void); int prealloc(int fd, uint32_t size); diff --git a/sheep/store.c b/sheep/store.c index d5a080b..3378c86 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -517,8 +517,7 @@ int write_object(struct vnode_info *vnodes, uint32_t epoch, uint64_t oid, char *data, unsigned int datalen, uint64_t offset, uint16_t flags, int nr_copies, int create) { - struct request write_req; - struct sd_req *hdr = &write_req.rq; + struct sd_req hdr; int ret; if (sys->enable_write_cache && object_is_cached(oid)) { @@ -531,23 +530,18 @@ int write_object(struct vnode_info *vnodes, uint32_t epoch, } } - memset(&write_req, 0, sizeof(write_req)); - hdr->opcode = create ? SD_OP_CREATE_AND_WRITE_OBJ : SD_OP_WRITE_OBJ; - hdr->flags = SD_FLAG_CMD_WRITE; - hdr->data_length = datalen; - hdr->epoch = epoch; + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = create ? SD_OP_CREATE_AND_WRITE_OBJ : SD_OP_WRITE_OBJ; + hdr.flags = SD_FLAG_CMD_WRITE; - hdr->obj.oid = oid; - hdr->obj.offset = offset; - hdr->obj.copies = nr_copies; + hdr.obj.oid = oid; + hdr.obj.offset = offset; + hdr.obj.copies = nr_copies; - write_req.data = data; - write_req.op = get_sd_op(hdr->opcode); - write_req.vnodes = vnodes; - - ret = forward_write_obj_req(&write_req); + ret = exec_local_req(&hdr, data, datalen); if (ret != SD_RES_SUCCESS) - eprintf("failed to forward write object %x\n", ret); + eprintf("failed to write object %" PRIx64 ", %x\n", oid, ret); + return ret; } @@ -559,8 +553,7 @@ int read_object(struct vnode_info *vnodes, uint32_t epoch, uint64_t oid, char *data, unsigned int datalen, uint64_t offset, int nr_copies) { - struct request read_req; - struct sd_req *hdr = &read_req.rq; + struct sd_req hdr; int ret; if (sys->enable_write_cache && object_is_cached(oid)) { @@ -573,23 +566,17 @@ int read_object(struct vnode_info *vnodes, uint32_t epoch, } return ret; } - memset(&read_req, 0, sizeof(read_req)); -forward_read: - hdr->opcode = SD_OP_READ_OBJ; - hdr->data_length = datalen; - hdr->epoch = epoch; - - hdr->obj.oid = oid; - hdr->obj.offset = offset; - hdr->obj.copies = nr_copies; - read_req.data = data; - read_req.op = get_sd_op(hdr->opcode); - read_req.vnodes = vnodes; +forward_read: + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = SD_OP_READ_OBJ; + hdr.obj.oid = oid; + hdr.obj.offset = offset; + hdr.obj.copies = nr_copies; - ret = forward_read_obj_req(&read_req); + ret = exec_local_req(&hdr, data, datalen); if (ret != SD_RES_SUCCESS) - eprintf("failed to forward read object %x\n", ret); + eprintf("failed to read object %" PRIx64 ", %x\n", oid, ret); return ret; } @@ -597,49 +584,19 @@ forward_read: int remove_object(struct vnode_info *vnodes, uint32_t epoch, uint64_t oid, int nr) { - struct sd_vnode *obj_vnodes[SD_MAX_COPIES]; - int err = 0, i = 0; - - oid_to_vnodes(vnodes, oid, nr, obj_vnodes); - for (i = 0; i < nr; i++) { - struct sd_req hdr; - struct sd_rsp *rsp = (struct sd_rsp *)&hdr; - struct sd_vnode *v; - unsigned wlen = 0, rlen = 0; - char name[128]; - int fd, ret; - - v = obj_vnodes[i]; - addr_to_str(name, sizeof(name), v->addr, 0); - - fd = connect_to(name, v->port); - if (fd < 0) { - rsp->result = SD_RES_NETWORK_ERROR; - return -1; - } - - memset(&hdr, 0, sizeof(hdr)); - hdr.epoch = epoch; - hdr.opcode = SD_OP_REMOVE_OBJ; - hdr.flags = 0; - hdr.data_length = rlen; - - hdr.obj.oid = oid; - - ret = exec_req(fd, &hdr, NULL, &wlen, &rlen); - close(fd); - - if (ret) - return -1; + struct sd_req hdr; + int ret; - if (rsp->result != SD_RES_SUCCESS) - err = 1; - } + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = SD_OP_REMOVE_OBJ; + hdr.obj.oid = oid; + hdr.obj.copies = nr; - if (err) - return -1; + ret = exec_local_req(&hdr, NULL, 0); + if (ret != SD_RES_SUCCESS) + eprintf("failed to remove object %" PRIx64 ", %x\n", oid, ret); - return 0; + return ret; } int set_cluster_copies(uint8_t copies) -- 1.7.10 |