[sheepdog] [PATCH v2 7/7] sheep: rename sdnet.c to request.c
Liu Yuan
namei.unix at gmail.com
Thu Jul 19 04:02:45 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
for sheep itself, sd as a file name means nothing. We can have a more consistent
file naming with request.c
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/Makefile.am | 2 +-
sheep/request.c | 878 +++++++++++++++++++++++++++++++++++++++++++++++++++++
sheep/sdnet.c | 878 -----------------------------------------------------
3 files changed, 879 insertions(+), 879 deletions(-)
create mode 100644 sheep/request.c
delete mode 100644 sheep/sdnet.c
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index f8017b4..16c79f0 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -24,7 +24,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include \
sbin_PROGRAMS = sheep
-sheep_SOURCES = sheep.c group.c sdnet.c gateway.c store.c vdi.c work.c \
+sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c work.c \
journal.c ops.c recovery.c cluster/local.c \
object_cache.c object_list_cache.c sockfd_cache.c
diff --git a/sheep/request.c b/sheep/request.c
new file mode 100644
index 0000000..35ac488
--- /dev/null
+++ b/sheep/request.c
@@ -0,0 +1,878 @@
+/*
+ * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <pthread.h>
+#include <sys/eventfd.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <sys/epoll.h>
+#include <fcntl.h>
+
+#include "sheep_priv.h"
+
+static void requeue_request(struct request *req);
+
+static int is_access_local(struct request *req, uint64_t oid)
+{
+ struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
+ int nr_copies;
+ int i;
+
+ nr_copies = get_nr_copies(req->vnodes);
+ oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
+
+ for (i = 0; i < nr_copies; i++) {
+ if (vnode_is_local(obj_vnodes[i]))
+ return 1;
+ }
+
+ return 0;
+}
+
+static void io_op_done(struct work *work)
+{
+ struct request *req = container_of(work, struct request, work);
+
+ if (req->rp.result == SD_RES_EIO) {
+ req->rp.result = SD_RES_NETWORK_ERROR;
+
+ eprintf("leaving sheepdog cluster\n");
+ leave_cluster();
+ }
+
+ put_request(req);
+ return;
+}
+
+static void gateway_op_done(struct work *work)
+{
+ struct request *req = container_of(work, struct request, work);
+ struct sd_req *hdr = &req->rq;
+
+ switch (req->rp.result) {
+ case SD_RES_OLD_NODE_VER:
+ if (req->rp.epoch > sys->epoch) {
+ list_add_tail(&req->request_list,
+ &sys->wait_rw_queue);
+ /*
+ * Gateway of this node is expected to process this
+ * request later when epoch is lifted.
+ */
+ return;
+ }
+ /*FALLTHRU*/
+ case SD_RES_NEW_NODE_VER:
+ case SD_RES_NETWORK_ERROR:
+ case SD_RES_WAIT_FOR_JOIN:
+ case SD_RES_WAIT_FOR_FORMAT:
+ dprintf("retrying failed I/O request "
+ "op %s result %d epoch %d, sys epoch %d\n",
+ op_name(req->op),
+ req->rp.result,
+ req->rq.epoch,
+ sys->epoch);
+ goto retry;
+ case SD_RES_EIO:
+ if (is_access_local(req, hdr->obj.oid)) {
+ eprintf("leaving sheepdog cluster\n");
+ leave_cluster();
+ goto retry;
+ }
+ break;
+ case SD_RES_SUCCESS:
+ break;
+ }
+
+ put_request(req);
+ return;
+retry:
+ requeue_request(req);
+}
+
+static void local_op_done(struct work *work)
+{
+ struct request *req = container_of(work, struct request, work);
+
+ if (has_process_main(req->op)) {
+ req->rp.result = do_process_main(req->op, &req->rq,
+ &req->rp, req->data);
+ }
+
+ put_request(req);
+}
+
+static int check_request_epoch(struct request *req)
+{
+ if (before(req->rq.epoch, sys->epoch)) {
+ eprintf("old node version %u, %u (%s)\n",
+ sys->epoch, req->rq.epoch, op_name(req->op));
+ /* ask gateway to retry. */
+ req->rp.result = SD_RES_OLD_NODE_VER;
+ req->rp.epoch = sys->epoch;
+ put_request(req);
+ return -1;
+ } else if (after(req->rq.epoch, sys->epoch)) {
+ eprintf("new node version %u, %u (%s)\n",
+ sys->epoch, req->rq.epoch, op_name(req->op));
+
+ /* put on local wait queue, waiting for local epoch
+ to be lifted */
+ req->rp.result = SD_RES_NEW_NODE_VER;
+ list_add_tail(&req->request_list, &sys->wait_rw_queue);
+ return -1;
+ }
+
+ return 0;
+}
+
+static bool request_in_recovery(struct request *req)
+{
+ /*
+ * Request from recovery should go down the Farm even if
+ * oid_in_recovery() returns true because we should also try snap
+ * cache of the Farm and return the error code back if not found.
+ */
+ if (oid_in_recovery(req->local_oid) &&
+ !(req->rq.flags & SD_FLAG_CMD_RECOVERY)) {
+ /*
+ * Put request on wait queues of local node
+ */
+ if (is_recovery_init()) {
+ req->rp.result = SD_RES_OBJ_RECOVERING;
+ list_add_tail(&req->request_list,
+ &sys->wait_rw_queue);
+ } else {
+ list_add_tail(&req->request_list,
+ &sys->wait_obj_queue);
+ }
+ return true;
+ }
+ return false;
+}
+
+void resume_wait_epoch_requests(void)
+{
+ struct request *req, *t;
+ LIST_HEAD(pending_list);
+
+ list_splice_init(&sys->wait_rw_queue, &pending_list);
+
+ list_for_each_entry_safe(req, t, &pending_list, request_list) {
+ switch (req->rp.result) {
+ case SD_RES_OLD_NODE_VER:
+ /*
+ * Gateway retries to send the request when
+ * its epoch changes.
+ */
+ assert(is_gateway_op(req->op));
+ req->rq.epoch = sys->epoch;
+ list_del(&req->request_list);
+ requeue_request(req);
+ break;
+ case SD_RES_NEW_NODE_VER:
+ /* Peer retries the request locally when its epoch changes. */
+ assert(!is_gateway_op(req->op));
+ list_del(&req->request_list);
+ requeue_request(req);
+ break;
+ default:
+ break;
+ }
+ }
+
+ list_splice_init(&pending_list, &sys->wait_rw_queue);
+}
+
+void resume_wait_recovery_requests(void)
+{
+ struct request *req, *t;
+ LIST_HEAD(pending_list);
+
+ list_splice_init(&sys->wait_rw_queue, &pending_list);
+
+ list_for_each_entry_safe(req, t, &pending_list, request_list) {
+ if (req->rp.result != SD_RES_OBJ_RECOVERING)
+ continue;
+
+ dprintf("resume wait oid %" PRIx64 "\n", req->local_oid);
+ list_del(&req->request_list);
+ requeue_request(req);
+ }
+
+ list_splice_init(&pending_list, &sys->wait_rw_queue);
+}
+
+void resume_wait_obj_requests(uint64_t oid)
+{
+ struct request *req, *t;
+ LIST_HEAD(pending_list);
+
+ list_splice_init(&sys->wait_obj_queue, &pending_list);
+
+ list_for_each_entry_safe(req, t, &pending_list, request_list) {
+ if (req->local_oid != oid)
+ continue;
+
+ /* the object requested by a pending request has been
+ * recovered, notify the pending request. */
+ dprintf("retry %" PRIx64 "\n", req->local_oid);
+ list_del(&req->request_list);
+ requeue_request(req);
+ }
+ list_splice_init(&pending_list, &sys->wait_obj_queue);
+}
+
+void flush_wait_obj_requests(void)
+{
+ struct request *req, *n;
+ LIST_HEAD(pending_list);
+
+ list_splice_init(&sys->wait_obj_queue, &pending_list);
+
+ list_for_each_entry_safe(req, n, &pending_list, request_list) {
+ list_del(&req->request_list);
+ requeue_request(req);
+ }
+}
+
+static void queue_peer_request(struct request *req)
+{
+ req->local_oid = req->rq.obj.oid;
+ if (req->local_oid) {
+ if (check_request_epoch(req) < 0)
+ return;
+ if (request_in_recovery(req))
+ return;
+ }
+
+ if (req->rq.flags & SD_FLAG_CMD_RECOVERY)
+ req->rq.epoch = req->rq.obj.tgt_epoch;
+
+ req->work.fn = do_process_work;
+ req->work.done = io_op_done;
+ queue_work(sys->io_wqueue, &req->work);
+}
+
+static void queue_gateway_request(struct request *req)
+{
+ struct sd_req *hdr = &req->rq;
+
+ if (is_access_local(req, hdr->obj.oid))
+ req->local_oid = hdr->obj.oid;
+
+ /*
+ * If we go for a cached object, we don't care if it is being recovered
+ */
+ if (sys->enable_write_cache &&
+ req->rq.flags & SD_FLAG_CMD_CACHE &&
+ object_is_cached(req->rq.obj.oid))
+ goto queue_work;
+
+ if (req->local_oid)
+ if (request_in_recovery(req))
+ return;
+
+queue_work:
+ req->work.fn = do_process_work;
+ req->work.done = gateway_op_done;
+ queue_work(sys->gateway_wqueue, &req->work);
+}
+
+static void queue_local_request(struct request *req)
+{
+ req->work.fn = do_process_work;
+ req->work.done = local_op_done;
+ queue_work(sys->io_wqueue, &req->work);
+}
+
+static void queue_request(struct request *req)
+{
+ struct sd_req *hdr = &req->rq;
+ struct sd_rsp *rsp = &req->rp;
+
+ /*
+ * Check the protocol version for all internal commands, and public
+ * commands that have it set. We can't enforce it on all public
+ * ones as it isn't a mandatory part of the public protocol.
+ */
+ if (hdr->opcode >= 0x80) {
+ if (hdr->proto_ver != SD_SHEEP_PROTO_VER) {
+ rsp->result = SD_RES_VER_MISMATCH;
+ goto done;
+ }
+ } else if (hdr->proto_ver) {
+ if (hdr->proto_ver != SD_PROTO_VER) {
+ rsp->result = SD_RES_VER_MISMATCH;
+ goto done;
+ }
+ }
+
+ req->op = get_sd_op(hdr->opcode);
+ if (!req->op) {
+ eprintf("invalid opcode %d\n", hdr->opcode);
+ rsp->result = SD_RES_INVALID_PARMS;
+ goto done;
+ }
+
+ dprintf("%s\n", op_name(req->op));
+
+ switch (sys->status) {
+ case SD_STATUS_SHUTDOWN:
+ rsp->result = SD_RES_SHUTDOWN;
+ goto done;
+ case SD_STATUS_WAIT_FOR_FORMAT:
+ if (!is_force_op(req->op)) {
+ rsp->result = SD_RES_WAIT_FOR_FORMAT;
+ goto done;
+ }
+ break;
+ case SD_STATUS_WAIT_FOR_JOIN:
+ if (!is_force_op(req->op)) {
+ rsp->result = SD_RES_WAIT_FOR_JOIN;
+ goto done;
+ }
+ break;
+ case SD_STATUS_HALT:
+ if (!is_force_op(req->op)) {
+ rsp->result = SD_RES_HALT;
+ goto done;
+ }
+ break;
+ default:
+ break;
+ }
+
+ /*
+ * force operations shouldn't access req->vnodes in their
+ * process_work() and process_main() because they can be
+ * called before we set up current_vnode_info
+ */
+ if (!is_force_op(req->op))
+ req->vnodes = get_vnode_info();
+
+ if (is_peer_op(req->op)) {
+ queue_peer_request(req);
+ } else if (is_gateway_op(req->op)) {
+ hdr->epoch = sys->epoch;
+ queue_gateway_request(req);
+ } else if (is_local_op(req->op)) {
+ hdr->epoch = sys->epoch;
+ queue_local_request(req);
+ } else if (is_cluster_op(req->op)) {
+ hdr->epoch = sys->epoch;
+ queue_cluster_request(req);
+ } else {
+ eprintf("unknown operation %d\n", hdr->opcode);
+ rsp->result = SD_RES_SYSTEM_ERROR;
+ goto done;
+ }
+
+ return;
+done:
+ put_request(req);
+}
+
+static void requeue_request(struct request *req)
+{
+ if (req->vnodes)
+ put_vnode_info(req->vnodes);
+ queue_request(req);
+}
+
+static void clear_client_info(struct client_info *ci);
+
+static struct request *alloc_local_request(void *data, int data_length)
+{
+ struct request *req;
+
+ req = xzalloc(sizeof(struct request));
+ if (data_length) {
+ req->data_length = data_length;
+ req->data = data;
+ }
+
+ req->local = 1;
+
+ INIT_LIST_HEAD(&req->request_list);
+
+ return req;
+}
+
+/*
+ * Exec the request locally and synchronously.
+ *
+ * This function takes advantage of gateway's retry mechanism.
+ */
+int exec_local_req(struct sd_req *rq, void *data)
+{
+ struct request *req;
+ eventfd_t value = 1;
+ int ret;
+
+ req = alloc_local_request(data, rq->data_length);
+ req->rq = *rq;
+ req->wait_efd = eventfd(0, 0);
+
+ pthread_mutex_lock(&sys->wait_req_lock);
+ list_add_tail(&req->request_list, &sys->wait_req_queue);
+ pthread_mutex_unlock(&sys->wait_req_lock);
+
+ eventfd_write(sys->req_efd, value);
+
+ ret = eventfd_read(req->wait_efd, &value);
+ if (ret < 0)
+ eprintf("event fd read error %m");
+
+ close(req->wait_efd);
+ ret = req->rp.result;
+ free(req);
+
+ return ret;
+}
+
+static struct request *alloc_request(struct client_info *ci, int data_length)
+{
+ struct request *req;
+
+ req = zalloc(sizeof(struct request));
+ if (!req)
+ return NULL;
+
+ req->ci = ci;
+ ci->refcnt++;
+ if (data_length) {
+ req->data_length = data_length;
+ req->data = valloc(data_length);
+ if (!req->data) {
+ free(req);
+ return NULL;
+ }
+ }
+
+ INIT_LIST_HEAD(&req->request_list);
+ uatomic_set(&req->refcnt, 1);
+
+ uatomic_inc(&sys->nr_outstanding_reqs);
+
+ return req;
+}
+
+static void free_request(struct request *req)
+{
+ uatomic_dec(&sys->nr_outstanding_reqs);
+
+ req->ci->refcnt--;
+ put_vnode_info(req->vnodes);
+ free(req->data);
+ free(req);
+}
+
+void put_request(struct request *req)
+{
+ struct client_info *ci = req->ci;
+ eventfd_t value = 1;
+
+ if (uatomic_sub_return(&req->refcnt, 1) > 0)
+ return;
+
+ if (req->local) {
+ req->done = 1;
+ eventfd_write(req->wait_efd, value);
+ } else {
+ if (conn_tx_on(&ci->conn)) {
+ clear_client_info(ci);
+ free_request(req);
+ } else {
+ list_add(&req->request_list, &ci->done_reqs);
+ }
+ }
+}
+
+static void init_rx_hdr(struct client_info *ci)
+{
+ ci->conn.c_rx_state = C_IO_HEADER;
+ ci->rx_req = NULL;
+ ci->conn.rx_length = sizeof(struct sd_req);
+ ci->conn.rx_buf = &ci->conn.rx_hdr;
+}
+
+static inline int begin_rx(struct client_info *ci)
+{
+ int ret;
+ uint64_t data_len;
+ struct connection *conn = &ci->conn;
+ struct sd_req *hdr = &conn->rx_hdr;
+ struct request *req;
+
+ switch (conn->c_rx_state) {
+ case C_IO_HEADER:
+ ret = rx(conn, C_IO_DATA_INIT);
+ if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
+ break;
+ case C_IO_DATA_INIT:
+ data_len = hdr->data_length;
+
+ req = alloc_request(ci, data_len);
+ if (!req) {
+ conn->c_rx_state = C_IO_CLOSED;
+ break;
+ }
+ ci->rx_req = req;
+
+ /* use le_to_cpu */
+ memcpy(&req->rq, hdr, sizeof(req->rq));
+
+ if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
+ conn->c_rx_state = C_IO_DATA;
+ conn->rx_length = data_len;
+ conn->rx_buf = req->data;
+ } else {
+ conn->c_rx_state = C_IO_END;
+ break;
+ }
+ case C_IO_DATA:
+ ret = rx(conn, C_IO_END);
+ break;
+ default:
+ eprintf("bug: unknown state %d\n", conn->c_rx_state);
+ }
+
+ if (is_conn_dead(conn)) {
+ clear_client_info(ci);
+ return -1;
+ }
+
+ /* Short read happens */
+ if (conn->c_rx_state != C_IO_END)
+ return -1;
+
+ return 0;
+}
+
+static inline void finish_rx(struct client_info *ci)
+{
+ struct request *req;
+ struct sd_req *hdr = &ci->conn.rx_hdr;
+
+ req = ci->rx_req;
+ init_rx_hdr(ci);
+ if (hdr->flags & SD_FLAG_CMD_WRITE)
+ req->rp.data_length = 0;
+ else
+ req->rp.data_length = hdr->data_length;
+
+ dprintf("%d, %s:%d\n", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
+ queue_request(req);
+}
+
+static void do_client_rx(struct client_info *ci)
+{
+ if (begin_rx(ci) < 0)
+ return;
+
+ finish_rx(ci);
+}
+
+static void init_tx_hdr(struct client_info *ci)
+{
+ struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+ struct request *req;
+
+ assert(!list_empty(&ci->done_reqs));
+
+ memset(rsp, 0, sizeof(*rsp));
+
+ req = list_first_entry(&ci->done_reqs, struct request, request_list);
+ list_del(&req->request_list);
+
+ ci->tx_req = req;
+ ci->conn.tx_length = sizeof(*rsp);
+ ci->conn.c_tx_state = C_IO_HEADER;
+ ci->conn.tx_buf = rsp;
+
+ /* use cpu_to_le */
+ memcpy(rsp, &req->rp, sizeof(*rsp));
+
+ rsp->epoch = sys->epoch;
+ rsp->opcode = req->rq.opcode;
+ rsp->id = req->rq.id;
+}
+
+static inline int begin_tx(struct client_info *ci)
+{
+ int ret, opt;
+ struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+
+ /* If short send happens, we don't need init hdr */
+ if (!ci->tx_req)
+ init_tx_hdr(ci);
+
+ opt = 1;
+ setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+ switch (ci->conn.c_tx_state) {
+ case C_IO_HEADER:
+ ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
+ if (!ret)
+ break;
+
+ if (rsp->data_length) {
+ ci->conn.tx_length = rsp->data_length;
+ ci->conn.tx_buf = ci->tx_req->data;
+ ci->conn.c_tx_state = C_IO_DATA;
+ } else {
+ ci->conn.c_tx_state = C_IO_END;
+ break;
+ }
+ case C_IO_DATA:
+ ret = tx(&ci->conn, C_IO_END, 0);
+ if (!ret)
+ break;
+ default:
+ break;
+ }
+
+ opt = 0;
+ setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+ if (is_conn_dead(&ci->conn)) {
+ clear_client_info(ci);
+ return -1;
+ }
+ return 0;
+}
+
+/* Return 1 if short send happens or we have more data to send */
+static inline int finish_tx(struct client_info *ci)
+{
+ /* Finish sending one response */
+ if (ci->conn.c_tx_state == C_IO_END) {
+ dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
+ ci->conn.ipstr, ci->conn.port);
+ free_request(ci->tx_req);
+ ci->tx_req = NULL;
+ }
+ if (ci->tx_req || !list_empty(&ci->done_reqs))
+ return 1;
+ return 0;
+}
+
+static void do_client_tx(struct client_info *ci)
+{
+ if (list_empty(&ci->done_reqs)) {
+ if (conn_tx_off(&ci->conn))
+ clear_client_info(ci);
+ return;
+ }
+again:
+ if (begin_tx(ci) < 0)
+ return;
+
+ if (finish_tx(ci))
+ goto again;
+
+ /* Let's go sleep, and put_request() will wake me up */
+ if (conn_tx_off(&ci->conn))
+ clear_client_info(ci);
+}
+
+static void destroy_client(struct client_info *ci)
+{
+ dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
+ close(ci->conn.fd);
+ free(ci);
+}
+
+static void clear_client_info(struct client_info *ci)
+{
+ struct request *req, *t;
+
+ dprintf("connection seems to be dead\n");
+
+ if (ci->rx_req) {
+ free_request(ci->rx_req);
+ ci->rx_req = NULL;
+ }
+
+ if (ci->tx_req) {
+ free_request(ci->tx_req);
+ ci->tx_req = NULL;
+ }
+
+ list_for_each_entry_safe(req, t, &ci->done_reqs, request_list) {
+ list_del(&req->request_list);
+ free_request(req);
+ }
+
+ unregister_event(ci->conn.fd);
+
+ dprintf("refcnt:%d, fd:%d, %s:%d\n",
+ ci->refcnt, ci->conn.fd,
+ ci->conn.ipstr, ci->conn.port);
+
+ if (ci->refcnt)
+ return;
+
+ destroy_client(ci);
+}
+
+static struct client_info *create_client(int fd, struct cluster_info *cluster)
+{
+ struct client_info *ci;
+ struct sockaddr_storage from;
+ socklen_t namesize = sizeof(from);
+
+ ci = zalloc(sizeof(*ci));
+ if (!ci)
+ return NULL;
+
+ if (getpeername(fd, (struct sockaddr *)&from, &namesize))
+ return NULL;
+
+ switch (from.ss_family) {
+ case AF_INET:
+ ci->conn.port = ntohs(((struct sockaddr_in *)&from)->sin_port);
+ inet_ntop(AF_INET, &((struct sockaddr_in *)&from)->sin_addr,
+ ci->conn.ipstr, sizeof(ci->conn.ipstr));
+ break;
+ case AF_INET6:
+ ci->conn.port = ntohs(((struct sockaddr_in6 *)&from)->sin6_port);
+ inet_ntop(AF_INET6, &((struct sockaddr_in6 *)&from)->sin6_addr,
+ ci->conn.ipstr, sizeof(ci->conn.ipstr));
+ break;
+ }
+
+ ci->conn.fd = fd;
+ ci->conn.events = EPOLLIN;
+ ci->refcnt = 0;
+
+ INIT_LIST_HEAD(&ci->done_reqs);
+
+ init_rx_hdr(ci);
+
+ return ci;
+}
+
+static void client_handler(int fd, int events, void *data)
+{
+ struct client_info *ci = (struct client_info *)data;
+
+ dprintf("%x, rx %d, tx %d\n", events, ci->conn.c_rx_state,
+ ci->conn.c_tx_state);
+
+ if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
+ return clear_client_info(ci);
+
+ if (events & EPOLLIN)
+ do_client_rx(ci);
+
+ if (events & EPOLLOUT)
+ do_client_tx(ci);
+}
+
+static void listen_handler(int listen_fd, int events, void *data)
+{
+ struct sockaddr_storage from;
+ socklen_t namesize;
+ int fd, ret;
+ struct client_info *ci;
+
+ if (sys_stat_shutdown()) {
+ dprintf("unregistering connection %d\n", listen_fd);
+ unregister_event(listen_fd);
+ return;
+ }
+
+ namesize = sizeof(from);
+ fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
+ if (fd < 0) {
+ eprintf("failed to accept a new connection: %m\n");
+ return;
+ }
+
+ ret = set_keepalive(fd);
+ if (ret) {
+ close(fd);
+ return;
+ }
+
+ ret = set_nodelay(fd);
+ if (ret) {
+ close(fd);
+ return;
+ }
+
+ ret = set_nonblocking(fd);
+ if (ret) {
+ close(fd);
+ return;
+ }
+
+ ci = create_client(fd, data);
+ if (!ci) {
+ close(fd);
+ return;
+ }
+
+ ret = register_event(fd, client_handler, ci);
+ if (ret) {
+ destroy_client(ci);
+ return;
+ }
+
+ dprintf("accepted a new connection: %d\n", fd);
+}
+
+static int create_listen_port_fn(int fd, void *data)
+{
+ return register_event(fd, listen_handler, data);
+}
+
+int create_listen_port(int port, void *data)
+{
+ return create_listen_ports(port, create_listen_port_fn, data);
+}
+
+
+static void req_handler(int listen_fd, int events, void *data)
+{
+ eventfd_t value;
+ struct request *req, *t;
+ LIST_HEAD(pending_list);
+ int ret;
+
+ if (events & EPOLLERR)
+ eprintf("request handler error\n");
+
+ ret = eventfd_read(listen_fd, &value);
+ if (ret < 0)
+ return;
+
+ pthread_mutex_lock(&sys->wait_req_lock);
+ list_splice_init(&sys->wait_req_queue, &pending_list);
+ pthread_mutex_unlock(&sys->wait_req_lock);
+
+ list_for_each_entry_safe(req, t, &pending_list, request_list) {
+ list_del(&req->request_list);
+ queue_request(req);
+ }
+}
+
+void local_req_init(void)
+{
+ pthread_mutex_init(&sys->wait_req_lock, NULL);
+ sys->req_efd = eventfd(0, EFD_NONBLOCK);
+ register_event(sys->req_efd, req_handler, NULL);
+}
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
deleted file mode 100644
index 7b53b04..0000000
--- a/sheep/sdnet.c
+++ /dev/null
@@ -1,878 +0,0 @@
-/*
- * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License version
- * 2 as published by the Free Software Foundation.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-#include <assert.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <pthread.h>
-#include <sys/eventfd.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h>
-#include <sys/epoll.h>
-#include <fcntl.h>
-
-#include "sheep_priv.h"
-
-static void requeue_request(struct request *req);
-
-static int is_access_local(struct request *req, uint64_t oid)
-{
- struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
- int nr_copies;
- int i;
-
- nr_copies = get_nr_copies(req->vnodes);
- oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
-
- for (i = 0; i < nr_copies; i++) {
- if (vnode_is_local(obj_vnodes[i]))
- return 1;
- }
-
- return 0;
-}
-
-static void io_op_done(struct work *work)
-{
- struct request *req = container_of(work, struct request, work);
-
- if (req->rp.result == SD_RES_EIO) {
- req->rp.result = SD_RES_NETWORK_ERROR;
-
- eprintf("leaving sheepdog cluster\n");
- leave_cluster();
- }
-
- put_request(req);
- return;
-}
-
-static void gateway_op_done(struct work *work)
-{
- struct request *req = container_of(work, struct request, work);
- struct sd_req *hdr = &req->rq;
-
- switch (req->rp.result) {
- case SD_RES_OLD_NODE_VER:
- if (req->rp.epoch > sys->epoch) {
- list_add_tail(&req->request_list,
- &sys->wait_rw_queue);
- /*
- * Gateway of this node is expected to process this
- * request later when epoch is lifted.
- */
- return;
- }
- /*FALLTHRU*/
- case SD_RES_NEW_NODE_VER:
- case SD_RES_NETWORK_ERROR:
- case SD_RES_WAIT_FOR_JOIN:
- case SD_RES_WAIT_FOR_FORMAT:
- dprintf("retrying failed I/O request "
- "op %s result %d epoch %d, sys epoch %d\n",
- op_name(req->op),
- req->rp.result,
- req->rq.epoch,
- sys->epoch);
- goto retry;
- case SD_RES_EIO:
- if (is_access_local(req, hdr->obj.oid)) {
- eprintf("leaving sheepdog cluster\n");
- leave_cluster();
- goto retry;
- }
- break;
- case SD_RES_SUCCESS:
- break;
- }
-
- put_request(req);
- return;
-retry:
- requeue_request(req);
-}
-
-static void local_op_done(struct work *work)
-{
- struct request *req = container_of(work, struct request, work);
-
- if (has_process_main(req->op)) {
- req->rp.result = do_process_main(req->op, &req->rq,
- &req->rp, req->data);
- }
-
- put_request(req);
-}
-
-static int check_request_epoch(struct request *req)
-{
- if (before(req->rq.epoch, sys->epoch)) {
- eprintf("old node version %u, %u (%s)\n",
- sys->epoch, req->rq.epoch, op_name(req->op));
- /* ask gateway to retry. */
- req->rp.result = SD_RES_OLD_NODE_VER;
- req->rp.epoch = sys->epoch;
- put_request(req);
- return -1;
- } else if (after(req->rq.epoch, sys->epoch)) {
- eprintf("new node version %u, %u (%s)\n",
- sys->epoch, req->rq.epoch, op_name(req->op));
-
- /* put on local wait queue, waiting for local epoch
- to be lifted */
- req->rp.result = SD_RES_NEW_NODE_VER;
- list_add_tail(&req->request_list, &sys->wait_rw_queue);
- return -1;
- }
-
- return 0;
-}
-
-static bool request_in_recovery(struct request *req)
-{
- /*
- * Request from recovery should go down the Farm even if
- * oid_in_recovery() returns true because we should also try snap
- * cache of the Farm and return the error code back if not found.
- */
- if (oid_in_recovery(req->local_oid) &&
- !(req->rq.flags & SD_FLAG_CMD_RECOVERY)) {
- /*
- * Put request on wait queues of local node
- */
- if (is_recovery_init()) {
- req->rp.result = SD_RES_OBJ_RECOVERING;
- list_add_tail(&req->request_list,
- &sys->wait_rw_queue);
- } else {
- list_add_tail(&req->request_list,
- &sys->wait_obj_queue);
- }
- return true;
- }
- return false;
-}
-
-void resume_wait_epoch_requests(void)
-{
- struct request *req, *t;
- LIST_HEAD(pending_list);
-
- list_splice_init(&sys->wait_rw_queue, &pending_list);
-
- list_for_each_entry_safe(req, t, &pending_list, request_list) {
- switch (req->rp.result) {
- case SD_RES_OLD_NODE_VER:
- /*
- * Gateway retries to send the request when
- * its epoch changes.
- */
- assert(is_gateway_op(req->op));
- req->rq.epoch = sys->epoch;
- list_del(&req->request_list);
- requeue_request(req);
- break;
- case SD_RES_NEW_NODE_VER:
- /* Peer retries the request locally when its epoch changes. */
- assert(!is_gateway_op(req->op));
- list_del(&req->request_list);
- requeue_request(req);
- break;
- default:
- break;
- }
- }
-
- list_splice_init(&pending_list, &sys->wait_rw_queue);
-}
-
-void resume_wait_recovery_requests(void)
-{
- struct request *req, *t;
- LIST_HEAD(pending_list);
-
- list_splice_init(&sys->wait_rw_queue, &pending_list);
-
- list_for_each_entry_safe(req, t, &pending_list, request_list) {
- if (req->rp.result != SD_RES_OBJ_RECOVERING)
- continue;
-
- dprintf("resume wait oid %" PRIx64 "\n", req->local_oid);
- list_del(&req->request_list);
- requeue_request(req);
- }
-
- list_splice_init(&pending_list, &sys->wait_rw_queue);
-}
-
-void resume_wait_obj_requests(uint64_t oid)
-{
- struct request *req, *t;
- LIST_HEAD(pending_list);
-
- list_splice_init(&sys->wait_obj_queue, &pending_list);
-
- list_for_each_entry_safe(req, t, &pending_list, request_list) {
- if (req->local_oid != oid)
- continue;
-
- /* the object requested by a pending request has been
- * recovered, notify the pending request. */
- dprintf("retry %" PRIx64 "\n", req->local_oid);
- list_del(&req->request_list);
- requeue_request(req);
- }
- list_splice_init(&pending_list, &sys->wait_obj_queue);
-}
-
-void flush_wait_obj_requests(void)
-{
- struct request *req, *n;
- LIST_HEAD(pending_list);
-
- list_splice_init(&sys->wait_obj_queue, &pending_list);
-
- list_for_each_entry_safe(req, n, &pending_list, request_list) {
- list_del(&req->request_list);
- requeue_request(req);
- }
-}
-
-static void queue_peer_request(struct request *req)
-{
- req->local_oid = req->rq.obj.oid;
- if (req->local_oid) {
- if (check_request_epoch(req) < 0)
- return;
- if (request_in_recovery(req))
- return;
- }
-
- if (req->rq.flags & SD_FLAG_CMD_RECOVERY)
- req->rq.epoch = req->rq.obj.tgt_epoch;
-
- req->work.fn = do_process_work;
- req->work.done = io_op_done;
- queue_work(sys->io_wqueue, &req->work);
-}
-
-static void queue_gateway_request(struct request *req)
-{
- struct sd_req *hdr = &req->rq;
-
- if (is_access_local(req, hdr->obj.oid))
- req->local_oid = hdr->obj.oid;
-
- /*
- * If we go for a cached object, we don't care if it is being recovered
- */
- if (sys->enable_write_cache &&
- req->rq.flags & SD_FLAG_CMD_CACHE &&
- object_is_cached(req->rq.obj.oid))
- goto queue_work;
-
- if (req->local_oid)
- if (request_in_recovery(req))
- return;
-
-queue_work:
- req->work.fn = do_process_work;
- req->work.done = gateway_op_done;
- queue_work(sys->gateway_wqueue, &req->work);
-}
-
-static void queue_local_request(struct request *req)
-{
- req->work.fn = do_process_work;
- req->work.done = local_op_done;
- queue_work(sys->io_wqueue, &req->work);
-}
-
-static void queue_request(struct request *req)
-{
- struct sd_req *hdr = &req->rq;
- struct sd_rsp *rsp = &req->rp;
-
- /*
- * Check the protocol version for all internal commands, and public
- * commands that have it set. We can't enforce it on all public
- * ones as it isn't a mandatory part of the public protocol.
- */
- if (hdr->opcode >= 0x80) {
- if (hdr->proto_ver != SD_SHEEP_PROTO_VER) {
- rsp->result = SD_RES_VER_MISMATCH;
- goto done;
- }
- } else if (hdr->proto_ver) {
- if (hdr->proto_ver != SD_PROTO_VER) {
- rsp->result = SD_RES_VER_MISMATCH;
- goto done;
- }
- }
-
- req->op = get_sd_op(hdr->opcode);
- if (!req->op) {
- eprintf("invalid opcode %d\n", hdr->opcode);
- rsp->result = SD_RES_INVALID_PARMS;
- goto done;
- }
-
- dprintf("%s\n", op_name(req->op));
-
- switch (sys->status) {
- case SD_STATUS_SHUTDOWN:
- rsp->result = SD_RES_SHUTDOWN;
- goto done;
- case SD_STATUS_WAIT_FOR_FORMAT:
- if (!is_force_op(req->op)) {
- rsp->result = SD_RES_WAIT_FOR_FORMAT;
- goto done;
- }
- break;
- case SD_STATUS_WAIT_FOR_JOIN:
- if (!is_force_op(req->op)) {
- rsp->result = SD_RES_WAIT_FOR_JOIN;
- goto done;
- }
- break;
- case SD_STATUS_HALT:
- if (!is_force_op(req->op)) {
- rsp->result = SD_RES_HALT;
- goto done;
- }
- break;
- default:
- break;
- }
-
- /*
- * force operations shouldn't access req->vnodes in their
- * process_work() and process_main() because they can be
- * called before we set up current_vnode_info
- */
- if (!is_force_op(req->op))
- req->vnodes = get_vnode_info();
-
- if (is_peer_op(req->op)) {
- queue_peer_request(req);
- } else if (is_gateway_op(req->op)) {
- hdr->epoch = sys->epoch;
- queue_gateway_request(req);
- } else if (is_local_op(req->op)) {
- hdr->epoch = sys->epoch;
- queue_local_request(req);
- } else if (is_cluster_op(req->op)) {
- hdr->epoch = sys->epoch;
- queue_cluster_request(req);
- } else {
- eprintf("unknown operation %d\n", hdr->opcode);
- rsp->result = SD_RES_SYSTEM_ERROR;
- goto done;
- }
-
- return;
-done:
- put_request(req);
-}
-
-static void requeue_request(struct request *req)
-{
- if (req->vnodes)
- put_vnode_info(req->vnodes);
- queue_request(req);
-}
-
-static void clear_client_info(struct client_info *ci);
-
-static struct request *alloc_local_request(void *data, int data_length)
-{
- struct request *req;
-
- req = xzalloc(sizeof(struct request));
- if (data_length) {
- req->data_length = data_length;
- req->data = data;
- }
-
- req->local = 1;
-
- INIT_LIST_HEAD(&req->request_list);
-
- return req;
-}
-
-/*
- * Exec the request locally and synchronously.
- *
- * This function takes advantage of gateway's retry mechanism.
- */
-int exec_local_req(struct sd_req *rq, void *data)
-{
- struct request *req;
- eventfd_t value = 1;
- int ret;
-
- req = alloc_local_request(data, rq->data_length);
- req->rq = *rq;
- req->wait_efd = eventfd(0, 0);
-
- pthread_mutex_lock(&sys->wait_req_lock);
- list_add_tail(&req->request_list, &sys->wait_req_queue);
- pthread_mutex_unlock(&sys->wait_req_lock);
-
- eventfd_write(sys->req_efd, value);
-
- ret = eventfd_read(req->wait_efd, &value);
- if (ret < 0)
- eprintf("event fd read error %m");
-
- close(req->wait_efd);
- ret = req->rp.result;
- free(req);
-
- return ret;
-}
-
-static struct request *alloc_request(struct client_info *ci, int data_length)
-{
- struct request *req;
-
- req = zalloc(sizeof(struct request));
- if (!req)
- return NULL;
-
- req->ci = ci;
- ci->refcnt++;
- if (data_length) {
- req->data_length = data_length;
- req->data = valloc(data_length);
- if (!req->data) {
- free(req);
- return NULL;
- }
- }
-
- INIT_LIST_HEAD(&req->request_list);
- uatomic_set(&req->refcnt, 1);
-
- uatomic_inc(&sys->nr_outstanding_reqs);
-
- return req;
-}
-
-static void free_request(struct request *req)
-{
- uatomic_dec(&sys->nr_outstanding_reqs);
-
- req->ci->refcnt--;
- put_vnode_info(req->vnodes);
- free(req->data);
- free(req);
-}
-
-void put_request(struct request *req)
-{
- struct client_info *ci = req->ci;
- eventfd_t value = 1;
-
- if (uatomic_sub_return(&req->refcnt, 1) > 0)
- return;
-
- if (req->local) {
- req->done = 1;
- eventfd_write(req->wait_efd, value);
- } else {
- if (conn_tx_on(&ci->conn)) {
- free_request(req);
- clear_client_info(ci);
- } else {
- list_add(&req->request_list, &ci->done_reqs);
- }
- }
-}
-
-static void init_rx_hdr(struct client_info *ci)
-{
- ci->conn.c_rx_state = C_IO_HEADER;
- ci->rx_req = NULL;
- ci->conn.rx_length = sizeof(struct sd_req);
- ci->conn.rx_buf = &ci->conn.rx_hdr;
-}
-
-static inline int begin_rx(struct client_info *ci)
-{
- int ret;
- uint64_t data_len;
- struct connection *conn = &ci->conn;
- struct sd_req *hdr = &conn->rx_hdr;
- struct request *req;
-
- switch (conn->c_rx_state) {
- case C_IO_HEADER:
- ret = rx(conn, C_IO_DATA_INIT);
- if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
- break;
- case C_IO_DATA_INIT:
- data_len = hdr->data_length;
-
- req = alloc_request(ci, data_len);
- if (!req) {
- conn->c_rx_state = C_IO_CLOSED;
- break;
- }
- ci->rx_req = req;
-
- /* use le_to_cpu */
- memcpy(&req->rq, hdr, sizeof(req->rq));
-
- if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
- conn->c_rx_state = C_IO_DATA;
- conn->rx_length = data_len;
- conn->rx_buf = req->data;
- } else {
- conn->c_rx_state = C_IO_END;
- break;
- }
- case C_IO_DATA:
- ret = rx(conn, C_IO_END);
- break;
- default:
- eprintf("bug: unknown state %d\n", conn->c_rx_state);
- }
-
- if (is_conn_dead(conn)) {
- clear_client_info(ci);
- return -1;
- }
-
- /* Short read happens */
- if (conn->c_rx_state != C_IO_END)
- return -1;
-
- return 0;
-}
-
-static inline void finish_rx(struct client_info *ci)
-{
- struct request *req;
- struct sd_req *hdr = &ci->conn.rx_hdr;
-
- req = ci->rx_req;
- init_rx_hdr(ci);
- if (hdr->flags & SD_FLAG_CMD_WRITE)
- req->rp.data_length = 0;
- else
- req->rp.data_length = hdr->data_length;
-
- dprintf("%d, %s:%d\n", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
- queue_request(req);
-}
-
-static void do_client_rx(struct client_info *ci)
-{
- if (begin_rx(ci) < 0)
- return;
-
- finish_rx(ci);
-}
-
-static void init_tx_hdr(struct client_info *ci)
-{
- struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
- struct request *req;
-
- assert(!list_empty(&ci->done_reqs));
-
- memset(rsp, 0, sizeof(*rsp));
-
- req = list_first_entry(&ci->done_reqs, struct request, request_list);
- list_del(&req->request_list);
-
- ci->tx_req = req;
- ci->conn.tx_length = sizeof(*rsp);
- ci->conn.c_tx_state = C_IO_HEADER;
- ci->conn.tx_buf = rsp;
-
- /* use cpu_to_le */
- memcpy(rsp, &req->rp, sizeof(*rsp));
-
- rsp->epoch = sys->epoch;
- rsp->opcode = req->rq.opcode;
- rsp->id = req->rq.id;
-}
-
-static inline int begin_tx(struct client_info *ci)
-{
- int ret, opt;
- struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
-
- /* If short send happens, we don't need init hdr */
- if (!ci->tx_req)
- init_tx_hdr(ci);
-
- opt = 1;
- setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
-
- switch (ci->conn.c_tx_state) {
- case C_IO_HEADER:
- ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
- if (!ret)
- break;
-
- if (rsp->data_length) {
- ci->conn.tx_length = rsp->data_length;
- ci->conn.tx_buf = ci->tx_req->data;
- ci->conn.c_tx_state = C_IO_DATA;
- } else {
- ci->conn.c_tx_state = C_IO_END;
- break;
- }
- case C_IO_DATA:
- ret = tx(&ci->conn, C_IO_END, 0);
- if (!ret)
- break;
- default:
- break;
- }
-
- opt = 0;
- setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
-
- if (is_conn_dead(&ci->conn)) {
- clear_client_info(ci);
- return -1;
- }
- return 0;
-}
-
-/* Return 1 if short send happens or we have more data to send */
-static inline int finish_tx(struct client_info *ci)
-{
- /* Finish sending one response */
- if (ci->conn.c_tx_state == C_IO_END) {
- dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
- ci->conn.ipstr, ci->conn.port);
- free_request(ci->tx_req);
- ci->tx_req = NULL;
- }
- if (ci->tx_req || !list_empty(&ci->done_reqs))
- return 1;
- return 0;
-}
-
-static void do_client_tx(struct client_info *ci)
-{
- if (list_empty(&ci->done_reqs)) {
- if (conn_tx_off(&ci->conn))
- clear_client_info(ci);
- return;
- }
-again:
- if (begin_tx(ci) < 0)
- return;
-
- if (finish_tx(ci))
- goto again;
-
- /* Let's go sleep, and put_request() will wake me up */
- if (conn_tx_off(&ci->conn))
- clear_client_info(ci);
-}
-
-static void destroy_client(struct client_info *ci)
-{
- dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
- close(ci->conn.fd);
- free(ci);
-}
-
-static void clear_client_info(struct client_info *ci)
-{
- struct request *req, *t;
-
- dprintf("connection seems to be dead\n");
-
- if (ci->rx_req) {
- free_request(ci->rx_req);
- ci->rx_req = NULL;
- }
-
- if (ci->tx_req) {
- free_request(ci->tx_req);
- ci->tx_req = NULL;
- }
-
- list_for_each_entry_safe(req, t, &ci->done_reqs, request_list) {
- list_del(&req->request_list);
- free_request(req);
- }
-
- unregister_event(ci->conn.fd);
-
- dprintf("refcnt:%d, fd:%d, %s:%d\n",
- ci->refcnt, ci->conn.fd,
- ci->conn.ipstr, ci->conn.port);
-
- if (ci->refcnt)
- return;
-
- destroy_client(ci);
-}
-
-static struct client_info *create_client(int fd, struct cluster_info *cluster)
-{
- struct client_info *ci;
- struct sockaddr_storage from;
- socklen_t namesize = sizeof(from);
-
- ci = zalloc(sizeof(*ci));
- if (!ci)
- return NULL;
-
- if (getpeername(fd, (struct sockaddr *)&from, &namesize))
- return NULL;
-
- switch (from.ss_family) {
- case AF_INET:
- ci->conn.port = ntohs(((struct sockaddr_in *)&from)->sin_port);
- inet_ntop(AF_INET, &((struct sockaddr_in *)&from)->sin_addr,
- ci->conn.ipstr, sizeof(ci->conn.ipstr));
- break;
- case AF_INET6:
- ci->conn.port = ntohs(((struct sockaddr_in6 *)&from)->sin6_port);
- inet_ntop(AF_INET6, &((struct sockaddr_in6 *)&from)->sin6_addr,
- ci->conn.ipstr, sizeof(ci->conn.ipstr));
- break;
- }
-
- ci->conn.fd = fd;
- ci->conn.events = EPOLLIN;
- ci->refcnt = 0;
-
- INIT_LIST_HEAD(&ci->done_reqs);
-
- init_rx_hdr(ci);
-
- return ci;
-}
-
-static void client_handler(int fd, int events, void *data)
-{
- struct client_info *ci = (struct client_info *)data;
-
- dprintf("%x, rx %d, tx %d\n", events, ci->conn.c_rx_state,
- ci->conn.c_tx_state);
-
- if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
- return clear_client_info(ci);
-
- if (events & EPOLLIN)
- do_client_rx(ci);
-
- if (events & EPOLLOUT)
- do_client_tx(ci);
-}
-
-static void listen_handler(int listen_fd, int events, void *data)
-{
- struct sockaddr_storage from;
- socklen_t namesize;
- int fd, ret;
- struct client_info *ci;
-
- if (sys_stat_shutdown()) {
- dprintf("unregistering connection %d\n", listen_fd);
- unregister_event(listen_fd);
- return;
- }
-
- namesize = sizeof(from);
- fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
- if (fd < 0) {
- eprintf("failed to accept a new connection: %m\n");
- return;
- }
-
- ret = set_keepalive(fd);
- if (ret) {
- close(fd);
- return;
- }
-
- ret = set_nodelay(fd);
- if (ret) {
- close(fd);
- return;
- }
-
- ret = set_nonblocking(fd);
- if (ret) {
- close(fd);
- return;
- }
-
- ci = create_client(fd, data);
- if (!ci) {
- close(fd);
- return;
- }
-
- ret = register_event(fd, client_handler, ci);
- if (ret) {
- destroy_client(ci);
- return;
- }
-
- dprintf("accepted a new connection: %d\n", fd);
-}
-
-static int create_listen_port_fn(int fd, void *data)
-{
- return register_event(fd, listen_handler, data);
-}
-
-int create_listen_port(int port, void *data)
-{
- return create_listen_ports(port, create_listen_port_fn, data);
-}
-
-
-static void req_handler(int listen_fd, int events, void *data)
-{
- eventfd_t value;
- struct request *req, *t;
- LIST_HEAD(pending_list);
- int ret;
-
- if (events & EPOLLERR)
- eprintf("request handler error\n");
-
- ret = eventfd_read(listen_fd, &value);
- if (ret < 0)
- return;
-
- pthread_mutex_lock(&sys->wait_req_lock);
- list_splice_init(&sys->wait_req_queue, &pending_list);
- pthread_mutex_unlock(&sys->wait_req_lock);
-
- list_for_each_entry_safe(req, t, &pending_list, request_list) {
- list_del(&req->request_list);
- queue_request(req);
- }
-}
-
-void local_req_init(void)
-{
- pthread_mutex_init(&sys->wait_req_lock, NULL);
- sys->req_efd = eventfd(0, EFD_NONBLOCK);
- register_event(sys->req_efd, req_handler, NULL);
-}
--
1.7.10.2
More information about the sheepdog
mailing list