[Stgt-devel] [PATCH 7/7] iser core

Pete Wyckoff pw
Mon Jul 30 21:01:05 CEST 2007


Core iSCSI RDMA support.  Adds the iscsi_rdma.c file implementing
the RDMA transport.

Adds a Makefile bit that get turned on if you define ISCSI_RDMA.
No behavior change if that is not defined.  But requires RDMA API
headers and libraries if it is defined.

Adds iSCSI parameters defined in the draft iSER specification.
No change unless initiatior explictly asks for "RDMAExtensions".

Some checks against conn->tp->rdma in iscsid.c to do
RDMA-specific things that did not warrant new iscsi/transport.h
functions.  This includes handling the lack of a status collapse
in iSER, redirecting R2T and DATA_IN outgoing packets to RDMA
handlers, pushing TX state machine without going through epoll,
and some login negotiation changes.

Errata documented in doc/README.iser.

Signed-off-by: Pete Wyckoff <pw at osc.edu>
---
 usr/Makefile           |    5 +
 usr/iscsi/iscsi_if.h   |    5 +
 usr/iscsi/iscsi_rdma.c | 1451 ++++++++++++++++++++++++++++++++++++++++++++++++
 usr/iscsi/iscsid.c     |   97 +++-
 usr/iscsi/param.c      |   38 ++
 usr/iscsi/session.c    |    2 +
 usr/iscsi/target.c     |    8 +
 usr/iscsi/transport.c  |    3 +
 usr/iscsi/transport.h  |    3 +
 9 files changed, 1603 insertions(+), 9 deletions(-)
 create mode 100644 usr/iscsi/iscsi_rdma.c

diff --git a/usr/Makefile b/usr/Makefile
index f48f259..0f2a116 100644
--- a/usr/Makefile
+++ b/usr/Makefile
@@ -37,6 +37,11 @@ TGTD_OBJS += $(addprefix iscsi/, conn.o param.o session.o iscsid.o target.o \
 	chap.o transport.o iscsi_tcp.o isns.o libcrc32c.o)
 TGTD_OBJS += bs_sync.o
 LIBS += -lcrypto -lpthread
+ifneq ($(ISCSI_RDMA),)
+CFLAGS += -DISCSI_RDMA
+TGTD_OBJS += iscsi/iscsi_rdma.o
+LIBS += -libverbs -lrdmacm
+endif
 endif
 
 INCLUDES += -I.
diff --git a/usr/iscsi/iscsi_if.h b/usr/iscsi/iscsi_if.h
index 58a76a2..b4f1e04 100644
--- a/usr/iscsi/iscsi_if.h
+++ b/usr/iscsi/iscsi_if.h
@@ -215,6 +215,11 @@ enum iscsi_param {
 	ISCSI_PARAM_OFMARKINT,
 	ISCSI_PARAM_IFMARKINT,
 	ISCSI_PARAM_MAXCONNECTIONS,
+	/* iSER draft */
+	ISCSI_PARAM_RDMA_EXTENSIONS,
+	ISCSI_PARAM_TARGET_RDSL,
+	ISCSI_PARAM_INITIATOR_RDSL,
+	ISCSI_PARAM_MAX_OUTST_PDU,
 	/* must always be last */
 	ISCSI_PARAM_MAX,
 };
diff --git a/usr/iscsi/iscsi_rdma.c b/usr/iscsi/iscsi_rdma.c
new file mode 100644
index 0000000..88b2b48
--- /dev/null
+++ b/usr/iscsi/iscsi_rdma.c
@@ -0,0 +1,1451 @@
+/*
+ * iSCSI extensions for RDMA (iSER) data path
+ *
+ * Copyright (C) 2007 Dennis Dalessandro (dennis at osc.edu)
+ * Copyright (C) 2007 Ananth Devulapalli (ananth at osc.edu)
+ * Copyright (C) 2007 Pete Wyckoff (pw at osc.edu)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, version 2 of the
+ * License.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <sys/socket.h>
+#include <stdarg.h>
+#include <math.h>
+#include <sys/types.h>
+#include <libgen.h>
+#include <sys/errno.h>
+#include <time.h>
+#include <assert.h>
+#include <poll.h>
+
+#if 0 && !defined(NDEBUG)
+#include <valgrind/memcheck.h>
+#else
+#define VALGRIND_MAKE_MEM_DEFINED(addr,len)
+#endif
+
+#include "infiniband/verbs.h"
+#include "rdma/rdma_cma.h"
+#include "util.h"
+#include "iscsid.h"
+#include "tgtd.h"
+#include "list.h"
+
+#ifndef NDEBUG
+#define iser_debug_level 10
+#define iser_out(lvl,fmt,args...) \
+    do { \
+		if (lvl <= iser_debug_level) \
+		iser_print_info(fmt,##args); \
+    } while (0)
+#else
+#define iser_out(lvl,fmt,...) do { } while (0)
+#endif
+
+/*
+ * The IB-extended version from the kernel.  Stags and VAs are in
+ * big-endian format.
+ */
+struct iser_hdr {
+	uint8_t   flags;
+	uint8_t   rsvd[3];
+	uint32_t  write_stag; /* write rkey */
+	uint64_t  write_va;
+	uint32_t  read_stag;  /* read rkey */
+	uint64_t  read_va;
+} __attribute__((packed));
+
+#define ISER_WSV	(0x08)
+#define ISER_RSV	(0x04)
+#define ISCSI_CTRL	(0x10)
+#define ISER_HELLO	(0x20)
+#define ISER_HELLORPLY	(0x30)
+
+struct recvlist {
+	struct list_head list;
+	struct ibv_sge		sge;
+	void			*buf;
+	struct ibv_recv_wr	wr;
+	int			conn_index;
+	unsigned long		bytes_recvd;
+};
+
+struct sendlist {
+	struct list_head list;
+	struct ibv_sge		sge;
+	void			*buf;
+	struct ibv_send_wr	wr;
+	int			conn_index;
+	int			free; /* if 1 can use if 0 already in use */
+};
+
+/* pre-registered memory for RDMA ops */
+struct mempool {
+	struct list_head list;
+	void *buf;
+};
+
+/* rem_va & rem_stag already available in struct ibv_send_wr */
+struct rdmalist {
+	struct list_head list;
+	struct ibv_sge		sge;
+	void			*buf;
+	struct ibv_send_wr	wr;
+	int			conn_index;
+	int			free;
+	struct iscsi_task	*task;
+};
+
+/*
+ * One of these for each iscsi_connection, adds more fields needed for iser.
+ */
+struct conn_info {
+	int			valid, busy;
+	int			conn_num;  /* index into cl.conn[] */
+	struct ibv_qp		*qp_hndl;
+	struct rdma_cm_id	*cma_id;
+	struct iscsi_connection *iscsi_conn;
+	size_t			rlen;
+	size_t			slen;
+	/* read and write from the initiator's point of view */
+	uint32_t rem_read_stag, rem_write_stag;
+	uint64_t rem_read_va, rem_write_va;
+	/*
+	 * TODO: mv readb/writeb to some task like struct. In case of
+	 * multithreaded apps they will fail.
+	 */
+	uint32_t readb;
+	uint32_t writeb;
+
+	void *srbuf;    /* all sends and recvs on this connection (not rdma) */
+	void *listbuf;  /* space for the send, recv, rdma list elements */
+	struct ibv_mr *srmr;   /* mr for registered srbuf */
+
+	/* point to the lists */
+	struct list_head rdmal, sendl, recvl;
+	struct list_head rdmal_write_busy;  /* rdma writes in progress */
+
+	/* points to the current recvlist, sendlist items for each conn */
+	struct	recvlist	*rcv_comm_event;
+	struct	sendlist	*send_comm_event;
+
+	/* to chain this connection onto the list of those ready to tx */
+	struct list_head conn_tx_ready;
+};
+
+#define ISCSI_LISTEN_PORT	3260
+
+#define MAX_CONN 256
+#define MAX_WQE 250
+#define MAX_SSIZE 8192
+#define MAX_RSIZE 8192
+#define ISER_INITIAL_POST 10  /*XXX make this a global variable and set it to what gets negotiated at login*/
+
+#define MEMPOOL_SZ (64)
+/* 512K + sizeof(iscsi_task) + AHS size */
+#define MEM_SZ (roundup((1 << 19) + sizeof(struct iscsi_task) + 260+28+48, \
+			4096))
+
+#define uint64_from_ptr(p) ((uint64_t)(uintptr_t)(p))
+#define ptr_from_int64(p) (void *)(unsigned long)(p)
+
+/* global connection list variables */
+struct rdma_conn_list {
+	struct conn_info conn[MAX_CONN];
+	/*variables that all connections will share*/
+	struct ibv_context	*ibv_hndl;
+	struct rdma_event_channel *rdma_evt_channel;
+	struct rdma_cm_id	*cma_listen_id;
+	struct ibv_pd		*prot_id;
+	struct ibv_cq		*cq_hndl;
+	struct ibv_comp_channel *cq_channel;
+	struct list_head conn_tx_ready;  /* conns with tasks ready to tx */
+
+	/* prereg rdma bufs */
+	void *regbuf;
+	void *membuf;
+	struct ibv_mr *regmr;
+	struct list_head freel, allocl;
+	int num_tx_ready;
+};
+static struct rdma_conn_list cl;
+
+static void iser_cqe_handler(int fd, int events, void *data);
+static ssize_t iser_rdma_read_completion(struct rdmalist *rdma);
+static int handle_wc(struct ibv_wc *wc);
+static void iser_progress(int *counter, void *data);
+static void iser_event_modify(int ep, int events);
+
+#ifndef NDEBUG
+static void iser_print_info(const char *fmt, ...)
+{
+	va_list ap;
+	fprintf(stderr, "[iSER]:");
+	va_start(ap, fmt);
+	vfprintf(stderr, fmt, ap);
+	va_end(ap);
+	fprintf(stderr, ".\n");
+}
+#endif
+
+static void openfab_error(const char *msg, int code)
+{
+	char error_string[256];
+	strerror_r(code, error_string, sizeof(error_string));
+	fprintf(stderr,"----------ERROR----------\n");
+	fprintf(stderr, "%s : %s\n", msg, error_string);
+	fprintf(stderr,"----------ERROR----------\n");
+}
+
+static int conn_find_free(void)
+{
+	int i;
+
+	for (i=0; i < MAX_CONN; i++)
+		if (cl.conn[i].busy == 0) {
+			cl.conn[i].busy = 1;
+			return i;
+		}
+	return -1;
+}
+
+static int iser_match_qp(uint32_t qp_num)
+{
+	int i;
+
+	iser_out(5, "%s() Looking for QP num %d", __func__, qp_num);
+	for (i=0; i<MAX_CONN; i++) {
+		if (cl.conn[i].valid == 1) { /* don't look at invalid conns */
+			if (qp_num == cl.conn[i].qp_hndl->qp_num) {
+				return i;
+			}
+		}
+	}
+	return -1;
+}
+
+static int iser_init_comm(struct conn_info *conn, ssize_t ssize,
+			  ssize_t rsize, int id)
+{
+	int i;
+	unsigned long size;
+	uint8_t *srbuf, *listbuf;
+	struct rdmalist *rdmal;
+	struct sendlist *sendl;
+	struct recvlist *recvl;
+
+	size = (rsize + ssize) * ISER_INITIAL_POST;
+	conn->srbuf = malloc(size);
+	if (!conn->srbuf) {
+		eprintf("malloc srbuf %lu\n", size);
+		return -ENOMEM;
+	}
+	conn->srmr = ibv_reg_mr(cl.prot_id, conn->srbuf, size,
+				IBV_ACCESS_LOCAL_WRITE);
+	if (!conn->srmr) {
+		eprintf("register srbuf\n");
+		return -1;
+	}
+
+	INIT_LIST_HEAD(&conn->sendl);
+	INIT_LIST_HEAD(&conn->recvl);
+	INIT_LIST_HEAD(&conn->rdmal);
+	INIT_LIST_HEAD(&conn->rdmal_write_busy);
+
+	size = ISER_INITIAL_POST * (sizeof(struct rdmalist)
+		+ sizeof(struct sendlist) + sizeof(struct recvlist));
+	conn->listbuf = malloc(size);
+	if (!conn->listbuf) {
+		eprintf("malloc listbuf %lu\n", size);
+		return -1;
+	}
+
+	srbuf = conn->srbuf;
+	listbuf = conn->listbuf;
+	for (i=0; i<ISER_INITIAL_POST; i++){
+		rdmal = (void *) listbuf;
+		listbuf += sizeof(*rdmal);
+
+		sendl = (void *) listbuf;
+		listbuf += sizeof(*sendl);
+
+		recvl = (void *) listbuf;
+		listbuf += sizeof(*recvl);
+
+		recvl->buf = srbuf;
+		srbuf += rsize;
+
+		sendl->buf = srbuf;
+		srbuf += ssize;
+
+		rdmal->conn_index = id;
+		sendl->conn_index = id;
+		recvl->conn_index = id;
+
+		sendl->free = 1;
+		rdmal->free = 1;
+
+		list_add_tail(&sendl->list, &conn->sendl);
+		list_add_tail(&recvl->list, &conn->recvl);
+		list_add_tail(&rdmal->list, &conn->rdmal);
+	}
+	conn->slen = ssize;
+	conn->rlen = rsize;
+
+	return 0;
+}
+
+static int iser_init_mempool(void)
+{
+	int i = 0;
+	struct mempool *freel;
+	uint8_t *regbuf, *membuf;
+	size_t size = MEM_SZ * MEMPOOL_SZ;
+
+	cl.regbuf = malloc(size);
+	if (!cl.regbuf) {
+		eprintf("malloc regbuf %lu\n", size);
+		return -ENOMEM;
+	}
+	cl.regmr = ibv_reg_mr(cl.prot_id, cl.regbuf, size,
+			      IBV_ACCESS_LOCAL_WRITE);
+	if (!cl.regmr) {
+		eprintf("register regbuf\n");
+		return -1;
+	}
+
+	INIT_LIST_HEAD(&cl.freel);
+	INIT_LIST_HEAD(&cl.allocl);
+	size = MEMPOOL_SZ * sizeof(struct mempool);
+	cl.membuf = malloc(size);
+	if (!cl.membuf) {
+		eprintf("malloc membuf %lu\n", size);
+		return -ENOMEM;
+	}
+	
+	membuf = cl.membuf;
+	regbuf = cl.regbuf;
+	for (i = 0; i < MEMPOOL_SZ; i++) {
+		freel = (void *)membuf;
+		membuf += sizeof(*freel);
+		freel->buf = regbuf;
+		regbuf += MEM_SZ;
+
+		list_add_tail(&freel->list, &cl.freel);
+	}
+
+	return 0;
+}
+
+static int iser_init_wr(struct conn_info *conn)
+{
+	int ret;
+	struct sendlist *sendl;
+	struct recvlist *recvl;
+	struct ibv_recv_wr *bad_wr;
+
+	iser_out(8, "%s() Entry", __func__);
+
+	list_for_each_entry(sendl, &conn->sendl, list) {
+		sendl->sge.addr = uint64_from_ptr(sendl->buf);
+		sendl->sge.length = conn->slen;
+		sendl->sge.lkey = conn->srmr->lkey;
+
+		memset(&(sendl->wr), 0, sizeof(sendl->wr));
+		sendl->wr.wr_id = uint64_from_ptr(sendl);
+		sendl->wr.sg_list = &(sendl->sge);
+		sendl->wr.num_sge = 1;
+		sendl->wr.opcode = IBV_WR_SEND;
+		sendl->wr.send_flags = IBV_SEND_SIGNALED;
+	}
+
+	list_for_each_entry(recvl, &conn->recvl, list) {
+		recvl->sge.addr = uint64_from_ptr(recvl->buf);
+		recvl->sge.length = conn->rlen;
+		recvl->sge.lkey = conn->srmr->lkey;
+
+		memset(&(recvl->wr), 0, sizeof(recvl->wr));
+		recvl->wr.wr_id = uint64_from_ptr(recvl);
+		recvl->wr.sg_list = &(recvl->sge);
+		recvl->wr.num_sge = 1;
+		recvl->wr.next = NULL;
+
+		ret = ibv_post_recv(conn->qp_hndl, &recvl->wr, &bad_wr);
+		if (ret) {
+			openfab_error("Can't post RECV", errno);
+			return -1;
+		}
+
+		iser_out(5, "%s() Send/Recv Init - Recv Posted wr_id %p",
+			 __func__, recvl);
+	}
+
+	iser_out(8, "%s() Exit", __func__);
+	return 0;
+}
+
+static void iser_accept_connection(struct rdma_cm_event *event)
+{
+	int ret, cqe_num;
+	struct ibv_device_attr device_attr;
+	struct ibv_qp_init_attr qp_init_attr;
+	int conn_index;
+	ssize_t rsize;
+	struct conn_info *ci;
+	struct rdma_conn_param conn_param = {
+		.responder_resources = 1,
+		.initiator_depth = 1,
+		.retry_count = 5,
+	};
+	int need_notify;
+
+	iser_out(8, "%s() Entry", __func__);
+
+	if (cl.ibv_hndl == NULL) {
+		cl.ibv_hndl = event->id->verbs;
+
+		/* allocate PD */
+		cl.prot_id = ibv_alloc_pd(cl.ibv_hndl);
+		if (!cl.prot_id)
+			openfab_error("Unable to create PD", errno);
+
+		ret = iser_init_mempool();
+		if (ret) {	
+			iser_out(4, "%s iser_init_mempool failed", __func__);
+			return;
+		}
+	}
+
+
+	need_notify = 0;
+	if (cl.cq_channel == NULL) {
+		ret = ibv_query_device(cl.ibv_hndl, &device_attr);
+		if (ret < 0) {
+			openfab_error("Unable to query device for max CQs",
+					errno);
+			exit(1);
+		}
+		iser_out(5, "%s() Max %d completion queue entries", __func__,
+					device_attr.max_cqe);
+		cqe_num = device_attr.max_cqe;
+
+		cl.cq_channel = ibv_create_comp_channel(cl.ibv_hndl);
+		if (!cl.cq_channel) {
+			openfab_error("Unable to create CQ channel", errno);
+			exit(1);
+		}
+
+		cl.cq_hndl = ibv_create_cq(cl.ibv_hndl, cqe_num, NULL,
+						cl.cq_channel, 0);
+		if (!cl.cq_hndl) {
+			openfab_error("Unable to create CQ", errno);
+			exit(1);
+		}
+
+		ret = tgt_event_add(cl.cq_channel->fd, EPOLLIN,
+				iser_cqe_handler, NULL);
+		if (ret) {
+			eprintf("Unable to add CQ channel FD to poll %m");
+			exit(1);
+
+		}
+
+		need_notify = 1;
+	}
+
+	/* now it gets specific for each connection */
+	conn_index = conn_find_free();
+	if (conn_index < 0) {
+		iser_out(0, "Unable to find a free connection");
+		return;
+	}
+
+	iser_out(2, "%s() Using connection index %d", __func__, conn_index);
+
+	ci = &cl.conn[conn_index];
+
+	ci->cma_id = event->id;
+
+	/* create qp next */
+	memset(&qp_init_attr, 0, sizeof(qp_init_attr));
+	/* wire both send and recv to the same CQ */
+	qp_init_attr.send_cq =  cl.cq_hndl;
+	qp_init_attr.recv_cq  = cl.cq_hndl;
+	qp_init_attr.cap.max_send_wr = MAX_WQE;
+	qp_init_attr.cap.max_recv_wr = MAX_WQE;
+	qp_init_attr.cap.max_send_sge = 1;  /* scatter/gather entries */
+	qp_init_attr.cap.max_recv_sge = 1;
+	qp_init_attr.qp_type = IBV_QPT_RC;
+	/* only generate completion queue entries if requested */
+	qp_init_attr.sq_sig_all = 0;
+
+	ret = rdma_create_qp(ci->cma_id, cl.prot_id, &qp_init_attr);
+	if (ret) {
+		eprintf("create qp failed\n");
+		return;
+	}
+	ci->qp_hndl = ci->cma_id->qp;
+	VALGRIND_MAKE_MEM_DEFINED(ci->qp_hndl, sizeof(*ci->qp_hndl));
+
+	iser_out(5, "%s() QP num = %d", __func__, ci->qp_hndl->qp_num);
+
+	/* Leave room for iser hdr, iscsi hdr, ahs extcdb, ahs rhdr,
+	 * immediate data.  Should negotiate max immediate data.  */
+	rsize = sizeof(struct iser_hdr) + sizeof(struct iscsi_hdr)
+		+ sizeof(struct iscsi_ecdb_ahdr)
+		+ sizeof(struct iscsi_rlength_ahdr) + MAX_RSIZE;
+	ret = iser_init_comm(ci, MAX_SSIZE, rsize, conn_index);
+	if (ret) {
+		eprintf("Unable to init send/recv mem regions");
+		return;
+	}
+
+	ret = iser_init_wr(ci);
+	if (ret) {
+		/* should never happen */
+		eprintf("Unable to init work requests");
+		return;
+	}
+
+
+	if (need_notify) {
+		iser_out(5, "%s() Need to set notif", __func__);
+		ret = ibv_req_notify_cq(cl.cq_hndl, 0);  /*get CQE for recv*/
+		if (ret) {
+			openfab_error("Can't req notify", ret);
+			return;
+		}
+	}
+
+	/* now we can actually accept the connection */
+	ret = rdma_accept(ci->cma_id, &conn_param);
+	if (ret) {
+		eprintf("rdma_accept failed\n");
+		return;
+	}
+}
+
+/*
+ * Finish putting the connection together, now that the other side
+ * has ACKed our acceptance.
+ */
+static void iser_conn_established(struct rdma_cm_event *event)
+{
+	int i, conn_index = -1;
+	struct conn_info *ci;
+
+	for (i=0; i<MAX_CONN; i++)
+		if (cl.conn[i].cma_id == event->id) {
+			conn_index = i;
+			break;
+		}
+	if (conn_index == -1) {
+		eprintf("cma id %p not found\n", event->id);
+		return;
+	}
+	ci = &cl.conn[conn_index];
+
+	iser_out(2, "%s() Connection fully accepted at index %d", __func__,
+		conn_index);
+
+	ci->rcv_comm_event = NULL;
+	ci->send_comm_event = NULL;
+	ci->readb = ci->writeb = 0;
+	ci->iscsi_conn = conn_alloc();
+	if (!ci->iscsi_conn) {
+		eprintf("unable to allocate iscsi_conn\n");
+		return;
+	}
+
+	ci->iscsi_conn->fd = conn_index;
+	ci->iscsi_conn->tp = &iscsi_iser;
+	conn_read_pdu(ci->iscsi_conn); /* just to set state */
+	INIT_LIST_HEAD(&ci->conn_tx_ready);
+	ci->valid = 1;  /* now can mark the connection ready */
+}
+
+static void iser_disconnect(struct rdma_cm_event *ev)
+{
+	int ind, ret, i;
+	struct conn_info *ci;
+
+	iser_out(8, "%s() Entry", __func__);
+
+	ind = iser_match_qp(ev->id->qp->qp_num);
+
+	if (ind < 0) {
+		eprintf("Unable to look up connection");
+		return;
+	}
+	ci = &cl.conn[ind];
+
+	iser_out(5, "%s() Connection Index = %d", __func__, ind);
+
+	/* invalidate conn */
+	ci->valid = 0;
+	list_del_init(&ci->conn_tx_ready);
+
+	/* disconnect */
+	ret = rdma_disconnect(ci->cma_id);
+	if (ret) {
+		openfab_error("Unable to disconnect cma id", -ret);
+	} else {
+		iser_out(5, "%s() RDMA disconnect complete", __func__);
+	}
+
+	/* flush receive work requests */
+	for (i=0; i<ISER_INITIAL_POST; ) {
+		struct ibv_wc wc;
+		ret = ibv_poll_cq(cl.cq_hndl, 1, &wc);
+		if (ret < 0) {
+			eprintf("ibv_poll_cq %d", ret);
+			exit(1);
+		} else if (ret == 0) {
+			usleep(1000);
+		} else {
+			VALGRIND_MAKE_MEM_DEFINED(&wc, sizeof(wc));
+			if (wc.status == IBV_WC_WR_FLUSH_ERR) {
+				++i;
+			} else if (wc.status == IBV_WC_SUCCESS) {
+				/* handle somebody else's WR */
+				ret = handle_wc(&wc);
+				if (ret)
+					eprintf("oops\n");
+			} else {
+				eprintf("bad WC status %d for wr_id 0x%lx\n",
+					wc.status, wc.wr_id);
+				exit(1);
+			}
+		}
+	}
+
+	/* free iscsi_conn */
+	if (ci->iscsi_conn->refcount != 1)
+		eprintf("iscsi conn refcount = %d\n", ci->iscsi_conn->refcount);
+	conn_put(ci->iscsi_conn);
+
+	iser_out(5, "%s() iscsi_conn is deallocated", __func__);
+
+	/* release mr and free the lists */
+	ret = ibv_dereg_mr(ci->srmr);
+	if (ret)
+		eprintf("ibv_dereg_mr\n");
+	free(ci->srbuf);
+	free(ci->listbuf);
+
+/*	ret = ibv_dereg_mr(ci->regmr);
+	if (ret)
+		eprintf("ibv_dereg_mr failed\n");
+	free(ci->regbuf);
+	free(ci->membuf); */
+
+	iser_out(5, "%s() Done freeing recv, send and rdma lists", __func__);
+
+	/* then can destory QP */
+	ret = ibv_destroy_qp(ci->qp_hndl); /* XXX: efence cribs: free err */
+	if (ret) {
+		openfab_error("Unable to destroy QP", -ret);
+	}
+
+	ci->busy = 0;
+	iser_out(8, "%s() Exit", __func__);
+}
+
+/*
+ * Handle RDMA connection events.
+ */
+void iser_handle_rdmacm(int fd __attribute__((unused)),
+			int events __attribute__((unused)),
+			void *data __attribute__((unused)))
+{
+	int ret;
+	struct rdma_cm_event *event;
+	struct rdma_cm_id *destroy_cm_id = NULL;
+
+	iser_out(8, "%s() Entry", __func__);
+
+	ret = rdma_get_cm_event(cl.rdma_evt_channel, &event);
+	if (ret) {
+		eprintf("rdma_get_cm_event failed\n");
+		return;
+	}
+
+	VALGRIND_MAKE_MEM_DEFINED(event, sizeof(*event));
+	switch (event->event) {
+	case RDMA_CM_EVENT_CONNECT_REQUEST:
+		iser_accept_connection(event);
+		break;
+	case RDMA_CM_EVENT_ESTABLISHED:
+		iser_conn_established(event);
+		break;
+	case RDMA_CM_EVENT_DISCONNECTED:
+		iser_disconnect(event);
+		destroy_cm_id = event->id;
+		break;
+	default:
+		eprintf("unknown event %d\n", event->event);
+		break;
+	}
+
+	ret = rdma_ack_cm_event(event);
+	if (ret) {
+		eprintf("ack cm event failed\n");
+		return;
+	}
+
+	if (destroy_cm_id) {
+		ret = rdma_destroy_id(destroy_cm_id);
+		if (ret)
+			eprintf("rdma_destroy_id failed\n");
+	}
+}
+
+/*
+ * Deal with just one work completion.
+ * TODO: When getting a completion event set the xxx_comm_event to NULL
+ */
+static int handle_wc(struct ibv_wc *wc)
+{
+	int ret = 0;
+	struct recvlist *recvl;
+	struct sendlist *sendl;
+	struct rdmalist *rdmal, *rc, *rn;
+	struct conn_info *ci;
+	struct ibv_recv_wr *bad_wr;
+
+	switch (wc->opcode) {
+	case IBV_WC_SEND:
+		iser_out(5, "%s() Outgoing Cmd - COMPLETE", __func__);
+		sendl = ptr_from_int64(wc->wr_id);
+		ci = &cl.conn[sendl->conn_index];
+		if (ci->iscsi_conn->state == STATE_CLOSE)
+			goto close_err;
+
+		sendl->free = 1;
+		break;
+
+	case IBV_WC_RDMA_WRITE:
+		iser_out(5, "%s() Outgoing Data - COMPLETE", __func__);
+		rdmal = ptr_from_int64(wc->wr_id);
+		ci = &cl.conn[rdmal->conn_index];
+		if (ci->iscsi_conn->state == STATE_CLOSE)
+			goto close_err;
+
+		dprintf("putting ep %d back on tx ready list\n",
+			rdmal->conn_index);
+		iser_event_modify(rdmal->conn_index, EPOLLIN | EPOLLOUT);
+
+		/*
+		 * Free this one and the previous unsignaled writes that
+		 * must have completed by now.
+		 */
+		list_for_each_entry_safe(rc, rn, &ci->rdmal_write_busy, list) {
+			if (rc->task == rdmal->task) {
+				rc->free = 1;
+				list_del(&rc->list);
+				list_add(&rc->list, &ci->rdmal);
+			}
+		}
+		break;
+
+	case IBV_WC_RDMA_READ:
+		iser_out(5, "%s() Incoming RDMA READ data", __func__);
+		rdmal = ptr_from_int64(wc->wr_id);
+		ci = &cl.conn[rdmal->conn_index];
+		if (ci->iscsi_conn->state == STATE_CLOSE)
+			goto close_err;
+
+		assert(rdmal->sge.length == wc->byte_len);
+		iser_rdma_read_completion(rdmal);
+		rdmal->free = 1;
+		break;
+
+	case IBV_WC_RECV:
+		iser_out(5, "%s() Incoming Cmd - COMPLETE (%d bytes)",
+			 __func__, wc->byte_len);
+		recvl = ptr_from_int64(wc->wr_id);
+		ci = &cl.conn[recvl->conn_index];
+		if (ci->iscsi_conn->state == STATE_CLOSE)
+			goto close_err;
+
+		recvl->bytes_recvd = wc->byte_len;
+		VALGRIND_MAKE_MEM_DEFINED(recvl->buf, recvl->bytes_recvd);
+
+		/*
+		 * Global pointer to the working receive on this connection
+		 * for reads from iscsid.c.
+		 */
+		ci->rcv_comm_event = recvl;
+		iscsi_rx_handler(recvl->conn_index, ci->iscsi_conn);
+		ci->rcv_comm_event = NULL;
+
+		ret = ibv_post_recv(ci->qp_hndl, &recvl->wr, &bad_wr);
+		if (ret) {
+			eprintf("ibv_post_recv failed\n");
+			exit(1);
+		}
+
+		iser_out(5, "%s() Done processing recv %p, reposted it",
+			 __func__, recvl);
+		break;
+
+	default:
+		eprintf("unexpected opcode %d\n", wc->opcode);
+		exit(1);
+	}
+
+	/*
+	 * XXX:
+	 *	The impl assumes single threaded backing store. If there is
+	 *	more than one thread, then event driven mechanism will break,
+	 *	which won't trigger tx_handler, and will disrupt request
+	 *	reply sequence.
+	 *
+	 * TODO:
+	 *	handle breach of MAX OUTSTANDING requests
+	 */
+
+	return ret;
+
+close_err:
+	eprintf("conn state set to closed .. IMPLEMENT ME\n");
+	/* conn_close(conn, fd); */
+	exit(1);
+}
+
+/*
+ * Called directly from main event loop when a CQ notification is
+ * available.
+ */
+static void iser_cqe_handler(int fd __attribute__((unused)),
+			     int events __attribute__((unused)),
+			     void *data __attribute__((unused)))
+{
+	int ret;
+	struct ibv_wc wc;
+	void *cq_context;
+
+	iser_out(8, "%s() Entry", __func__);
+
+	ret = ibv_get_cq_event(cl.cq_channel, &cl.cq_hndl, &cq_context);
+	if (ret != 0){
+		eprintf("notification, but no CQ event\n");
+		exit(1);
+	}
+
+	ibv_ack_cq_events(cl.cq_hndl, 1);
+
+	ret = ibv_req_notify_cq(cl.cq_hndl, 0);
+	if (ret) {
+		openfab_error("Can't req notify", ret);
+		exit(1);
+	}
+
+	/* drain the CQ */
+	for (;;) {
+		ret = ibv_poll_cq(cl.cq_hndl, 1, &wc);
+		if (ret < 0) {
+			eprintf("ibv_poll_cq %d\n", ret);
+			exit(1);
+		} else if (ret == 0) {
+			break;
+		}
+
+		VALGRIND_MAKE_MEM_DEFINED(&wc, sizeof(wc));
+		if (wc.status != IBV_WC_SUCCESS) {
+			eprintf("bad WC status %d for wr_id 0x%lx\n", wc.status,
+				wc.wr_id);
+			exit(1);
+		}
+		ret = handle_wc(&wc);
+		if (ret)
+			break;
+	}
+
+	iser_out(8, "%s() Exit", __func__);
+}
+
+static int iser_parse_hdr(struct conn_info *ci, struct recvlist *recvl)
+{
+	int ret;
+	struct iser_hdr *hdr = recvl->buf;
+
+	switch (hdr->flags & 0xF0) {
+	case ISCSI_CTRL:
+		iser_out(4, "iSCSI control type PDU");
+		if (hdr->flags & ISER_RSV) {
+			ci->rem_read_stag = be32_to_cpu(hdr->read_stag);
+			ci->rem_read_va = be64_to_cpu(hdr->read_va);
+			iser_out(4, "rem_read_stag %x rem_read_va %lx",
+				 ci->rem_read_stag, ci->rem_read_va);
+		}
+		if (hdr->flags & ISER_WSV) {
+			ci->rem_write_stag = be32_to_cpu(hdr->write_stag);
+			ci->rem_write_va = be64_to_cpu(hdr->write_va);
+			iser_out(4, "rem_write_stag %x rem_write_va %lx",
+				 ci->rem_write_stag, ci->rem_write_va);
+		}
+		ret = 0;
+		break;
+	case ISER_HELLO:
+		iser_out(4, "iSER Hello message??");
+		ret = -1;
+		break;
+	default:
+		iser_out(4, "Malformed iser hdr");
+		ret = -1;
+		break;
+	}
+
+	ci->readb = sizeof(*hdr);
+	return ret;
+}
+
+static size_t iscsi_iser_read(int ind, void *buf, size_t nbytes)
+{
+	int ret;
+	struct conn_info *ci;
+	struct recvlist *recvl;
+
+	iser_out(4, "in %s buf %p nbytes %d", __func__, buf, nbytes);
+
+	ci = &cl.conn[ind];
+	if (!ci->valid) {
+		eprintf("conn %d not valid\n", ind);
+		exit(1);
+	}
+
+	recvl = ci->rcv_comm_event;
+	assert(recvl != NULL);
+
+	if (ci->readb == 0) {
+		if (recvl->bytes_recvd < sizeof(struct iser_hdr))
+			return 0;
+
+		ret = iser_parse_hdr(ci, recvl);
+		if (ret != 0)
+			return 0;
+	}
+
+	if (ci->readb + nbytes > recvl->bytes_recvd) {
+		if (ci->readb > recvl->bytes_recvd)
+			nbytes = recvl->bytes_recvd;
+		else
+			nbytes = recvl->bytes_recvd - ci->readb;
+	}
+
+	/* TODO: can this copy be eliminated? */
+	memcpy(buf, (char *)recvl->buf + ci->readb, nbytes);
+	ci->readb += nbytes;
+
+	if (ci->readb == recvl->bytes_recvd) {
+		memset(recvl->buf, 0, ci->readb); /* TODO: for debugging */
+		ci->readb = 0;
+		/* TODO: should we dequeu the recvl?? */
+	}
+
+	iser_out(4, "%s ret %d", __func__, nbytes);
+	return nbytes;
+}
+
+static size_t iscsi_iser_write_begin(int ind, void *buf, size_t nbytes)
+{
+	struct conn_info *ci;
+	struct sendlist *send;
+
+	iser_out(4, "in %s nbytes %d", __func__, nbytes);
+
+	ci = &cl.conn[ind];
+	if (!ci->valid) {
+		eprintf("conn %d not valid\n", ind);
+		exit(1);
+	}
+
+	if (ci->send_comm_event == NULL) {
+		int found = 0;
+		/* find one, first time here */
+		list_for_each_entry(send, &ci->sendl, list) {
+			if (send->free) {
+				found = 1;
+				break;
+			}
+		}
+		if (!found) {
+			eprintf("Unable to find send slot\n");
+			return 0;
+		}
+		send->free = 0;
+		ci->send_comm_event = send;
+		iser_out(4, "%s: alloc new send event %p", __func__, send);
+	} else {
+		send = ci->send_comm_event;
+		iser_out(4, "%s: reuse existing send event %p", __func__, send);
+	}
+
+	if (ci->writeb + nbytes > MAX_SSIZE) {
+		eprintf("send buf overflow %d + %zd", ci->writeb, nbytes);
+		exit(1);
+	}
+
+	if (ci->writeb == 0) {
+		/* insert iser hdr */
+		struct iser_hdr *hdr = send->buf;
+
+		memset(hdr, 0, sizeof(*hdr));
+		hdr->flags = ISCSI_CTRL;
+		ci->writeb = sizeof(*hdr);
+	}
+
+	memcpy((char *)send->buf + ci->writeb, buf, nbytes);
+	ci->writeb += nbytes;
+
+	return nbytes;
+}
+
+static void iscsi_iser_write_end(int ind)
+{
+	int ret;
+	struct ibv_send_wr *bad_wr;
+	struct conn_info *ci = NULL;
+	struct sendlist *send = NULL;
+
+	ci = &cl.conn[ind];
+	if (!ci->valid) {
+		eprintf("conn %d not valid\n", ind);
+		exit(1);
+	}
+
+	send = ci->send_comm_event; /* should be set from func above */
+	iser_out(4, "%s: continue send event %p, writeb %d", __func__, send,
+		 ci->writeb);
+
+	send->sge.length = ci->writeb;
+
+	ret = ibv_post_send(ci->qp_hndl, &send->wr, &bad_wr);
+	if (ret) {
+		openfab_error("Can't post SEND", errno);
+		return;
+	}
+
+	ci->writeb = 0;  /* reset count */
+	ci->send_comm_event = NULL;
+
+	/* wake up the progress engine to do the done */
+	dprintf("waking up progress to finish cmd\n");
+	++cl.num_tx_ready;
+
+	iser_out(4, "%s exit %p", __func__, send);
+	return;
+}
+
+static size_t iscsi_iser_close(int ep __attribute__((unused)) )
+{
+	eprintf("not implemented\n");
+	return 0;
+}
+
+static int iscsi_iser_show(int ep, char *buf, int rest)
+{
+	struct conn_info *ci = &cl.conn[ep];
+
+	snprintf(buf, rest, "RDMA connection, QP %p", ci->qp_hndl);
+	return 0;
+}
+
+static int iscsi_iser_init(void)
+{
+	int ret;
+	struct sockaddr_in sock_addr;
+	short int port = ISCSI_LISTEN_PORT;
+
+	iser_out(8, "%s() Entry", __func__);
+
+	memset(&cl, 0, sizeof(cl));
+
+	memset(&sock_addr, 0, sizeof(sock_addr));
+	sock_addr.sin_family = AF_INET;
+	sock_addr.sin_port = htons(port);
+	sock_addr.sin_addr.s_addr = INADDR_ANY;
+
+	cl.rdma_evt_channel = rdma_create_event_channel();
+
+	ret = rdma_create_id(cl.rdma_evt_channel, &cl.cma_listen_id, NULL,
+			     RDMA_PS_TCP);
+	if (ret) {
+		openfab_error("Unable to create rdma id", ret);
+		return -1;
+	}
+
+	ret = rdma_bind_addr(cl.cma_listen_id, (struct sockaddr *) &sock_addr);
+	if (ret) {
+		if (ret == -1)
+			openfab_error("can't bind address", errno);
+		else
+			openfab_error("can't bind address", -ret);
+		return -1;
+	}
+
+	/* 0 means maximum backlog */
+	ret = rdma_listen(cl.cma_listen_id, 0);
+	if (ret) {
+		if (ret == -1)
+			openfab_error("Can't listen", errno);
+		else
+			openfab_error("Can't listen", -ret);
+		return -1;
+	}
+
+	iser_out(5, "%s() Listening on %d for an iSER connection", __func__,
+			port);
+
+	iser_out(5, "%s() FD we want to watch is %d", __func__,
+			cl.cma_listen_id->channel->fd);
+
+	ret = tgt_event_add(cl.cma_listen_id->channel->fd, EPOLLIN,
+			    iser_handle_rdmacm, NULL);
+	if (ret) {
+		eprintf("Unable to add RDMA event channel FD to poll %m\n");
+		return -1;
+	}
+
+	INIT_LIST_HEAD(&cl.conn_tx_ready);
+	cl.num_tx_ready = 0;
+	ret = tgt_counter_event_add(&cl.num_tx_ready, iser_progress, NULL);
+	if (ret)
+		return ret;
+
+	cl.cq_channel = NULL;
+
+	iser_out(8, "%s() Exit", __func__);
+
+	return 0;
+}
+
+
+static struct rdmalist *iser_find_rdma_slot(struct conn_info *ci, uint8_t *buf,
+					    ssize_t size)
+{
+	struct rdmalist *rdmal;
+	int found = 0;
+
+	list_for_each_entry(rdmal, &ci->rdmal, list) {
+		if (rdmal->free) {
+			found = 1;
+			break;
+		}
+	}
+	if (!found) {
+		eprintf("Unable to find rdma slot\n");
+		exit(1);
+	}
+
+	rdmal->free = 0;
+	rdmal->buf = buf;
+	return rdmal;
+}
+
+/*
+ * NOTE: for signaled use IBV_SEND_SIGNALED it is != 1, for unsignaled use 0
+ * NOTE: exp opcodes are: IBV_WR_RDMA_WRITE, IBV_WR_RDMA_READ
+ */
+static int iser_post_rdma_wr(struct conn_info *ci, struct rdmalist *rdma,
+			     ssize_t size, int op, int signaled_flag,
+			     uint64_t remote_va, uint32_t remote_rkey,
+			     struct iscsi_task *task)
+{
+	int ret;
+	struct ibv_send_wr *bad_wr;
+
+	iser_out(4, "in %s size %d rem_va %lx rem_stag %x", __func__, size,
+		 remote_va, remote_rkey);
+
+	rdma->task = task;
+
+	rdma->sge.addr = uint64_from_ptr(rdma->buf);
+	rdma->sge.length = size;
+	rdma->sge.lkey = cl.regmr->lkey;
+
+	memset(&rdma->wr, 0, sizeof(rdma->wr));
+	rdma->wr.wr_id = uint64_from_ptr(rdma);
+	rdma->wr.sg_list = &rdma->sge;
+	rdma->wr.num_sge = 1;
+	rdma->wr.wr.rdma.remote_addr = remote_va;
+	rdma->wr.wr.rdma.rkey = remote_rkey;
+	rdma->wr.opcode = op;
+	rdma->wr.send_flags = signaled_flag;
+
+	ret = ibv_post_send(ci->qp_hndl, &rdma->wr, &bad_wr);
+	if (ret)
+		openfab_error("Can't post SEND", errno);
+
+	iser_out(8, "%s() Exit rdma = %p", __func__, rdma);
+	return ret;
+}
+
+/*
+ * Convert the iscsi data-in response to an RDMA write and send it.
+ */
+static int iser_rdma_write(int ind, struct iscsi_pdu *rsp,
+			   struct iscsi_task *task)
+{
+	uint32_t offset;
+	int ret;
+	struct rdmalist *rdmal;
+	struct conn_info *ci = &cl.conn[ind];
+	struct iscsi_data_rsp *datain = (struct iscsi_data_rsp *) &rsp->bhs;
+	int last_rdma = (task->offset == task->len);
+
+	iser_out(4, "in %s size %d, %p", __func__, rsp->datasize, rsp);
+
+	rdmal = iser_find_rdma_slot(ci, rsp->data, rsp->datasize);
+	if (rdmal == NULL) {
+		eprintf("iser_find_rdma_slot failed\n");
+		return -1;
+	}
+
+	/* to find unsignaled ones when the last one completes */
+	list_del(&rdmal->list);
+	list_add(&rdmal->list, &ci->rdmal_write_busy);
+
+	offset = be32_to_cpu(datain->offset);
+
+	/* only signal on the last RDMA write */
+	dprintf("offset %d len %d last %d\n", task->offset, task->len,
+		last_rdma);
+
+	ret = iser_post_rdma_wr(ci, rdmal, rsp->datasize, IBV_WR_RDMA_WRITE,
+				last_rdma ? IBV_SEND_SIGNALED : 0,
+				ci->rem_read_va + offset, ci->rem_read_stag,
+				task);
+
+	if (ret < 0) {
+		eprintf("iser_post_rdma_wr failed\n");
+		exit(1);
+	}
+
+	/* iscsi thinks we are txing, but really we're waiting for this
+	 * rdma to finish before sending the completion.  Then we'll stick
+	 * ourselves back on the list.
+	 */
+	if (last_rdma) {
+		dprintf("removing ep %d from tx ready list\n", ind);
+		iser_event_modify(ind, EPOLLIN);
+	} else {
+		/* poke ourselves to do the next rdma */
+		++cl.num_tx_ready;
+	}
+
+	return ret;
+}
+
+static int iser_rdma_read(int ind, struct iscsi_pdu *rsp)
+{
+	struct conn_info *ci = &cl.conn[ind];
+	struct iscsi_connection *conn = ci->iscsi_conn;
+	struct iscsi_task *task = conn->tx_task;
+	struct iscsi_r2t_rsp *r2t = (struct iscsi_r2t_rsp *) &rsp->bhs;
+	uint8_t *buf;
+	uint32_t len;
+	struct rdmalist *rdma;
+	int ret;
+
+	iser_out(4, "in %s", __func__);
+
+	buf = task->data + task->offset;
+	len = be32_to_cpu(r2t->data_length);
+	rdma = iser_find_rdma_slot(ci, buf, len);
+	if (rdma == NULL) {
+		iser_out(4, "iser_find_rdma_slot failed");
+		return -1;
+	}
+
+	ret = iser_post_rdma_wr(ci, rdma, len, IBV_WR_RDMA_READ,
+				IBV_SEND_SIGNALED, ci->rem_write_va,
+				ci->rem_write_stag, task);
+	if (ret < 0) {
+		iser_out(4, "iser_post_rdma_wr failed");
+		return ret;
+	}
+
+	/*
+	 * Initiator registers the entire buffer, but gives us a VA that
+	 * is advanced by immediate + unsolicited data amounts.  Advance
+	 * rem_va as we read, knowing that the target always grabs segments
+	 * in order.
+	 */
+	ci->rem_write_va += len;
+
+	return 0;
+}
+
+static ssize_t iser_rdma_read_completion(struct rdmalist *rdma)
+{
+	int ret = 0;
+	struct conn_info *ci = &cl.conn[rdma->conn_index];
+	struct iscsi_connection *conn = ci->iscsi_conn;
+	struct iscsi_task *task;
+
+	iser_out(4, "in %s", __func__);
+
+	/* task is no longer conn->tx_task, look it up */
+	list_for_each_entry(task, &conn->session->cmd_list, c_hlist) {
+		if (task == rdma->task)
+			goto found;
+	}
+	return -EINVAL;
+
+found:
+	/* equivalent of iscsi_data_out_rx_start + _done */
+	conn->rx_buffer = ptr_from_int64(rdma->sge.addr);
+	conn->rx_size = rdma->sge.length;
+	task->offset += rdma->sge.length;
+	task->r2t_count -= rdma->sge.length;
+	VALGRIND_MAKE_MEM_DEFINED(conn->rx_buffer, conn->rx_size);
+
+	dprintf("more bytes %u arrived, now r2t_count %d\n", rdma->sge.length,
+		task->r2t_count);
+
+	/*
+	 * We soliticed this data, so hdr->ttt is what we asked for.  Bypass
+	 * data_out_rx_done and just run the task.  If more r2t are needed,
+	 * this will generate them.
+	 */
+	ret = iscsi_scsi_cmd_execute(task);
+
+	conn->rx_task = NULL;
+	conn_read_pdu(conn);
+
+	iser_out(4, "%s exit conn %p conn->rx_buffer %p ret %d", __func__,
+		 conn, conn->rx_buffer, ret);
+	return ret;
+}
+
+/*
+ * Called from tgtd when cl.num_tx_ready (counter) non-zero.  Walks the
+ * list of active connections and tries to push tx on each, until nothing
+ * is ready anymore.
+ */
+static void iser_progress(int *counter, void *data)
+{
+	int done;
+	struct conn_info *ci, *cin;
+	struct iscsi_connection *conn;
+
+	iser_out(8, "%s() Entry", __func__);
+
+	if (cl.num_tx_ready == 0)
+		goto out;
+
+	--cl.num_tx_ready;
+	do {
+		done = 1;
+		list_for_each_entry_safe(ci, cin, &cl.conn_tx_ready,
+					 conn_tx_ready) {
+			conn = ci->iscsi_conn;
+			dprintf("trying tx on fd %d conn %p\n", conn->fd, conn);
+			done = 0;
+			iscsi_tx_handler(conn->fd, conn);
+		}
+	} while (!done);
+
+out:
+	iser_out(8, "%s() Exit", __func__);
+}
+
+void *iser_malloc(size_t sz)
+{
+	struct mempool *mem;
+
+	if (list_empty(&cl.freel)) {
+		/* TODO: take slow path: allocate & register */
+		eprintf("free list empty\n");
+		return NULL;
+	}
+
+	assert(sz <= MEM_SZ);
+
+	mem = list_entry(cl.freel.next, struct mempool, list);
+	list_del(&mem->list);
+	list_add(&mem->list, &cl.allocl);
+
+	iser_out(4, "meml %p buf %p", mem, mem->buf);
+	return mem->buf;
+}
+
+void iser_free(void *buf)
+{
+	int found = 0;
+	struct mempool *mem;
+
+	list_for_each_entry(mem, &cl.allocl, list) {
+		if (mem->buf == buf) {
+			found = 1;
+			break;
+		}
+	}
+	if (!found) {
+		eprintf("couldn't locate buf %p\n", buf);
+		return;
+	}
+	list_del(&mem->list);
+	list_add(&mem->list, &cl.freel);
+}
+
+static void iser_event_modify(int ep, int events)
+{
+	struct conn_info *ci = &cl.conn[ep];
+
+	dprintf("ep %d events %d\n", ep, events);
+	if (events & EPOLLOUT) {
+		/* only add if not already on the list? */
+		if (list_empty(&ci->conn_tx_ready)) {
+			dprintf("adding ep %d to tx ready list\n", ep);
+			list_add(&ci->conn_tx_ready, &cl.conn_tx_ready);
+		} else {
+			eprintf("ep %d was already on the list.  Track?\n", ep);
+		}
+		++cl.num_tx_ready;
+	} else {
+		dprintf("removing ep %d from tx ready list\n", ep);
+		list_del_init(&ci->conn_tx_ready);
+	}
+}
+
+struct iscsi_transport iscsi_iser = {
+	.name			= "iser",
+	.rdma			= 1,
+	.ep_init		= iscsi_iser_init,
+	.ep_read		= iscsi_iser_read,
+	.ep_write_begin		= iscsi_iser_write_begin,
+	.ep_write_end		= iscsi_iser_write_end,
+	.ep_close		= iscsi_iser_close,
+	.ep_show		= iscsi_iser_show,
+	.ep_rdma_write		= iser_rdma_write,
+	.ep_rdma_read		= iser_rdma_read,
+	.ep_malloc		= iser_malloc,
+	.ep_free		= iser_free,
+	.ep_event_modify	= iser_event_modify,
+};
diff --git a/usr/iscsi/iscsid.c b/usr/iscsi/iscsid.c
index 4f9194a..15c3623 100644
--- a/usr/iscsi/iscsid.c
+++ b/usr/iscsi/iscsid.c
@@ -270,7 +270,7 @@ static void login_security_done(struct iscsi_connection *conn)
 static void text_scan_login(struct iscsi_connection *conn)
 {
 	char *key, *value, *data;
-	int datasize, idx;
+	int datasize, idx, is_rdma = 0;
 	struct iscsi_login_rsp *rsp = (struct iscsi_login_rsp *)&conn->rsp.bhs;
 
 	data = conn->req.data;
@@ -289,6 +289,9 @@ static void text_scan_login(struct iscsi_connection *conn)
 			if (idx == ISCSI_PARAM_MAX_RECV_DLENGTH)
 				idx = ISCSI_PARAM_MAX_XMIT_DLENGTH;
 
+			if (idx == ISCSI_PARAM_RDMA_EXTENSIONS)
+				is_rdma = 1;
+
 			if (param_str_to_val(session_keys, idx, value, &val) < 0) {
 				if (conn->session_param[idx].state
 				    == KEY_STATE_START) {
@@ -335,6 +338,10 @@ static void text_scan_login(struct iscsi_connection *conn)
 			text_key_add(conn, key, "NotUnderstood");
 	}
 
+	/* do not offer, initiator must explicitly request */
+	if (!is_rdma)
+		conn->session_param[ISCSI_PARAM_RDMA_EXTENSIONS].val = 0;
+
 out:
 	return;
 }
@@ -354,6 +361,13 @@ static int text_check_param(struct iscsi_connection *conn)
 					p[i].state = KEY_STATE_DONE;
 					continue;
 				}
+				if (p[ISCSI_PARAM_RDMA_EXTENSIONS].val == 1) {
+					if (i == ISCSI_PARAM_MAX_RECV_DLENGTH)
+						continue;
+				} else {
+					if (i >= ISCSI_PARAM_RDMA_EXTENSIONS)
+						continue;
+				}
 				memset(buf, 0, sizeof(buf));
 				param_val_to_str(session_keys, i, p[i].val,
 						 buf);
@@ -435,7 +449,7 @@ static void login_start(struct iscsi_connection *conn)
 			return;
 		}
 
-		if (ip_acl(conn->tid, conn->fd)) {
+		if (!conn->tp->rdma && ip_acl(conn->tid, conn->fd)) {
 			rsp->status_class = ISCSI_STATUS_CLS_INITIATOR_ERR;
 			rsp->status_detail = ISCSI_LOGIN_STATUS_TGT_NOT_FOUND;
 			conn->state = STATE_EXIT;
@@ -469,12 +483,43 @@ static void login_start(struct iscsi_connection *conn)
 
 static void login_finish(struct iscsi_connection *conn)
 {
+	struct iscsi_login_rsp *rsp = (struct iscsi_login_rsp *)&conn->rsp.bhs;
+	int ret;
+
 	switch (conn->session_type) {
 	case SESSION_NORMAL:
-		if (!conn->session)
+	       /*
+		* XXX: call into transport to initialize resources.  It looks
+		* at conn->session_param to know how big buffers to pin, e.g.
+		*/
+		/* ret = conn->tp->alloc_resources(conn); */
+		ret = 0;
+		if (ret) {
+			rsp->flags = 0;
+			rsp->status_class = ISCSI_STATUS_CLS_TARGET_ERR;
+			rsp->status_detail = ISCSI_LOGIN_STATUS_NO_RESOURCES;
+			conn->state = STATE_EXIT;
+			break;
+		}
+
+		if (!conn->session) {
 			session_create(conn);
+		} else {
+			if (conn->tp->rdma ^ conn->session->rdma) {
+				eprintf("new conn rdma %d, but session %d\n",
+					conn->tp->rdma, conn->session->rdma);
+				rsp->flags = 0;
+				rsp->status_class =
+					ISCSI_STATUS_CLS_INITIATOR_ERR;
+				rsp->status_detail =
+					ISCSI_LOGIN_STATUS_INVALID_REQUEST;
+				conn->state = STATE_EXIT;
+				break;
+			}
+		}
 		memcpy(conn->isid, conn->session->isid, sizeof(conn->isid));
 		conn->tsih = conn->session->tsih;
+
 		break;
 	case SESSION_DISCOVERY:
 		/* set a dummy tsih value */
@@ -636,8 +681,11 @@ static void cmnd_exec_login(struct iscsi_connection *conn)
 			default:
 				goto init_err;
 			}
-			if (!stay && !nsg_disagree)
+			if (!stay && !nsg_disagree) {
 				login_finish(conn);
+				if (rsp->status_class)
+					return;
+			}
 			break;
 		default:
 			goto init_err;
@@ -916,7 +964,7 @@ static int iscsi_data_rsp_build(struct iscsi_task *task)
 		rsp->flags = ISCSI_FLAG_CMD_FINAL;
 
 		/* collapse status into final packet if successful */
-		if (task->result == 0 && task->dir != BIDIRECTIONAL) {
+		if (task->result == 0 && task->dir != BIDIRECTIONAL && !conn->tp->rdma) {
 			rsp->flags |= ISCSI_FLAG_DATA_STATUS;
 			rsp->cmd_status = task->result;
 			rsp->statsn = cpu_to_be32(conn->stat_sn++);
@@ -1643,7 +1691,7 @@ static int iscsi_scsi_cmd_tx_done(struct iscsi_connection *conn)
 		break;
 	case ISCSI_OP_SCSI_DATA_IN:
 		if (task->offset < task->len || task->result != 0
-		   || task->dir == BIDIRECTIONAL) {
+		   || task->dir == BIDIRECTIONAL || conn->tp->rdma) {
 			dprintf("more data or sense or bidir %x\n", hdr->itt);
 			list_add_tail(&task->c_list, &task->conn->tx_clist);
 			return 0;
@@ -1932,6 +1980,31 @@ void iscsi_tx_handler(int fd, struct iscsi_connection *conn)
 			return;
 	}
 
+	/*
+	 * For rdma, grab the data-in or r2t packet and covert to
+	 * an RDMA operation.
+	 */
+	if (conn->tp->rdma && conn->state == STATE_SCSI) {
+		switch (conn->rsp.bhs.opcode) {
+		case ISCSI_OP_R2T:
+			ret = conn->tp->ep_rdma_read(fd, &conn->rsp);
+			if (ret < 0)
+				conn->state = STATE_CLOSE;
+			goto finish;
+
+		case ISCSI_OP_SCSI_DATA_IN:
+			ret = conn->tp->ep_rdma_write(fd, &conn->rsp,
+						      conn->tx_task);
+			if (ret < 0)
+				conn->state = STATE_CLOSE;
+			goto finish;
+
+		default:
+			break;
+		}
+	}
+
+again:
 	switch (conn->tx_iostate) {
 	case IOSTATE_TX_BHS:
 		ret = do_send(fd, conn, IOSTATE_TX_INIT_AHS);
@@ -2007,11 +2080,15 @@ void iscsi_tx_handler(int fd, struct iscsi_connection *conn)
 		exit(1);
 	}
 
-	if (ret < 0 ||
-	    conn->tx_iostate != IOSTATE_TX_END ||
-	    conn->state == STATE_CLOSE)
+	if (ret < 0 || conn->state == STATE_CLOSE)
 		return;
 
+	if (conn->tx_iostate != IOSTATE_TX_END) {
+		if (conn->tp->rdma)
+			goto again;  /* avoid event loop, just push */
+		return;
+	}
+
 	if (conn->tx_size) {
 		eprintf("error %d %d %d\n", conn->state, conn->tx_iostate,
 			conn->tx_size);
@@ -2019,6 +2096,8 @@ void iscsi_tx_handler(int fd, struct iscsi_connection *conn)
 	}
 
 	conn->tp->ep_write_end(fd);
+
+finish:
 	cmnd_finish(conn);
 
 	switch (conn->state) {
diff --git a/usr/iscsi/param.c b/usr/iscsi/param.c
index 9eac62c..76236d1 100644
--- a/usr/iscsi/param.c
+++ b/usr/iscsi/param.c
@@ -118,6 +118,18 @@ static int minimum_check_val(struct iscsi_key *key, unsigned int *val)
 	return 0;
 }
 
+static int min_or_zero_check_val(struct iscsi_key *key, unsigned int *val)
+{
+	int err = 0;
+
+	if (*val != 0 && (*val < key->min || key->max < *val)) {
+		*val = key->min;
+		err = -EINVAL;
+	}
+
+	return 0;
+}
+
 static int maximum_check_val(struct iscsi_key *key, unsigned int *val)
 {
 	int err = 0;
@@ -140,6 +152,16 @@ static int minimum_set_val(struct param *param, int idx, unsigned int *val)
 	return 0;
 }
 
+static int min_or_zero_set_val(struct param *param, int idx, unsigned int *val)
+{
+	if (*val > param[idx].val || *val == 0)
+		*val = param[idx].val;
+	else
+		param[idx].val = *val;
+
+	return 0;
+}
+
 static int maximum_set_val(struct param *param, int idx, unsigned int *val)
 {
 	if (param[idx].val > *val)
@@ -265,6 +287,13 @@ static struct iscsi_key_ops minimum_ops = {
 	.set_val = minimum_set_val,
 };
 
+static struct iscsi_key_ops min_or_zero_ops = {
+	.val_to_str = range_val_to_str,
+	.str_to_val = range_str_to_val,
+	.check_val = min_or_zero_check_val,
+	.set_val = min_or_zero_set_val,
+};
+
 static struct iscsi_key_ops maximum_ops = {
 	.val_to_str = range_val_to_str,
 	.str_to_val = range_str_to_val,
@@ -345,6 +374,15 @@ struct iscsi_key session_keys[] = {
 	{"IFMarkInt", 2048, 1, 65535, &marker_ops},
 	[ISCSI_PARAM_MAXCONNECTIONS] =
 	{"MaxConnections", 1, 1, 65535, &minimum_ops},
+	/* iSER draft */
+	[ISCSI_PARAM_RDMA_EXTENSIONS] =
+	{"RDMAExtensions", 0, 0, 1, &and_ops},
+	[ISCSI_PARAM_TARGET_RDSL] =
+	{"TargetRecvDataSegmentLength", 8192, 512, 16777215, &minimum_ops},
+	[ISCSI_PARAM_INITIATOR_RDSL] =
+	{"InitiatorRecvDataSegmentLength", 8192, 512, 16777215, &minimum_ops},
+	[ISCSI_PARAM_MAX_OUTST_PDU] =
+	{"MaxOutstandingUnexpectedPDUs", 0, 2, 4294967295U, &min_or_zero_ops},
 	[ISCSI_PARAM_MAX] =
 	{NULL,},
 };
diff --git a/usr/iscsi/session.c b/usr/iscsi/session.c
index dfb94d0..36beadc 100644
--- a/usr/iscsi/session.c
+++ b/usr/iscsi/session.c
@@ -128,6 +128,8 @@ int session_create(struct iscsi_connection *conn)
 	memcpy(session->isid, conn->isid, sizeof(session->isid));
 	session->tsih = last_tsih = tsih;
 
+	session->rdma = conn->tp->rdma;
+
 	conn_add_to_session(conn, session);
 
 	dprintf("session_create: %#" PRIx64 "\n", sid64(conn->isid, session->tsih));
diff --git a/usr/iscsi/target.c b/usr/iscsi/target.c
index 2653839..d47f05f 100644
--- a/usr/iscsi/target.c
+++ b/usr/iscsi/target.c
@@ -283,6 +283,14 @@ int iscsi_target_create(struct target *t)
 		[ISCSI_PARAM_OFMARKINT] = {0, 2048},
 		[ISCSI_PARAM_IFMARKINT] = {0, 2048},
 		[ISCSI_PARAM_MAXCONNECTIONS] = {0, 1},
+#ifdef ISCSI_RDMA
+		[ISCSI_PARAM_RDMA_EXTENSIONS] = {0, 1},
+#else
+		[ISCSI_PARAM_RDMA_EXTENSIONS] = {0, 0},
+#endif
+		[ISCSI_PARAM_TARGET_RDSL] = {0, 262144},
+		[ISCSI_PARAM_INITIATOR_RDSL] = {0, 262144},
+		[ISCSI_PARAM_MAX_OUTST_PDU] =  {0, 0},  /* not in open-iscsi */
 	};
 
 	target = malloc(sizeof(*target));
diff --git a/usr/iscsi/transport.c b/usr/iscsi/transport.c
index ba232ed..e17b554 100644
--- a/usr/iscsi/transport.c
+++ b/usr/iscsi/transport.c
@@ -29,6 +29,9 @@
 
 struct iscsi_transport *iscsi_transports[] = {
 	&iscsi_tcp,
+#ifdef ISCSI_RDMA
+	&iscsi_iser,
+#endif
 	NULL,
 };
 
diff --git a/usr/iscsi/transport.h b/usr/iscsi/transport.h
index bfba784..ebb9a23 100644
--- a/usr/iscsi/transport.h
+++ b/usr/iscsi/transport.h
@@ -23,5 +23,8 @@ struct iscsi_transport {
 };
 
 extern struct iscsi_transport iscsi_tcp;
+#ifdef ISCSI_RDMA
+extern struct iscsi_transport iscsi_iser;
+#endif
 
 #endif
-- 
1.5.2.4




More information about the stgt mailing list