[Sheepdog] [PATCH 1/6] collie: core codes of a sheepdog daemon

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Tue Dec 1 19:35:18 CET 2009


This includes a main function, connections handling, and
worker threads.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 collie/collie.c |  116 +++++++++++++++++++
 collie/collie.h |   93 +++++++++++++++
 collie/net.c    |  341 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 collie/work.c   |  234 ++++++++++++++++++++++++++++++++++++++
 collie/work.h   |   20 ++++
 5 files changed, 804 insertions(+), 0 deletions(-)
 create mode 100644 collie/collie.c
 create mode 100644 collie/collie.h
 create mode 100644 collie/net.c
 create mode 100644 collie/work.c
 create mode 100644 collie/work.h

diff --git a/collie/collie.c b/collie/collie.c
new file mode 100644
index 0000000..f58ed96
--- /dev/null
+++ b/collie/collie.c
@@ -0,0 +1,116 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <getopt.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "collie.h"
+
+#define EPOLL_SIZE 4096
+#define DEFAULT_OBJECT_DIR "/tmp"
+
+static char program_name[] = "collie";
+
+static struct option const long_options[] = {
+	{"port", required_argument, 0, 'p'},
+	{"foreground", no_argument, 0, 'f'},
+	{"debug", no_argument, 0, 'd'},
+	{"help", no_argument, 0, 'h'},
+	{0, 0, 0, 0},
+};
+
+static char *short_options = "p:fdh";
+
+static void usage(int status)
+{
+	if (status)
+		fprintf(stderr, "Try `%s --help' for more information.\n",
+			program_name);
+	else {
+		printf("Usage: %s [OPTION] [PATH]\n", program_name);
+		printf("\
+Sheepdog Daemon\n\
+  -p, --port              specify the listen port number\n\
+  -f, --foreground        make the program run in the foreground\n\
+  -d, --debug             print debug messages\n\
+  -h, --help              display this help and exit\n\
+");
+	}
+	exit(status);
+}
+
+int main(int argc, char **argv)
+{
+	int ch, longindex;
+	int ret, port = SD_LISTEN_PORT;
+	char *dir = DEFAULT_OBJECT_DIR;
+	int is_daemon = 1;
+	int is_debug = 0;
+	struct cluster_info *ci;
+
+	while ((ch = getopt_long(argc, argv, short_options, long_options,
+				 &longindex)) >= 0) {
+		switch (ch) {
+		case 'p':
+			port = atoi(optarg);
+			break;
+		case 'f':
+			is_daemon = 0;
+			break;
+		case 'd':
+			is_debug = 1;
+			break;
+		case 'h':
+			usage(0);
+			break;
+		default:
+			usage(1);
+			break;
+		}
+	}
+
+	if (optind != argc)
+		dir = argv[optind];
+
+	ret = log_init(program_name, LOG_SPACE_SIZE, is_daemon, is_debug);
+	if (ret)
+		exit(1);
+
+	if (is_daemon && daemon(0, 0))
+		exit(1);
+
+	ret = init_event(EPOLL_SIZE);
+	if (ret)
+		exit(1);
+
+	ret = init_store(dir);
+	if (ret)
+		exit(1);
+
+	ret = init_worker();
+	if (ret)
+		exit(1);
+
+	ci = create_cluster(port);
+	if (!ci) {
+		eprintf("failed to create sheepdog cluster.\n");
+		exit(1);
+	}
+
+	ret = create_listen_port(port, ci);
+	if (ret)
+		exit(1);
+
+	event_loop(-1);
+
+	return 0;
+}
diff --git a/collie/collie.h b/collie/collie.h
new file mode 100644
index 0000000..5bdbe3c
--- /dev/null
+++ b/collie/collie.h
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef __COLLIE_H__
+#define __COLLIE_H__
+
+#include <inttypes.h>
+#include <corosync/cpg.h>
+
+#include "sheepdog_proto.h"
+#include "event.h"
+#include "logger.h"
+#include "work.h"
+#include "net.h"
+
+#define SD_MSG_JOIN             0x01
+#define SD_MSG_VDI_OP           0x02
+#define SD_MSG_MASTER_CHANGED   0x03
+
+struct client_info {
+	struct connection conn;
+
+	struct request *rx_req;
+
+	struct request *tx_req;
+
+	struct list_head reqs;
+	struct list_head done_reqs;
+
+	struct cluster_info *cluster;
+};
+
+struct request;
+
+typedef void (*req_end_t) (struct request *);
+
+struct request {
+	struct sd_req rq;
+	struct sd_rsp rp;
+
+	void *data;
+
+	struct client_info *ci;
+	struct list_head r_siblings;
+	struct list_head r_wlist;
+	struct list_head pending_list;
+
+	req_end_t done;
+	struct work work;
+};
+
+struct cluster_info {
+	cpg_handle_t handle;
+	int synchronized;
+	uint32_t this_nodeid;
+	uint32_t this_pid;
+	struct sheepdog_node_list_entry this_node;
+
+	uint64_t epoch;
+
+	struct list_head node_list;
+	struct list_head vm_list;
+	struct list_head pending_list;
+};
+
+int create_listen_port(int port, void *data);
+
+int init_store(char *dir);
+
+int add_vdi(struct cluster_info *cluster,
+	    char *name, int len, uint64_t size, uint64_t * added_oid,
+	    uint64_t base_oid, uint32_t tag);
+
+int lookup_vdi(struct cluster_info *cluster, char *filename, uint64_t * oid,
+	       uint32_t tag, int do_lock, int *current);
+
+int build_node_list(struct list_head *node_list,
+		    struct sheepdog_node_list_entry *entries);
+
+struct cluster_info *create_cluster(int port);
+
+void store_queue_request(struct work *work, int idx);
+
+void cluster_queue_request(struct work *work, int idx);
+
+#endif
diff --git a/collie/net.c b/collie/net.c
new file mode 100644
index 0000000..cf2e4bb
--- /dev/null
+++ b/collie/net.c
@@ -0,0 +1,341 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <netinet/tcp.h>
+#include <sys/epoll.h>
+
+#include "collie.h"
+
+static void __done(struct work *work, int idx)
+{
+	struct request *req = container_of(work, struct request, work);
+	struct sd_req *hdr = (struct sd_req *)&req->rq;
+
+	switch (hdr->opcode) {
+	case SD_OP_NEW_VDI:
+	case SD_OP_DEL_VDI:
+	case SD_OP_LOCK_VDI:
+	case SD_OP_RELEASE_VDI:
+	case SD_OP_GET_VDI_INFO:
+	case SD_OP_MAKE_FS:
+	case SD_OP_UPDATE_EPOCH:
+	case SD_OP_SHUTDOWN:
+		/* request is forwarded to cpg group */
+		return;
+	}
+	req->done(req);
+}
+
+static void queue_request(struct request *req)
+{
+	struct sd_req *hdr = (struct sd_req *)&req->rq;
+
+	switch (hdr->opcode) {
+	case SD_OP_CREATE_AND_WRITE_OBJ:
+	case SD_OP_REMOVE_OBJ:
+	case SD_OP_READ_OBJ:
+	case SD_OP_WRITE_OBJ:
+	case SD_OP_SYNC_OBJ:
+	case SD_OP_STAT_SHEEP:
+		req->work.fn = store_queue_request;
+		break;
+	case SD_OP_GET_NODE_LIST:
+	case SD_OP_GET_VM_LIST:
+	case SD_OP_NEW_VDI:
+	case SD_OP_DEL_VDI:
+	case SD_OP_LOCK_VDI:
+	case SD_OP_RELEASE_VDI:
+	case SD_OP_GET_VDI_INFO:
+	case SD_OP_MAKE_FS:
+	case SD_OP_UPDATE_EPOCH:
+	case SD_OP_SHUTDOWN:
+		req->work.fn = cluster_queue_request;
+		break;
+	default:
+		eprintf("unknown operation %d\n", hdr->opcode);
+		return;
+	}
+
+	req->work.done = __done;
+
+	list_del(&req->r_wlist);
+
+	queue_work(&req->work);
+}
+
+static struct request *alloc_request(struct client_info *ci, int data_length)
+{
+	struct request *req;
+
+	req = zalloc(sizeof(struct request) + data_length);
+	if (!req)
+		return NULL;
+
+	req->ci = ci;
+	if (data_length)
+		req->data = (char *)req + sizeof(*req);
+
+	list_add(&req->r_siblings, &ci->reqs);
+	INIT_LIST_HEAD(&req->r_wlist);
+
+	return req;
+}
+
+static void free_request(struct request *req)
+{
+	list_del(&req->r_siblings);
+	free(req);
+}
+
+static void req_done(struct request *req)
+{
+	list_add(&req->r_wlist, &req->ci->done_reqs);
+	conn_tx_on(&req->ci->conn);
+}
+
+static void init_rx_hdr(struct client_info *ci)
+{
+	ci->conn.c_rx_state = C_IO_HEADER;
+	ci->rx_req = NULL;
+	ci->conn.rx_length = sizeof(struct sd_req);
+	ci->conn.rx_buf = &ci->conn.rx_hdr;
+}
+
+static void client_rx_handler(struct client_info *ci)
+{
+	int ret;
+	uint64_t data_len;
+	struct connection *conn = &ci->conn;
+	struct sd_req *hdr = &conn->rx_hdr;
+	struct request *req;
+
+	switch (conn->c_rx_state) {
+	case C_IO_HEADER:
+		ret = rx(conn, C_IO_DATA_INIT);
+		if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
+			break;
+	case C_IO_DATA_INIT:
+		data_len = hdr->data_length;
+
+		req = alloc_request(ci, data_len);
+		if (!req) {
+			conn->c_rx_state = C_IO_CLOSED;
+			break;
+		}
+		ci->rx_req = req;
+
+		/* use le_to_cpu */
+		memcpy(&req->rq, hdr, sizeof(req->rq));
+
+		if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
+			conn->c_rx_state = C_IO_DATA;
+			conn->rx_length = data_len;
+			conn->rx_buf = req->data;
+		} else {
+			conn->c_rx_state = C_IO_END;
+			break;
+		}
+	case C_IO_DATA:
+		ret = rx(conn, C_IO_END);
+		break;
+	default:
+		eprintf("BUG: unknown state %d\n", conn->c_rx_state);
+	}
+
+	if (is_conn_dead(conn) || conn->c_rx_state != C_IO_END)
+		return;
+
+	/* now we have a complete command */
+
+	req = ci->rx_req;
+
+	init_rx_hdr(ci);
+
+	if (hdr->flags & SD_FLAG_CMD_WRITE)
+		req->rp.data_length = 0;
+	else
+		req->rp.data_length = hdr->data_length;
+
+	req->done = req_done;
+
+	queue_request(req);
+}
+
+static void init_tx_hdr(struct client_info *ci)
+{
+	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+	struct request *req;
+
+	if (ci->tx_req || list_empty(&ci->done_reqs))
+		return;
+
+	memset(rsp, 0, sizeof(*rsp));
+
+	req = list_first_entry(&ci->done_reqs, struct request, r_wlist);
+	list_del(&req->r_wlist);
+
+	ci->tx_req = req;
+	ci->conn.tx_length = sizeof(*rsp);
+	ci->conn.c_tx_state = C_IO_HEADER;
+	ci->conn.tx_buf = rsp;
+
+	/* use cpu_to_le */
+	memcpy(rsp, &req->rp, sizeof(*rsp));
+
+	rsp->epoch = ci->cluster->epoch;
+	rsp->opcode = req->rq.opcode;
+	rsp->id = req->rq.id;
+}
+
+static void client_tx_handler(struct client_info *ci)
+{
+	int ret, opt;
+	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+
+again:
+	init_tx_hdr(ci);
+	if (!ci->tx_req) {
+		conn_tx_off(&ci->conn);
+		return;
+	}
+
+	opt = 1;
+	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+	switch (ci->conn.c_tx_state) {
+	case C_IO_HEADER:
+		if (rsp->data_length)
+			ret = tx(&ci->conn, C_IO_DATA_INIT, MSG_MORE);
+		else
+			ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
+
+		if (!ret)
+			break;
+
+		if (rsp->data_length) {
+			ci->conn.tx_length = rsp->data_length;
+			ci->conn.tx_buf = ci->tx_req->data;
+			ci->conn.c_tx_state = C_IO_DATA;
+		} else {
+			ci->conn.c_tx_state = C_IO_END;
+			break;
+		}
+	case C_IO_DATA:
+		ret = tx(&ci->conn, C_IO_END, 0);
+		if (!ret)
+			break;
+	default:
+		break;
+	}
+
+	opt = 0;
+	setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+	if (is_conn_dead(&ci->conn) || ci->conn.c_tx_state != C_IO_END)
+		return;
+
+	if (ci->conn.c_tx_state == C_IO_END) {
+		free_request(ci->tx_req);
+		ci->tx_req = NULL;
+		goto again;
+	}
+}
+
+static void destroy_client(struct client_info *ci)
+{
+	close(ci->conn.fd);
+	free(ci);
+}
+
+static struct client_info *create_client(int fd, struct cluster_info *cluster)
+{
+	struct client_info *ci;
+
+	ci = zalloc(sizeof(*ci));
+	if (!ci)
+		return NULL;
+
+	ci->conn.fd = fd;
+
+	INIT_LIST_HEAD(&ci->reqs);
+	INIT_LIST_HEAD(&ci->done_reqs);
+
+	init_rx_hdr(ci);
+
+	ci->cluster = cluster;
+
+	return ci;
+}
+
+static void client_handler(int fd, int events, void *data)
+{
+	struct client_info *ci = (struct client_info *)data;
+
+	if (events & EPOLLIN)
+		client_rx_handler(ci);
+
+	if (!is_conn_dead(&ci->conn) && events & EPOLLOUT)
+		client_tx_handler(ci);
+
+	if (is_conn_dead(&ci->conn)) {
+		dprintf("closed a connection, %d\n", fd);
+		unregister_event(fd);
+		destroy_client(ci);
+	}
+}
+
+static void listen_handler(int listen_fd, int events, void *data)
+{
+	struct sockaddr_storage from;
+	socklen_t namesize;
+	int fd, ret, opt;
+	struct client_info *ci;
+
+	namesize = sizeof(from);
+	fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
+	if (fd < 0) {
+		eprintf("can't accept a new connection, %m\n");
+		return;
+	}
+
+	opt = 1;
+	ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
+	if (ret) {
+		close(fd);
+		return;
+	}
+
+	ci = create_client(fd, data);
+	if (!ci) {
+		close(fd);
+		return;
+	}
+
+	ret = register_event(fd, client_handler, ci);
+	if (ret) {
+		destroy_client(ci);
+		return;
+	}
+
+	dprintf("accepted a new connection, %d\n", fd);
+}
+
+static int create_listen_port_fn(int fd, void *data)
+{
+	return register_event(fd, listen_handler, data);
+}
+
+int create_listen_port(int port, void *data)
+{
+	return create_listen_ports(port, create_listen_port_fn, data);
+}
diff --git a/collie/work.c b/collie/work.c
new file mode 100644
index 0000000..a0b7fda
--- /dev/null
+++ b/collie/work.c
@@ -0,0 +1,234 @@
+/*
+ * Copyright (C) 2007 FUJITA Tomonori <tomof at acm.org>
+ * Copyright (C) 2007 Mike Christie <michaelc at cs.wisc.edu>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, version 2 of the
+ * License.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+#include <errno.h>
+#include <string.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <syscall.h>
+#include <sys/types.h>
+#include <linux/types.h>
+#define _LINUX_FCNTL_H
+#include <linux/signalfd.h>
+
+#include "list.h"
+#include "util.h"
+#include "work.h"
+#include "logger.h"
+#include "event.h"
+
+extern int signalfd(int fd, const sigset_t *mask, int flags);
+
+struct worker_info {
+	pthread_t worker_thread[NR_WORKER_THREAD];
+
+	pthread_mutex_t finished_lock;
+	struct list_head finished_list;
+
+	/* wokers sleep on this and signaled by tgtd */
+	pthread_cond_t pending_cond;
+	/* locked by tgtd and workers */
+	pthread_mutex_t pending_lock;
+	/* protected by pending_lock */
+	struct list_head pending_list;
+
+	pthread_mutex_t startup_lock;
+
+	int sig_fd;
+
+	int stop;
+};
+
+static struct worker_info __wi;
+
+static void bs_thread_request_done(int fd, int events, void *data)
+{
+	int ret;
+	struct worker_info *wi = data;
+	struct work *work;
+	struct signalfd_siginfo siginfo[16];
+	LIST_HEAD(list);
+
+	ret = read(fd, (char *)siginfo, sizeof(siginfo));
+	if (ret <= 0)
+		return;
+
+	pthread_mutex_lock(&wi->finished_lock);
+	list_splice_init(&wi->finished_list, &list);
+	pthread_mutex_unlock(&wi->finished_lock);
+
+	while (!list_empty(&list)) {
+		work = list_first_entry(&list, struct work, w_list);
+		list_del(&work->w_list);
+
+		work->done(work, 0);
+	}
+}
+
+static void *worker_routine(void *arg)
+{
+	struct worker_info *wi = &__wi;
+	struct work *work;
+	pthread_t *p = arg;
+	int idx = p - wi->worker_thread;
+	sigset_t set;
+
+	sigfillset(&set);
+	sigprocmask(SIG_BLOCK, &set, NULL);
+
+	pthread_mutex_lock(&wi->startup_lock);
+	dprintf("started this thread %d\n", idx);
+	pthread_mutex_unlock(&wi->startup_lock);
+
+	while (!wi->stop) {
+		pthread_mutex_lock(&wi->pending_lock);
+retest:
+		if (list_empty(&wi->pending_list)) {
+			pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
+			if (wi->stop) {
+				pthread_mutex_unlock(&wi->pending_lock);
+				pthread_exit(NULL);
+			}
+			goto retest;
+		}
+
+		work = list_first_entry(&wi->pending_list,
+				       struct work, w_list);
+
+		list_del(&work->w_list);
+		pthread_mutex_unlock(&wi->pending_lock);
+
+		work->fn(work, idx);
+
+		pthread_mutex_lock(&wi->finished_lock);
+		list_add_tail(&work->w_list, &wi->finished_list);
+		pthread_mutex_unlock(&wi->finished_lock);
+
+		kill(getpid(), SIGUSR2);
+	}
+
+	pthread_exit(NULL);
+}
+
+int init_worker(void)
+{
+	int i, ret;
+	sigset_t mask;
+	struct worker_info *wi = &__wi;
+
+	INIT_LIST_HEAD(&wi->pending_list);
+	INIT_LIST_HEAD(&wi->finished_list);
+
+	pthread_cond_init(&wi->pending_cond, NULL);
+
+	pthread_mutex_init(&wi->finished_lock, NULL);
+	pthread_mutex_init(&wi->pending_lock, NULL);
+	pthread_mutex_init(&wi->startup_lock, NULL);
+
+	sigemptyset(&mask);
+	sigaddset(&mask, SIGUSR2);
+	sigprocmask(SIG_BLOCK, &mask, NULL);
+
+	wi->sig_fd = signalfd(-1, &mask, 0);
+	if (wi->sig_fd < 0) {
+		eprintf("failed to create a signal fd, %m\n");
+		return 1;
+	}
+
+	ret = fcntl(wi->sig_fd, F_GETFL);
+	ret = fcntl(wi->sig_fd, F_SETFL, ret | O_NONBLOCK);
+
+	ret = register_event(wi->sig_fd, bs_thread_request_done, wi);
+	if (ret) {
+		eprintf("failed to add epoll event\n");
+		goto destroy_cond_mutex;
+	}
+
+	pthread_mutex_lock(&wi->startup_lock);
+	for (i = 0; i < NR_WORKER_THREAD; i++) {
+		ret = pthread_create(&wi->worker_thread[i], NULL,
+				     worker_routine, &wi->worker_thread[i]);
+
+		if (ret) {
+			eprintf("failed to create a worker thread, %d %s\n",
+				i, strerror(ret));
+			if (ret)
+				goto destroy_threads;
+		}
+	}
+	pthread_mutex_unlock(&wi->startup_lock);
+
+	return 0;
+destroy_threads:
+
+	wi->stop = 1;
+	pthread_mutex_unlock(&wi->startup_lock);
+	for (; i > 0; i--) {
+		pthread_join(wi->worker_thread[i - 1], NULL);
+		eprintf("stopped the worker thread %d\n", i - 1);
+	}
+
+	unregister_event(wi->sig_fd);
+destroy_cond_mutex:
+	pthread_cond_destroy(&wi->pending_cond);
+	pthread_mutex_destroy(&wi->pending_lock);
+	pthread_mutex_destroy(&wi->startup_lock);
+	pthread_mutex_destroy(&wi->finished_lock);
+
+	return 1;
+}
+
+void exit_worker(void)
+{
+	int i;
+	struct worker_info *wi = &__wi;
+
+	wi->stop = 1;
+	pthread_cond_broadcast(&wi->pending_cond);
+
+	for (i = 0; wi->worker_thread[i] &&
+		     i < ARRAY_SIZE(wi->worker_thread); i++)
+		pthread_join(wi->worker_thread[i], NULL);
+
+	pthread_cond_destroy(&wi->pending_cond);
+	pthread_mutex_destroy(&wi->pending_lock);
+	pthread_mutex_destroy(&wi->startup_lock);
+	pthread_mutex_destroy(&wi->finished_lock);
+
+	unregister_event(wi->sig_fd);
+
+	wi->stop = 0;
+}
+
+void queue_work(struct work *work)
+{
+	struct worker_info *wi = &__wi;
+
+	pthread_mutex_lock(&wi->pending_lock);
+
+	list_add_tail(&work->w_list, &wi->pending_list);
+
+	pthread_mutex_unlock(&wi->pending_lock);
+
+	pthread_cond_signal(&wi->pending_cond);
+}
diff --git a/collie/work.h b/collie/work.h
new file mode 100644
index 0000000..81d8c56
--- /dev/null
+++ b/collie/work.h
@@ -0,0 +1,20 @@
+#ifndef __WORK_H__
+#define __WORK_H__
+
+#define NR_WORKER_THREAD 4
+
+struct work;
+
+typedef void (*work_func_t)(struct work *, int idx);
+
+struct work {
+	struct list_head w_list;
+	work_func_t fn;
+	work_func_t done;
+};
+
+int init_worker(void);
+void exit_worker(void);
+void queue_work(struct work *work);
+
+#endif
-- 
1.5.6.5




More information about the sheepdog mailing list