[sheepdog] [PATCH v2 7/7] sheep: rename sdnet.c to request.c

Liu Yuan namei.unix at gmail.com
Thu Jul 19 04:02:45 CEST 2012


From: Liu Yuan <tailai.ly at taobao.com>

for sheep itself, sd as a file name means nothing. We can have a more consistent
file naming with request.c

Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
 sheep/Makefile.am |    2 +-
 sheep/request.c   |  878 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 sheep/sdnet.c     |  878 -----------------------------------------------------
 3 files changed, 879 insertions(+), 879 deletions(-)
 create mode 100644 sheep/request.c
 delete mode 100644 sheep/sdnet.c

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index f8017b4..16c79f0 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -24,7 +24,7 @@ INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include \
 
 sbin_PROGRAMS		= sheep
 
-sheep_SOURCES		= sheep.c group.c sdnet.c gateway.c store.c vdi.c work.c \
+sheep_SOURCES		= sheep.c group.c request.c gateway.c store.c vdi.c work.c \
 			  journal.c ops.c recovery.c cluster/local.c \
 			  object_cache.c object_list_cache.c sockfd_cache.c
 
diff --git a/sheep/request.c b/sheep/request.c
new file mode 100644
index 0000000..35ac488
--- /dev/null
+++ b/sheep/request.c
@@ -0,0 +1,878 @@
+/*
+ * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <pthread.h>
+#include <sys/eventfd.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <sys/epoll.h>
+#include <fcntl.h>
+
+#include "sheep_priv.h"
+
+static void requeue_request(struct request *req);
+
+static int is_access_local(struct request *req, uint64_t oid)
+{
+	struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
+	int nr_copies;
+	int i;
+
+	nr_copies = get_nr_copies(req->vnodes);
+	oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
+
+	for (i = 0; i < nr_copies; i++) {
+		if (vnode_is_local(obj_vnodes[i]))
+			return 1;
+	}
+
+	return 0;
+}
+
+static void io_op_done(struct work *work)
+{
+	struct request *req = container_of(work, struct request, work);
+
+	if (req->rp.result == SD_RES_EIO) {
+		req->rp.result = SD_RES_NETWORK_ERROR;
+
+		eprintf("leaving sheepdog cluster\n");
+		leave_cluster();
+	}
+
+	put_request(req);
+	return;
+}
+
+static void gateway_op_done(struct work *work)
+{
+	struct request *req = container_of(work, struct request, work);
+	struct sd_req *hdr = &req->rq;
+
+	switch (req->rp.result) {
+	case SD_RES_OLD_NODE_VER:
+		if (req->rp.epoch > sys->epoch) {
+			list_add_tail(&req->request_list,
+				      &sys->wait_rw_queue);
+			/*
+			 * Gateway of this node is expected to process this
+			 * request later when epoch is lifted.
+			 */
+			return;
+		}
+		/*FALLTHRU*/
+	case SD_RES_NEW_NODE_VER:
+	case SD_RES_NETWORK_ERROR:
+	case SD_RES_WAIT_FOR_JOIN:
+	case SD_RES_WAIT_FOR_FORMAT:
+		dprintf("retrying failed I/O request "
+			"op %s result %d epoch %d, sys epoch %d\n",
+			op_name(req->op),
+			req->rp.result,
+			req->rq.epoch,
+			sys->epoch);
+		goto retry;
+	case SD_RES_EIO:
+		if (is_access_local(req, hdr->obj.oid)) {
+			eprintf("leaving sheepdog cluster\n");
+			leave_cluster();
+			goto retry;
+		}
+		break;
+	case SD_RES_SUCCESS:
+		break;
+	}
+
+	put_request(req);
+	return;
+retry:
+	requeue_request(req);
+}
+
+static void local_op_done(struct work *work)
+{
+	struct request *req = container_of(work, struct request, work);
+
+	if (has_process_main(req->op)) {
+		req->rp.result = do_process_main(req->op, &req->rq,
+						 &req->rp, req->data);
+	}
+
+	put_request(req);
+}
+
+static int check_request_epoch(struct request *req)
+{
+	if (before(req->rq.epoch, sys->epoch)) {
+		eprintf("old node version %u, %u (%s)\n",
+			sys->epoch, req->rq.epoch, op_name(req->op));
+		/* ask gateway to retry. */
+		req->rp.result = SD_RES_OLD_NODE_VER;
+		req->rp.epoch = sys->epoch;
+		put_request(req);
+		return -1;
+	} else if (after(req->rq.epoch, sys->epoch)) {
+		eprintf("new node version %u, %u (%s)\n",
+			sys->epoch, req->rq.epoch, op_name(req->op));
+
+		/* put on local wait queue, waiting for local epoch
+		   to be lifted */
+		req->rp.result = SD_RES_NEW_NODE_VER;
+		list_add_tail(&req->request_list, &sys->wait_rw_queue);
+		return -1;
+	}
+
+	return 0;
+}
+
+static bool request_in_recovery(struct request *req)
+{
+	/*
+	 * Request from recovery should go down the Farm even if
+	 * oid_in_recovery() returns true because we should also try snap
+	 * cache of the Farm and return the error code back if not found.
+	 */
+	if (oid_in_recovery(req->local_oid) &&
+	    !(req->rq.flags & SD_FLAG_CMD_RECOVERY)) {
+		/*
+		 * Put request on wait queues of local node
+		 */
+		if (is_recovery_init()) {
+			req->rp.result = SD_RES_OBJ_RECOVERING;
+			list_add_tail(&req->request_list,
+				      &sys->wait_rw_queue);
+		} else {
+			list_add_tail(&req->request_list,
+				      &sys->wait_obj_queue);
+		}
+		return true;
+	}
+	return false;
+}
+
+void resume_wait_epoch_requests(void)
+{
+	struct request *req, *t;
+	LIST_HEAD(pending_list);
+
+	list_splice_init(&sys->wait_rw_queue, &pending_list);
+
+	list_for_each_entry_safe(req, t, &pending_list, request_list) {
+		switch (req->rp.result) {
+		case SD_RES_OLD_NODE_VER:
+			/*
+			 * Gateway retries to send the request when
+			 * its epoch changes.
+			 */
+			assert(is_gateway_op(req->op));
+			req->rq.epoch = sys->epoch;
+			list_del(&req->request_list);
+			requeue_request(req);
+			break;
+		case SD_RES_NEW_NODE_VER:
+			/* Peer retries the request locally when its epoch changes. */
+			assert(!is_gateway_op(req->op));
+			list_del(&req->request_list);
+			requeue_request(req);
+			break;
+		default:
+			break;
+		}
+	}
+
+	list_splice_init(&pending_list, &sys->wait_rw_queue);
+}
+
+void resume_wait_recovery_requests(void)
+{
+	struct request *req, *t;
+	LIST_HEAD(pending_list);
+
+	list_splice_init(&sys->wait_rw_queue, &pending_list);
+
+	list_for_each_entry_safe(req, t, &pending_list, request_list) {
+		if (req->rp.result != SD_RES_OBJ_RECOVERING)
+			continue;
+
+		dprintf("resume wait oid %" PRIx64 "\n", req->local_oid);
+		list_del(&req->request_list);
+		requeue_request(req);
+	}
+
+	list_splice_init(&pending_list, &sys->wait_rw_queue);
+}
+
+void resume_wait_obj_requests(uint64_t oid)
+{
+	struct request *req, *t;
+	LIST_HEAD(pending_list);
+
+	list_splice_init(&sys->wait_obj_queue, &pending_list);
+
+	list_for_each_entry_safe(req, t, &pending_list, request_list) {
+		if (req->local_oid != oid)
+			continue;
+
+		/* the object requested by a pending request has been
+		 * recovered, notify the pending request. */
+		dprintf("retry %" PRIx64 "\n", req->local_oid);
+		list_del(&req->request_list);
+		requeue_request(req);
+	}
+	list_splice_init(&pending_list, &sys->wait_obj_queue);
+}
+
+void flush_wait_obj_requests(void)
+{
+	struct request *req, *n;
+	LIST_HEAD(pending_list);
+
+	list_splice_init(&sys->wait_obj_queue, &pending_list);
+
+	list_for_each_entry_safe(req, n, &pending_list, request_list) {
+		list_del(&req->request_list);
+		requeue_request(req);
+	}
+}
+
+static void queue_peer_request(struct request *req)
+{
+	req->local_oid = req->rq.obj.oid;
+	if (req->local_oid) {
+		if (check_request_epoch(req) < 0)
+			return;
+		if (request_in_recovery(req))
+			return;
+	}
+
+	if (req->rq.flags & SD_FLAG_CMD_RECOVERY)
+		req->rq.epoch = req->rq.obj.tgt_epoch;
+
+	req->work.fn = do_process_work;
+	req->work.done = io_op_done;
+	queue_work(sys->io_wqueue, &req->work);
+}
+
+static void queue_gateway_request(struct request *req)
+{
+	struct sd_req *hdr = &req->rq;
+
+	if (is_access_local(req, hdr->obj.oid))
+		req->local_oid = hdr->obj.oid;
+
+	/*
+	 * If we go for a cached object, we don't care if it is being recovered
+	 */
+	if (sys->enable_write_cache &&
+	    req->rq.flags & SD_FLAG_CMD_CACHE &&
+	    object_is_cached(req->rq.obj.oid))
+		goto queue_work;
+
+	if (req->local_oid)
+		if (request_in_recovery(req))
+			return;
+
+queue_work:
+	req->work.fn = do_process_work;
+	req->work.done = gateway_op_done;
+	queue_work(sys->gateway_wqueue, &req->work);
+}
+
+static void queue_local_request(struct request *req)
+{
+	req->work.fn = do_process_work;
+	req->work.done = local_op_done;
+	queue_work(sys->io_wqueue, &req->work);
+}
+
+static void queue_request(struct request *req)
+{
+	struct sd_req *hdr = &req->rq;
+	struct sd_rsp *rsp = &req->rp;
+
+	/*
+	 * Check the protocol version for all internal commands, and public
+	 * commands that have it set.  We can't enforce it on all public
+	 * ones as it isn't a mandatory part of the public protocol.
+	 */
+	if (hdr->opcode >= 0x80) {
+		if (hdr->proto_ver != SD_SHEEP_PROTO_VER) {
+			rsp->result = SD_RES_VER_MISMATCH;
+			goto done;
+		}
+	} else if (hdr->proto_ver) {
+		if (hdr->proto_ver != SD_PROTO_VER) {
+			rsp->result = SD_RES_VER_MISMATCH;
+			goto done;
+		}
+	}
+
+	req->op = get_sd_op(hdr->opcode);
+	if (!req->op) {
+		eprintf("invalid opcode %d\n", hdr->opcode);
+		rsp->result = SD_RES_INVALID_PARMS;
+		goto done;
+	}
+
+	dprintf("%s\n", op_name(req->op));
+
+	switch (sys->status) {
+	case SD_STATUS_SHUTDOWN:
+		rsp->result = SD_RES_SHUTDOWN;
+		goto done;
+	case SD_STATUS_WAIT_FOR_FORMAT:
+		if (!is_force_op(req->op)) {
+			rsp->result = SD_RES_WAIT_FOR_FORMAT;
+			goto done;
+		}
+		break;
+	case SD_STATUS_WAIT_FOR_JOIN:
+		if (!is_force_op(req->op)) {
+			rsp->result = SD_RES_WAIT_FOR_JOIN;
+			goto done;
+		}
+		break;
+	case SD_STATUS_HALT:
+		if (!is_force_op(req->op)) {
+			rsp->result = SD_RES_HALT;
+			goto done;
+		}
+		break;
+	default:
+		break;
+	}
+
+	/*
+	 * force operations shouldn't access req->vnodes in their
+	 * process_work() and process_main() because they can be
+	 * called before we set up current_vnode_info
+	 */
+	if (!is_force_op(req->op))
+		req->vnodes = get_vnode_info();
+
+	if (is_peer_op(req->op)) {
+		queue_peer_request(req);
+	} else if (is_gateway_op(req->op)) {
+		hdr->epoch = sys->epoch;
+		queue_gateway_request(req);
+	} else if (is_local_op(req->op)) {
+		hdr->epoch = sys->epoch;
+		queue_local_request(req);
+	} else if (is_cluster_op(req->op)) {
+		hdr->epoch = sys->epoch;
+		queue_cluster_request(req);
+	} else {
+		eprintf("unknown operation %d\n", hdr->opcode);
+		rsp->result = SD_RES_SYSTEM_ERROR;
+		goto done;
+	}
+
+	return;
+done:
+	put_request(req);
+}
+
+static void requeue_request(struct request *req)
+{
+	if (req->vnodes)
+		put_vnode_info(req->vnodes);
+	queue_request(req);
+}
+
+static void clear_client_info(struct client_info *ci);
+
+static struct request *alloc_local_request(void *data, int data_length)
+{
+	struct request *req;
+
+	req = xzalloc(sizeof(struct request));
+	if (data_length) {
+		req->data_length = data_length;
+		req->data = data;
+	}
+
+	req->local = 1;
+
+	INIT_LIST_HEAD(&req->request_list);
+
+	return req;
+}
+
+/*
+ * Exec the request locally and synchronously.
+ *
+ * This function takes advantage of gateway's retry mechanism.
+ */
+int exec_local_req(struct sd_req *rq, void *data)
+{
+	struct request *req;
+	eventfd_t value = 1;
+	int ret;
+
+	req = alloc_local_request(data, rq->data_length);
+	req->rq = *rq;
+	req->wait_efd = eventfd(0, 0);
+
+	pthread_mutex_lock(&sys->wait_req_lock);
+	list_add_tail(&req->request_list, &sys->wait_req_queue);
+	pthread_mutex_unlock(&sys->wait_req_lock);
+
+	eventfd_write(sys->req_efd, value);
+
+	ret = eventfd_read(req->wait_efd, &value);
+	if (ret < 0)
+		eprintf("event fd read error %m");
+
+	close(req->wait_efd);
+	ret = req->rp.result;
+	free(req);
+
+	return ret;
+}
+
+static struct request *alloc_request(struct client_info *ci, int data_length)
+{
+	struct request *req;
+
+	req = zalloc(sizeof(struct request));
+	if (!req)
+		return NULL;
+
+	req->ci = ci;
+	ci->refcnt++;
+	if (data_length) {
+		req->data_length = data_length;
+		req->data = valloc(data_length);
+		if (!req->data) {
+			free(req);
+			return NULL;
+		}
+	}
+
+	INIT_LIST_HEAD(&req->request_list);
+	uatomic_set(&req->refcnt, 1);
+
+	uatomic_inc(&sys->nr_outstanding_reqs);
+
+	return req;
+}
+
+static void free_request(struct request *req)
+{
+	uatomic_dec(&sys->nr_outstanding_reqs);
+
+	req->ci->refcnt--;
+	put_vnode_info(req->vnodes);
+	free(req->data);
+	free(req);
+}
+
+void put_request(struct request *req)
+{
+	struct client_info *ci = req->ci;
+	eventfd_t value = 1;
+
+	if (uatomic_sub_return(&req->refcnt, 1) > 0)
+		return;
+
+	if (req->local) {
+		req->done = 1;
+		eventfd_write(req->wait_efd, value);
+	} else {
+		if (conn_tx_on(&ci->conn)) {
+			clear_client_info(ci);
+			free_request(req);
+		} else {
+			list_add(&req->request_list, &ci->done_reqs);
+		}
+	}
+}
+
+static void init_rx_hdr(struct client_info *ci)
+{
+	ci->conn.c_rx_state = C_IO_HEADER;
+	ci->rx_req = NULL;
+	ci->conn.rx_length = sizeof(struct sd_req);
+	ci->conn.rx_buf = &ci->conn.rx_hdr;
+}
+
+static inline int begin_rx(struct client_info *ci)
+{
+	int ret;
+	uint64_t data_len;
+	struct connection *conn = &ci->conn;
+	struct sd_req *hdr = &conn->rx_hdr;
+	struct request *req;
+
+	switch (conn->c_rx_state) {
+	case C_IO_HEADER:
+		ret = rx(conn, C_IO_DATA_INIT);
+		if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
+			break;
+	case C_IO_DATA_INIT:
+		data_len = hdr->data_length;
+
+		req = alloc_request(ci, data_len);
+		if (!req) {
+			conn->c_rx_state = C_IO_CLOSED;
+			break;
+		}
+		ci->rx_req = req;
+
+		/* use le_to_cpu */
+		memcpy(&req->rq, hdr, sizeof(req->rq));
+
+		if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
+			conn->c_rx_state = C_IO_DATA;
+			conn->rx_length = data_len;
+			conn->rx_buf = req->data;
+		} else {
+			conn->c_rx_state = C_IO_END;
+			break;
+		}
+	case C_IO_DATA:
+		ret = rx(conn, C_IO_END);
+		break;
+	default:
+		eprintf("bug: unknown state %d\n", conn->c_rx_state);
+	}
+
+	if (is_conn_dead(conn)) {
+		clear_client_info(ci);
+		return -1;
+	}
+
+	/* Short read happens */
+	if (conn->c_rx_state != C_IO_END)
+		return -1;
+
+	return 0;
+}
+
+static inline void finish_rx(struct client_info *ci)
+{
+	struct request *req;
+	struct sd_req *hdr = &ci->conn.rx_hdr;
+
+	req = ci->rx_req;
+	init_rx_hdr(ci);
+	if (hdr->flags & SD_FLAG_CMD_WRITE)
+		req->rp.data_length = 0;
+	else
+		req->rp.data_length = hdr->data_length;
+
+	dprintf("%d, %s:%d\n", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
+	queue_request(req);
+}
+
+static void do_client_rx(struct client_info *ci)
+{
+	if (begin_rx(ci) < 0)
+		return;
+
+	finish_rx(ci);
+}
+
+static void init_tx_hdr(struct client_info *ci)
+{
+	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+	struct request *req;
+
+	assert(!list_empty(&ci->done_reqs));
+
+	memset(rsp, 0, sizeof(*rsp));
+
+	req = list_first_entry(&ci->done_reqs, struct request, request_list);
+	list_del(&req->request_list);
+
+	ci->tx_req = req;
+	ci->conn.tx_length = sizeof(*rsp);
+	ci->conn.c_tx_state = C_IO_HEADER;
+	ci->conn.tx_buf = rsp;
+
+	/* use cpu_to_le */
+	memcpy(rsp, &req->rp, sizeof(*rsp));
+
+	rsp->epoch = sys->epoch;
+	rsp->opcode = req->rq.opcode;
+	rsp->id = req->rq.id;
+}
+
+static inline int begin_tx(struct client_info *ci)
+{
+	int ret, opt;
+	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+
+	/* If short send happens, we don't need init hdr */
+	if (!ci->tx_req)
+		init_tx_hdr(ci);
+
+	opt = 1;
+	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+	switch (ci->conn.c_tx_state) {
+	case C_IO_HEADER:
+		ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
+		if (!ret)
+			break;
+
+		if (rsp->data_length) {
+			ci->conn.tx_length = rsp->data_length;
+			ci->conn.tx_buf = ci->tx_req->data;
+			ci->conn.c_tx_state = C_IO_DATA;
+		} else {
+			ci->conn.c_tx_state = C_IO_END;
+			break;
+		}
+	case C_IO_DATA:
+		ret = tx(&ci->conn, C_IO_END, 0);
+		if (!ret)
+			break;
+	default:
+		break;
+	}
+
+	opt = 0;
+	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+	if (is_conn_dead(&ci->conn)) {
+		clear_client_info(ci);
+		return -1;
+	}
+	return 0;
+}
+
+/* Return 1 if short send happens or we have more data to send */
+static inline int finish_tx(struct client_info *ci)
+{
+	/* Finish sending one response */
+	if (ci->conn.c_tx_state == C_IO_END) {
+		dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
+			ci->conn.ipstr, ci->conn.port);
+		free_request(ci->tx_req);
+		ci->tx_req = NULL;
+	}
+	if (ci->tx_req || !list_empty(&ci->done_reqs))
+		return 1;
+	return 0;
+}
+
+static void do_client_tx(struct client_info *ci)
+{
+	if (list_empty(&ci->done_reqs)) {
+		if (conn_tx_off(&ci->conn))
+			clear_client_info(ci);
+		return;
+	}
+again:
+	if (begin_tx(ci) < 0)
+		return;
+
+	if (finish_tx(ci))
+		goto again;
+
+	/* Let's go sleep, and put_request() will wake me up */
+	if (conn_tx_off(&ci->conn))
+		clear_client_info(ci);
+}
+
+static void destroy_client(struct client_info *ci)
+{
+	dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
+	close(ci->conn.fd);
+	free(ci);
+}
+
+static void clear_client_info(struct client_info *ci)
+{
+	struct request *req, *t;
+
+	dprintf("connection seems to be dead\n");
+
+	if (ci->rx_req) {
+		free_request(ci->rx_req);
+		ci->rx_req = NULL;
+	}
+
+	if (ci->tx_req) {
+		free_request(ci->tx_req);
+		ci->tx_req = NULL;
+	}
+
+	list_for_each_entry_safe(req, t, &ci->done_reqs, request_list) {
+		list_del(&req->request_list);
+		free_request(req);
+	}
+
+	unregister_event(ci->conn.fd);
+
+	dprintf("refcnt:%d, fd:%d, %s:%d\n",
+		ci->refcnt, ci->conn.fd,
+		ci->conn.ipstr, ci->conn.port);
+
+	if (ci->refcnt)
+		return;
+
+	destroy_client(ci);
+}
+
+static struct client_info *create_client(int fd, struct cluster_info *cluster)
+{
+	struct client_info *ci;
+	struct sockaddr_storage from;
+	socklen_t namesize = sizeof(from);
+
+	ci = zalloc(sizeof(*ci));
+	if (!ci)
+		return NULL;
+
+	if (getpeername(fd, (struct sockaddr *)&from, &namesize))
+		return NULL;
+
+	switch (from.ss_family) {
+	case AF_INET:
+		ci->conn.port = ntohs(((struct sockaddr_in *)&from)->sin_port);
+		inet_ntop(AF_INET, &((struct sockaddr_in *)&from)->sin_addr,
+				ci->conn.ipstr, sizeof(ci->conn.ipstr));
+		break;
+	case AF_INET6:
+		ci->conn.port = ntohs(((struct sockaddr_in6 *)&from)->sin6_port);
+		inet_ntop(AF_INET6, &((struct sockaddr_in6 *)&from)->sin6_addr,
+				ci->conn.ipstr, sizeof(ci->conn.ipstr));
+		break;
+	}
+
+	ci->conn.fd = fd;
+	ci->conn.events = EPOLLIN;
+	ci->refcnt = 0;
+
+	INIT_LIST_HEAD(&ci->done_reqs);
+
+	init_rx_hdr(ci);
+
+	return ci;
+}
+
+static void client_handler(int fd, int events, void *data)
+{
+	struct client_info *ci = (struct client_info *)data;
+
+	dprintf("%x, rx %d, tx %d\n", events, ci->conn.c_rx_state,
+		ci->conn.c_tx_state);
+
+	if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
+		return clear_client_info(ci);
+
+	if (events & EPOLLIN)
+		do_client_rx(ci);
+
+	if (events & EPOLLOUT)
+		do_client_tx(ci);
+}
+
+static void listen_handler(int listen_fd, int events, void *data)
+{
+	struct sockaddr_storage from;
+	socklen_t namesize;
+	int fd, ret;
+	struct client_info *ci;
+
+	if (sys_stat_shutdown()) {
+		dprintf("unregistering connection %d\n", listen_fd);
+		unregister_event(listen_fd);
+		return;
+	}
+
+	namesize = sizeof(from);
+	fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
+	if (fd < 0) {
+		eprintf("failed to accept a new connection: %m\n");
+		return;
+	}
+
+	ret = set_keepalive(fd);
+	if (ret) {
+		close(fd);
+		return;
+	}
+
+	ret = set_nodelay(fd);
+	if (ret) {
+		close(fd);
+		return;
+	}
+
+	ret = set_nonblocking(fd);
+	if (ret) {
+		close(fd);
+		return;
+	}
+
+	ci = create_client(fd, data);
+	if (!ci) {
+		close(fd);
+		return;
+	}
+
+	ret = register_event(fd, client_handler, ci);
+	if (ret) {
+		destroy_client(ci);
+		return;
+	}
+
+	dprintf("accepted a new connection: %d\n", fd);
+}
+
+static int create_listen_port_fn(int fd, void *data)
+{
+	return register_event(fd, listen_handler, data);
+}
+
+int create_listen_port(int port, void *data)
+{
+	return create_listen_ports(port, create_listen_port_fn, data);
+}
+
+
+static void req_handler(int listen_fd, int events, void *data)
+{
+	eventfd_t value;
+	struct request *req, *t;
+	LIST_HEAD(pending_list);
+	int ret;
+
+	if (events & EPOLLERR)
+		eprintf("request handler error\n");
+
+	ret = eventfd_read(listen_fd, &value);
+	if (ret < 0)
+		return;
+
+	pthread_mutex_lock(&sys->wait_req_lock);
+	list_splice_init(&sys->wait_req_queue, &pending_list);
+	pthread_mutex_unlock(&sys->wait_req_lock);
+
+	list_for_each_entry_safe(req, t, &pending_list, request_list) {
+		list_del(&req->request_list);
+		queue_request(req);
+	}
+}
+
+void local_req_init(void)
+{
+	pthread_mutex_init(&sys->wait_req_lock, NULL);
+	sys->req_efd = eventfd(0, EFD_NONBLOCK);
+	register_event(sys->req_efd, req_handler, NULL);
+}
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
deleted file mode 100644
index 7b53b04..0000000
--- a/sheep/sdnet.c
+++ /dev/null
@@ -1,878 +0,0 @@
-/*
- * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License version
- * 2 as published by the Free Software Foundation.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-#include <assert.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <pthread.h>
-#include <sys/eventfd.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h>
-#include <sys/epoll.h>
-#include <fcntl.h>
-
-#include "sheep_priv.h"
-
-static void requeue_request(struct request *req);
-
-static int is_access_local(struct request *req, uint64_t oid)
-{
-	struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
-	int nr_copies;
-	int i;
-
-	nr_copies = get_nr_copies(req->vnodes);
-	oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
-
-	for (i = 0; i < nr_copies; i++) {
-		if (vnode_is_local(obj_vnodes[i]))
-			return 1;
-	}
-
-	return 0;
-}
-
-static void io_op_done(struct work *work)
-{
-	struct request *req = container_of(work, struct request, work);
-
-	if (req->rp.result == SD_RES_EIO) {
-		req->rp.result = SD_RES_NETWORK_ERROR;
-
-		eprintf("leaving sheepdog cluster\n");
-		leave_cluster();
-	}
-
-	put_request(req);
-	return;
-}
-
-static void gateway_op_done(struct work *work)
-{
-	struct request *req = container_of(work, struct request, work);
-	struct sd_req *hdr = &req->rq;
-
-	switch (req->rp.result) {
-	case SD_RES_OLD_NODE_VER:
-		if (req->rp.epoch > sys->epoch) {
-			list_add_tail(&req->request_list,
-				      &sys->wait_rw_queue);
-			/*
-			 * Gateway of this node is expected to process this
-			 * request later when epoch is lifted.
-			 */
-			return;
-		}
-		/*FALLTHRU*/
-	case SD_RES_NEW_NODE_VER:
-	case SD_RES_NETWORK_ERROR:
-	case SD_RES_WAIT_FOR_JOIN:
-	case SD_RES_WAIT_FOR_FORMAT:
-		dprintf("retrying failed I/O request "
-			"op %s result %d epoch %d, sys epoch %d\n",
-			op_name(req->op),
-			req->rp.result,
-			req->rq.epoch,
-			sys->epoch);
-		goto retry;
-	case SD_RES_EIO:
-		if (is_access_local(req, hdr->obj.oid)) {
-			eprintf("leaving sheepdog cluster\n");
-			leave_cluster();
-			goto retry;
-		}
-		break;
-	case SD_RES_SUCCESS:
-		break;
-	}
-
-	put_request(req);
-	return;
-retry:
-	requeue_request(req);
-}
-
-static void local_op_done(struct work *work)
-{
-	struct request *req = container_of(work, struct request, work);
-
-	if (has_process_main(req->op)) {
-		req->rp.result = do_process_main(req->op, &req->rq,
-						 &req->rp, req->data);
-	}
-
-	put_request(req);
-}
-
-static int check_request_epoch(struct request *req)
-{
-	if (before(req->rq.epoch, sys->epoch)) {
-		eprintf("old node version %u, %u (%s)\n",
-			sys->epoch, req->rq.epoch, op_name(req->op));
-		/* ask gateway to retry. */
-		req->rp.result = SD_RES_OLD_NODE_VER;
-		req->rp.epoch = sys->epoch;
-		put_request(req);
-		return -1;
-	} else if (after(req->rq.epoch, sys->epoch)) {
-		eprintf("new node version %u, %u (%s)\n",
-			sys->epoch, req->rq.epoch, op_name(req->op));
-
-		/* put on local wait queue, waiting for local epoch
-		   to be lifted */
-		req->rp.result = SD_RES_NEW_NODE_VER;
-		list_add_tail(&req->request_list, &sys->wait_rw_queue);
-		return -1;
-	}
-
-	return 0;
-}
-
-static bool request_in_recovery(struct request *req)
-{
-	/*
-	 * Request from recovery should go down the Farm even if
-	 * oid_in_recovery() returns true because we should also try snap
-	 * cache of the Farm and return the error code back if not found.
-	 */
-	if (oid_in_recovery(req->local_oid) &&
-	    !(req->rq.flags & SD_FLAG_CMD_RECOVERY)) {
-		/*
-		 * Put request on wait queues of local node
-		 */
-		if (is_recovery_init()) {
-			req->rp.result = SD_RES_OBJ_RECOVERING;
-			list_add_tail(&req->request_list,
-				      &sys->wait_rw_queue);
-		} else {
-			list_add_tail(&req->request_list,
-				      &sys->wait_obj_queue);
-		}
-		return true;
-	}
-	return false;
-}
-
-void resume_wait_epoch_requests(void)
-{
-	struct request *req, *t;
-	LIST_HEAD(pending_list);
-
-	list_splice_init(&sys->wait_rw_queue, &pending_list);
-
-	list_for_each_entry_safe(req, t, &pending_list, request_list) {
-		switch (req->rp.result) {
-		case SD_RES_OLD_NODE_VER:
-			/*
-			 * Gateway retries to send the request when
-			 * its epoch changes.
-			 */
-			assert(is_gateway_op(req->op));
-			req->rq.epoch = sys->epoch;
-			list_del(&req->request_list);
-			requeue_request(req);
-			break;
-		case SD_RES_NEW_NODE_VER:
-			/* Peer retries the request locally when its epoch changes. */
-			assert(!is_gateway_op(req->op));
-			list_del(&req->request_list);
-			requeue_request(req);
-			break;
-		default:
-			break;
-		}
-	}
-
-	list_splice_init(&pending_list, &sys->wait_rw_queue);
-}
-
-void resume_wait_recovery_requests(void)
-{
-	struct request *req, *t;
-	LIST_HEAD(pending_list);
-
-	list_splice_init(&sys->wait_rw_queue, &pending_list);
-
-	list_for_each_entry_safe(req, t, &pending_list, request_list) {
-		if (req->rp.result != SD_RES_OBJ_RECOVERING)
-			continue;
-
-		dprintf("resume wait oid %" PRIx64 "\n", req->local_oid);
-		list_del(&req->request_list);
-		requeue_request(req);
-	}
-
-	list_splice_init(&pending_list, &sys->wait_rw_queue);
-}
-
-void resume_wait_obj_requests(uint64_t oid)
-{
-	struct request *req, *t;
-	LIST_HEAD(pending_list);
-
-	list_splice_init(&sys->wait_obj_queue, &pending_list);
-
-	list_for_each_entry_safe(req, t, &pending_list, request_list) {
-		if (req->local_oid != oid)
-			continue;
-
-		/* the object requested by a pending request has been
-		 * recovered, notify the pending request. */
-		dprintf("retry %" PRIx64 "\n", req->local_oid);
-		list_del(&req->request_list);
-		requeue_request(req);
-	}
-	list_splice_init(&pending_list, &sys->wait_obj_queue);
-}
-
-void flush_wait_obj_requests(void)
-{
-	struct request *req, *n;
-	LIST_HEAD(pending_list);
-
-	list_splice_init(&sys->wait_obj_queue, &pending_list);
-
-	list_for_each_entry_safe(req, n, &pending_list, request_list) {
-		list_del(&req->request_list);
-		requeue_request(req);
-	}
-}
-
-static void queue_peer_request(struct request *req)
-{
-	req->local_oid = req->rq.obj.oid;
-	if (req->local_oid) {
-		if (check_request_epoch(req) < 0)
-			return;
-		if (request_in_recovery(req))
-			return;
-	}
-
-	if (req->rq.flags & SD_FLAG_CMD_RECOVERY)
-		req->rq.epoch = req->rq.obj.tgt_epoch;
-
-	req->work.fn = do_process_work;
-	req->work.done = io_op_done;
-	queue_work(sys->io_wqueue, &req->work);
-}
-
-static void queue_gateway_request(struct request *req)
-{
-	struct sd_req *hdr = &req->rq;
-
-	if (is_access_local(req, hdr->obj.oid))
-		req->local_oid = hdr->obj.oid;
-
-	/*
-	 * If we go for a cached object, we don't care if it is being recovered
-	 */
-	if (sys->enable_write_cache &&
-	    req->rq.flags & SD_FLAG_CMD_CACHE &&
-	    object_is_cached(req->rq.obj.oid))
-		goto queue_work;
-
-	if (req->local_oid)
-		if (request_in_recovery(req))
-			return;
-
-queue_work:
-	req->work.fn = do_process_work;
-	req->work.done = gateway_op_done;
-	queue_work(sys->gateway_wqueue, &req->work);
-}
-
-static void queue_local_request(struct request *req)
-{
-	req->work.fn = do_process_work;
-	req->work.done = local_op_done;
-	queue_work(sys->io_wqueue, &req->work);
-}
-
-static void queue_request(struct request *req)
-{
-	struct sd_req *hdr = &req->rq;
-	struct sd_rsp *rsp = &req->rp;
-
-	/*
-	 * Check the protocol version for all internal commands, and public
-	 * commands that have it set.  We can't enforce it on all public
-	 * ones as it isn't a mandatory part of the public protocol.
-	 */
-	if (hdr->opcode >= 0x80) {
-		if (hdr->proto_ver != SD_SHEEP_PROTO_VER) {
-			rsp->result = SD_RES_VER_MISMATCH;
-			goto done;
-		}
-	} else if (hdr->proto_ver) {
-		if (hdr->proto_ver != SD_PROTO_VER) {
-			rsp->result = SD_RES_VER_MISMATCH;
-			goto done;
-		}
-	}
-
-	req->op = get_sd_op(hdr->opcode);
-	if (!req->op) {
-		eprintf("invalid opcode %d\n", hdr->opcode);
-		rsp->result = SD_RES_INVALID_PARMS;
-		goto done;
-	}
-
-	dprintf("%s\n", op_name(req->op));
-
-	switch (sys->status) {
-	case SD_STATUS_SHUTDOWN:
-		rsp->result = SD_RES_SHUTDOWN;
-		goto done;
-	case SD_STATUS_WAIT_FOR_FORMAT:
-		if (!is_force_op(req->op)) {
-			rsp->result = SD_RES_WAIT_FOR_FORMAT;
-			goto done;
-		}
-		break;
-	case SD_STATUS_WAIT_FOR_JOIN:
-		if (!is_force_op(req->op)) {
-			rsp->result = SD_RES_WAIT_FOR_JOIN;
-			goto done;
-		}
-		break;
-	case SD_STATUS_HALT:
-		if (!is_force_op(req->op)) {
-			rsp->result = SD_RES_HALT;
-			goto done;
-		}
-		break;
-	default:
-		break;
-	}
-
-	/*
-	 * force operations shouldn't access req->vnodes in their
-	 * process_work() and process_main() because they can be
-	 * called before we set up current_vnode_info
-	 */
-	if (!is_force_op(req->op))
-		req->vnodes = get_vnode_info();
-
-	if (is_peer_op(req->op)) {
-		queue_peer_request(req);
-	} else if (is_gateway_op(req->op)) {
-		hdr->epoch = sys->epoch;
-		queue_gateway_request(req);
-	} else if (is_local_op(req->op)) {
-		hdr->epoch = sys->epoch;
-		queue_local_request(req);
-	} else if (is_cluster_op(req->op)) {
-		hdr->epoch = sys->epoch;
-		queue_cluster_request(req);
-	} else {
-		eprintf("unknown operation %d\n", hdr->opcode);
-		rsp->result = SD_RES_SYSTEM_ERROR;
-		goto done;
-	}
-
-	return;
-done:
-	put_request(req);
-}
-
-static void requeue_request(struct request *req)
-{
-	if (req->vnodes)
-		put_vnode_info(req->vnodes);
-	queue_request(req);
-}
-
-static void clear_client_info(struct client_info *ci);
-
-static struct request *alloc_local_request(void *data, int data_length)
-{
-	struct request *req;
-
-	req = xzalloc(sizeof(struct request));
-	if (data_length) {
-		req->data_length = data_length;
-		req->data = data;
-	}
-
-	req->local = 1;
-
-	INIT_LIST_HEAD(&req->request_list);
-
-	return req;
-}
-
-/*
- * Exec the request locally and synchronously.
- *
- * This function takes advantage of gateway's retry mechanism.
- */
-int exec_local_req(struct sd_req *rq, void *data)
-{
-	struct request *req;
-	eventfd_t value = 1;
-	int ret;
-
-	req = alloc_local_request(data, rq->data_length);
-	req->rq = *rq;
-	req->wait_efd = eventfd(0, 0);
-
-	pthread_mutex_lock(&sys->wait_req_lock);
-	list_add_tail(&req->request_list, &sys->wait_req_queue);
-	pthread_mutex_unlock(&sys->wait_req_lock);
-
-	eventfd_write(sys->req_efd, value);
-
-	ret = eventfd_read(req->wait_efd, &value);
-	if (ret < 0)
-		eprintf("event fd read error %m");
-
-	close(req->wait_efd);
-	ret = req->rp.result;
-	free(req);
-
-	return ret;
-}
-
-static struct request *alloc_request(struct client_info *ci, int data_length)
-{
-	struct request *req;
-
-	req = zalloc(sizeof(struct request));
-	if (!req)
-		return NULL;
-
-	req->ci = ci;
-	ci->refcnt++;
-	if (data_length) {
-		req->data_length = data_length;
-		req->data = valloc(data_length);
-		if (!req->data) {
-			free(req);
-			return NULL;
-		}
-	}
-
-	INIT_LIST_HEAD(&req->request_list);
-	uatomic_set(&req->refcnt, 1);
-
-	uatomic_inc(&sys->nr_outstanding_reqs);
-
-	return req;
-}
-
-static void free_request(struct request *req)
-{
-	uatomic_dec(&sys->nr_outstanding_reqs);
-
-	req->ci->refcnt--;
-	put_vnode_info(req->vnodes);
-	free(req->data);
-	free(req);
-}
-
-void put_request(struct request *req)
-{
-	struct client_info *ci = req->ci;
-	eventfd_t value = 1;
-
-	if (uatomic_sub_return(&req->refcnt, 1) > 0)
-		return;
-
-	if (req->local) {
-		req->done = 1;
-		eventfd_write(req->wait_efd, value);
-	} else {
-		if (conn_tx_on(&ci->conn)) {
-			free_request(req);
-			clear_client_info(ci);
-		} else {
-			list_add(&req->request_list, &ci->done_reqs);
-		}
-	}
-}
-
-static void init_rx_hdr(struct client_info *ci)
-{
-	ci->conn.c_rx_state = C_IO_HEADER;
-	ci->rx_req = NULL;
-	ci->conn.rx_length = sizeof(struct sd_req);
-	ci->conn.rx_buf = &ci->conn.rx_hdr;
-}
-
-static inline int begin_rx(struct client_info *ci)
-{
-	int ret;
-	uint64_t data_len;
-	struct connection *conn = &ci->conn;
-	struct sd_req *hdr = &conn->rx_hdr;
-	struct request *req;
-
-	switch (conn->c_rx_state) {
-	case C_IO_HEADER:
-		ret = rx(conn, C_IO_DATA_INIT);
-		if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
-			break;
-	case C_IO_DATA_INIT:
-		data_len = hdr->data_length;
-
-		req = alloc_request(ci, data_len);
-		if (!req) {
-			conn->c_rx_state = C_IO_CLOSED;
-			break;
-		}
-		ci->rx_req = req;
-
-		/* use le_to_cpu */
-		memcpy(&req->rq, hdr, sizeof(req->rq));
-
-		if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
-			conn->c_rx_state = C_IO_DATA;
-			conn->rx_length = data_len;
-			conn->rx_buf = req->data;
-		} else {
-			conn->c_rx_state = C_IO_END;
-			break;
-		}
-	case C_IO_DATA:
-		ret = rx(conn, C_IO_END);
-		break;
-	default:
-		eprintf("bug: unknown state %d\n", conn->c_rx_state);
-	}
-
-	if (is_conn_dead(conn)) {
-		clear_client_info(ci);
-		return -1;
-	}
-
-	/* Short read happens */
-	if (conn->c_rx_state != C_IO_END)
-		return -1;
-
-	return 0;
-}
-
-static inline void finish_rx(struct client_info *ci)
-{
-	struct request *req;
-	struct sd_req *hdr = &ci->conn.rx_hdr;
-
-	req = ci->rx_req;
-	init_rx_hdr(ci);
-	if (hdr->flags & SD_FLAG_CMD_WRITE)
-		req->rp.data_length = 0;
-	else
-		req->rp.data_length = hdr->data_length;
-
-	dprintf("%d, %s:%d\n", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
-	queue_request(req);
-}
-
-static void do_client_rx(struct client_info *ci)
-{
-	if (begin_rx(ci) < 0)
-		return;
-
-	finish_rx(ci);
-}
-
-static void init_tx_hdr(struct client_info *ci)
-{
-	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
-	struct request *req;
-
-	assert(!list_empty(&ci->done_reqs));
-
-	memset(rsp, 0, sizeof(*rsp));
-
-	req = list_first_entry(&ci->done_reqs, struct request, request_list);
-	list_del(&req->request_list);
-
-	ci->tx_req = req;
-	ci->conn.tx_length = sizeof(*rsp);
-	ci->conn.c_tx_state = C_IO_HEADER;
-	ci->conn.tx_buf = rsp;
-
-	/* use cpu_to_le */
-	memcpy(rsp, &req->rp, sizeof(*rsp));
-
-	rsp->epoch = sys->epoch;
-	rsp->opcode = req->rq.opcode;
-	rsp->id = req->rq.id;
-}
-
-static inline int begin_tx(struct client_info *ci)
-{
-	int ret, opt;
-	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
-
-	/* If short send happens, we don't need init hdr */
-	if (!ci->tx_req)
-		init_tx_hdr(ci);
-
-	opt = 1;
-	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
-
-	switch (ci->conn.c_tx_state) {
-	case C_IO_HEADER:
-		ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
-		if (!ret)
-			break;
-
-		if (rsp->data_length) {
-			ci->conn.tx_length = rsp->data_length;
-			ci->conn.tx_buf = ci->tx_req->data;
-			ci->conn.c_tx_state = C_IO_DATA;
-		} else {
-			ci->conn.c_tx_state = C_IO_END;
-			break;
-		}
-	case C_IO_DATA:
-		ret = tx(&ci->conn, C_IO_END, 0);
-		if (!ret)
-			break;
-	default:
-		break;
-	}
-
-	opt = 0;
-	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
-
-	if (is_conn_dead(&ci->conn)) {
-		clear_client_info(ci);
-		return -1;
-	}
-	return 0;
-}
-
-/* Return 1 if short send happens or we have more data to send */
-static inline int finish_tx(struct client_info *ci)
-{
-	/* Finish sending one response */
-	if (ci->conn.c_tx_state == C_IO_END) {
-		dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
-			ci->conn.ipstr, ci->conn.port);
-		free_request(ci->tx_req);
-		ci->tx_req = NULL;
-	}
-	if (ci->tx_req || !list_empty(&ci->done_reqs))
-		return 1;
-	return 0;
-}
-
-static void do_client_tx(struct client_info *ci)
-{
-	if (list_empty(&ci->done_reqs)) {
-		if (conn_tx_off(&ci->conn))
-			clear_client_info(ci);
-		return;
-	}
-again:
-	if (begin_tx(ci) < 0)
-		return;
-
-	if (finish_tx(ci))
-		goto again;
-
-	/* Let's go sleep, and put_request() will wake me up */
-	if (conn_tx_off(&ci->conn))
-		clear_client_info(ci);
-}
-
-static void destroy_client(struct client_info *ci)
-{
-	dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
-	close(ci->conn.fd);
-	free(ci);
-}
-
-static void clear_client_info(struct client_info *ci)
-{
-	struct request *req, *t;
-
-	dprintf("connection seems to be dead\n");
-
-	if (ci->rx_req) {
-		free_request(ci->rx_req);
-		ci->rx_req = NULL;
-	}
-
-	if (ci->tx_req) {
-		free_request(ci->tx_req);
-		ci->tx_req = NULL;
-	}
-
-	list_for_each_entry_safe(req, t, &ci->done_reqs, request_list) {
-		list_del(&req->request_list);
-		free_request(req);
-	}
-
-	unregister_event(ci->conn.fd);
-
-	dprintf("refcnt:%d, fd:%d, %s:%d\n",
-		ci->refcnt, ci->conn.fd,
-		ci->conn.ipstr, ci->conn.port);
-
-	if (ci->refcnt)
-		return;
-
-	destroy_client(ci);
-}
-
-static struct client_info *create_client(int fd, struct cluster_info *cluster)
-{
-	struct client_info *ci;
-	struct sockaddr_storage from;
-	socklen_t namesize = sizeof(from);
-
-	ci = zalloc(sizeof(*ci));
-	if (!ci)
-		return NULL;
-
-	if (getpeername(fd, (struct sockaddr *)&from, &namesize))
-		return NULL;
-
-	switch (from.ss_family) {
-	case AF_INET:
-		ci->conn.port = ntohs(((struct sockaddr_in *)&from)->sin_port);
-		inet_ntop(AF_INET, &((struct sockaddr_in *)&from)->sin_addr,
-				ci->conn.ipstr, sizeof(ci->conn.ipstr));
-		break;
-	case AF_INET6:
-		ci->conn.port = ntohs(((struct sockaddr_in6 *)&from)->sin6_port);
-		inet_ntop(AF_INET6, &((struct sockaddr_in6 *)&from)->sin6_addr,
-				ci->conn.ipstr, sizeof(ci->conn.ipstr));
-		break;
-	}
-
-	ci->conn.fd = fd;
-	ci->conn.events = EPOLLIN;
-	ci->refcnt = 0;
-
-	INIT_LIST_HEAD(&ci->done_reqs);
-
-	init_rx_hdr(ci);
-
-	return ci;
-}
-
-static void client_handler(int fd, int events, void *data)
-{
-	struct client_info *ci = (struct client_info *)data;
-
-	dprintf("%x, rx %d, tx %d\n", events, ci->conn.c_rx_state,
-		ci->conn.c_tx_state);
-
-	if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
-		return clear_client_info(ci);
-
-	if (events & EPOLLIN)
-		do_client_rx(ci);
-
-	if (events & EPOLLOUT)
-		do_client_tx(ci);
-}
-
-static void listen_handler(int listen_fd, int events, void *data)
-{
-	struct sockaddr_storage from;
-	socklen_t namesize;
-	int fd, ret;
-	struct client_info *ci;
-
-	if (sys_stat_shutdown()) {
-		dprintf("unregistering connection %d\n", listen_fd);
-		unregister_event(listen_fd);
-		return;
-	}
-
-	namesize = sizeof(from);
-	fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
-	if (fd < 0) {
-		eprintf("failed to accept a new connection: %m\n");
-		return;
-	}
-
-	ret = set_keepalive(fd);
-	if (ret) {
-		close(fd);
-		return;
-	}
-
-	ret = set_nodelay(fd);
-	if (ret) {
-		close(fd);
-		return;
-	}
-
-	ret = set_nonblocking(fd);
-	if (ret) {
-		close(fd);
-		return;
-	}
-
-	ci = create_client(fd, data);
-	if (!ci) {
-		close(fd);
-		return;
-	}
-
-	ret = register_event(fd, client_handler, ci);
-	if (ret) {
-		destroy_client(ci);
-		return;
-	}
-
-	dprintf("accepted a new connection: %d\n", fd);
-}
-
-static int create_listen_port_fn(int fd, void *data)
-{
-	return register_event(fd, listen_handler, data);
-}
-
-int create_listen_port(int port, void *data)
-{
-	return create_listen_ports(port, create_listen_port_fn, data);
-}
-
-
-static void req_handler(int listen_fd, int events, void *data)
-{
-	eventfd_t value;
-	struct request *req, *t;
-	LIST_HEAD(pending_list);
-	int ret;
-
-	if (events & EPOLLERR)
-		eprintf("request handler error\n");
-
-	ret = eventfd_read(listen_fd, &value);
-	if (ret < 0)
-		return;
-
-	pthread_mutex_lock(&sys->wait_req_lock);
-	list_splice_init(&sys->wait_req_queue, &pending_list);
-	pthread_mutex_unlock(&sys->wait_req_lock);
-
-	list_for_each_entry_safe(req, t, &pending_list, request_list) {
-		list_del(&req->request_list);
-		queue_request(req);
-	}
-}
-
-void local_req_init(void)
-{
-	pthread_mutex_init(&sys->wait_req_lock, NULL);
-	sys->req_efd = eventfd(0, EFD_NONBLOCK);
-	register_event(sys->req_efd, req_handler, NULL);
-}
-- 
1.7.10.2




More information about the sheepdog mailing list