[Sheepdog] [PATCH 1/6] collie: core codes of a sheepdog daemon
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Tue Dec 1 19:35:18 CET 2009
This includes a main function, connections handling, and
worker threads.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
collie/collie.c | 116 +++++++++++++++++++
collie/collie.h | 93 +++++++++++++++
collie/net.c | 341 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
collie/work.c | 234 ++++++++++++++++++++++++++++++++++++++
collie/work.h | 20 ++++
5 files changed, 804 insertions(+), 0 deletions(-)
create mode 100644 collie/collie.c
create mode 100644 collie/collie.h
create mode 100644 collie/net.c
create mode 100644 collie/work.c
create mode 100644 collie/work.h
diff --git a/collie/collie.c b/collie/collie.c
new file mode 100644
index 0000000..f58ed96
--- /dev/null
+++ b/collie/collie.c
@@ -0,0 +1,116 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <getopt.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "collie.h"
+
+#define EPOLL_SIZE 4096
+#define DEFAULT_OBJECT_DIR "/tmp"
+
+static char program_name[] = "collie";
+
+static struct option const long_options[] = {
+ {"port", required_argument, 0, 'p'},
+ {"foreground", no_argument, 0, 'f'},
+ {"debug", no_argument, 0, 'd'},
+ {"help", no_argument, 0, 'h'},
+ {0, 0, 0, 0},
+};
+
+static char *short_options = "p:fdh";
+
+static void usage(int status)
+{
+ if (status)
+ fprintf(stderr, "Try `%s --help' for more information.\n",
+ program_name);
+ else {
+ printf("Usage: %s [OPTION] [PATH]\n", program_name);
+ printf("\
+Sheepdog Daemon\n\
+ -p, --port specify the listen port number\n\
+ -f, --foreground make the program run in the foreground\n\
+ -d, --debug print debug messages\n\
+ -h, --help display this help and exit\n\
+");
+ }
+ exit(status);
+}
+
+int main(int argc, char **argv)
+{
+ int ch, longindex;
+ int ret, port = SD_LISTEN_PORT;
+ char *dir = DEFAULT_OBJECT_DIR;
+ int is_daemon = 1;
+ int is_debug = 0;
+ struct cluster_info *ci;
+
+ while ((ch = getopt_long(argc, argv, short_options, long_options,
+ &longindex)) >= 0) {
+ switch (ch) {
+ case 'p':
+ port = atoi(optarg);
+ break;
+ case 'f':
+ is_daemon = 0;
+ break;
+ case 'd':
+ is_debug = 1;
+ break;
+ case 'h':
+ usage(0);
+ break;
+ default:
+ usage(1);
+ break;
+ }
+ }
+
+ if (optind != argc)
+ dir = argv[optind];
+
+ ret = log_init(program_name, LOG_SPACE_SIZE, is_daemon, is_debug);
+ if (ret)
+ exit(1);
+
+ if (is_daemon && daemon(0, 0))
+ exit(1);
+
+ ret = init_event(EPOLL_SIZE);
+ if (ret)
+ exit(1);
+
+ ret = init_store(dir);
+ if (ret)
+ exit(1);
+
+ ret = init_worker();
+ if (ret)
+ exit(1);
+
+ ci = create_cluster(port);
+ if (!ci) {
+ eprintf("failed to create sheepdog cluster.\n");
+ exit(1);
+ }
+
+ ret = create_listen_port(port, ci);
+ if (ret)
+ exit(1);
+
+ event_loop(-1);
+
+ return 0;
+}
diff --git a/collie/collie.h b/collie/collie.h
new file mode 100644
index 0000000..5bdbe3c
--- /dev/null
+++ b/collie/collie.h
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef __COLLIE_H__
+#define __COLLIE_H__
+
+#include <inttypes.h>
+#include <corosync/cpg.h>
+
+#include "sheepdog_proto.h"
+#include "event.h"
+#include "logger.h"
+#include "work.h"
+#include "net.h"
+
+#define SD_MSG_JOIN 0x01
+#define SD_MSG_VDI_OP 0x02
+#define SD_MSG_MASTER_CHANGED 0x03
+
+struct client_info {
+ struct connection conn;
+
+ struct request *rx_req;
+
+ struct request *tx_req;
+
+ struct list_head reqs;
+ struct list_head done_reqs;
+
+ struct cluster_info *cluster;
+};
+
+struct request;
+
+typedef void (*req_end_t) (struct request *);
+
+struct request {
+ struct sd_req rq;
+ struct sd_rsp rp;
+
+ void *data;
+
+ struct client_info *ci;
+ struct list_head r_siblings;
+ struct list_head r_wlist;
+ struct list_head pending_list;
+
+ req_end_t done;
+ struct work work;
+};
+
+struct cluster_info {
+ cpg_handle_t handle;
+ int synchronized;
+ uint32_t this_nodeid;
+ uint32_t this_pid;
+ struct sheepdog_node_list_entry this_node;
+
+ uint64_t epoch;
+
+ struct list_head node_list;
+ struct list_head vm_list;
+ struct list_head pending_list;
+};
+
+int create_listen_port(int port, void *data);
+
+int init_store(char *dir);
+
+int add_vdi(struct cluster_info *cluster,
+ char *name, int len, uint64_t size, uint64_t * added_oid,
+ uint64_t base_oid, uint32_t tag);
+
+int lookup_vdi(struct cluster_info *cluster, char *filename, uint64_t * oid,
+ uint32_t tag, int do_lock, int *current);
+
+int build_node_list(struct list_head *node_list,
+ struct sheepdog_node_list_entry *entries);
+
+struct cluster_info *create_cluster(int port);
+
+void store_queue_request(struct work *work, int idx);
+
+void cluster_queue_request(struct work *work, int idx);
+
+#endif
diff --git a/collie/net.c b/collie/net.c
new file mode 100644
index 0000000..cf2e4bb
--- /dev/null
+++ b/collie/net.c
@@ -0,0 +1,341 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <netinet/tcp.h>
+#include <sys/epoll.h>
+
+#include "collie.h"
+
+static void __done(struct work *work, int idx)
+{
+ struct request *req = container_of(work, struct request, work);
+ struct sd_req *hdr = (struct sd_req *)&req->rq;
+
+ switch (hdr->opcode) {
+ case SD_OP_NEW_VDI:
+ case SD_OP_DEL_VDI:
+ case SD_OP_LOCK_VDI:
+ case SD_OP_RELEASE_VDI:
+ case SD_OP_GET_VDI_INFO:
+ case SD_OP_MAKE_FS:
+ case SD_OP_UPDATE_EPOCH:
+ case SD_OP_SHUTDOWN:
+ /* request is forwarded to cpg group */
+ return;
+ }
+ req->done(req);
+}
+
+static void queue_request(struct request *req)
+{
+ struct sd_req *hdr = (struct sd_req *)&req->rq;
+
+ switch (hdr->opcode) {
+ case SD_OP_CREATE_AND_WRITE_OBJ:
+ case SD_OP_REMOVE_OBJ:
+ case SD_OP_READ_OBJ:
+ case SD_OP_WRITE_OBJ:
+ case SD_OP_SYNC_OBJ:
+ case SD_OP_STAT_SHEEP:
+ req->work.fn = store_queue_request;
+ break;
+ case SD_OP_GET_NODE_LIST:
+ case SD_OP_GET_VM_LIST:
+ case SD_OP_NEW_VDI:
+ case SD_OP_DEL_VDI:
+ case SD_OP_LOCK_VDI:
+ case SD_OP_RELEASE_VDI:
+ case SD_OP_GET_VDI_INFO:
+ case SD_OP_MAKE_FS:
+ case SD_OP_UPDATE_EPOCH:
+ case SD_OP_SHUTDOWN:
+ req->work.fn = cluster_queue_request;
+ break;
+ default:
+ eprintf("unknown operation %d\n", hdr->opcode);
+ return;
+ }
+
+ req->work.done = __done;
+
+ list_del(&req->r_wlist);
+
+ queue_work(&req->work);
+}
+
+static struct request *alloc_request(struct client_info *ci, int data_length)
+{
+ struct request *req;
+
+ req = zalloc(sizeof(struct request) + data_length);
+ if (!req)
+ return NULL;
+
+ req->ci = ci;
+ if (data_length)
+ req->data = (char *)req + sizeof(*req);
+
+ list_add(&req->r_siblings, &ci->reqs);
+ INIT_LIST_HEAD(&req->r_wlist);
+
+ return req;
+}
+
+static void free_request(struct request *req)
+{
+ list_del(&req->r_siblings);
+ free(req);
+}
+
+static void req_done(struct request *req)
+{
+ list_add(&req->r_wlist, &req->ci->done_reqs);
+ conn_tx_on(&req->ci->conn);
+}
+
+static void init_rx_hdr(struct client_info *ci)
+{
+ ci->conn.c_rx_state = C_IO_HEADER;
+ ci->rx_req = NULL;
+ ci->conn.rx_length = sizeof(struct sd_req);
+ ci->conn.rx_buf = &ci->conn.rx_hdr;
+}
+
+static void client_rx_handler(struct client_info *ci)
+{
+ int ret;
+ uint64_t data_len;
+ struct connection *conn = &ci->conn;
+ struct sd_req *hdr = &conn->rx_hdr;
+ struct request *req;
+
+ switch (conn->c_rx_state) {
+ case C_IO_HEADER:
+ ret = rx(conn, C_IO_DATA_INIT);
+ if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
+ break;
+ case C_IO_DATA_INIT:
+ data_len = hdr->data_length;
+
+ req = alloc_request(ci, data_len);
+ if (!req) {
+ conn->c_rx_state = C_IO_CLOSED;
+ break;
+ }
+ ci->rx_req = req;
+
+ /* use le_to_cpu */
+ memcpy(&req->rq, hdr, sizeof(req->rq));
+
+ if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
+ conn->c_rx_state = C_IO_DATA;
+ conn->rx_length = data_len;
+ conn->rx_buf = req->data;
+ } else {
+ conn->c_rx_state = C_IO_END;
+ break;
+ }
+ case C_IO_DATA:
+ ret = rx(conn, C_IO_END);
+ break;
+ default:
+ eprintf("BUG: unknown state %d\n", conn->c_rx_state);
+ }
+
+ if (is_conn_dead(conn) || conn->c_rx_state != C_IO_END)
+ return;
+
+ /* now we have a complete command */
+
+ req = ci->rx_req;
+
+ init_rx_hdr(ci);
+
+ if (hdr->flags & SD_FLAG_CMD_WRITE)
+ req->rp.data_length = 0;
+ else
+ req->rp.data_length = hdr->data_length;
+
+ req->done = req_done;
+
+ queue_request(req);
+}
+
+static void init_tx_hdr(struct client_info *ci)
+{
+ struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+ struct request *req;
+
+ if (ci->tx_req || list_empty(&ci->done_reqs))
+ return;
+
+ memset(rsp, 0, sizeof(*rsp));
+
+ req = list_first_entry(&ci->done_reqs, struct request, r_wlist);
+ list_del(&req->r_wlist);
+
+ ci->tx_req = req;
+ ci->conn.tx_length = sizeof(*rsp);
+ ci->conn.c_tx_state = C_IO_HEADER;
+ ci->conn.tx_buf = rsp;
+
+ /* use cpu_to_le */
+ memcpy(rsp, &req->rp, sizeof(*rsp));
+
+ rsp->epoch = ci->cluster->epoch;
+ rsp->opcode = req->rq.opcode;
+ rsp->id = req->rq.id;
+}
+
+static void client_tx_handler(struct client_info *ci)
+{
+ int ret, opt;
+ struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+
+again:
+ init_tx_hdr(ci);
+ if (!ci->tx_req) {
+ conn_tx_off(&ci->conn);
+ return;
+ }
+
+ opt = 1;
+ setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+ switch (ci->conn.c_tx_state) {
+ case C_IO_HEADER:
+ if (rsp->data_length)
+ ret = tx(&ci->conn, C_IO_DATA_INIT, MSG_MORE);
+ else
+ ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
+
+ if (!ret)
+ break;
+
+ if (rsp->data_length) {
+ ci->conn.tx_length = rsp->data_length;
+ ci->conn.tx_buf = ci->tx_req->data;
+ ci->conn.c_tx_state = C_IO_DATA;
+ } else {
+ ci->conn.c_tx_state = C_IO_END;
+ break;
+ }
+ case C_IO_DATA:
+ ret = tx(&ci->conn, C_IO_END, 0);
+ if (!ret)
+ break;
+ default:
+ break;
+ }
+
+ opt = 0;
+ setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+ if (is_conn_dead(&ci->conn) || ci->conn.c_tx_state != C_IO_END)
+ return;
+
+ if (ci->conn.c_tx_state == C_IO_END) {
+ free_request(ci->tx_req);
+ ci->tx_req = NULL;
+ goto again;
+ }
+}
+
+static void destroy_client(struct client_info *ci)
+{
+ close(ci->conn.fd);
+ free(ci);
+}
+
+static struct client_info *create_client(int fd, struct cluster_info *cluster)
+{
+ struct client_info *ci;
+
+ ci = zalloc(sizeof(*ci));
+ if (!ci)
+ return NULL;
+
+ ci->conn.fd = fd;
+
+ INIT_LIST_HEAD(&ci->reqs);
+ INIT_LIST_HEAD(&ci->done_reqs);
+
+ init_rx_hdr(ci);
+
+ ci->cluster = cluster;
+
+ return ci;
+}
+
+static void client_handler(int fd, int events, void *data)
+{
+ struct client_info *ci = (struct client_info *)data;
+
+ if (events & EPOLLIN)
+ client_rx_handler(ci);
+
+ if (!is_conn_dead(&ci->conn) && events & EPOLLOUT)
+ client_tx_handler(ci);
+
+ if (is_conn_dead(&ci->conn)) {
+ dprintf("closed a connection, %d\n", fd);
+ unregister_event(fd);
+ destroy_client(ci);
+ }
+}
+
+static void listen_handler(int listen_fd, int events, void *data)
+{
+ struct sockaddr_storage from;
+ socklen_t namesize;
+ int fd, ret, opt;
+ struct client_info *ci;
+
+ namesize = sizeof(from);
+ fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
+ if (fd < 0) {
+ eprintf("can't accept a new connection, %m\n");
+ return;
+ }
+
+ opt = 1;
+ ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
+ if (ret) {
+ close(fd);
+ return;
+ }
+
+ ci = create_client(fd, data);
+ if (!ci) {
+ close(fd);
+ return;
+ }
+
+ ret = register_event(fd, client_handler, ci);
+ if (ret) {
+ destroy_client(ci);
+ return;
+ }
+
+ dprintf("accepted a new connection, %d\n", fd);
+}
+
+static int create_listen_port_fn(int fd, void *data)
+{
+ return register_event(fd, listen_handler, data);
+}
+
+int create_listen_port(int port, void *data)
+{
+ return create_listen_ports(port, create_listen_port_fn, data);
+}
diff --git a/collie/work.c b/collie/work.c
new file mode 100644
index 0000000..a0b7fda
--- /dev/null
+++ b/collie/work.c
@@ -0,0 +1,234 @@
+/*
+ * Copyright (C) 2007 FUJITA Tomonori <tomof at acm.org>
+ * Copyright (C) 2007 Mike Christie <michaelc at cs.wisc.edu>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, version 2 of the
+ * License.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+#include <errno.h>
+#include <string.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <syscall.h>
+#include <sys/types.h>
+#include <linux/types.h>
+#define _LINUX_FCNTL_H
+#include <linux/signalfd.h>
+
+#include "list.h"
+#include "util.h"
+#include "work.h"
+#include "logger.h"
+#include "event.h"
+
+extern int signalfd(int fd, const sigset_t *mask, int flags);
+
+struct worker_info {
+ pthread_t worker_thread[NR_WORKER_THREAD];
+
+ pthread_mutex_t finished_lock;
+ struct list_head finished_list;
+
+ /* wokers sleep on this and signaled by tgtd */
+ pthread_cond_t pending_cond;
+ /* locked by tgtd and workers */
+ pthread_mutex_t pending_lock;
+ /* protected by pending_lock */
+ struct list_head pending_list;
+
+ pthread_mutex_t startup_lock;
+
+ int sig_fd;
+
+ int stop;
+};
+
+static struct worker_info __wi;
+
+static void bs_thread_request_done(int fd, int events, void *data)
+{
+ int ret;
+ struct worker_info *wi = data;
+ struct work *work;
+ struct signalfd_siginfo siginfo[16];
+ LIST_HEAD(list);
+
+ ret = read(fd, (char *)siginfo, sizeof(siginfo));
+ if (ret <= 0)
+ return;
+
+ pthread_mutex_lock(&wi->finished_lock);
+ list_splice_init(&wi->finished_list, &list);
+ pthread_mutex_unlock(&wi->finished_lock);
+
+ while (!list_empty(&list)) {
+ work = list_first_entry(&list, struct work, w_list);
+ list_del(&work->w_list);
+
+ work->done(work, 0);
+ }
+}
+
+static void *worker_routine(void *arg)
+{
+ struct worker_info *wi = &__wi;
+ struct work *work;
+ pthread_t *p = arg;
+ int idx = p - wi->worker_thread;
+ sigset_t set;
+
+ sigfillset(&set);
+ sigprocmask(SIG_BLOCK, &set, NULL);
+
+ pthread_mutex_lock(&wi->startup_lock);
+ dprintf("started this thread %d\n", idx);
+ pthread_mutex_unlock(&wi->startup_lock);
+
+ while (!wi->stop) {
+ pthread_mutex_lock(&wi->pending_lock);
+retest:
+ if (list_empty(&wi->pending_list)) {
+ pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
+ if (wi->stop) {
+ pthread_mutex_unlock(&wi->pending_lock);
+ pthread_exit(NULL);
+ }
+ goto retest;
+ }
+
+ work = list_first_entry(&wi->pending_list,
+ struct work, w_list);
+
+ list_del(&work->w_list);
+ pthread_mutex_unlock(&wi->pending_lock);
+
+ work->fn(work, idx);
+
+ pthread_mutex_lock(&wi->finished_lock);
+ list_add_tail(&work->w_list, &wi->finished_list);
+ pthread_mutex_unlock(&wi->finished_lock);
+
+ kill(getpid(), SIGUSR2);
+ }
+
+ pthread_exit(NULL);
+}
+
+int init_worker(void)
+{
+ int i, ret;
+ sigset_t mask;
+ struct worker_info *wi = &__wi;
+
+ INIT_LIST_HEAD(&wi->pending_list);
+ INIT_LIST_HEAD(&wi->finished_list);
+
+ pthread_cond_init(&wi->pending_cond, NULL);
+
+ pthread_mutex_init(&wi->finished_lock, NULL);
+ pthread_mutex_init(&wi->pending_lock, NULL);
+ pthread_mutex_init(&wi->startup_lock, NULL);
+
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGUSR2);
+ sigprocmask(SIG_BLOCK, &mask, NULL);
+
+ wi->sig_fd = signalfd(-1, &mask, 0);
+ if (wi->sig_fd < 0) {
+ eprintf("failed to create a signal fd, %m\n");
+ return 1;
+ }
+
+ ret = fcntl(wi->sig_fd, F_GETFL);
+ ret = fcntl(wi->sig_fd, F_SETFL, ret | O_NONBLOCK);
+
+ ret = register_event(wi->sig_fd, bs_thread_request_done, wi);
+ if (ret) {
+ eprintf("failed to add epoll event\n");
+ goto destroy_cond_mutex;
+ }
+
+ pthread_mutex_lock(&wi->startup_lock);
+ for (i = 0; i < NR_WORKER_THREAD; i++) {
+ ret = pthread_create(&wi->worker_thread[i], NULL,
+ worker_routine, &wi->worker_thread[i]);
+
+ if (ret) {
+ eprintf("failed to create a worker thread, %d %s\n",
+ i, strerror(ret));
+ if (ret)
+ goto destroy_threads;
+ }
+ }
+ pthread_mutex_unlock(&wi->startup_lock);
+
+ return 0;
+destroy_threads:
+
+ wi->stop = 1;
+ pthread_mutex_unlock(&wi->startup_lock);
+ for (; i > 0; i--) {
+ pthread_join(wi->worker_thread[i - 1], NULL);
+ eprintf("stopped the worker thread %d\n", i - 1);
+ }
+
+ unregister_event(wi->sig_fd);
+destroy_cond_mutex:
+ pthread_cond_destroy(&wi->pending_cond);
+ pthread_mutex_destroy(&wi->pending_lock);
+ pthread_mutex_destroy(&wi->startup_lock);
+ pthread_mutex_destroy(&wi->finished_lock);
+
+ return 1;
+}
+
+void exit_worker(void)
+{
+ int i;
+ struct worker_info *wi = &__wi;
+
+ wi->stop = 1;
+ pthread_cond_broadcast(&wi->pending_cond);
+
+ for (i = 0; wi->worker_thread[i] &&
+ i < ARRAY_SIZE(wi->worker_thread); i++)
+ pthread_join(wi->worker_thread[i], NULL);
+
+ pthread_cond_destroy(&wi->pending_cond);
+ pthread_mutex_destroy(&wi->pending_lock);
+ pthread_mutex_destroy(&wi->startup_lock);
+ pthread_mutex_destroy(&wi->finished_lock);
+
+ unregister_event(wi->sig_fd);
+
+ wi->stop = 0;
+}
+
+void queue_work(struct work *work)
+{
+ struct worker_info *wi = &__wi;
+
+ pthread_mutex_lock(&wi->pending_lock);
+
+ list_add_tail(&work->w_list, &wi->pending_list);
+
+ pthread_mutex_unlock(&wi->pending_lock);
+
+ pthread_cond_signal(&wi->pending_cond);
+}
diff --git a/collie/work.h b/collie/work.h
new file mode 100644
index 0000000..81d8c56
--- /dev/null
+++ b/collie/work.h
@@ -0,0 +1,20 @@
+#ifndef __WORK_H__
+#define __WORK_H__
+
+#define NR_WORKER_THREAD 4
+
+struct work;
+
+typedef void (*work_func_t)(struct work *, int idx);
+
+struct work {
+ struct list_head w_list;
+ work_func_t fn;
+ work_func_t done;
+};
+
+int init_worker(void);
+void exit_worker(void);
+void queue_work(struct work *work);
+
+#endif
--
1.5.6.5
More information about the sheepdog
mailing list