[sheepdog] [RFC PATCH] sheep: introduce 'short thread' to worker threads
Liu Yuan
namei.unix at gmail.com
Fri Jun 29 12:46:30 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
This patch introduces *short thread* abstraction that is created on demand and destroyed
after serving the work for gateway or io requests, aiming to solve two problems:
1. timeout of IO requests from guests. With on-demand short threads, we guarantee
that there is always one thread available to execute the request ASAP.
2. system halt for corner cases that all gateway and io threads are executing
local requests that ask for creation of another thread to execute the request
and sleep wait for response.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/sheep.c | 31 +++----------------------------
sheep/work.c | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 55 insertions(+), 28 deletions(-)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 67db023..11a518f 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -27,9 +27,6 @@
#define DEFAULT_OBJECT_DIR "/tmp"
#define LOG_FILE_NAME "sheep.log"
-static unsigned nr_io_worker = 4;
-static unsigned nr_gateway_worker = 4;
-
LIST_HEAD(cluster_drivers);
static char program_name[] = "sheep";
@@ -38,10 +35,8 @@ static struct option const long_options[] = {
{"debug", no_argument, NULL, 'd'},
{"directio", no_argument, NULL, 'D'},
{"foreground", no_argument, NULL, 'f'},
- {"nr_gateway_worker", required_argument, NULL, 'g'},
{"gateway", no_argument, NULL, 'G'},
{"help", no_argument, NULL, 'h'},
- {"nr_io_worker", required_argument, NULL, 'i'},
{"loglevel", required_argument, NULL, 'l'},
{"myaddr", required_argument, NULL, 'y'},
{"stdout", no_argument, NULL, 'o'},
@@ -52,7 +47,7 @@ static struct option const long_options[] = {
{NULL, 0, NULL, 0},
};
-static const char *short_options = "c:dDfg:Ghi:l:op:v:Wy:z:";
+static const char *short_options = "c:dDfGhl:op:v:Wy:z:";
static void usage(int status)
{
@@ -68,10 +63,8 @@ Options:\n\
-d, --debug include debug messages in the log\n\
-D, --directio use direct IO when accessing the object from object cache\n\
-f, --foreground make the program run in the foreground\n\
- -g, --nr_gateway_worker set the number of workers for Guests' requests (default 4)\n\
-G, --gateway make the progam run as a gateway mode (same as '-v 0')\n\
-h, --help display this help and exit\n\
- -i, --nr_io_worker set the number of workers for inter-sheep requests (default 4)\n\
-l, --loglevel specify the level of logging detail\n\
-o, --stdout log to stdout instead of shared logger\n\
-p, --port specify the TCP port on which to listen\n\
@@ -163,28 +156,10 @@ int main(int argc, char **argv)
dprintf("direct IO mode\n");
sys->use_directio = 1;
break;
- case 'g':
- nr_gateway_worker = strtol(optarg, &p, 10);
- if (optarg == p || nr_gateway_worker < 4 || nr_gateway_worker > UINT32_MAX) {
- fprintf(stderr, "Invalid number of gateway workers '%s': "
- "must be an integer between 4 and %u\n",
- optarg, UINT32_MAX);
- exit(1);
- }
- break;
case 'G':
/* same as '-v 0' */
nr_vnodes = 0;
break;
- case 'i':
- nr_io_worker = strtol(optarg, &p, 10);
- if (optarg == p || nr_io_worker < 4 || nr_io_worker > UINT32_MAX) {
- fprintf(stderr, "Invalid number of internal IO workers '%s': "
- "must be an integer between 4 and %u\n",
- optarg, UINT32_MAX);
- exit(1);
- }
- break;
case 'o':
to_stdout = 1;
break;
@@ -272,8 +247,8 @@ int main(int argc, char **argv)
local_req_init();
- sys->gateway_wqueue = init_work_queue("gateway", nr_gateway_worker);
- sys->io_wqueue = init_work_queue("io", nr_io_worker);
+ sys->gateway_wqueue = init_work_queue("gateway", 0);
+ sys->io_wqueue = init_work_queue("io", 0);
sys->recovery_wqueue = init_work_queue("recovery", 1);
sys->deletion_wqueue = init_work_queue("deletion", 1);
sys->block_wqueue = init_work_queue("block", 1);
diff --git a/sheep/work.c b/sheep/work.c
index f81c41d..708f88c 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -24,6 +24,7 @@
#include <sys/types.h>
#include <sys/eventfd.h>
#include <linux/types.h>
+#include <urcu/uatomic.h>
#include "list.h"
#include "util.h"
@@ -39,10 +40,61 @@ enum wq_state {
WQ_DEAD = (1U << 1),
};
+/*
+ * Short thread is created on demand and destroyed after serving the work for
+ * gateway or io requests, aiming to solve two problems:
+ *
+ * 1. timeout of IO requests from guests. With on-demand short threads, we
+ * guarantee that there is always one thread available to execute the
+ * request as soon as possible.
+ * 2. sheep halt for corner case that all gateway and io threads are executing
+ * local requests that ask for creation of another thread to execute the
+ * requests and sleep-wait for responses.
+ */
+struct short_work {
+ struct work *work;
+ struct worker_info *wi;
+};
+
+static void *run_short_thread(void * arg)
+{
+ struct short_work *sw = arg;
+ eventfd_t value = 1;
+ static uint64_t idx = 0;
+
+ uatomic_inc(&idx);
+ set_thread_name(sw->wi->name, idx);
+
+ sw->work->fn(sw->work);
+
+ pthread_mutex_lock(&sw->wi->finished_lock);
+ list_add_tail(&sw->work->w_list, &sw->wi->finished_list);
+ pthread_mutex_unlock(&sw->wi->finished_lock);
+
+ eventfd_write(efd, value);
+ return NULL;
+}
+
+static inline void create_short_thread(struct worker_info *wi,
+ struct work *work)
+{
+ pthread_t thread;
+ struct short_work *sw = xmalloc(sizeof *sw);
+
+ sw->work = work;
+ sw->wi = wi;
+ if (pthread_create(&thread, NULL, run_short_thread, sw))
+ panic("%m\n");
+}
+
void queue_work(struct work_queue *q, struct work *work)
{
struct worker_info *wi = container_of(q, struct worker_info, q);
+ if (!wi->nr_threads) {
+ create_short_thread(wi, work);
+ return;
+ }
pthread_mutex_lock(&wi->pending_lock);
list_add_tail(&work->w_list, &wi->q.pending_list);
pthread_mutex_unlock(&wi->pending_lock);
--
1.7.10.2
More information about the sheepdog
mailing list