[stgt] [PATCH] nonblocking epoll_wait loop, sched events, ISER/IB polling

Alexander Nezhinsky nezhinsky at gmail.com
Thu Sep 25 18:09:09 CEST 2008


This patch introduces a few interconnected improvements, that 
ultimately lead to significant reduction in interrupts rate 
for iser/ib, while adding flexibility to tgtd event processing scheme.

First, it implements a kind of "scheduler" list, to replace the
counter events. Event handlers can schedule other events that are
placed on the scheduler's loop. This has some small advantages in 
itself, because the same event descriptor is used for all events 
in the system, events are executed in order they'd been scheduled, 
they can be removed from list, few instances of the same event 
may be scheduled. 

But the most important change is the logic of the processing loop,
as a whole. It goes as follows: 

The scheduler checks events on the list, does their processing, 
which can schedule more items, but does not process the new items 
(in order to avoid infinite/long loops).

If after processing all "old" events, some "new" ones were scheduled,
epoll_wait() is executed in non-blocking manner. This guarantees
that the file descriptors that became ready during the scheduler
processing are handled, but if there no ready fds, the remaining 
scheduler events are processed immediately.  

But if after processing the scheduler list nothing new is added, 
then epoll_wait is blocking as in the current scheme.

This way we can be very flexible, because event handlers and deferred 
work can not block one another. Potentially long handlers can be 
split into shorter ones easily without blocking the entire target.

Finally, IB completion queue is the first guy who benefits, because 
we can postpone interrupt re-arming, until no completion entries 
remain, and to schedule CQ "draining" after all events are serviced.

When a completion event is handled, CQ is polled so that up to a 
given number (now set to 8) of WCs are processed. 

If more completions are left on the CQ, essentially the same 
handler is scheduled, to carry out the next round of polling, 
but in the meanwhile, other events in the system can be serviced.

If CQ is found empty, interrupts are re-armed and a handler is
scheduled to consume the completions which could sneak in 
between the moment the CQ was seen empty and just before 
the interrupts were re-armed.

Thus in iSER IB there is a marked interrupts rate reduction.
Here are typical results:
Current target: 62-65 KIOPS, ~110,000 interrupt/sec, CPU% ~68%
Patched target: 65-70 KIOPS,  ~65,000 interrupt/sec, CPU% ~65%

Signed-off-by: Alexander Nezhinsky <nezhinsky at gmail.com>
---

diff --git a/usr/iscsi/iscsi_rdma.c b/usr/iscsi/iscsi_rdma.c
index 46e6ea8..23bb4db 100644
--- a/usr/iscsi/iscsi_rdma.c
+++ b/usr/iscsi/iscsi_rdma.c
@@ -144,6 +144,8 @@ struct conn_info {
 	/* but count so we can drain CQ on close */
 	int recvl_posted;
 
+	struct tgt_event tx_sched;
+
 	/* login phase resources, freed at full-feature */
 	void *srbuf_login;
 	void *listbuf_login;
@@ -194,6 +196,8 @@ struct iser_device {
 	void *mempool_listbuf;
 	struct ibv_mr *mempool_mr;
 
+	struct tgt_event poll_sched;
+
 	/* free and allocated mempool entries */
 	struct list_head mempool_free, mempool_alloc;
 };
@@ -217,10 +221,6 @@ static struct list_head iser_conn_list;
 /* if any task needs an rdma read or write slot to proceed */
 static int waiting_rdma_slot;
 
-/* progress available, used with tgt_counter_event */
-static int num_tx_ready;
-static int num_rx_ready;
-
 #define uint64_from_ptr(p) (uint64_t)(uintptr_t)(p)
 #define ptr_from_int64(p) (void *)(unsigned long)(p)
 
@@ -251,6 +251,9 @@ static int num_rx_ready;
 #define RDMA_PER_CONN 20
 #define RDMA_TRANSFER_SIZE (512 * 1024)
 
+
+#define MAX_POLL_WC 8
+
 /*
  * Number of allocatable data buffers, each of this size.  Do at least 128
  * for linux iser.  The mempool size is rounded up at initialization time
@@ -270,13 +273,17 @@ static inline struct conn_info *RDMA_CONN(struct iscsi_connection *conn)
 	return container_of(conn, struct conn_info, iscsi_conn);
 }
 
-static void iser_cqe_handler(int fd, int events, void *data);
-static void iser_rx_progress(int *counter, void *data);
+static void iser_cqe_handler(int fd __attribute__((unused)),
+			     int events __attribute__((unused)),
+			     void *data);
 static void iser_rdma_read_completion(struct rdmalist *rdma);
 static void iscsi_rdma_release(struct iscsi_connection *conn);
 static int iscsi_rdma_show(struct iscsi_connection *conn, char *buf,
 			   int rest);
 static void iscsi_rdma_event_modify(struct iscsi_connection *conn, int events);
+static void iser_sched_poll_cq(struct tgt_event *tev);
+static void iser_sched_consume_cq(struct tgt_event *tev);
+static void iser_sched_tx(struct tgt_event *evt);
 
 /*
  * Called when ready for full feature, builds resources.
@@ -612,6 +619,8 @@ static int iser_device_init(struct iser_device *dev)
 		goto out;
 	}
 
+	tgt_init_sched_event(&dev->poll_sched, iser_sched_poll_cq, dev);
+
 	ret = ibv_req_notify_cq(dev->cq, 0);
 	if (ret) {
 		eprintf("ibv_req_notify failed: %s\n", strerror(ret));
@@ -691,6 +700,9 @@ static void iser_accept_connection(struct rdma_cm_event *event)
 	ci->login_phase = LOGIN_PHASE_START;
 	INIT_LIST_HEAD(&ci->conn_tx_ready);
 	list_add(&ci->iser_conn_list, &temp_conn);
+
+	tgt_init_sched_event(&ci->tx_sched, iser_sched_tx, ci);
+
 	/* initiator sits at dst, we are src */
 	memcpy(&ci->peer_addr, &event->id->route.addr.dst_addr,
 	       sizeof(ci->peer_addr));
@@ -940,7 +952,7 @@ static void handle_wc(struct ibv_wc *wc)
 		list_add(&rdmal->list, &ci->rdmal);
 		if (waiting_rdma_slot) {
 			waiting_rdma_slot = 0;
-			num_tx_ready = 1;
+			tgt_add_sched_event(&ci->tx_sched);
 		}
 		break;
 
@@ -957,7 +969,7 @@ static void handle_wc(struct ibv_wc *wc)
 		list_add(&rdmal->list, &ci->rdmal);
 		if (waiting_rdma_slot) {
 			waiting_rdma_slot = 0;
-			num_tx_ready = 1;
+			tgt_add_sched_event(&ci->tx_sched);
 		}
 		break;
 
@@ -974,85 +986,14 @@ close_err:
 }
 
 /*
- * Called directly from main event loop when a CQ notification is
- * available.
- */
-static void iser_cqe_handler(int fd __attribute__((unused)),
-			     int events __attribute__((unused)),
-			     void *data)
-{
-	int ret;
-	void *cq_context;
-	struct iser_device *dev = data;
-
-	ret = ibv_get_cq_event(dev->cq_channel, &dev->cq, &cq_context);
-	if (ret != 0) {
-		eprintf("notification, but no CQ event\n");
-		exit(1);
-	}
-
-	ibv_ack_cq_events(dev->cq, 1);
-
-	ret = ibv_req_notify_cq(dev->cq, 0);
-	if (ret) {
-		eprintf("ibv_req_notify_cq: %s\n", strerror(ret));
-		exit(1);
-	}
-
-	iser_rx_progress(NULL, dev);
-}
-
-/*
- * Called from tgtd when num_tx_ready (counter) non-zero.  Walks the
- * list of active connections and tries to push tx on each, until nothing
- * is ready anymore.  No progress limit here.
- */
-static void iser_tx_progress(int *counter __attribute__((unused)),
-			     void *data __attribute__((unused)))
-{
-	int reloop, ret;
-	struct conn_info *ci, *cin;
-	struct iscsi_connection *conn;
-
-	dprintf("entry\n");
-	num_tx_ready = 0;
-
-	do {
-		reloop = 0;
-		list_for_each_entry_safe(ci, cin, &conn_tx_ready,
-					 conn_tx_ready) {
-			conn = &ci->iscsi_conn;
-			if (conn->state == STATE_CLOSE) {
-				dprintf("ignoring tx for closed conn\n");
-			} else {
-				dprintf("trying tx\n");
-				ret = iscsi_tx_handler(conn);
-				if (conn->state == STATE_CLOSE) {
-					conn_close(conn);
-					dprintf("connection %p closed\n", ci);
-				} else {
-					if (ret == 0) {
-						reloop = 1;
-					} else {
-						/* but leave on tx ready list */
-						waiting_rdma_slot = 1;
-					}
-				}
-			}
-		}
-	} while (reloop);
-}
-
-/*
  * Could read as many entries as possible without blocking, but
  * that just fills up a list of tasks.  Instead pop out of here
  * so that tx progress, like issuing rdma reads and writes, can
  * happen periodically.
  */
-#define MAX_RX_PROGRESS 8
-static void iser_rx_progress_one(struct iser_device *dev)
+static int iser_poll_cq(struct iser_device *dev, int max_wc)
 {
-	int ret, numwc = 0;
+	int ret = 0, numwc = 0;
 	struct ibv_wc wc;
 	struct conn_info *ci;
 	struct recvlist *recvl;
@@ -1069,8 +1010,8 @@ static void iser_rx_progress_one(struct iser_device *dev)
 		VALGRIND_MAKE_MEM_DEFINED(&wc, sizeof(wc));
 		if (wc.status == IBV_WC_SUCCESS) {
 			handle_wc(&wc);
-			if (++numwc == MAX_RX_PROGRESS) {
-				num_rx_ready = 1;
+			if (++numwc == max_wc) {
+				ret = 1;
 				break;
 			}
 		} else if (wc.status == IBV_WC_WR_FLUSH_ERR) {
@@ -1089,23 +1030,114 @@ static void iser_rx_progress_one(struct iser_device *dev)
 				wc.status, (unsigned long long) wc.wr_id);
 		}
 	}
+	return ret;
+}
+
+static void iser_poll_cq_armable(struct iser_device *dev)
+{
+	int ret;
+
+	ret = iser_poll_cq(dev, MAX_POLL_WC);
+	if (ret < 0)
+		exit(1);
+
+	if (ret == 0) {
+		/* no more completions on cq, arm the completion interrupts */
+		ret = ibv_req_notify_cq(dev->cq, 0);
+		if (ret) {
+			eprintf("ibv_req_notify_cq: %s\n", strerror(ret));
+			exit(1);
+		}
+		dev->poll_sched.sched_handler = iser_sched_consume_cq;
+	} else
+		dev->poll_sched.sched_handler = iser_sched_poll_cq;
+
+	tgt_add_sched_event(&dev->poll_sched);
+}
+
+/* Scheduled to poll cq after a completion event has been 
+   received and acknowledged, if no more completions are found
+   the interrupts are re-armed */
+static void iser_sched_poll_cq(struct tgt_event *tev)
+{
+	struct iser_device *dev = tev->data;
+	iser_poll_cq_armable(dev);
+}
+
+/* Scheduled to consume completion events that could arrive
+   after the cq had been seen empty but just before 
+   the notification interrupts were re-armed. 
+   Intended to consume those remaining completions only,
+   this function does not re-arm interrupts. */
+static void iser_sched_consume_cq(struct tgt_event *tev)
+{
+	struct iser_device *dev = tev->data;
+	int ret;
+
+	ret = iser_poll_cq(dev, MAX_POLL_WC);
+	if (ret < 0)
+		exit(1);
+}
+
+/*
+ * Called directly from main event loop when a CQ notification is
+ * available.
+ */
+static void iser_cqe_handler(int fd __attribute__((unused)),
+			     int events __attribute__((unused)),
+			     void *data)
+{
+	struct iser_device *dev = data;
+	void *cq_context;
+	int ret;
+
+	ret = ibv_get_cq_event(dev->cq_channel, &dev->cq, &cq_context);
+	if (ret != 0) {
+		eprintf("notification, but no CQ event\n");
+		exit(1);
+	}
+
+	ibv_ack_cq_events(dev->cq, 1);
+
+	/* if a poll was previosuly scheduled, remove it, 
+	   as it will be scheduled when necessary */
+	if (dev->poll_sched.scheduled)
+		tgt_remove_sched_event(&dev->poll_sched);
+
+	iser_poll_cq_armable(dev);
 }
 
 /*
- * Only one progress counter, must look across all devs.
+ * Called from tgtd as a scheduled event
+ * tries to push tx on a connection, until nothing
+ * is ready anymore.  No progress limit here.
  */
-static void iser_rx_progress(int *counter __attribute__((unused)), void *data)
+static void iser_sched_tx(struct tgt_event *evt)
 {
-	struct iser_device *dev;
+	struct conn_info *ci = evt->data;
+	struct iscsi_connection *conn = &ci->iscsi_conn;
+	int ret;
 
 	dprintf("entry\n");
-	num_rx_ready = 0;
-	if (data == NULL) {
-		list_for_each_entry(dev, &iser_dev_list, list)
-			iser_rx_progress_one(dev);
-	} else {
-		dev = data;
-		iser_rx_progress_one(dev);
+
+	if (conn->state == STATE_CLOSE) {
+		dprintf("ignoring tx for closed conn\n");
+		return;
+	}
+
+	for (;;) {
+		dprintf("trying tx\n");
+		ret = iscsi_tx_handler(conn);
+		if (conn->state == STATE_CLOSE) {
+			conn_close(conn);
+			dprintf("connection %p closed\n", ci);
+			break;
+		}
+		if (ret != 0) {
+			/* but leave on tx ready list */
+			waiting_rdma_slot = 1;
+			break;
+		}
 	}
 }
 
@@ -1165,10 +1197,7 @@ static int iscsi_rdma_init(void)
 	INIT_LIST_HEAD(&iser_dev_list);
 	INIT_LIST_HEAD(&iser_conn_list);
 	INIT_LIST_HEAD(&temp_conn);
-	num_tx_ready = 0;
-	num_rx_ready = 0;
-	ret = tgt_counter_event_add(&num_tx_ready, iser_tx_progress, NULL);
-	ret = tgt_counter_event_add(&num_rx_ready, iser_rx_progress, NULL);
+
 	return ret;
 }
 
@@ -1397,10 +1426,6 @@ static void iscsi_iser_write_end(struct iscsi_connection *conn)
 
 	ci->writeb = 0;  /* reset count */
 	ci->send_comm_event = NULL;
-
-	/* wake up the progress engine to do the done */
-	dprintf("inc progress to finish cmd\n");
-	num_tx_ready = 1;
 }
 
 /*
@@ -1505,7 +1530,7 @@ static int iscsi_rdma_rdma_write(struct iscsi_connection *conn)
 		iscsi_rdma_event_modify(conn, EPOLLIN);
 	} else {
 		/* poke ourselves to do the next rdma */
-		num_tx_ready = 1;
+		tgt_add_sched_event(&ci->tx_sched);
 	}
 
 	return ret;
@@ -1628,7 +1653,7 @@ static void iscsi_rdma_event_modify(struct iscsi_connection *conn, int events)
 			dprintf("tx ready adding %p\n", ci);
 			list_add(&ci->conn_tx_ready, &conn_tx_ready);
 		}
-		num_tx_ready = 1;
+		tgt_add_sched_event(&ci->tx_sched);
 	} else {
 		dprintf("tx ready removing %p\n", ci);
 		list_del_init(&ci->conn_tx_ready);
diff --git a/usr/tgtd.c b/usr/tgtd.c
index 0b1cb4c..ac3ff76 100644
--- a/usr/tgtd.c
+++ b/usr/tgtd.c
@@ -38,26 +38,13 @@
 #include "work.h"
 #include "util.h"
 
-struct tgt_event {
-	union {
-		event_handler_t *handler;
-		counter_event_handler_t *counter_handler;
-	};
-	union {
-		int fd;
-		int *counter;
-	};
-	void *data;
-	struct list_head e_list;
-};
-
 unsigned long pagesize, pageshift, pagemask;
 
 int system_active = 1;
 static int ep_fd;
 static char program_name[] = "tgtd";
 static LIST_HEAD(tgt_events_list);
-static LIST_HEAD(tgt_counter_events_list);
+static LIST_HEAD(tgt_sched_events_list);
 
 static struct option const long_options[] =
 {
@@ -136,22 +123,6 @@ int tgt_event_add(int fd, int events, event_handler_t handler, void *data)
 	return err;
 }
 
-int tgt_counter_event_add(int *counter, counter_event_handler_t handler,
-			  void *data)
-{
-	struct tgt_event *tev;
-
-	tev = zalloc(sizeof(*tev));
-	if (!tev)
-		return -ENOMEM;
-
-	tev->data = data;
-	tev->counter_handler = handler;
-	tev->counter = counter;
-	list_add(&tev->e_list, &tgt_counter_events_list);
-	return 0;
-}
-
 static struct tgt_event *tgt_event_lookup(int fd)
 {
 	struct tgt_event *tev;
@@ -163,17 +134,6 @@ static struct tgt_event *tgt_event_lookup(int fd)
 	return NULL;
 }
 
-static struct tgt_event *tgt_counter_event_lookup(int *counter)
-{
-	struct tgt_event *tev;
-
-	list_for_each_entry(tev, &tgt_counter_events_list, e_list) {
-		if (tev->counter == counter)
-			return tev;
-	}
-	return NULL;
-}
-
 void tgt_event_del(int fd)
 {
 	struct tgt_event *tev;
@@ -189,20 +149,6 @@ void tgt_event_del(int fd)
 	free(tev);
 }
 
-void tgt_counter_event_del(int *counter)
-{
-	struct tgt_event *tev;
-
-	tev = tgt_counter_event_lookup(counter);
-	if (!tev) {
-		eprintf("Cannot find counter event %p\n", counter);
-		return;
-	}
-
-	list_del(&tev->e_list);
-	free(tev);
-}
-
 int tgt_event_modify(int fd, int events)
 {
 	struct epoll_event ev;
@@ -221,26 +167,54 @@ int tgt_event_modify(int fd, int events)
 	return epoll_ctl(ep_fd, EPOLL_CTL_MOD, fd, &ev);
 }
 
-static void event_loop(void)
+void tgt_init_sched_event(struct tgt_event *evt,
+			  sched_event_handler_t sched_handler, void *data)
 {
-	int nevent, i, done, timeout = TGTD_TICK_PERIOD * 1000;
-	struct epoll_event events[1024];
+	evt->sched_handler = sched_handler;
+	evt->scheduled = 0;
+	evt->data = data;
+	INIT_LIST_HEAD(&evt->e_list);
+}
+
+void tgt_add_sched_event(struct tgt_event *evt)
+{
+	if (!evt->scheduled) {
+		evt->scheduled = 1;
+		list_add_tail(&evt->e_list, &tgt_sched_events_list);
+	}
+}
+
+void tgt_remove_sched_event(struct tgt_event *evt)
+{
+	if (evt->scheduled) {
+		evt->scheduled = 0;
+		list_del_init(&evt->e_list);
+	}
+}
+
+static void tgt_exec_scheduled(void)
+{
+	struct list_head *last_sched;
 	struct tgt_event *tev, *tevn;
 
-retry:
-	/*
-	 * Check the counter events to see if they have any work to run.
-	 */
-	do {
-		done = 1;
-		list_for_each_entry_safe(tev, tevn, &tgt_counter_events_list,
-					e_list) {
-			if (*tev->counter) {
-				done = 0;
-				tev->counter_handler(tev->counter, tev->data);
-			}
-		}
-	} while (!done);
+	if (list_empty(&tgt_sched_events_list))
+		return;
+
+	/* execute only work scheduled till now */
+	last_sched = tgt_sched_events_list.prev;
+	list_for_each_entry_safe(tev, tevn, &tgt_sched_events_list, e_list) {
+		tgt_remove_sched_event(tev);
+		tev->sched_handler(tev);
+		if (&tev->e_list == last_sched)
+			break;
+	}
+}
+
+static void tgt_poll_events(int timeout)
+{
+	int nevent, i;
+	struct tgt_event *tev;
+	struct epoll_event events[1024];
 
 	nevent = epoll_wait(ep_fd, events, ARRAY_SIZE(events), timeout);
 	if (nevent < 0) {
@@ -255,9 +229,19 @@ retry:
 		}
 	} else
 		schedule();
+}
 
-	if (system_active)
-		goto retry;
+static void event_loop(void)
+{
+	int timeout, wait_timeout = TGTD_TICK_PERIOD * 1000;
+
+	while (system_active) {
+		tgt_exec_scheduled();
+		/* wait if no scheduled work, poll if there is */
+		timeout = list_empty(&tgt_sched_events_list) ?
+				wait_timeout : 0;
+		tgt_poll_events(timeout);
+	}
 }
 
 static int lld_init(int *use_kernel, char *args)
diff --git a/usr/tgtd.h b/usr/tgtd.h
index 4febcd3..da751c8 100644
--- a/usr/tgtd.h
+++ b/usr/tgtd.h
@@ -206,13 +206,20 @@ extern int tgt_bind_host_to_target(int tid, int host_no);
 extern int tgt_unbind_host_to_target(int tid, int host_no);
 extern int tgt_bound_target_lookup(int host_no);
 
-typedef void (event_handler_t)(int fd, int events, void *data);
-typedef void (counter_event_handler_t)(int *counter, void *data);
+struct tgt_event;
+typedef void (*sched_event_handler_t)(struct tgt_event *tev);
+
+extern void tgt_init_sched_event(struct tgt_event *evt,
+			  sched_event_handler_t sched_handler, void *data);
+
+typedef void (*event_handler_t)(int fd, int events, void *data);
+
 extern int tgt_event_add(int fd, int events, event_handler_t handler, void *data);
-extern int tgt_counter_event_add(int *counter, counter_event_handler_t handler,
-				 void *data);
 extern void tgt_event_del(int fd);
-extern void tgt_counter_event_del(int *counter);
+
+extern void tgt_add_sched_event(struct tgt_event *evt);
+extern void tgt_remove_sched_event(struct tgt_event *evt);
+
 extern int tgt_event_modify(int fd, int events);
 extern int target_cmd_queue(int tid, struct scsi_cmd *cmd);
 extern void target_cmd_done(struct scsi_cmd *cmd);
@@ -262,4 +269,17 @@ extern int dtd_load_unload(int tid, uint64_t lun, int load, char *file);
 extern int register_backingstore_template(struct backingstore_template *bst);
 extern struct backingstore_template *get_backingstore_template(const char *name);
 
+struct tgt_event {
+	union {
+		event_handler_t handler;
+		sched_event_handler_t sched_handler;
+	};
+	union {
+		int fd;
+		int scheduled;
+	};
+	void *data;
+	struct list_head e_list;
+};
+
 #endif
--
To unsubscribe from this list: send the line "unsubscribe stgt" in
the body of a message to majordomo at vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html



More information about the stgt mailing list