Puppy is C implementation of dog program. It use corosync instead of JGroups for the cluster communication, so JVM is no longer required. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- puppy/Makefile.in | 31 ++++ puppy/group.c | 422 +++++++++++++++++++++++++++++++++++++++++++++++++++++ puppy/net.c | 355 ++++++++++++++++++++++++++++++++++++++++++++ puppy/puppy.c | 186 +++++++++++++++++++++++ puppy/puppy.h | 127 ++++++++++++++++ puppy/vdi.c | 256 ++++++++++++++++++++++++++++++++ 6 files changed, 1377 insertions(+), 0 deletions(-) create mode 100644 puppy/Makefile.in create mode 100644 puppy/group.c create mode 100644 puppy/net.c create mode 100644 puppy/puppy.c create mode 100644 puppy/puppy.h create mode 100644 puppy/vdi.c diff --git a/puppy/Makefile.in b/puppy/Makefile.in new file mode 100644 index 0000000..8468066 --- /dev/null +++ b/puppy/Makefile.in @@ -0,0 +1,31 @@ +CFLAGS += -g -O2 -Wall -Wstrict-prototypes -I../include +CFLAGS += -D_GNU_SOURCE +LIBS += -lcrypto -lcpg + +PROGRAMS = puppy +PUPPY_OBJS = puppy.o net.o net.o vdi.o group.o ../lib/event.o ../lib/net.o ../lib/logger.o +PUPPY_DEP = $(PUPPY_OBJS:.o=.d) + +prefix = @prefix@ +exec_prefix = @exec_prefix@ +bindir = @bindir@ + +.PHONY:all +all: $(PROGRAMS) + +puppy: $(PUPPY_OBJS) + $(CC) $^ -o $@ $(LIBS) + +-include $(PUPPY_DEP) + +%.o: %.c + $(CC) -c $(CFLAGS) $*.c -o $*.o + @$(CC) -MM $(CFLAGS) -MF $*.d -MT $*.o $*.c + +.PHONY:clean +clean: + rm -f *.[od] $(PROGRAMS) + +.PHONY:install +install: $(PROGRAMS) + install $< $(bindir) diff --git a/puppy/group.c b/puppy/group.c new file mode 100644 index 0000000..982a9b4 --- /dev/null +++ b/puppy/group.c @@ -0,0 +1,422 @@ +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/time.h> +#include <corosync/cpg.h> + +#include "puppy.h" +#include "list.h" +#include "util.h" +#include "event.h" +#include "sheepdog_proto.h" +#include "net.h" +#include "meta.h" +#include "logger.h" + +static struct vm *lookup_vm(struct list_head *entries, char *name) +{ + struct vm *vm; + + list_for_each_entry(vm, entries, list) { + if (!strcmp((char *)vm->ent.name, name)) + return vm; + } + + return NULL; +} + +static void group_handler(int listen_fd, int events, void *data) +{ + struct cluster_info *ci = data; + cpg_dispatch(ci->handle, CPG_DISPATCH_ALL); +} + +struct cluster_info *create_cluster(cpg_handle_t handle, uint32_t this_nodeid, + uint32_t this_pid, + struct sheepdog_node_list_entry *this_node) +{ + int fd; + struct cluster_info *ci; + ci = zalloc(sizeof(*ci)); + if (!ci) + return NULL; + + ci->handle = handle; + ci->this_nodeid = this_nodeid; + ci->this_pid = this_pid; + ci->this_node = *this_node; + ci->synchronized = 0; + INIT_LIST_HEAD(&ci->node_list); + INIT_LIST_HEAD(&ci->vm_list); + INIT_LIST_HEAD(&ci->pending_list); + cpg_context_set(handle, ci); + + cpg_fd_get(handle, &fd); + register_event(fd, group_handler, ci); + return ci; +} + +static void print_node_list(struct cluster_info *ci) +{ + struct node *node; + list_for_each_entry(node, &ci->node_list, list) { + dprintf("%c nodeid: %x, pid: %d, ip: %d.%d.%d.%d:%d\n", + node_cmp(&node->ent, &ci->this_node) ? ' ' : 'l', + node->nodeid, node->pid, + node->ent.addr[12], node->ent.addr[13], + node->ent.addr[14], node->ent.addr[15], node->ent.port); + } +} + +static void add_node(struct cluster_info *ci, uint32_t nodeid, uint32_t pid, + struct sheepdog_node_list_entry *sd_ent) +{ + struct node *node; + + node = zalloc(sizeof(*node)); + if (!node) { + eprintf("out of memory\n"); + return; + } + node->nodeid = nodeid; + node->pid = pid; + node->ent = *sd_ent; + list_add_tail(&node->list, &ci->node_list); + ci->epoch++; +} + +static int is_master(struct cluster_info *ci) +{ + struct node *node; + + if (!ci->synchronized) + return 0; + + if (list_empty(&ci->node_list)) + return 1; + + node = list_first_entry(&ci->node_list, struct node, list); + if (node_cmp(&node->ent, &ci->this_node) == 0) + return 1; + + return 0; +} + +static void join(struct cluster_info *ci, cpg_handle_t handle, + struct join_message *msg) +{ + if (!ci->synchronized) + return; + + if (!is_master(ci)) + return; + + struct node *node; + + list_for_each_entry(node, &ci->node_list, list) { + msg->nodes[msg->nr_nodes].nodeid = node->nodeid; + msg->nodes[msg->nr_nodes].pid = node->pid; + msg->nodes[msg->nr_nodes].ent = node->ent; + msg->nr_nodes++; + } +} + +static void update_cluster_info(struct cluster_info *ci, + struct join_message *msg) +{ + int i; + int nr_nodes = msg->nr_nodes; + struct node *node, *e; + + if (ci->synchronized) + goto out; + + list_for_each_entry_safe(node, e, &ci->node_list, list) { + list_del(&node->list); + free(node); + } + + INIT_LIST_HEAD(&ci->node_list); + for (i = 0; i < nr_nodes; i++) + add_node(ci, msg->nodes[i].nodeid, msg->nodes[i].pid, + &msg->nodes[i].ent); + + ci->synchronized = 1; + +out: + add_node(ci, msg->nodeid, msg->pid, &msg->header.from); + print_node_list(ci); +} + +static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg) +{ + const struct sd_vdi_req *hdr = &msg->req; + struct sd_vdi_rsp *rsp = &msg->rsp; + void *data = msg->data; + int ret = SD_RES_SUCCESS, is_current; + uint64_t oid = 0; + struct sheepdog_super_block *sb; + struct timeval tv; + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; + int nr_nodes; + + switch (hdr->opcode) { + case SD_OP_NEW_VDI: + ret = add_vdi(ci, data, strlen(data), hdr->vdi_size, &oid, + hdr->base_oid, hdr->tag); + break; + case SD_OP_LOCK_VDI: + case SD_OP_GET_VDI_INFO: + ret = lookup_vdi(ci, data, &oid, hdr->tag, 1, &is_current); + if (ret < 0) + break; + if (is_current) + rsp->flags = SD_VDI_RSP_FLAG_CURRENT; + break; + case SD_OP_RELEASE_VDI: + break; + case SD_OP_MAKE_FS: + sb = zalloc(sizeof(*sb)); + if (!sb) { + ret = -1; + break; + } + gettimeofday(&tv, NULL); + sb->ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000; + sb->default_nr_copies = 3; + + nr_nodes = build_node_list(&ci->node_list, entries); + ret = write_object(entries, nr_nodes, ci->epoch, + SD_DIR_OID, (char *)sb, sizeof(*sb), 0, + sb->default_nr_copies, 1); + break; + case SD_OP_UPDATE_EPOCH: + break; + case SD_OP_GET_EPOCH: + rsp->epoch = 1; + break; + default: + ret = SD_RES_SYSTEM_ERROR; + eprintf("opcode %d is not implemented\n", hdr->opcode); + break; + } + + rsp->oid = oid; + rsp->result = ret; +} + +static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg) +{ + const struct sd_vdi_req *hdr = &msg->req; + struct sd_vdi_rsp *rsp = &msg->rsp; + void *data = msg->data; + struct vm *vm; + struct request *req; + int ret = msg->rsp.result; + + switch (hdr->opcode) { + case SD_OP_NEW_VDI: + break; + case SD_OP_LOCK_VDI: + if (lookup_vm(&ci->vm_list, (char *)data)) { + ret = SD_RES_VDI_LOCKED; + break; + } + + vm = zalloc(sizeof(*vm)); + if (!vm) { + ret = SD_RES_UNKNOWN; + break; + } + strcpy((char *)vm->ent.name, (char *)data); + memcpy(vm->ent.host_addr, msg->header.from.addr, + sizeof(vm->ent.host_addr)); + vm->ent.host_port = msg->header.from.port; + + list_add(&vm->list, &ci->vm_list); + break; + case SD_OP_RELEASE_VDI: + vm = lookup_vm(&ci->vm_list, (char *)data); + if (!vm) { + ret = SD_RES_VDI_NOT_LOCKED; + break; + } + + list_del(&vm->list); + break; + case SD_OP_GET_VDI_INFO: + break; + case SD_OP_UPDATE_EPOCH: + break; + case SD_OP_GET_EPOCH: + break; + case SD_OP_MAKE_FS: + break; + default: + eprintf("unknown operation %d\n", hdr->opcode); + ret = SD_RES_UNKNOWN; + } + + if (node_cmp(&ci->this_node, &msg->header.from) != 0) + return; + + req = list_first_entry(&ci->pending_list, struct request, pending_list); + + rsp->result = ret; + memcpy(req->data, data, rsp->data_length); + memcpy(&req->rp, rsp, sizeof(req->rp)); + list_del(&req->pending_list); + req->done(req); +} + +void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) +{ + struct cluster_info *ci; + struct message_header *m = msg; + + dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n", + m->op, m->done, m->msg_length, + m->from.addr[12], m->from.addr[13], + m->from.addr[14], m->from.addr[15], m->from.port); + + cpg_context_get(handle, (void **)&ci); + if (!m->done) { + if (!is_master(ci)) + return; + + switch (m->op) { + case SD_MSG_JOIN: + join(ci, handle, msg); + break; + case SD_MSG_VDI_OP: + vdi_op(ci, msg); + break; + default: + eprintf("unknown message %d\n", m->op); + break; + } + + m->done = 1; + send_message(handle, m); + } else { + switch (m->op) { + case SD_MSG_JOIN: + update_cluster_info(ci, msg); + break; + case SD_MSG_VDI_OP: + vdi_op_done(ci, msg); + break; + default: + eprintf("unknown message %d\n", m->op); + break; + } + } +} + +void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name, + const struct cpg_address *member_list, + size_t member_list_entries, const struct cpg_address *left_list, + size_t left_list_entries, const struct cpg_address *joined_list, + size_t joined_list_entries) +{ + struct cluster_info *ci; + struct node *node, *e; + int i; + + dprintf("confchg nodeid %x\n", member_list[0].nodeid); + dprintf("%ld %ld %ld\n", member_list_entries, left_list_entries, + joined_list_entries); + for (i = 0; i < member_list_entries; i++) { + dprintf("[%d] node_id: %d, pid: %d, reason: %d\n", i, + member_list[i].nodeid, member_list[i].pid, + member_list[i].reason); + } + + cpg_context_get(handle, (void **)&ci); + + if (member_list_entries == joined_list_entries - left_list_entries && + ci->this_nodeid == member_list[0].nodeid && + ci->this_pid == member_list[0].pid) + ci->synchronized = 1; + + for (i = 0; i < left_list_entries; i++) { + list_for_each_entry_safe(node, e, &ci->node_list, list) { + if (node->nodeid != left_list[i].nodeid || + node->pid != left_list[i].pid) + continue; + + list_del(&node->list); + free(node); + ci->epoch++; + } + } + + for (i = 0; i < joined_list_entries; i++) { + if (ci->this_nodeid == joined_list[0].nodeid && + ci->this_pid == joined_list[0].pid) { + struct join_message msg; + + msg.header.op = SD_MSG_JOIN; + msg.header.done = 0; + msg.header.msg_length = sizeof(msg); + msg.nodeid = ci->this_nodeid; + msg.pid = ci->this_pid; + msg.header.from = ci->this_node; + + send_message(ci->handle, (struct message_header *)&msg); + } + } + + if (left_list_entries == 0) + return; + + print_node_list(ci); +} + +int send_message(cpg_handle_t handle, struct message_header *msg) +{ + struct iovec iov; + int ret; + + iov.iov_base = msg; + iov.iov_len = msg->msg_length; +retry: + ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1); + switch (ret) { + case CS_OK: + break; + case CS_ERR_TRY_AGAIN: + dprintf("failed to send message. try again\n"); + sleep(1); + goto retry; + default: + eprintf("failed to send message, %d\n", ret); + return -1; + } + return 0; +} + +int build_node_list(struct list_head *node_list, + struct sheepdog_node_list_entry *entries) +{ + struct node *node; + int nr = 0; + + list_for_each_entry(node, node_list, list) { + if (entries) + memcpy(entries + nr, &node->ent, sizeof(*entries)); + nr++; + } + if (entries) + qsort(entries, nr, sizeof(*entries), node_cmp); + + return nr; +} + +int node_cmp(const void *a, const void *b) +{ + const struct sheepdog_node_list_entry *node1 = a; + const struct sheepdog_node_list_entry *node2 = b; + return memcmp(node1->id, node2->id, sizeof(node1->id)); +} diff --git a/puppy/net.c b/puppy/net.c new file mode 100644 index 0000000..f011acf --- /dev/null +++ b/puppy/net.c @@ -0,0 +1,355 @@ +#include <errno.h> +#include <fcntl.h> +#include <netdb.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <sys/socket.h> +#include <sys/epoll.h> +#include <sys/stat.h> +#include <sys/types.h> + +#include "sheepdog_proto.h" +#include "util.h" +#include "event.h" +#include "net.h" +#include "list.h" +#include "puppy.h" +#include "meta.h" +#include "logger.h" + +static int get_node_idx(struct sheepdog_node_list_entry *ent, + struct sheepdog_node_list_entry *entries, int nr_nodes) +{ + ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp); + if (!ent) + return -1; + + return ent - entries; +} + +static void get_node_list(struct cluster_info *cluster, struct sd_node_req *req, + struct sd_node_rsp *rsp, void *data) +{ + int nr_nodes; + struct node *node; + + nr_nodes = build_node_list(&cluster->node_list, data); + rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry); + rsp->nr_nodes = nr_nodes; + rsp->local_idx = get_node_idx(&cluster->this_node, data, nr_nodes); + + if (list_empty(&cluster->node_list)) { + rsp->master_idx = -1; + return; + } + node = list_first_entry(&cluster->node_list, struct node, list); + rsp->master_idx = get_node_idx(&node->ent, data, nr_nodes); +} + +static void get_vm_list(struct cluster_info *cluster, struct sd_rsp *rsp, + void *data) +{ + int nr_vms; + struct vm *vm; + + struct sheepdog_vm_list_entry *p = data; + list_for_each_entry(vm, &cluster->vm_list, list) { + *p++ = vm->ent; + } + + nr_vms = p - (struct sheepdog_vm_list_entry *)data; + rsp->data_length = nr_vms * sizeof(struct sheepdog_vm_list_entry); +} + +void queue_request(struct request *req) +{ + struct sd_req *hdr = (struct sd_req *)&req->rq; + struct sd_rsp *rsp = (struct sd_rsp *)&req->rp; + struct cluster_info *cluster = req->ci->cluster; + struct vdi_op_message *msg; + int ret = SD_RES_SUCCESS; + + eprintf("%p %x\n", req, hdr->opcode); + + switch (hdr->opcode) { + case SD_OP_GET_NODE_LIST: + get_node_list(cluster, (struct sd_node_req *)hdr, + (struct sd_node_rsp *)rsp, req->data); + break; + case SD_OP_GET_VM_LIST: + get_vm_list(cluster, rsp, req->data); + break; + default: + /* forward request to group */ + goto forward; + } + + rsp->result = ret; + req->done(req); + return; + +forward: + msg = zalloc(sizeof(*msg) + hdr->data_length); + if (!msg) { + eprintf("out of memory\n"); + return; + } + + msg->header.op = SD_MSG_VDI_OP; + msg->header.done = 0; + msg->header.msg_length = sizeof(*msg) + hdr->data_length; + msg->header.from = cluster->this_node; + msg->req = *((struct sd_vdi_req *)&req->rq); + msg->rsp = *((struct sd_vdi_rsp *)&req->rp); + if (hdr->flags & SD_FLAG_CMD_WRITE) + memcpy(msg->data, req->data, hdr->data_length); + + list_add(&req->pending_list, &cluster->pending_list); + send_message(cluster->handle, (struct message_header *)msg); +} + +static struct request *alloc_request(struct client_info *ci, int data_length) +{ + struct request *req; + + req = zalloc(sizeof(struct request) + data_length); + if (!req) + return NULL; + + req->ci = ci; + if (data_length) + req->data = (char *)req + sizeof(*req); + + list_add(&req->r_siblings, &ci->reqs); + INIT_LIST_HEAD(&req->r_wlist); + + return req; +} + +static void free_request(struct request *req) +{ + list_del(&req->r_siblings); + free(req); +} + +static void init_rx_hdr(struct client_info *ci) +{ + ci->conn.c_rx_state = C_IO_HEADER; + ci->rx_req = NULL; + ci->conn.rx_length = sizeof(struct sd_req); + ci->conn.rx_buf = &ci->conn.rx_hdr; +} + +static void req_done(struct request *req) +{ + list_add(&req->r_wlist, &req->ci->done_reqs); + conn_tx_on(&req->ci->conn); +} + +static void client_rx_handler(struct client_info *ci) +{ + int ret; + uint64_t data_len; + struct connection *conn = &ci->conn; + struct sd_req *hdr = &conn->rx_hdr; + struct request *req; + + switch (conn->c_rx_state) { + case C_IO_HEADER: + ret = rx(conn, C_IO_DATA_INIT); + if (!ret || conn->c_rx_state != C_IO_DATA_INIT) + break; + case C_IO_DATA_INIT: + data_len = hdr->data_length; + + req = alloc_request(ci, data_len); + if (!req) { + conn->c_rx_state = C_IO_CLOSED; + break; + } + ci->rx_req = req; + + /* use le_to_cpu */ + memcpy(&req->rq, hdr, sizeof(req->rq)); + + if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) { + conn->c_rx_state = C_IO_DATA; + conn->rx_length = data_len; + conn->rx_buf = req->data; + } else { + conn->c_rx_state = C_IO_END; + break; + } + case C_IO_DATA: + ret = rx(conn, C_IO_END); + break; + default: + eprintf("BUG: unknown state %d\n", conn->c_rx_state); + } + + if (is_conn_dead(conn) || conn->c_rx_state != C_IO_END) + return; + + /* now we have a complete command */ + + req = ci->rx_req; + + init_rx_hdr(ci); + + if (hdr->flags & SD_FLAG_CMD_WRITE) + req->rp.data_length = 0; + else + req->rp.data_length = hdr->data_length; + + req->done = req_done; + + queue_request(req); +} + +static void init_tx_hdr(struct client_info *ci) +{ + struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr; + struct request *req; + + if (ci->tx_req || list_empty(&ci->done_reqs)) + return; + + memset(rsp, 0, sizeof(*rsp)); + + req = list_first_entry(&ci->done_reqs, struct request, r_wlist); + list_del(&req->r_wlist); + + ci->tx_req = req; + ci->conn.tx_length = sizeof(*rsp); + ci->conn.c_tx_state = C_IO_HEADER; + ci->conn.tx_buf = rsp; + + memcpy(rsp, &req->rp, sizeof(*rsp)); + + rsp->epoch = ci->cluster->epoch; + rsp->opcode = req->rq.opcode; + rsp->id = req->rq.id; +} + +static void client_tx_handler(struct client_info *ci) +{ + int ret; + struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr; + +again: + init_tx_hdr(ci); + if (!ci->tx_req) { + conn_tx_off(&ci->conn); + return; + } + + switch (ci->conn.c_tx_state) { + case C_IO_HEADER: + if (rsp->data_length) + ret = tx(&ci->conn, C_IO_DATA_INIT, MSG_MORE); + else + ret = tx(&ci->conn, C_IO_DATA_INIT, 0); + + if (!ret) + break; + + if (rsp->data_length) { + ci->conn.tx_length = rsp->data_length; + ci->conn.tx_buf = ci->tx_req->data; + ci->conn.c_tx_state = C_IO_DATA; + } else { + ci->conn.c_tx_state = C_IO_END; + break; + } + case C_IO_DATA: + ret = tx(&ci->conn, C_IO_END, 0); + if (!ret) + break; + default: + break; + } + + if (is_conn_dead(&ci->conn) || ci->conn.c_tx_state != C_IO_END) + return; + + if (ci->conn.c_tx_state == C_IO_END) { + free_request(ci->tx_req); + ci->tx_req = NULL; + goto again; + } +} + +static struct client_info *create_client(int fd, struct cluster_info *cluster) +{ + struct client_info *ci; + + ci = zalloc(sizeof(*ci)); + if (!ci) + return NULL; + + ci->conn.fd = fd; + + ci->conn.c_rx_state = C_IO_HEADER; + ci->conn.rx_length = sizeof(struct sd_req); + ci->conn.rx_buf = &ci->conn.rx_hdr; + + INIT_LIST_HEAD(&ci->reqs); + INIT_LIST_HEAD(&ci->done_reqs); + + ci->cluster = cluster; + + return ci; +} + +static void destroy_client(struct client_info *ci) +{ + close(ci->conn.fd); + free(ci); +} + +static void client_handler(int fd, int events, void *data) +{ + struct client_info *ci = (struct client_info *)data; + + if (events & EPOLLIN) + client_rx_handler(ci); + + if (!is_conn_dead(&ci->conn) && events & EPOLLOUT) + client_tx_handler(ci); + + if (is_conn_dead(&ci->conn)) { + unregister_event(fd); + destroy_client(ci); + } +} + +void listen_handler(int listen_fd, int events, void *data) +{ + struct sockaddr_storage from; + socklen_t namesize; + int fd, ret; + struct client_info *ci; + + namesize = sizeof(from); + fd = accept(listen_fd, (struct sockaddr *)&from, &namesize); + if (fd < 0) { + eprintf("can't accept a new connection, %m\n"); + return; + } + + ci = create_client(fd, data); + if (!ci) { + close(fd); + return; + } + + ret = register_event(fd, client_handler, ci); + if (ret) { + destroy_client(ci); + return; + } +} diff --git a/puppy/puppy.c b/puppy/puppy.c new file mode 100644 index 0000000..1401f19 --- /dev/null +++ b/puppy/puppy.c @@ -0,0 +1,186 @@ +#include <stdio.h> +#include <stdlib.h> +#include <netdb.h> +#include <unistd.h> +#include <pthread.h> +#include <errno.h> +#include <getopt.h> +#include <openssl/sha.h> +#include <corosync/cpg.h> + +#include "puppy.h" +#include "list.h" +#include "util.h" +#include "event.h" +#include "sheepdog_proto.h" +#include "net.h" +#include "logger.h" + +static char program_name[] = "puppy"; + +static int sheepport = SHEEP_LISTEN_PORT; + +static struct option const long_options[] = { + {"dport", required_argument, 0, 'D'}, + {"sport", required_argument, 0, 's'}, + {"foreground", no_argument, 0, 'f'}, + {"debug", no_argument, 0, 'd'}, + {"help", no_argument, 0, 'h'}, + {0, 0, 0, 0}, +}; + +static char *short_options = "D:s:fdh"; + +static void usage(int status) +{ + if (status) + fprintf(stderr, "Try `%s --help' for more information.\n", + program_name); + else { + printf("Usage: %s [OPTION]\n", program_name); + printf("\ +Sheepdog cluster monitor\n\ + -D, --dport specify the dog (our) listen port number\n\ + -s, --sport specify the sheep listen port number\n\ + -f, --foreground make the program run in the foreground\n\ + -d, --debug print debug messages\n\ + -h, --help display this help and exit\n\ +"); + } + exit(status); +} + +static int create_listen_ports_fn(int fd, void *data) +{ + return register_event(fd, listen_handler, data); +} + +int main(int argc, char **argv) +{ + int ch, longindex; + int ret, port = DOG_LISTEN_PORT; + int is_daemon = 1; + int is_debug = 0; + cpg_handle_t cpg_handle = 0; + SHA_CTX ctx; + struct cluster_info *ci; + cpg_callbacks_t cb = { &sd_deliver, &sd_confch }; + unsigned int nodeid = 0; + struct cpg_name group = { 8, "sheepdog" }; + struct sheepdog_node_list_entry ent; + char name[INET6_ADDRSTRLEN]; + struct addrinfo hints, *res; + + while ((ch = getopt_long(argc, argv, short_options, long_options, + &longindex)) >= 0) { + switch (ch) { + case 'D': + port = atoi(optarg); + break; + case 's': + sheepport = atoi(optarg); + break; + case 'f': + is_daemon = 0; + break; + case 'd': + is_debug = 1; + break; + case 'h': + usage(0); + break; + default: + usage(1); + break; + } + } + + ret = log_init(program_name, LOG_SPACE_SIZE, is_daemon, is_debug); + if (ret) + exit(1); + + if (is_daemon && daemon(0, 0)) + exit(1); + + ret = init_event(1024); + if (ret) + exit(1); + + ret = cpg_initialize(&cpg_handle, &cb); + if (ret != CS_OK) { + eprintf("Failed to initialize cpg, %d\n", ret); + eprintf("Is corosync running?\n"); + return -1; + } + +join_retry: + ret = cpg_join(cpg_handle, &group); + switch (ret) { + case CS_OK: + break; + case CS_ERR_TRY_AGAIN: + dprintf("Failed to join the sheepdog group, try again\n"); + sleep(1); + goto join_retry; + case CS_ERR_SECURITY: + eprintf("Permission error.\n"); + exit(1); + default: + eprintf("Failed to join the sheepdog group, %d\n", ret); + exit(1); + break; + } + + ret = cpg_local_get(cpg_handle, &nodeid); + if (ret != CS_OK) { + eprintf("Failed to get the local node's identifier, %d\n", ret); + exit(1); + } + + memset(&ent, 0, sizeof(ent)); + + gethostname(name, sizeof(name)); + + memset(&hints, 0, sizeof(hints)); + + hints.ai_socktype = SOCK_STREAM; + ret = getaddrinfo(name, NULL, &hints, &res); + if (ret) + exit(1); + + if (res->ai_family == AF_INET) { + struct sockaddr_in *addr = (struct sockaddr_in *)res->ai_addr; + memset(ent.addr, 0, sizeof(ent.addr)); + memcpy(ent.addr + 12, &addr->sin_addr, 4); + } else if (res->ai_family == AF_INET6) { + struct sockaddr_in6 *addr = (struct sockaddr_in6 *)res->ai_addr; + memcpy(ent.addr, &addr->sin6_addr, 16); + } else { + eprintf("unknown address family\n"); + exit(1); + } + + freeaddrinfo(res); + + ent.port = sheepport; + + SHA1_Init(&ctx); + SHA1_Update(&ctx, ent.addr, sizeof(ent.addr)); + SHA1_Update(&ctx, &ent.port, sizeof(ent.port)); + SHA1_Final(ent.id, &ctx); + + ci = create_cluster(cpg_handle, nodeid, getpid(), &ent); + if (!ci) { + eprintf("failed to create sheepdog cluster.\n"); + exit(1); + } + + ret = create_listen_ports(port, create_listen_ports_fn, ci); + if (ret) + exit(1); + + event_loop(-1); + + cpg_finalize(cpg_handle); + return 0; +} diff --git a/puppy/puppy.h b/puppy/puppy.h new file mode 100644 index 0000000..14f0f46 --- /dev/null +++ b/puppy/puppy.h @@ -0,0 +1,127 @@ +#include <corosync/cpg.h> + +#include "sheepdog_proto.h" +#include "net.h" +#include "list.h" + +#define SD_MSG_JOIN 0x01 +#define SD_MSG_VDI_OP 0x02 +#define SD_MSG_MASTER_CHANGED 0x03 + +/* too much duplication */ + +struct client_info { + struct connection conn; + + char hostname[INET6_ADDRSTRLEN]; + int port; + + struct request *rx_req; + + struct request *tx_req; + + struct list_head reqs; + struct list_head done_reqs; + + struct cluster_info *cluster; +}; + +struct request; + +typedef void (*req_end_t) (struct request *); + +struct request { + struct sd_req rq; + struct sd_rsp rp; + + void *data; + + struct client_info *ci; + struct list_head r_siblings; + struct list_head r_wlist; + struct list_head pending_list; + + req_end_t done; +}; + +struct vm { + struct sheepdog_vm_list_entry ent; + struct list_head list; +}; + +struct node { + uint32_t nodeid; + uint32_t pid; + struct sheepdog_node_list_entry ent; + struct list_head list; +}; + +struct cluster_info { + cpg_handle_t handle; + int synchronized; + uint32_t this_nodeid; + uint32_t this_pid; + struct sheepdog_node_list_entry this_node; + + uint64_t epoch; + + struct list_head node_list; + struct list_head vm_list; + struct list_head pending_list; +}; + +struct message_header { + uint8_t op; + uint8_t done; + uint8_t pad[2]; + uint32_t msg_length; + struct sheepdog_node_list_entry from; +}; + +struct join_message { + struct message_header header; + uint32_t nodeid; + uint32_t pid; + uint32_t nr_nodes; + struct sheepdog_node_list_entry master_node; + struct { + uint32_t nodeid; + uint32_t pid; + struct sheepdog_node_list_entry ent; + } nodes[SD_MAX_NODES]; +}; + +struct vdi_op_message { + struct message_header header; + struct sd_vdi_req req; + struct sd_vdi_rsp rsp; + uint8_t data[0]; +}; + +int add_vdi(struct cluster_info *cluster, + char *name, int len, uint64_t size, uint64_t * added_oid, + uint64_t base_oid, uint32_t tag); + +int lookup_vdi(struct cluster_info *cluster, char *filename, uint64_t * oid, + uint32_t tag, int do_lock, int *current); + +int build_node_list(struct list_head *node_list, + struct sheepdog_node_list_entry *entries); +int send_message(cpg_handle_t handle, struct message_header *msg); +int node_cmp(const void *a, const void *b); + +void listen_handler(int listen_fd, int events, void *data); +struct cluster_info *create_cluster(cpg_handle_t handle, + uint32_t this_nodeid, + uint32_t this_pid, + struct sheepdog_node_list_entry + *this_node); +void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len); +void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name, + const struct cpg_address *member_list, + size_t member_list_entries, + const struct cpg_address *left_list, + size_t left_list_entries, + const struct cpg_address *joined_list, + size_t joined_list_entries); diff --git a/puppy/vdi.c b/puppy/vdi.c new file mode 100644 index 0000000..82232e0 --- /dev/null +++ b/puppy/vdi.c @@ -0,0 +1,256 @@ +#include <errno.h> +#include <inttypes.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/time.h> +#include <openssl/sha.h> + +#include "meta.h" +#include "util.h" +#include "list.h" +#include "sheepdog_proto.h" +#include "puppy.h" +#include "net.h" +#include "logger.h" + +static int sheepdog_match(struct sheepdog_dir_entry *ent, char *name, int len) +{ + if (!ent->name_len) + return 0; + if (ent->name_len != len) + return 0; + return !memcmp(ent->name, name, len); +} + +/* TODO: should be performed atomically */ +static int create_inode_obj(struct sheepdog_node_list_entry *entries, + int nr_nodes, uint64_t epoch, int copies, + uint64_t oid, uint64_t size, uint64_t base_oid) +{ + struct sheepdog_inode inode, base; + struct timeval tv; + int ret; + + if (base_oid) { + ret = read_object(entries, nr_nodes, epoch, + base_oid, (char *)&base, sizeof(base), 0, + copies); + if (ret < 0) + return SD_RES_BASE_VDI_READ; + } + + gettimeofday(&tv, NULL); + + memset(&inode, 0, sizeof(inode)); + + inode.oid = oid; + inode.vdi_size = size; + inode.block_size = SD_DATA_OBJ_SIZE; + inode.ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000; + inode.nr_copies = copies; + + if (base_oid) { + int i; + + eprintf("%zd %zd\n", sizeof(inode.data_oid), + ARRAY_SIZE(base.child_oid)); + inode.parent_oid = base_oid; + memcpy(inode.data_oid, base.data_oid, + MAX_DATA_OBJS * sizeof(uint64_t)); + + for (i = 0; i < ARRAY_SIZE(base.child_oid); i++) { + if (!base.child_oid[i]) { + base.child_oid[i] = oid; + break; + } + } + + if (i == ARRAY_SIZE(base.child_oid)) + return SD_RES_NO_BASE_VDI; + + ret = write_object(entries, nr_nodes, + epoch, base_oid, (char *)&base, + sizeof(base), 0, copies, 0); + if (ret < 0) + return SD_RES_BASE_VDI_WRITE; + } + + ret = write_object(entries, nr_nodes, epoch, + oid, (char *)&inode, sizeof(inode), 0, copies, 1); + if (ret < 0) + return SD_RES_VDI_WRITE; + + return ret; +} + +#define DIR_BUF_LEN (UINT64_C(1) << 20) + +/* + * TODO: handle larger buffer + */ +int add_vdi(struct cluster_info *cluster, char *name, int len, uint64_t size, + uint64_t *added_oid, uint64_t base_oid, uint32_t tag) +{ + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; + int nr_nodes; + struct sheepdog_dir_entry *prv, *ent; + uint64_t oid = 0; + char *buf; + int ret, rest; + struct sheepdog_super_block *sb; + int copies; + + nr_nodes = build_node_list(&cluster->node_list, entries); + + eprintf("%s (%d) %" PRIu64 ", base: %" PRIu64 "\n", name, len, size, + base_oid); + + buf = zalloc(DIR_BUF_LEN); + if (!buf) + return 1; + + ret = read_object(entries, nr_nodes, cluster->epoch, + SD_DIR_OID, buf, DIR_BUF_LEN, 0, nr_nodes); + if (ret < 0) { + ret = SD_RES_DIR_READ; + goto out; + } + + sb = (struct sheepdog_super_block *)buf; + copies = sb->default_nr_copies; + + ret = read_object(entries, nr_nodes, cluster->epoch, + SD_DIR_OID, buf, DIR_BUF_LEN, sizeof(*sb), nr_nodes); + if (ret < 0) { + ret = SD_RES_DIR_READ; + goto out; + } + + ent = (struct sheepdog_dir_entry *)buf; + rest = ret; + while (rest > 0) { + if (!ent->name_len) + break; + + if (sheepdog_match(ent, name, len) && !tag) { + ret = SD_RES_VDI_EXIST; + goto out; + } + oid = ent->oid; + prv = ent; + ent = next_entry(prv); + rest -= ((char *)ent - (char *)prv); + } + + /* need to check if the buffer is large enough here. */ + oid += (1 << 18); + + ret = create_inode_obj(entries, nr_nodes, cluster->epoch, copies, + oid, size, base_oid); + if (ret) + goto out; + + ent->oid = oid; + ent->tag = tag; + + ent->flags = FLAG_CURRENT; + ent->name_len = len; + memcpy(ent->name, name, len); + + if (tag) { + struct sheepdog_dir_entry *e = (struct sheepdog_dir_entry *)buf; + + while (e < ent) { + if (sheepdog_match(e, name, len)) + e->flags &= ~FLAG_CURRENT; + e = next_entry(e); + } + } + + ent = next_entry(ent); + + ret = write_object(entries, nr_nodes, cluster->epoch, + SD_DIR_OID, buf, (char *)ent - buf, sizeof(*sb), + copies, 0); + if (ret) { + ret = SD_RES_DIR_WRITE; + goto out; + } + + *added_oid = oid; +out: + free(buf); + + return ret; +} + +int del_vdi(struct cluster_info *cluster, char *name, int len) +{ + return 0; +} + +int lookup_vdi(struct cluster_info *cluster, + char *filename, uint64_t * oid, uint32_t tag, int do_lock, + int *current) +{ + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; + int nr_nodes; + int rest, ret; + char *buf; + struct sheepdog_dir_entry *prv, *ent; + + nr_nodes = build_node_list(&cluster->node_list, entries); + + *current = 0; + buf = zalloc(DIR_BUF_LEN); + if (!buf) + return 1; + + ret = read_object(entries, nr_nodes, cluster->epoch, + SD_DIR_OID, buf, DIR_BUF_LEN, + sizeof(struct sheepdog_super_block), nr_nodes); + if (ret < 0) { + ret = SD_RES_DIR_READ; + goto out; + } + + eprintf("looking for %s %zd, %d\n", filename, strlen(filename), ret); + + ent = (struct sheepdog_dir_entry *)buf; + rest = ret; + ret = SD_RES_NO_VDI; + while (rest > 0) { + if (!ent->name_len) + break; + + eprintf("%s %d %" PRIu64 "\n", ent->name, ent->name_len, + ent->oid); + + if (sheepdog_match(ent, filename, strlen(filename))) { + if (ent->tag != tag && tag != -1) { + ret = SD_RES_NO_TAG; + goto next; + } + if (ent->tag != tag && !(ent->flags & FLAG_CURRENT)) { + /* current vdi must exsit */ + ret = SD_RES_SYSTEM_ERROR; + goto next; + } + + *oid = ent->oid; + ret = 0; + + if (ent->flags & FLAG_CURRENT) + *current = 1; + break; + } +next: + prv = ent; + ent = next_entry(prv); + rest -= ((char *)ent - (char *)prv); + } +out: + free(buf); + return ret; +} -- 1.5.6.5 |