[sheepdog] [PATCH UPDATE] sheep: queue local gateway request instead of directly call forward_*_obj_req

levin li levin108 at gmail.com
Tue Jun 26 12:45:05 CEST 2012


From: levin li <xingke.lwp at taobao.com>

In read/write/remove_object(), we directly call forward_*_obj_req() to
forward the request to peer nodes, but without any retrying machanism
as the gateway does, so we should queue a local gateway request for this
routine to make it take advantage of the retrying machanism of gateway.

Signed-off-by: levin li <xingke.lwp at taobao.com>
---
 sheep/group.c      |    1 +
 sheep/sdnet.c      |  101 ++++++++++++++++++++++++++++++++++++++++++++++++----
 sheep/sheep.c      |    2 ++
 sheep/sheep_priv.h |    9 +++++
 sheep/store.c      |  101 +++++++++++++++-------------------------------------
 5 files changed, 136 insertions(+), 78 deletions(-)

diff --git a/sheep/group.c b/sheep/group.c
index b448809..2431c97 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -1161,6 +1161,7 @@ int create_cluster(int port, int64_t zone, int nr_vnodes,
 
 	INIT_LIST_HEAD(&sys->blocking_conn_list);
 
+	INIT_LIST_HEAD(&sys->wait_req_queue);
 	INIT_LIST_HEAD(&sys->wait_rw_queue);
 	INIT_LIST_HEAD(&sys->wait_obj_queue);
 
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 34d65cf..8b2a460 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -13,6 +13,8 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <netdb.h>
+#include <pthread.h>
+#include <sys/eventfd.h>
 #include <arpa/inet.h>
 #include <netinet/tcp.h>
 #include <sys/epoll.h>
@@ -384,6 +386,53 @@ static void requeue_request(struct request *req)
 static void client_incref(struct client_info *ci);
 static void client_decref(struct client_info *ci);
 
+static struct request *alloc_local_request(void *data, int data_length)
+{
+	struct request *req;
+
+	req = zalloc(sizeof(struct request));
+	if (data_length) {
+		req->data_length = data_length;
+		req->data = data;
+	}
+
+	req->local = 1;
+
+	INIT_LIST_HEAD(&req->request_list);
+
+	sys->nr_outstanding_reqs++;
+	sys->outstanding_data_size += data_length;
+
+	return req;
+}
+
+int exec_local_req(struct sd_req *rq, void *data, int data_length)
+{
+	struct request *req;
+	eventfd_t value = 1;
+	int ret;
+
+	req = alloc_local_request(data, data_length);
+	req->rq = *rq;
+	req->rq.data_length = data_length;
+	req->wait_efd = eventfd(0, 0);
+
+	pthread_mutex_lock(&sys->wait_req_lock);
+	list_add_tail(&req->request_list, &sys->wait_req_queue);
+	eventfd_write(sys->req_efd, value);
+	pthread_mutex_unlock(&sys->wait_req_lock);
+
+	ret = eventfd_read(req->wait_efd, &value);
+	if (ret < 0)
+		eprintf("event fd read error %m");
+
+	close(req->wait_efd);
+	ret = req->rp.result;
+	free(req);
+
+	return ret;
+}
+
 static struct request *alloc_request(struct client_info *ci, int data_length)
 {
 	struct request *req;
@@ -424,15 +473,24 @@ static void free_request(struct request *req)
 void req_done(struct request *req)
 {
 	struct client_info *ci = req->ci;
+	eventfd_t value = 1;
 
-	if (conn_tx_on(&ci->conn)) {
-		dprintf("connection seems to be dead\n");
-		free_request(req);
+	if (req->local) {
+		req->done = 1;
+		sys->nr_outstanding_reqs--;
+		sys->outstanding_data_size -= req->data_length;
+
+		eventfd_write(req->wait_efd, value);
 	} else {
-		list_add(&req->request_list, &ci->done_reqs);
-	}
+		if (conn_tx_on(&ci->conn)) {
+			dprintf("connection seems to be dead\n");
+			free_request(req);
+		} else {
+			list_add(&req->request_list, &ci->done_reqs);
+		}
 
-	client_decref(ci);
+		client_decref(ci);
+	}
 }
 
 static void init_rx_hdr(struct client_info *ci)
@@ -805,3 +863,34 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch)
 
 	return fd;
 }
+
+static void req_handler(int listen_fd, int events, void *data)
+{
+	eventfd_t value;
+	struct request *req, *t;
+	LIST_HEAD(pending_list);
+	int ret;
+
+	if (events & EPOLLERR)
+		eprintf("request handler error\n");
+
+	ret = eventfd_read(listen_fd, &value);
+	if (ret < 0)
+		return;
+
+	pthread_mutex_lock(&sys->wait_req_lock);
+	list_splice_init(&sys->wait_req_queue, &pending_list);
+	pthread_mutex_unlock(&sys->wait_req_lock);
+
+	list_for_each_entry_safe(req, t, &pending_list, request_list) {
+		list_del(&req->request_list);
+		queue_request(req);
+	}
+}
+
+void local_req_init(void)
+{
+	pthread_mutex_init(&sys->wait_req_lock, NULL);
+	sys->req_efd = eventfd(0, EFD_NONBLOCK);
+	register_event(sys->req_efd, req_handler, NULL);
+}
diff --git a/sheep/sheep.c b/sheep/sheep.c
index a2cd43e..6e4c317 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -275,6 +275,8 @@ int main(int argc, char **argv)
 		exit(1);
 	}
 
+	local_req_init();
+
 	sys->gateway_wqueue = init_work_queue("gateway", nr_gateway_worker);
 	sys->io_wqueue = init_work_queue("io", nr_io_worker);
 	sys->recovery_wqueue = init_work_queue("recovery", 1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index afdaad8..21fd7ff 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -73,6 +73,10 @@ struct request {
 	struct list_head request_list;
 	struct list_head pending_list;
 
+	int local;
+	int done;
+	int wait_efd;
+
 	uint64_t local_oid;
 
 	struct vnode_info *vnodes;
@@ -118,7 +122,10 @@ struct cluster_info {
 	struct list_head blocking_conn_list;
 
 	int nr_copies;
+	int req_efd;
 
+	pthread_mutex_t wait_req_lock;
+	struct list_head wait_req_queue;
 	struct list_head wait_rw_queue;
 	struct list_head wait_obj_queue;
 	int nr_outstanding_reqs;
@@ -294,6 +301,8 @@ int remove_object(struct vnode_info *vnodes, uint32_t node_version,
 
 void del_sheep_fd(int fd);
 int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch);
+int exec_local_req(struct sd_req *rq, void *data, int data_length);
+void local_req_init(void);
 
 int prealloc(int fd, uint32_t size);
 
diff --git a/sheep/store.c b/sheep/store.c
index d5a080b..3378c86 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -517,8 +517,7 @@ int write_object(struct vnode_info *vnodes, uint32_t epoch,
 		 uint64_t oid, char *data, unsigned int datalen,
 		 uint64_t offset, uint16_t flags, int nr_copies, int create)
 {
-	struct request write_req;
-	struct sd_req *hdr = &write_req.rq;
+	struct sd_req hdr;
 	int ret;
 
 	if (sys->enable_write_cache && object_is_cached(oid)) {
@@ -531,23 +530,18 @@ int write_object(struct vnode_info *vnodes, uint32_t epoch,
 		}
 	}
 
-	memset(&write_req, 0, sizeof(write_req));
-	hdr->opcode = create ? SD_OP_CREATE_AND_WRITE_OBJ : SD_OP_WRITE_OBJ;
-	hdr->flags = SD_FLAG_CMD_WRITE;
-	hdr->data_length = datalen;
-	hdr->epoch = epoch;
+	memset(&hdr, 0, sizeof(hdr));
+	hdr.opcode = create ? SD_OP_CREATE_AND_WRITE_OBJ : SD_OP_WRITE_OBJ;
+	hdr.flags = SD_FLAG_CMD_WRITE;
 
-	hdr->obj.oid = oid;
-	hdr->obj.offset = offset;
-	hdr->obj.copies = nr_copies;
+	hdr.obj.oid = oid;
+	hdr.obj.offset = offset;
+	hdr.obj.copies = nr_copies;
 
-	write_req.data = data;
-	write_req.op = get_sd_op(hdr->opcode);
-	write_req.vnodes = vnodes;
-
-	ret = forward_write_obj_req(&write_req);
+	ret = exec_local_req(&hdr, data, datalen);
 	if (ret != SD_RES_SUCCESS)
-		eprintf("failed to forward write object %x\n", ret);
+		eprintf("failed to write object %" PRIx64 ", %x\n", oid, ret);
+
 	return ret;
 }
 
@@ -559,8 +553,7 @@ int read_object(struct vnode_info *vnodes, uint32_t epoch,
 		uint64_t oid, char *data, unsigned int datalen,
 		uint64_t offset, int nr_copies)
 {
-	struct request read_req;
-	struct sd_req *hdr = &read_req.rq;
+	struct sd_req hdr;
 	int ret;
 
 	if (sys->enable_write_cache && object_is_cached(oid)) {
@@ -573,23 +566,17 @@ int read_object(struct vnode_info *vnodes, uint32_t epoch,
 		}
 		return ret;
 	}
-	memset(&read_req, 0, sizeof(read_req));
-forward_read:
-	hdr->opcode = SD_OP_READ_OBJ;
-	hdr->data_length = datalen;
-	hdr->epoch = epoch;
-
-	hdr->obj.oid = oid;
-	hdr->obj.offset = offset;
-	hdr->obj.copies = nr_copies;
 
-	read_req.data = data;
-	read_req.op = get_sd_op(hdr->opcode);
-	read_req.vnodes = vnodes;
+forward_read:
+	memset(&hdr, 0, sizeof(hdr));
+	hdr.opcode = SD_OP_READ_OBJ;
+	hdr.obj.oid = oid;
+	hdr.obj.offset = offset;
+	hdr.obj.copies = nr_copies;
 
-	ret = forward_read_obj_req(&read_req);
+	ret = exec_local_req(&hdr, data, datalen);
 	if (ret != SD_RES_SUCCESS)
-		eprintf("failed to forward read object %x\n", ret);
+		eprintf("failed to read object %" PRIx64 ", %x\n", oid, ret);
 
 	return ret;
 }
@@ -597,49 +584,19 @@ forward_read:
 int remove_object(struct vnode_info *vnodes, uint32_t epoch,
 		  uint64_t oid, int nr)
 {
-	struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
-	int err = 0, i = 0;
-
-	oid_to_vnodes(vnodes, oid, nr, obj_vnodes);
-	for (i = 0; i < nr; i++) {
-		struct sd_req hdr;
-		struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
-		struct sd_vnode *v;
-		unsigned wlen = 0, rlen = 0;
-		char name[128];
-		int fd, ret;
-
-		v = obj_vnodes[i];
-		addr_to_str(name, sizeof(name), v->addr, 0);
-
-		fd = connect_to(name, v->port);
-		if (fd < 0) {
-			rsp->result = SD_RES_NETWORK_ERROR;
-			return -1;
-		}
-
-		memset(&hdr, 0, sizeof(hdr));
-		hdr.epoch = epoch;
-		hdr.opcode = SD_OP_REMOVE_OBJ;
-		hdr.flags = 0;
-		hdr.data_length = rlen;
-
-		hdr.obj.oid = oid;
-
-		ret = exec_req(fd, &hdr, NULL, &wlen, &rlen);
-		close(fd);
-
-		if (ret)
-			return -1;
+	struct sd_req hdr;
+	int ret;
 
-		if (rsp->result != SD_RES_SUCCESS)
-			err = 1;
-	}
+	memset(&hdr, 0, sizeof(hdr));
+	hdr.opcode = SD_OP_REMOVE_OBJ;
+	hdr.obj.oid = oid;
+	hdr.obj.copies = nr;
 
-	if (err)
-		return -1;
+	ret = exec_local_req(&hdr, NULL, 0);
+	if (ret != SD_RES_SUCCESS)
+		eprintf("failed to remove object %" PRIx64 ", %x\n", oid, ret);
 
-	return 0;
+	return ret;
 }
 
 int set_cluster_copies(uint8_t copies)
-- 
1.7.10




More information about the sheepdog mailing list