This includes a main function, connections handling, and worker threads. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- collie/collie.c | 116 +++++++++++++++++++ collie/collie.h | 93 +++++++++++++++ collie/net.c | 341 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ collie/work.c | 234 ++++++++++++++++++++++++++++++++++++++ collie/work.h | 20 ++++ 5 files changed, 804 insertions(+), 0 deletions(-) create mode 100644 collie/collie.c create mode 100644 collie/collie.h create mode 100644 collie/net.c create mode 100644 collie/work.c create mode 100644 collie/work.h diff --git a/collie/collie.c b/collie/collie.c new file mode 100644 index 0000000..f58ed96 --- /dev/null +++ b/collie/collie.c @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <getopt.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +#include "collie.h" + +#define EPOLL_SIZE 4096 +#define DEFAULT_OBJECT_DIR "/tmp" + +static char program_name[] = "collie"; + +static struct option const long_options[] = { + {"port", required_argument, 0, 'p'}, + {"foreground", no_argument, 0, 'f'}, + {"debug", no_argument, 0, 'd'}, + {"help", no_argument, 0, 'h'}, + {0, 0, 0, 0}, +}; + +static char *short_options = "p:fdh"; + +static void usage(int status) +{ + if (status) + fprintf(stderr, "Try `%s --help' for more information.\n", + program_name); + else { + printf("Usage: %s [OPTION] [PATH]\n", program_name); + printf("\ +Sheepdog Daemon\n\ + -p, --port specify the listen port number\n\ + -f, --foreground make the program run in the foreground\n\ + -d, --debug print debug messages\n\ + -h, --help display this help and exit\n\ +"); + } + exit(status); +} + +int main(int argc, char **argv) +{ + int ch, longindex; + int ret, port = SD_LISTEN_PORT; + char *dir = DEFAULT_OBJECT_DIR; + int is_daemon = 1; + int is_debug = 0; + struct cluster_info *ci; + + while ((ch = getopt_long(argc, argv, short_options, long_options, + &longindex)) >= 0) { + switch (ch) { + case 'p': + port = atoi(optarg); + break; + case 'f': + is_daemon = 0; + break; + case 'd': + is_debug = 1; + break; + case 'h': + usage(0); + break; + default: + usage(1); + break; + } + } + + if (optind != argc) + dir = argv[optind]; + + ret = log_init(program_name, LOG_SPACE_SIZE, is_daemon, is_debug); + if (ret) + exit(1); + + if (is_daemon && daemon(0, 0)) + exit(1); + + ret = init_event(EPOLL_SIZE); + if (ret) + exit(1); + + ret = init_store(dir); + if (ret) + exit(1); + + ret = init_worker(); + if (ret) + exit(1); + + ci = create_cluster(port); + if (!ci) { + eprintf("failed to create sheepdog cluster.\n"); + exit(1); + } + + ret = create_listen_port(port, ci); + if (ret) + exit(1); + + event_loop(-1); + + return 0; +} diff --git a/collie/collie.h b/collie/collie.h new file mode 100644 index 0000000..5bdbe3c --- /dev/null +++ b/collie/collie.h @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#ifndef __COLLIE_H__ +#define __COLLIE_H__ + +#include <inttypes.h> +#include <corosync/cpg.h> + +#include "sheepdog_proto.h" +#include "event.h" +#include "logger.h" +#include "work.h" +#include "net.h" + +#define SD_MSG_JOIN 0x01 +#define SD_MSG_VDI_OP 0x02 +#define SD_MSG_MASTER_CHANGED 0x03 + +struct client_info { + struct connection conn; + + struct request *rx_req; + + struct request *tx_req; + + struct list_head reqs; + struct list_head done_reqs; + + struct cluster_info *cluster; +}; + +struct request; + +typedef void (*req_end_t) (struct request *); + +struct request { + struct sd_req rq; + struct sd_rsp rp; + + void *data; + + struct client_info *ci; + struct list_head r_siblings; + struct list_head r_wlist; + struct list_head pending_list; + + req_end_t done; + struct work work; +}; + +struct cluster_info { + cpg_handle_t handle; + int synchronized; + uint32_t this_nodeid; + uint32_t this_pid; + struct sheepdog_node_list_entry this_node; + + uint64_t epoch; + + struct list_head node_list; + struct list_head vm_list; + struct list_head pending_list; +}; + +int create_listen_port(int port, void *data); + +int init_store(char *dir); + +int add_vdi(struct cluster_info *cluster, + char *name, int len, uint64_t size, uint64_t * added_oid, + uint64_t base_oid, uint32_t tag); + +int lookup_vdi(struct cluster_info *cluster, char *filename, uint64_t * oid, + uint32_t tag, int do_lock, int *current); + +int build_node_list(struct list_head *node_list, + struct sheepdog_node_list_entry *entries); + +struct cluster_info *create_cluster(int port); + +void store_queue_request(struct work *work, int idx); + +void cluster_queue_request(struct work *work, int idx); + +#endif diff --git a/collie/net.c b/collie/net.c new file mode 100644 index 0000000..cf2e4bb --- /dev/null +++ b/collie/net.c @@ -0,0 +1,341 @@ +/* + * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <netinet/tcp.h> +#include <sys/epoll.h> + +#include "collie.h" + +static void __done(struct work *work, int idx) +{ + struct request *req = container_of(work, struct request, work); + struct sd_req *hdr = (struct sd_req *)&req->rq; + + switch (hdr->opcode) { + case SD_OP_NEW_VDI: + case SD_OP_DEL_VDI: + case SD_OP_LOCK_VDI: + case SD_OP_RELEASE_VDI: + case SD_OP_GET_VDI_INFO: + case SD_OP_MAKE_FS: + case SD_OP_UPDATE_EPOCH: + case SD_OP_SHUTDOWN: + /* request is forwarded to cpg group */ + return; + } + req->done(req); +} + +static void queue_request(struct request *req) +{ + struct sd_req *hdr = (struct sd_req *)&req->rq; + + switch (hdr->opcode) { + case SD_OP_CREATE_AND_WRITE_OBJ: + case SD_OP_REMOVE_OBJ: + case SD_OP_READ_OBJ: + case SD_OP_WRITE_OBJ: + case SD_OP_SYNC_OBJ: + case SD_OP_STAT_SHEEP: + req->work.fn = store_queue_request; + break; + case SD_OP_GET_NODE_LIST: + case SD_OP_GET_VM_LIST: + case SD_OP_NEW_VDI: + case SD_OP_DEL_VDI: + case SD_OP_LOCK_VDI: + case SD_OP_RELEASE_VDI: + case SD_OP_GET_VDI_INFO: + case SD_OP_MAKE_FS: + case SD_OP_UPDATE_EPOCH: + case SD_OP_SHUTDOWN: + req->work.fn = cluster_queue_request; + break; + default: + eprintf("unknown operation %d\n", hdr->opcode); + return; + } + + req->work.done = __done; + + list_del(&req->r_wlist); + + queue_work(&req->work); +} + +static struct request *alloc_request(struct client_info *ci, int data_length) +{ + struct request *req; + + req = zalloc(sizeof(struct request) + data_length); + if (!req) + return NULL; + + req->ci = ci; + if (data_length) + req->data = (char *)req + sizeof(*req); + + list_add(&req->r_siblings, &ci->reqs); + INIT_LIST_HEAD(&req->r_wlist); + + return req; +} + +static void free_request(struct request *req) +{ + list_del(&req->r_siblings); + free(req); +} + +static void req_done(struct request *req) +{ + list_add(&req->r_wlist, &req->ci->done_reqs); + conn_tx_on(&req->ci->conn); +} + +static void init_rx_hdr(struct client_info *ci) +{ + ci->conn.c_rx_state = C_IO_HEADER; + ci->rx_req = NULL; + ci->conn.rx_length = sizeof(struct sd_req); + ci->conn.rx_buf = &ci->conn.rx_hdr; +} + +static void client_rx_handler(struct client_info *ci) +{ + int ret; + uint64_t data_len; + struct connection *conn = &ci->conn; + struct sd_req *hdr = &conn->rx_hdr; + struct request *req; + + switch (conn->c_rx_state) { + case C_IO_HEADER: + ret = rx(conn, C_IO_DATA_INIT); + if (!ret || conn->c_rx_state != C_IO_DATA_INIT) + break; + case C_IO_DATA_INIT: + data_len = hdr->data_length; + + req = alloc_request(ci, data_len); + if (!req) { + conn->c_rx_state = C_IO_CLOSED; + break; + } + ci->rx_req = req; + + /* use le_to_cpu */ + memcpy(&req->rq, hdr, sizeof(req->rq)); + + if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) { + conn->c_rx_state = C_IO_DATA; + conn->rx_length = data_len; + conn->rx_buf = req->data; + } else { + conn->c_rx_state = C_IO_END; + break; + } + case C_IO_DATA: + ret = rx(conn, C_IO_END); + break; + default: + eprintf("BUG: unknown state %d\n", conn->c_rx_state); + } + + if (is_conn_dead(conn) || conn->c_rx_state != C_IO_END) + return; + + /* now we have a complete command */ + + req = ci->rx_req; + + init_rx_hdr(ci); + + if (hdr->flags & SD_FLAG_CMD_WRITE) + req->rp.data_length = 0; + else + req->rp.data_length = hdr->data_length; + + req->done = req_done; + + queue_request(req); +} + +static void init_tx_hdr(struct client_info *ci) +{ + struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr; + struct request *req; + + if (ci->tx_req || list_empty(&ci->done_reqs)) + return; + + memset(rsp, 0, sizeof(*rsp)); + + req = list_first_entry(&ci->done_reqs, struct request, r_wlist); + list_del(&req->r_wlist); + + ci->tx_req = req; + ci->conn.tx_length = sizeof(*rsp); + ci->conn.c_tx_state = C_IO_HEADER; + ci->conn.tx_buf = rsp; + + /* use cpu_to_le */ + memcpy(rsp, &req->rp, sizeof(*rsp)); + + rsp->epoch = ci->cluster->epoch; + rsp->opcode = req->rq.opcode; + rsp->id = req->rq.id; +} + +static void client_tx_handler(struct client_info *ci) +{ + int ret, opt; + struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr; + +again: + init_tx_hdr(ci); + if (!ci->tx_req) { + conn_tx_off(&ci->conn); + return; + } + + opt = 1; + setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt)); + + switch (ci->conn.c_tx_state) { + case C_IO_HEADER: + if (rsp->data_length) + ret = tx(&ci->conn, C_IO_DATA_INIT, MSG_MORE); + else + ret = tx(&ci->conn, C_IO_DATA_INIT, 0); + + if (!ret) + break; + + if (rsp->data_length) { + ci->conn.tx_length = rsp->data_length; + ci->conn.tx_buf = ci->tx_req->data; + ci->conn.c_tx_state = C_IO_DATA; + } else { + ci->conn.c_tx_state = C_IO_END; + break; + } + case C_IO_DATA: + ret = tx(&ci->conn, C_IO_END, 0); + if (!ret) + break; + default: + break; + } + + opt = 0; + setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt)); + + if (is_conn_dead(&ci->conn) || ci->conn.c_tx_state != C_IO_END) + return; + + if (ci->conn.c_tx_state == C_IO_END) { + free_request(ci->tx_req); + ci->tx_req = NULL; + goto again; + } +} + +static void destroy_client(struct client_info *ci) +{ + close(ci->conn.fd); + free(ci); +} + +static struct client_info *create_client(int fd, struct cluster_info *cluster) +{ + struct client_info *ci; + + ci = zalloc(sizeof(*ci)); + if (!ci) + return NULL; + + ci->conn.fd = fd; + + INIT_LIST_HEAD(&ci->reqs); + INIT_LIST_HEAD(&ci->done_reqs); + + init_rx_hdr(ci); + + ci->cluster = cluster; + + return ci; +} + +static void client_handler(int fd, int events, void *data) +{ + struct client_info *ci = (struct client_info *)data; + + if (events & EPOLLIN) + client_rx_handler(ci); + + if (!is_conn_dead(&ci->conn) && events & EPOLLOUT) + client_tx_handler(ci); + + if (is_conn_dead(&ci->conn)) { + dprintf("closed a connection, %d\n", fd); + unregister_event(fd); + destroy_client(ci); + } +} + +static void listen_handler(int listen_fd, int events, void *data) +{ + struct sockaddr_storage from; + socklen_t namesize; + int fd, ret, opt; + struct client_info *ci; + + namesize = sizeof(from); + fd = accept(listen_fd, (struct sockaddr *)&from, &namesize); + if (fd < 0) { + eprintf("can't accept a new connection, %m\n"); + return; + } + + opt = 1; + ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); + if (ret) { + close(fd); + return; + } + + ci = create_client(fd, data); + if (!ci) { + close(fd); + return; + } + + ret = register_event(fd, client_handler, ci); + if (ret) { + destroy_client(ci); + return; + } + + dprintf("accepted a new connection, %d\n", fd); +} + +static int create_listen_port_fn(int fd, void *data) +{ + return register_event(fd, listen_handler, data); +} + +int create_listen_port(int port, void *data) +{ + return create_listen_ports(port, create_listen_port_fn, data); +} diff --git a/collie/work.c b/collie/work.c new file mode 100644 index 0000000..a0b7fda --- /dev/null +++ b/collie/work.c @@ -0,0 +1,234 @@ +/* + * Copyright (C) 2007 FUJITA Tomonori <tomof at acm.org> + * Copyright (C) 2007 Mike Christie <michaelc at cs.wisc.edu> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, version 2 of the + * License. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA + */ +#include <errno.h> +#include <string.h> +#include <inttypes.h> +#include <pthread.h> +#include <stdio.h> +#include <unistd.h> +#include <fcntl.h> +#include <signal.h> +#include <syscall.h> +#include <sys/types.h> +#include <linux/types.h> +#define _LINUX_FCNTL_H +#include <linux/signalfd.h> + +#include "list.h" +#include "util.h" +#include "work.h" +#include "logger.h" +#include "event.h" + +extern int signalfd(int fd, const sigset_t *mask, int flags); + +struct worker_info { + pthread_t worker_thread[NR_WORKER_THREAD]; + + pthread_mutex_t finished_lock; + struct list_head finished_list; + + /* wokers sleep on this and signaled by tgtd */ + pthread_cond_t pending_cond; + /* locked by tgtd and workers */ + pthread_mutex_t pending_lock; + /* protected by pending_lock */ + struct list_head pending_list; + + pthread_mutex_t startup_lock; + + int sig_fd; + + int stop; +}; + +static struct worker_info __wi; + +static void bs_thread_request_done(int fd, int events, void *data) +{ + int ret; + struct worker_info *wi = data; + struct work *work; + struct signalfd_siginfo siginfo[16]; + LIST_HEAD(list); + + ret = read(fd, (char *)siginfo, sizeof(siginfo)); + if (ret <= 0) + return; + + pthread_mutex_lock(&wi->finished_lock); + list_splice_init(&wi->finished_list, &list); + pthread_mutex_unlock(&wi->finished_lock); + + while (!list_empty(&list)) { + work = list_first_entry(&list, struct work, w_list); + list_del(&work->w_list); + + work->done(work, 0); + } +} + +static void *worker_routine(void *arg) +{ + struct worker_info *wi = &__wi; + struct work *work; + pthread_t *p = arg; + int idx = p - wi->worker_thread; + sigset_t set; + + sigfillset(&set); + sigprocmask(SIG_BLOCK, &set, NULL); + + pthread_mutex_lock(&wi->startup_lock); + dprintf("started this thread %d\n", idx); + pthread_mutex_unlock(&wi->startup_lock); + + while (!wi->stop) { + pthread_mutex_lock(&wi->pending_lock); +retest: + if (list_empty(&wi->pending_list)) { + pthread_cond_wait(&wi->pending_cond, &wi->pending_lock); + if (wi->stop) { + pthread_mutex_unlock(&wi->pending_lock); + pthread_exit(NULL); + } + goto retest; + } + + work = list_first_entry(&wi->pending_list, + struct work, w_list); + + list_del(&work->w_list); + pthread_mutex_unlock(&wi->pending_lock); + + work->fn(work, idx); + + pthread_mutex_lock(&wi->finished_lock); + list_add_tail(&work->w_list, &wi->finished_list); + pthread_mutex_unlock(&wi->finished_lock); + + kill(getpid(), SIGUSR2); + } + + pthread_exit(NULL); +} + +int init_worker(void) +{ + int i, ret; + sigset_t mask; + struct worker_info *wi = &__wi; + + INIT_LIST_HEAD(&wi->pending_list); + INIT_LIST_HEAD(&wi->finished_list); + + pthread_cond_init(&wi->pending_cond, NULL); + + pthread_mutex_init(&wi->finished_lock, NULL); + pthread_mutex_init(&wi->pending_lock, NULL); + pthread_mutex_init(&wi->startup_lock, NULL); + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR2); + sigprocmask(SIG_BLOCK, &mask, NULL); + + wi->sig_fd = signalfd(-1, &mask, 0); + if (wi->sig_fd < 0) { + eprintf("failed to create a signal fd, %m\n"); + return 1; + } + + ret = fcntl(wi->sig_fd, F_GETFL); + ret = fcntl(wi->sig_fd, F_SETFL, ret | O_NONBLOCK); + + ret = register_event(wi->sig_fd, bs_thread_request_done, wi); + if (ret) { + eprintf("failed to add epoll event\n"); + goto destroy_cond_mutex; + } + + pthread_mutex_lock(&wi->startup_lock); + for (i = 0; i < NR_WORKER_THREAD; i++) { + ret = pthread_create(&wi->worker_thread[i], NULL, + worker_routine, &wi->worker_thread[i]); + + if (ret) { + eprintf("failed to create a worker thread, %d %s\n", + i, strerror(ret)); + if (ret) + goto destroy_threads; + } + } + pthread_mutex_unlock(&wi->startup_lock); + + return 0; +destroy_threads: + + wi->stop = 1; + pthread_mutex_unlock(&wi->startup_lock); + for (; i > 0; i--) { + pthread_join(wi->worker_thread[i - 1], NULL); + eprintf("stopped the worker thread %d\n", i - 1); + } + + unregister_event(wi->sig_fd); +destroy_cond_mutex: + pthread_cond_destroy(&wi->pending_cond); + pthread_mutex_destroy(&wi->pending_lock); + pthread_mutex_destroy(&wi->startup_lock); + pthread_mutex_destroy(&wi->finished_lock); + + return 1; +} + +void exit_worker(void) +{ + int i; + struct worker_info *wi = &__wi; + + wi->stop = 1; + pthread_cond_broadcast(&wi->pending_cond); + + for (i = 0; wi->worker_thread[i] && + i < ARRAY_SIZE(wi->worker_thread); i++) + pthread_join(wi->worker_thread[i], NULL); + + pthread_cond_destroy(&wi->pending_cond); + pthread_mutex_destroy(&wi->pending_lock); + pthread_mutex_destroy(&wi->startup_lock); + pthread_mutex_destroy(&wi->finished_lock); + + unregister_event(wi->sig_fd); + + wi->stop = 0; +} + +void queue_work(struct work *work) +{ + struct worker_info *wi = &__wi; + + pthread_mutex_lock(&wi->pending_lock); + + list_add_tail(&work->w_list, &wi->pending_list); + + pthread_mutex_unlock(&wi->pending_lock); + + pthread_cond_signal(&wi->pending_cond); +} diff --git a/collie/work.h b/collie/work.h new file mode 100644 index 0000000..81d8c56 --- /dev/null +++ b/collie/work.h @@ -0,0 +1,20 @@ +#ifndef __WORK_H__ +#define __WORK_H__ + +#define NR_WORKER_THREAD 4 + +struct work; + +typedef void (*work_func_t)(struct work *, int idx); + +struct work { + struct list_head w_list; + work_func_t fn; + work_func_t done; +}; + +int init_worker(void); +void exit_worker(void); +void queue_work(struct work *work); + +#endif -- 1.5.6.5 |