[Sheepdog] [PATCH 1/3] sheep: use multiple work queues

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Thu Sep 1 18:37:50 CEST 2011


Currently, Sheepdog uses only one work queue.  So if many VMs send a
forwarding requests to sheep daemons at the same time and they consume
all the worker threads, it will cause freeze of Sheepdog.

The simplest way to solve this problem is that Sheepdog uses multiple
work queues and puts forwarding requests and I/O requests into
different queues.

This also solves the problem that sheep daemons use too many socket
descriptors on the large cluster environment.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/group.c      |    7 +++++--
 sheep/sdnet.c      |    6 +++---
 sheep/sheep.c      |    9 +++++++--
 sheep/sheep_priv.h |    9 ++++++++-
 sheep/store.c      |   10 +++++-----
 sheep/vdi.c        |    6 +++---
 sheep/work.c       |   21 +++++++++------------
 sheep/work.h       |    5 +++--
 8 files changed, 43 insertions(+), 30 deletions(-)

diff --git a/sheep/group.c b/sheep/group.c
index 1fcaea4..6690125 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -1547,7 +1547,10 @@ do_retry:
 				}
 			}
 		}
-		queue_work(&req->work);
+		if (req->rq.flags & SD_FLAG_CMD_DIRECT)
+			queue_work(sys->io_wqueue, &req->work);
+		else
+			queue_work(sys->gateway_wqueue, &req->work);
 	}
 
 	while (!list_empty(&failed_req_list)) {
@@ -1577,7 +1580,7 @@ do_retry:
 	cpg_event_work.fn = cpg_event_fn;
 	cpg_event_work.done = cpg_event_done;
 
-	queue_work(&cpg_event_work);
+	queue_work(sys->cpg_wqueue, &cpg_event_work);
 }
 
 static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name,
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index aae4df4..1ce8c80 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -805,14 +805,14 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
 int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
 		 uint32_t epoch, int worker_idx)
 {
-	static int cached_fds[NR_WORKER_THREAD][SD_MAX_NODES];
+	static int cached_fds[NR_GW_WORKER_THREAD][SD_MAX_NODES];
 	static uint32_t cached_epoch = 0;
 	int i, j, fd, ret;
 	char name[INET6_ADDRSTRLEN];
 
 	if (cached_epoch == 0) {
 		/* initialize */
-		for (i = 0; i < NR_WORKER_THREAD; i++) {
+		for (i = 0; i < NR_GW_WORKER_THREAD; i++) {
 			for (j = 0; j < SD_MAX_NODES; j++)
 				cached_fds[i][j] = -1;
 		}
@@ -825,7 +825,7 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
 		return -1;
 	}
 	if (after(epoch, cached_epoch)) {
-		for (i = 0; i < NR_WORKER_THREAD; i++) {
+		for (i = 0; i < NR_GW_WORKER_THREAD; i++) {
 			for (j = 0; j < SD_MAX_NODES; j++) {
 				if (cached_fds[i][j] >= 0)
 					close(cached_fds[i][j]);
diff --git a/sheep/sheep.c b/sheep/sheep.c
index da0cb7b..82f1031 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -145,8 +145,13 @@ int main(int argc, char **argv)
 	if (ret)
 		exit(1);
 
-	ret = init_work_queue(NR_WORKER_THREAD);
-	if (ret)
+	sys->cpg_wqueue = init_work_queue(1);
+	sys->gateway_wqueue = init_work_queue(NR_GW_WORKER_THREAD);
+	sys->io_wqueue = init_work_queue(NR_IO_WORKER_THREAD);
+	sys->recovery_wqueue = init_work_queue(1);
+	sys->deletion_wqueue = init_work_queue(1);
+	if (!sys->cpg_wqueue || !sys->gateway_wqueue || !sys->io_wqueue ||
+	    !sys->recovery_wqueue || !sys->deletion_wqueue)
 		exit(1);
 
 	ret = create_listen_port(port, sys);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index f8f8c65..dc56f54 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -140,6 +140,12 @@ struct cluster_info {
 	uint32_t recovered_epoch;
 
 	int use_directio;
+
+	struct work_queue *cpg_wqueue;
+	struct work_queue *gateway_wqueue;
+	struct work_queue *io_wqueue;
+	struct work_queue *deletion_wqueue;
+	struct work_queue *recovery_wqueue;
 };
 
 extern struct cluster_info *sys;
@@ -193,7 +199,8 @@ int update_epoch_store(uint32_t epoch);
 int set_global_nr_copies(uint32_t copies);
 int get_global_nr_copies(uint32_t *copies);
 
-#define NR_WORKER_THREAD 64
+#define NR_GW_WORKER_THREAD 4
+#define NR_IO_WORKER_THREAD 4
 
 int epoch_log_write(uint32_t epoch, char *buf, int len);
 int epoch_log_read(uint32_t epoch, char *buf, int len);
diff --git a/sheep/store.c b/sheep/store.c
index a7c7e3b..992c8f5 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -1465,7 +1465,7 @@ static void recover_timer(void *data)
 		return;
 	}
 
-	queue_work(&rw->work);
+	queue_work(sys->recovery_wqueue, &rw->work);
 }
 
 void resume_recovery_work(void)
@@ -1483,7 +1483,7 @@ void resume_recovery_work(void)
 		return;
 
 	suspended_recovery_work = NULL;
-	queue_work(&rw->work);
+	queue_work(sys->recovery_wqueue, &rw->work);
 }
 
 int is_recoverying_oid(uint64_t oid)
@@ -1575,7 +1575,7 @@ static void recover_done(struct work *work, int idx)
 			return;
 		}
 		resume_pending_requests();
-		queue_work(&rw->work);
+		queue_work(sys->recovery_wqueue, &rw->work);
 		return;
 	}
 
@@ -1595,7 +1595,7 @@ static void recover_done(struct work *work, int idx)
 		list_del(&rw->rw_siblings);
 
 		recovering_work = rw;
-		queue_work(&rw->work);
+		queue_work(sys->recovery_wqueue, &rw->work);
 	}
 }
 
@@ -1800,7 +1800,7 @@ int start_recovery(uint32_t epoch)
 		list_add_tail(&rw->rw_siblings, &recovery_work_list);
 	else {
 		recovering_work = rw;
-		queue_work(&rw->work);
+		queue_work(sys->recovery_wqueue, &rw->work);
 	}
 
 	return 0;
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 1d2c5e1..392d429 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -489,7 +489,7 @@ static void delete_one_done(struct work *work, int idx)
 
 	dw->done++;
 	if (dw->done < dw->count) {
-		queue_work(&dw->work);
+		queue_work(sys->deletion_wqueue, &dw->work);
 		return;
 	}
 
@@ -502,7 +502,7 @@ static void delete_one_done(struct work *work, int idx)
 		dw = list_first_entry(&deletion_work_list,
 				      struct deletion_work, dw_siblings);
 
-		queue_work(&dw->work);
+		queue_work(sys->deletion_wqueue, &dw->work);
 	}
 }
 
@@ -644,7 +644,7 @@ int start_deletion(uint32_t vid, uint32_t epoch)
 	}
 
 	list_add_tail(&dw->dw_siblings, &deletion_work_list);
-	queue_work(&dw->work);
+	queue_work(sys->deletion_wqueue, &dw->work);
 out:
 	free(entries);
 
diff --git a/sheep/work.c b/sheep/work.c
index 1b30036..a417107 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -126,18 +126,16 @@ static void __queue_work(struct work_queue *q, struct work *work, int enabled)
 		list_add_tail(&work->w_list, &wi->q.blocked_list);
 }
 
-static struct work_queue *wqueue;
-
-void queue_work(struct work *work)
+void queue_work(struct work_queue *q, struct work *work)
 {
 	int enabled;
 
-	if (!list_empty(&wqueue->blocked_list))
+	if (!list_empty(&q->blocked_list))
 		enabled = 0;
 	else
-		enabled = work_enabled(wqueue, work);
+		enabled = work_enabled(q, work);
 
-	__queue_work(wqueue, work, enabled);
+	__queue_work(q, work, enabled);
 }
 
 static void work_post_done(struct work_queue *q, enum work_attr attr)
@@ -258,18 +256,18 @@ static int init_eventfd(void)
 	return 0;
 }
 
-int init_work_queue(int nr)
+struct work_queue *init_work_queue(int nr)
 {
 	int i, ret;
 	struct worker_info *wi;
 
 	ret = init_eventfd();
 	if (ret)
-		return -1;
+		return NULL;
 
 	wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
 	if (!wi)
-		return -1;
+		return NULL;
 
 	wi->nr_threads = nr;
 
@@ -298,9 +296,8 @@ int init_work_queue(int nr)
 	pthread_mutex_unlock(&wi->startup_lock);
 
 	list_add(&wi->worker_info_siblings, &worker_info_list);
-	wqueue = &wi->q;
 
-	return 0;
+	return &wi->q;
 destroy_threads:
 
 	wi->q.wq_state |= WQ_DEAD;
@@ -316,7 +313,7 @@ destroy_threads:
 	pthread_mutex_destroy(&wi->startup_lock);
 	pthread_mutex_destroy(&wi->finished_lock);
 
-	return -1;
+	return NULL;
 }
 
 #ifdef COMPILE_UNUSED_CODE
diff --git a/sheep/work.h b/sheep/work.h
index 05367cb..a980600 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -2,6 +2,7 @@
 #define __WORK_H__
 
 struct work;
+struct work_queue;
 
 typedef void (*work_func_t)(struct work *, int idx);
 
@@ -17,7 +18,7 @@ struct work {
 	enum work_attr attr;
 };
 
-int init_work_queue(int nr);
-void queue_work(struct work *work);
+struct work_queue *init_work_queue(int nr);
+void queue_work(struct work_queue *q, struct work *work);
 
 #endif
-- 
1.7.2.5




More information about the sheepdog mailing list