[Sheepdog] [PATCH 1/2] add puppy (tiny dog program)
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Thu Nov 12 21:50:42 CET 2009
Puppy is C implementation of dog program.
It use corosync instead of JGroups for the cluster communication,
so JVM is no longer required.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
puppy/Makefile.in | 31 ++++
puppy/group.c | 422 +++++++++++++++++++++++++++++++++++++++++++++++++++++
puppy/net.c | 355 ++++++++++++++++++++++++++++++++++++++++++++
puppy/puppy.c | 186 +++++++++++++++++++++++
puppy/puppy.h | 127 ++++++++++++++++
puppy/vdi.c | 256 ++++++++++++++++++++++++++++++++
6 files changed, 1377 insertions(+), 0 deletions(-)
create mode 100644 puppy/Makefile.in
create mode 100644 puppy/group.c
create mode 100644 puppy/net.c
create mode 100644 puppy/puppy.c
create mode 100644 puppy/puppy.h
create mode 100644 puppy/vdi.c
diff --git a/puppy/Makefile.in b/puppy/Makefile.in
new file mode 100644
index 0000000..8468066
--- /dev/null
+++ b/puppy/Makefile.in
@@ -0,0 +1,31 @@
+CFLAGS += -g -O2 -Wall -Wstrict-prototypes -I../include
+CFLAGS += -D_GNU_SOURCE
+LIBS += -lcrypto -lcpg
+
+PROGRAMS = puppy
+PUPPY_OBJS = puppy.o net.o net.o vdi.o group.o ../lib/event.o ../lib/net.o ../lib/logger.o
+PUPPY_DEP = $(PUPPY_OBJS:.o=.d)
+
+prefix = @prefix@
+exec_prefix = @exec_prefix@
+bindir = @bindir@
+
+.PHONY:all
+all: $(PROGRAMS)
+
+puppy: $(PUPPY_OBJS)
+ $(CC) $^ -o $@ $(LIBS)
+
+-include $(PUPPY_DEP)
+
+%.o: %.c
+ $(CC) -c $(CFLAGS) $*.c -o $*.o
+ @$(CC) -MM $(CFLAGS) -MF $*.d -MT $*.o $*.c
+
+.PHONY:clean
+clean:
+ rm -f *.[od] $(PROGRAMS)
+
+.PHONY:install
+install: $(PROGRAMS)
+ install $< $(bindir)
diff --git a/puppy/group.c b/puppy/group.c
new file mode 100644
index 0000000..982a9b4
--- /dev/null
+++ b/puppy/group.c
@@ -0,0 +1,422 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <corosync/cpg.h>
+
+#include "puppy.h"
+#include "list.h"
+#include "util.h"
+#include "event.h"
+#include "sheepdog_proto.h"
+#include "net.h"
+#include "meta.h"
+#include "logger.h"
+
+static struct vm *lookup_vm(struct list_head *entries, char *name)
+{
+ struct vm *vm;
+
+ list_for_each_entry(vm, entries, list) {
+ if (!strcmp((char *)vm->ent.name, name))
+ return vm;
+ }
+
+ return NULL;
+}
+
+static void group_handler(int listen_fd, int events, void *data)
+{
+ struct cluster_info *ci = data;
+ cpg_dispatch(ci->handle, CPG_DISPATCH_ALL);
+}
+
+struct cluster_info *create_cluster(cpg_handle_t handle, uint32_t this_nodeid,
+ uint32_t this_pid,
+ struct sheepdog_node_list_entry *this_node)
+{
+ int fd;
+ struct cluster_info *ci;
+ ci = zalloc(sizeof(*ci));
+ if (!ci)
+ return NULL;
+
+ ci->handle = handle;
+ ci->this_nodeid = this_nodeid;
+ ci->this_pid = this_pid;
+ ci->this_node = *this_node;
+ ci->synchronized = 0;
+ INIT_LIST_HEAD(&ci->node_list);
+ INIT_LIST_HEAD(&ci->vm_list);
+ INIT_LIST_HEAD(&ci->pending_list);
+ cpg_context_set(handle, ci);
+
+ cpg_fd_get(handle, &fd);
+ register_event(fd, group_handler, ci);
+ return ci;
+}
+
+static void print_node_list(struct cluster_info *ci)
+{
+ struct node *node;
+ list_for_each_entry(node, &ci->node_list, list) {
+ dprintf("%c nodeid: %x, pid: %d, ip: %d.%d.%d.%d:%d\n",
+ node_cmp(&node->ent, &ci->this_node) ? ' ' : 'l',
+ node->nodeid, node->pid,
+ node->ent.addr[12], node->ent.addr[13],
+ node->ent.addr[14], node->ent.addr[15], node->ent.port);
+ }
+}
+
+static void add_node(struct cluster_info *ci, uint32_t nodeid, uint32_t pid,
+ struct sheepdog_node_list_entry *sd_ent)
+{
+ struct node *node;
+
+ node = zalloc(sizeof(*node));
+ if (!node) {
+ eprintf("out of memory\n");
+ return;
+ }
+ node->nodeid = nodeid;
+ node->pid = pid;
+ node->ent = *sd_ent;
+ list_add_tail(&node->list, &ci->node_list);
+ ci->epoch++;
+}
+
+static int is_master(struct cluster_info *ci)
+{
+ struct node *node;
+
+ if (!ci->synchronized)
+ return 0;
+
+ if (list_empty(&ci->node_list))
+ return 1;
+
+ node = list_first_entry(&ci->node_list, struct node, list);
+ if (node_cmp(&node->ent, &ci->this_node) == 0)
+ return 1;
+
+ return 0;
+}
+
+static void join(struct cluster_info *ci, cpg_handle_t handle,
+ struct join_message *msg)
+{
+ if (!ci->synchronized)
+ return;
+
+ if (!is_master(ci))
+ return;
+
+ struct node *node;
+
+ list_for_each_entry(node, &ci->node_list, list) {
+ msg->nodes[msg->nr_nodes].nodeid = node->nodeid;
+ msg->nodes[msg->nr_nodes].pid = node->pid;
+ msg->nodes[msg->nr_nodes].ent = node->ent;
+ msg->nr_nodes++;
+ }
+}
+
+static void update_cluster_info(struct cluster_info *ci,
+ struct join_message *msg)
+{
+ int i;
+ int nr_nodes = msg->nr_nodes;
+ struct node *node, *e;
+
+ if (ci->synchronized)
+ goto out;
+
+ list_for_each_entry_safe(node, e, &ci->node_list, list) {
+ list_del(&node->list);
+ free(node);
+ }
+
+ INIT_LIST_HEAD(&ci->node_list);
+ for (i = 0; i < nr_nodes; i++)
+ add_node(ci, msg->nodes[i].nodeid, msg->nodes[i].pid,
+ &msg->nodes[i].ent);
+
+ ci->synchronized = 1;
+
+out:
+ add_node(ci, msg->nodeid, msg->pid, &msg->header.from);
+ print_node_list(ci);
+}
+
+static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg)
+{
+ const struct sd_vdi_req *hdr = &msg->req;
+ struct sd_vdi_rsp *rsp = &msg->rsp;
+ void *data = msg->data;
+ int ret = SD_RES_SUCCESS, is_current;
+ uint64_t oid = 0;
+ struct sheepdog_super_block *sb;
+ struct timeval tv;
+ struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+ int nr_nodes;
+
+ switch (hdr->opcode) {
+ case SD_OP_NEW_VDI:
+ ret = add_vdi(ci, data, strlen(data), hdr->vdi_size, &oid,
+ hdr->base_oid, hdr->tag);
+ break;
+ case SD_OP_LOCK_VDI:
+ case SD_OP_GET_VDI_INFO:
+ ret = lookup_vdi(ci, data, &oid, hdr->tag, 1, &is_current);
+ if (ret < 0)
+ break;
+ if (is_current)
+ rsp->flags = SD_VDI_RSP_FLAG_CURRENT;
+ break;
+ case SD_OP_RELEASE_VDI:
+ break;
+ case SD_OP_MAKE_FS:
+ sb = zalloc(sizeof(*sb));
+ if (!sb) {
+ ret = -1;
+ break;
+ }
+ gettimeofday(&tv, NULL);
+ sb->ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+ sb->default_nr_copies = 3;
+
+ nr_nodes = build_node_list(&ci->node_list, entries);
+ ret = write_object(entries, nr_nodes, ci->epoch,
+ SD_DIR_OID, (char *)sb, sizeof(*sb), 0,
+ sb->default_nr_copies, 1);
+ break;
+ case SD_OP_UPDATE_EPOCH:
+ break;
+ case SD_OP_GET_EPOCH:
+ rsp->epoch = 1;
+ break;
+ default:
+ ret = SD_RES_SYSTEM_ERROR;
+ eprintf("opcode %d is not implemented\n", hdr->opcode);
+ break;
+ }
+
+ rsp->oid = oid;
+ rsp->result = ret;
+}
+
+static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg)
+{
+ const struct sd_vdi_req *hdr = &msg->req;
+ struct sd_vdi_rsp *rsp = &msg->rsp;
+ void *data = msg->data;
+ struct vm *vm;
+ struct request *req;
+ int ret = msg->rsp.result;
+
+ switch (hdr->opcode) {
+ case SD_OP_NEW_VDI:
+ break;
+ case SD_OP_LOCK_VDI:
+ if (lookup_vm(&ci->vm_list, (char *)data)) {
+ ret = SD_RES_VDI_LOCKED;
+ break;
+ }
+
+ vm = zalloc(sizeof(*vm));
+ if (!vm) {
+ ret = SD_RES_UNKNOWN;
+ break;
+ }
+ strcpy((char *)vm->ent.name, (char *)data);
+ memcpy(vm->ent.host_addr, msg->header.from.addr,
+ sizeof(vm->ent.host_addr));
+ vm->ent.host_port = msg->header.from.port;
+
+ list_add(&vm->list, &ci->vm_list);
+ break;
+ case SD_OP_RELEASE_VDI:
+ vm = lookup_vm(&ci->vm_list, (char *)data);
+ if (!vm) {
+ ret = SD_RES_VDI_NOT_LOCKED;
+ break;
+ }
+
+ list_del(&vm->list);
+ break;
+ case SD_OP_GET_VDI_INFO:
+ break;
+ case SD_OP_UPDATE_EPOCH:
+ break;
+ case SD_OP_GET_EPOCH:
+ break;
+ case SD_OP_MAKE_FS:
+ break;
+ default:
+ eprintf("unknown operation %d\n", hdr->opcode);
+ ret = SD_RES_UNKNOWN;
+ }
+
+ if (node_cmp(&ci->this_node, &msg->header.from) != 0)
+ return;
+
+ req = list_first_entry(&ci->pending_list, struct request, pending_list);
+
+ rsp->result = ret;
+ memcpy(req->data, data, rsp->data_length);
+ memcpy(&req->rp, rsp, sizeof(req->rp));
+ list_del(&req->pending_list);
+ req->done(req);
+}
+
+void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+{
+ struct cluster_info *ci;
+ struct message_header *m = msg;
+
+ dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n",
+ m->op, m->done, m->msg_length,
+ m->from.addr[12], m->from.addr[13],
+ m->from.addr[14], m->from.addr[15], m->from.port);
+
+ cpg_context_get(handle, (void **)&ci);
+ if (!m->done) {
+ if (!is_master(ci))
+ return;
+
+ switch (m->op) {
+ case SD_MSG_JOIN:
+ join(ci, handle, msg);
+ break;
+ case SD_MSG_VDI_OP:
+ vdi_op(ci, msg);
+ break;
+ default:
+ eprintf("unknown message %d\n", m->op);
+ break;
+ }
+
+ m->done = 1;
+ send_message(handle, m);
+ } else {
+ switch (m->op) {
+ case SD_MSG_JOIN:
+ update_cluster_info(ci, msg);
+ break;
+ case SD_MSG_VDI_OP:
+ vdi_op_done(ci, msg);
+ break;
+ default:
+ eprintf("unknown message %d\n", m->op);
+ break;
+ }
+ }
+}
+
+void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
+ const struct cpg_address *member_list,
+ size_t member_list_entries, const struct cpg_address *left_list,
+ size_t left_list_entries, const struct cpg_address *joined_list,
+ size_t joined_list_entries)
+{
+ struct cluster_info *ci;
+ struct node *node, *e;
+ int i;
+
+ dprintf("confchg nodeid %x\n", member_list[0].nodeid);
+ dprintf("%ld %ld %ld\n", member_list_entries, left_list_entries,
+ joined_list_entries);
+ for (i = 0; i < member_list_entries; i++) {
+ dprintf("[%d] node_id: %d, pid: %d, reason: %d\n", i,
+ member_list[i].nodeid, member_list[i].pid,
+ member_list[i].reason);
+ }
+
+ cpg_context_get(handle, (void **)&ci);
+
+ if (member_list_entries == joined_list_entries - left_list_entries &&
+ ci->this_nodeid == member_list[0].nodeid &&
+ ci->this_pid == member_list[0].pid)
+ ci->synchronized = 1;
+
+ for (i = 0; i < left_list_entries; i++) {
+ list_for_each_entry_safe(node, e, &ci->node_list, list) {
+ if (node->nodeid != left_list[i].nodeid ||
+ node->pid != left_list[i].pid)
+ continue;
+
+ list_del(&node->list);
+ free(node);
+ ci->epoch++;
+ }
+ }
+
+ for (i = 0; i < joined_list_entries; i++) {
+ if (ci->this_nodeid == joined_list[0].nodeid &&
+ ci->this_pid == joined_list[0].pid) {
+ struct join_message msg;
+
+ msg.header.op = SD_MSG_JOIN;
+ msg.header.done = 0;
+ msg.header.msg_length = sizeof(msg);
+ msg.nodeid = ci->this_nodeid;
+ msg.pid = ci->this_pid;
+ msg.header.from = ci->this_node;
+
+ send_message(ci->handle, (struct message_header *)&msg);
+ }
+ }
+
+ if (left_list_entries == 0)
+ return;
+
+ print_node_list(ci);
+}
+
+int send_message(cpg_handle_t handle, struct message_header *msg)
+{
+ struct iovec iov;
+ int ret;
+
+ iov.iov_base = msg;
+ iov.iov_len = msg->msg_length;
+retry:
+ ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1);
+ switch (ret) {
+ case CS_OK:
+ break;
+ case CS_ERR_TRY_AGAIN:
+ dprintf("failed to send message. try again\n");
+ sleep(1);
+ goto retry;
+ default:
+ eprintf("failed to send message, %d\n", ret);
+ return -1;
+ }
+ return 0;
+}
+
+int build_node_list(struct list_head *node_list,
+ struct sheepdog_node_list_entry *entries)
+{
+ struct node *node;
+ int nr = 0;
+
+ list_for_each_entry(node, node_list, list) {
+ if (entries)
+ memcpy(entries + nr, &node->ent, sizeof(*entries));
+ nr++;
+ }
+ if (entries)
+ qsort(entries, nr, sizeof(*entries), node_cmp);
+
+ return nr;
+}
+
+int node_cmp(const void *a, const void *b)
+{
+ const struct sheepdog_node_list_entry *node1 = a;
+ const struct sheepdog_node_list_entry *node2 = b;
+ return memcmp(node1->id, node2->id, sizeof(node1->id));
+}
diff --git a/puppy/net.c b/puppy/net.c
new file mode 100644
index 0000000..f011acf
--- /dev/null
+++ b/puppy/net.c
@@ -0,0 +1,355 @@
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include "sheepdog_proto.h"
+#include "util.h"
+#include "event.h"
+#include "net.h"
+#include "list.h"
+#include "puppy.h"
+#include "meta.h"
+#include "logger.h"
+
+static int get_node_idx(struct sheepdog_node_list_entry *ent,
+ struct sheepdog_node_list_entry *entries, int nr_nodes)
+{
+ ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
+ if (!ent)
+ return -1;
+
+ return ent - entries;
+}
+
+static void get_node_list(struct cluster_info *cluster, struct sd_node_req *req,
+ struct sd_node_rsp *rsp, void *data)
+{
+ int nr_nodes;
+ struct node *node;
+
+ nr_nodes = build_node_list(&cluster->node_list, data);
+ rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
+ rsp->nr_nodes = nr_nodes;
+ rsp->local_idx = get_node_idx(&cluster->this_node, data, nr_nodes);
+
+ if (list_empty(&cluster->node_list)) {
+ rsp->master_idx = -1;
+ return;
+ }
+ node = list_first_entry(&cluster->node_list, struct node, list);
+ rsp->master_idx = get_node_idx(&node->ent, data, nr_nodes);
+}
+
+static void get_vm_list(struct cluster_info *cluster, struct sd_rsp *rsp,
+ void *data)
+{
+ int nr_vms;
+ struct vm *vm;
+
+ struct sheepdog_vm_list_entry *p = data;
+ list_for_each_entry(vm, &cluster->vm_list, list) {
+ *p++ = vm->ent;
+ }
+
+ nr_vms = p - (struct sheepdog_vm_list_entry *)data;
+ rsp->data_length = nr_vms * sizeof(struct sheepdog_vm_list_entry);
+}
+
+void queue_request(struct request *req)
+{
+ struct sd_req *hdr = (struct sd_req *)&req->rq;
+ struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
+ struct cluster_info *cluster = req->ci->cluster;
+ struct vdi_op_message *msg;
+ int ret = SD_RES_SUCCESS;
+
+ eprintf("%p %x\n", req, hdr->opcode);
+
+ switch (hdr->opcode) {
+ case SD_OP_GET_NODE_LIST:
+ get_node_list(cluster, (struct sd_node_req *)hdr,
+ (struct sd_node_rsp *)rsp, req->data);
+ break;
+ case SD_OP_GET_VM_LIST:
+ get_vm_list(cluster, rsp, req->data);
+ break;
+ default:
+ /* forward request to group */
+ goto forward;
+ }
+
+ rsp->result = ret;
+ req->done(req);
+ return;
+
+forward:
+ msg = zalloc(sizeof(*msg) + hdr->data_length);
+ if (!msg) {
+ eprintf("out of memory\n");
+ return;
+ }
+
+ msg->header.op = SD_MSG_VDI_OP;
+ msg->header.done = 0;
+ msg->header.msg_length = sizeof(*msg) + hdr->data_length;
+ msg->header.from = cluster->this_node;
+ msg->req = *((struct sd_vdi_req *)&req->rq);
+ msg->rsp = *((struct sd_vdi_rsp *)&req->rp);
+ if (hdr->flags & SD_FLAG_CMD_WRITE)
+ memcpy(msg->data, req->data, hdr->data_length);
+
+ list_add(&req->pending_list, &cluster->pending_list);
+ send_message(cluster->handle, (struct message_header *)msg);
+}
+
+static struct request *alloc_request(struct client_info *ci, int data_length)
+{
+ struct request *req;
+
+ req = zalloc(sizeof(struct request) + data_length);
+ if (!req)
+ return NULL;
+
+ req->ci = ci;
+ if (data_length)
+ req->data = (char *)req + sizeof(*req);
+
+ list_add(&req->r_siblings, &ci->reqs);
+ INIT_LIST_HEAD(&req->r_wlist);
+
+ return req;
+}
+
+static void free_request(struct request *req)
+{
+ list_del(&req->r_siblings);
+ free(req);
+}
+
+static void init_rx_hdr(struct client_info *ci)
+{
+ ci->conn.c_rx_state = C_IO_HEADER;
+ ci->rx_req = NULL;
+ ci->conn.rx_length = sizeof(struct sd_req);
+ ci->conn.rx_buf = &ci->conn.rx_hdr;
+}
+
+static void req_done(struct request *req)
+{
+ list_add(&req->r_wlist, &req->ci->done_reqs);
+ conn_tx_on(&req->ci->conn);
+}
+
+static void client_rx_handler(struct client_info *ci)
+{
+ int ret;
+ uint64_t data_len;
+ struct connection *conn = &ci->conn;
+ struct sd_req *hdr = &conn->rx_hdr;
+ struct request *req;
+
+ switch (conn->c_rx_state) {
+ case C_IO_HEADER:
+ ret = rx(conn, C_IO_DATA_INIT);
+ if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
+ break;
+ case C_IO_DATA_INIT:
+ data_len = hdr->data_length;
+
+ req = alloc_request(ci, data_len);
+ if (!req) {
+ conn->c_rx_state = C_IO_CLOSED;
+ break;
+ }
+ ci->rx_req = req;
+
+ /* use le_to_cpu */
+ memcpy(&req->rq, hdr, sizeof(req->rq));
+
+ if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
+ conn->c_rx_state = C_IO_DATA;
+ conn->rx_length = data_len;
+ conn->rx_buf = req->data;
+ } else {
+ conn->c_rx_state = C_IO_END;
+ break;
+ }
+ case C_IO_DATA:
+ ret = rx(conn, C_IO_END);
+ break;
+ default:
+ eprintf("BUG: unknown state %d\n", conn->c_rx_state);
+ }
+
+ if (is_conn_dead(conn) || conn->c_rx_state != C_IO_END)
+ return;
+
+ /* now we have a complete command */
+
+ req = ci->rx_req;
+
+ init_rx_hdr(ci);
+
+ if (hdr->flags & SD_FLAG_CMD_WRITE)
+ req->rp.data_length = 0;
+ else
+ req->rp.data_length = hdr->data_length;
+
+ req->done = req_done;
+
+ queue_request(req);
+}
+
+static void init_tx_hdr(struct client_info *ci)
+{
+ struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+ struct request *req;
+
+ if (ci->tx_req || list_empty(&ci->done_reqs))
+ return;
+
+ memset(rsp, 0, sizeof(*rsp));
+
+ req = list_first_entry(&ci->done_reqs, struct request, r_wlist);
+ list_del(&req->r_wlist);
+
+ ci->tx_req = req;
+ ci->conn.tx_length = sizeof(*rsp);
+ ci->conn.c_tx_state = C_IO_HEADER;
+ ci->conn.tx_buf = rsp;
+
+ memcpy(rsp, &req->rp, sizeof(*rsp));
+
+ rsp->epoch = ci->cluster->epoch;
+ rsp->opcode = req->rq.opcode;
+ rsp->id = req->rq.id;
+}
+
+static void client_tx_handler(struct client_info *ci)
+{
+ int ret;
+ struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+
+again:
+ init_tx_hdr(ci);
+ if (!ci->tx_req) {
+ conn_tx_off(&ci->conn);
+ return;
+ }
+
+ switch (ci->conn.c_tx_state) {
+ case C_IO_HEADER:
+ if (rsp->data_length)
+ ret = tx(&ci->conn, C_IO_DATA_INIT, MSG_MORE);
+ else
+ ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
+
+ if (!ret)
+ break;
+
+ if (rsp->data_length) {
+ ci->conn.tx_length = rsp->data_length;
+ ci->conn.tx_buf = ci->tx_req->data;
+ ci->conn.c_tx_state = C_IO_DATA;
+ } else {
+ ci->conn.c_tx_state = C_IO_END;
+ break;
+ }
+ case C_IO_DATA:
+ ret = tx(&ci->conn, C_IO_END, 0);
+ if (!ret)
+ break;
+ default:
+ break;
+ }
+
+ if (is_conn_dead(&ci->conn) || ci->conn.c_tx_state != C_IO_END)
+ return;
+
+ if (ci->conn.c_tx_state == C_IO_END) {
+ free_request(ci->tx_req);
+ ci->tx_req = NULL;
+ goto again;
+ }
+}
+
+static struct client_info *create_client(int fd, struct cluster_info *cluster)
+{
+ struct client_info *ci;
+
+ ci = zalloc(sizeof(*ci));
+ if (!ci)
+ return NULL;
+
+ ci->conn.fd = fd;
+
+ ci->conn.c_rx_state = C_IO_HEADER;
+ ci->conn.rx_length = sizeof(struct sd_req);
+ ci->conn.rx_buf = &ci->conn.rx_hdr;
+
+ INIT_LIST_HEAD(&ci->reqs);
+ INIT_LIST_HEAD(&ci->done_reqs);
+
+ ci->cluster = cluster;
+
+ return ci;
+}
+
+static void destroy_client(struct client_info *ci)
+{
+ close(ci->conn.fd);
+ free(ci);
+}
+
+static void client_handler(int fd, int events, void *data)
+{
+ struct client_info *ci = (struct client_info *)data;
+
+ if (events & EPOLLIN)
+ client_rx_handler(ci);
+
+ if (!is_conn_dead(&ci->conn) && events & EPOLLOUT)
+ client_tx_handler(ci);
+
+ if (is_conn_dead(&ci->conn)) {
+ unregister_event(fd);
+ destroy_client(ci);
+ }
+}
+
+void listen_handler(int listen_fd, int events, void *data)
+{
+ struct sockaddr_storage from;
+ socklen_t namesize;
+ int fd, ret;
+ struct client_info *ci;
+
+ namesize = sizeof(from);
+ fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
+ if (fd < 0) {
+ eprintf("can't accept a new connection, %m\n");
+ return;
+ }
+
+ ci = create_client(fd, data);
+ if (!ci) {
+ close(fd);
+ return;
+ }
+
+ ret = register_event(fd, client_handler, ci);
+ if (ret) {
+ destroy_client(ci);
+ return;
+ }
+}
diff --git a/puppy/puppy.c b/puppy/puppy.c
new file mode 100644
index 0000000..1401f19
--- /dev/null
+++ b/puppy/puppy.c
@@ -0,0 +1,186 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <errno.h>
+#include <getopt.h>
+#include <openssl/sha.h>
+#include <corosync/cpg.h>
+
+#include "puppy.h"
+#include "list.h"
+#include "util.h"
+#include "event.h"
+#include "sheepdog_proto.h"
+#include "net.h"
+#include "logger.h"
+
+static char program_name[] = "puppy";
+
+static int sheepport = SHEEP_LISTEN_PORT;
+
+static struct option const long_options[] = {
+ {"dport", required_argument, 0, 'D'},
+ {"sport", required_argument, 0, 's'},
+ {"foreground", no_argument, 0, 'f'},
+ {"debug", no_argument, 0, 'd'},
+ {"help", no_argument, 0, 'h'},
+ {0, 0, 0, 0},
+};
+
+static char *short_options = "D:s:fdh";
+
+static void usage(int status)
+{
+ if (status)
+ fprintf(stderr, "Try `%s --help' for more information.\n",
+ program_name);
+ else {
+ printf("Usage: %s [OPTION]\n", program_name);
+ printf("\
+Sheepdog cluster monitor\n\
+ -D, --dport specify the dog (our) listen port number\n\
+ -s, --sport specify the sheep listen port number\n\
+ -f, --foreground make the program run in the foreground\n\
+ -d, --debug print debug messages\n\
+ -h, --help display this help and exit\n\
+");
+ }
+ exit(status);
+}
+
+static int create_listen_ports_fn(int fd, void *data)
+{
+ return register_event(fd, listen_handler, data);
+}
+
+int main(int argc, char **argv)
+{
+ int ch, longindex;
+ int ret, port = DOG_LISTEN_PORT;
+ int is_daemon = 1;
+ int is_debug = 0;
+ cpg_handle_t cpg_handle = 0;
+ SHA_CTX ctx;
+ struct cluster_info *ci;
+ cpg_callbacks_t cb = { &sd_deliver, &sd_confch };
+ unsigned int nodeid = 0;
+ struct cpg_name group = { 8, "sheepdog" };
+ struct sheepdog_node_list_entry ent;
+ char name[INET6_ADDRSTRLEN];
+ struct addrinfo hints, *res;
+
+ while ((ch = getopt_long(argc, argv, short_options, long_options,
+ &longindex)) >= 0) {
+ switch (ch) {
+ case 'D':
+ port = atoi(optarg);
+ break;
+ case 's':
+ sheepport = atoi(optarg);
+ break;
+ case 'f':
+ is_daemon = 0;
+ break;
+ case 'd':
+ is_debug = 1;
+ break;
+ case 'h':
+ usage(0);
+ break;
+ default:
+ usage(1);
+ break;
+ }
+ }
+
+ ret = log_init(program_name, LOG_SPACE_SIZE, is_daemon, is_debug);
+ if (ret)
+ exit(1);
+
+ if (is_daemon && daemon(0, 0))
+ exit(1);
+
+ ret = init_event(1024);
+ if (ret)
+ exit(1);
+
+ ret = cpg_initialize(&cpg_handle, &cb);
+ if (ret != CS_OK) {
+ eprintf("Failed to initialize cpg, %d\n", ret);
+ eprintf("Is corosync running?\n");
+ return -1;
+ }
+
+join_retry:
+ ret = cpg_join(cpg_handle, &group);
+ switch (ret) {
+ case CS_OK:
+ break;
+ case CS_ERR_TRY_AGAIN:
+ dprintf("Failed to join the sheepdog group, try again\n");
+ sleep(1);
+ goto join_retry;
+ case CS_ERR_SECURITY:
+ eprintf("Permission error.\n");
+ exit(1);
+ default:
+ eprintf("Failed to join the sheepdog group, %d\n", ret);
+ exit(1);
+ break;
+ }
+
+ ret = cpg_local_get(cpg_handle, &nodeid);
+ if (ret != CS_OK) {
+ eprintf("Failed to get the local node's identifier, %d\n", ret);
+ exit(1);
+ }
+
+ memset(&ent, 0, sizeof(ent));
+
+ gethostname(name, sizeof(name));
+
+ memset(&hints, 0, sizeof(hints));
+
+ hints.ai_socktype = SOCK_STREAM;
+ ret = getaddrinfo(name, NULL, &hints, &res);
+ if (ret)
+ exit(1);
+
+ if (res->ai_family == AF_INET) {
+ struct sockaddr_in *addr = (struct sockaddr_in *)res->ai_addr;
+ memset(ent.addr, 0, sizeof(ent.addr));
+ memcpy(ent.addr + 12, &addr->sin_addr, 4);
+ } else if (res->ai_family == AF_INET6) {
+ struct sockaddr_in6 *addr = (struct sockaddr_in6 *)res->ai_addr;
+ memcpy(ent.addr, &addr->sin6_addr, 16);
+ } else {
+ eprintf("unknown address family\n");
+ exit(1);
+ }
+
+ freeaddrinfo(res);
+
+ ent.port = sheepport;
+
+ SHA1_Init(&ctx);
+ SHA1_Update(&ctx, ent.addr, sizeof(ent.addr));
+ SHA1_Update(&ctx, &ent.port, sizeof(ent.port));
+ SHA1_Final(ent.id, &ctx);
+
+ ci = create_cluster(cpg_handle, nodeid, getpid(), &ent);
+ if (!ci) {
+ eprintf("failed to create sheepdog cluster.\n");
+ exit(1);
+ }
+
+ ret = create_listen_ports(port, create_listen_ports_fn, ci);
+ if (ret)
+ exit(1);
+
+ event_loop(-1);
+
+ cpg_finalize(cpg_handle);
+ return 0;
+}
diff --git a/puppy/puppy.h b/puppy/puppy.h
new file mode 100644
index 0000000..14f0f46
--- /dev/null
+++ b/puppy/puppy.h
@@ -0,0 +1,127 @@
+#include <corosync/cpg.h>
+
+#include "sheepdog_proto.h"
+#include "net.h"
+#include "list.h"
+
+#define SD_MSG_JOIN 0x01
+#define SD_MSG_VDI_OP 0x02
+#define SD_MSG_MASTER_CHANGED 0x03
+
+/* too much duplication */
+
+struct client_info {
+ struct connection conn;
+
+ char hostname[INET6_ADDRSTRLEN];
+ int port;
+
+ struct request *rx_req;
+
+ struct request *tx_req;
+
+ struct list_head reqs;
+ struct list_head done_reqs;
+
+ struct cluster_info *cluster;
+};
+
+struct request;
+
+typedef void (*req_end_t) (struct request *);
+
+struct request {
+ struct sd_req rq;
+ struct sd_rsp rp;
+
+ void *data;
+
+ struct client_info *ci;
+ struct list_head r_siblings;
+ struct list_head r_wlist;
+ struct list_head pending_list;
+
+ req_end_t done;
+};
+
+struct vm {
+ struct sheepdog_vm_list_entry ent;
+ struct list_head list;
+};
+
+struct node {
+ uint32_t nodeid;
+ uint32_t pid;
+ struct sheepdog_node_list_entry ent;
+ struct list_head list;
+};
+
+struct cluster_info {
+ cpg_handle_t handle;
+ int synchronized;
+ uint32_t this_nodeid;
+ uint32_t this_pid;
+ struct sheepdog_node_list_entry this_node;
+
+ uint64_t epoch;
+
+ struct list_head node_list;
+ struct list_head vm_list;
+ struct list_head pending_list;
+};
+
+struct message_header {
+ uint8_t op;
+ uint8_t done;
+ uint8_t pad[2];
+ uint32_t msg_length;
+ struct sheepdog_node_list_entry from;
+};
+
+struct join_message {
+ struct message_header header;
+ uint32_t nodeid;
+ uint32_t pid;
+ uint32_t nr_nodes;
+ struct sheepdog_node_list_entry master_node;
+ struct {
+ uint32_t nodeid;
+ uint32_t pid;
+ struct sheepdog_node_list_entry ent;
+ } nodes[SD_MAX_NODES];
+};
+
+struct vdi_op_message {
+ struct message_header header;
+ struct sd_vdi_req req;
+ struct sd_vdi_rsp rsp;
+ uint8_t data[0];
+};
+
+int add_vdi(struct cluster_info *cluster,
+ char *name, int len, uint64_t size, uint64_t * added_oid,
+ uint64_t base_oid, uint32_t tag);
+
+int lookup_vdi(struct cluster_info *cluster, char *filename, uint64_t * oid,
+ uint32_t tag, int do_lock, int *current);
+
+int build_node_list(struct list_head *node_list,
+ struct sheepdog_node_list_entry *entries);
+int send_message(cpg_handle_t handle, struct message_header *msg);
+int node_cmp(const void *a, const void *b);
+
+void listen_handler(int listen_fd, int events, void *data);
+struct cluster_info *create_cluster(cpg_handle_t handle,
+ uint32_t this_nodeid,
+ uint32_t this_pid,
+ struct sheepdog_node_list_entry
+ *this_node);
+void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
+void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
+ const struct cpg_address *member_list,
+ size_t member_list_entries,
+ const struct cpg_address *left_list,
+ size_t left_list_entries,
+ const struct cpg_address *joined_list,
+ size_t joined_list_entries);
diff --git a/puppy/vdi.c b/puppy/vdi.c
new file mode 100644
index 0000000..82232e0
--- /dev/null
+++ b/puppy/vdi.c
@@ -0,0 +1,256 @@
+#include <errno.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <openssl/sha.h>
+
+#include "meta.h"
+#include "util.h"
+#include "list.h"
+#include "sheepdog_proto.h"
+#include "puppy.h"
+#include "net.h"
+#include "logger.h"
+
+static int sheepdog_match(struct sheepdog_dir_entry *ent, char *name, int len)
+{
+ if (!ent->name_len)
+ return 0;
+ if (ent->name_len != len)
+ return 0;
+ return !memcmp(ent->name, name, len);
+}
+
+/* TODO: should be performed atomically */
+static int create_inode_obj(struct sheepdog_node_list_entry *entries,
+ int nr_nodes, uint64_t epoch, int copies,
+ uint64_t oid, uint64_t size, uint64_t base_oid)
+{
+ struct sheepdog_inode inode, base;
+ struct timeval tv;
+ int ret;
+
+ if (base_oid) {
+ ret = read_object(entries, nr_nodes, epoch,
+ base_oid, (char *)&base, sizeof(base), 0,
+ copies);
+ if (ret < 0)
+ return SD_RES_BASE_VDI_READ;
+ }
+
+ gettimeofday(&tv, NULL);
+
+ memset(&inode, 0, sizeof(inode));
+
+ inode.oid = oid;
+ inode.vdi_size = size;
+ inode.block_size = SD_DATA_OBJ_SIZE;
+ inode.ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+ inode.nr_copies = copies;
+
+ if (base_oid) {
+ int i;
+
+ eprintf("%zd %zd\n", sizeof(inode.data_oid),
+ ARRAY_SIZE(base.child_oid));
+ inode.parent_oid = base_oid;
+ memcpy(inode.data_oid, base.data_oid,
+ MAX_DATA_OBJS * sizeof(uint64_t));
+
+ for (i = 0; i < ARRAY_SIZE(base.child_oid); i++) {
+ if (!base.child_oid[i]) {
+ base.child_oid[i] = oid;
+ break;
+ }
+ }
+
+ if (i == ARRAY_SIZE(base.child_oid))
+ return SD_RES_NO_BASE_VDI;
+
+ ret = write_object(entries, nr_nodes,
+ epoch, base_oid, (char *)&base,
+ sizeof(base), 0, copies, 0);
+ if (ret < 0)
+ return SD_RES_BASE_VDI_WRITE;
+ }
+
+ ret = write_object(entries, nr_nodes, epoch,
+ oid, (char *)&inode, sizeof(inode), 0, copies, 1);
+ if (ret < 0)
+ return SD_RES_VDI_WRITE;
+
+ return ret;
+}
+
+#define DIR_BUF_LEN (UINT64_C(1) << 20)
+
+/*
+ * TODO: handle larger buffer
+ */
+int add_vdi(struct cluster_info *cluster, char *name, int len, uint64_t size,
+ uint64_t *added_oid, uint64_t base_oid, uint32_t tag)
+{
+ struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+ int nr_nodes;
+ struct sheepdog_dir_entry *prv, *ent;
+ uint64_t oid = 0;
+ char *buf;
+ int ret, rest;
+ struct sheepdog_super_block *sb;
+ int copies;
+
+ nr_nodes = build_node_list(&cluster->node_list, entries);
+
+ eprintf("%s (%d) %" PRIu64 ", base: %" PRIu64 "\n", name, len, size,
+ base_oid);
+
+ buf = zalloc(DIR_BUF_LEN);
+ if (!buf)
+ return 1;
+
+ ret = read_object(entries, nr_nodes, cluster->epoch,
+ SD_DIR_OID, buf, DIR_BUF_LEN, 0, nr_nodes);
+ if (ret < 0) {
+ ret = SD_RES_DIR_READ;
+ goto out;
+ }
+
+ sb = (struct sheepdog_super_block *)buf;
+ copies = sb->default_nr_copies;
+
+ ret = read_object(entries, nr_nodes, cluster->epoch,
+ SD_DIR_OID, buf, DIR_BUF_LEN, sizeof(*sb), nr_nodes);
+ if (ret < 0) {
+ ret = SD_RES_DIR_READ;
+ goto out;
+ }
+
+ ent = (struct sheepdog_dir_entry *)buf;
+ rest = ret;
+ while (rest > 0) {
+ if (!ent->name_len)
+ break;
+
+ if (sheepdog_match(ent, name, len) && !tag) {
+ ret = SD_RES_VDI_EXIST;
+ goto out;
+ }
+ oid = ent->oid;
+ prv = ent;
+ ent = next_entry(prv);
+ rest -= ((char *)ent - (char *)prv);
+ }
+
+ /* need to check if the buffer is large enough here. */
+ oid += (1 << 18);
+
+ ret = create_inode_obj(entries, nr_nodes, cluster->epoch, copies,
+ oid, size, base_oid);
+ if (ret)
+ goto out;
+
+ ent->oid = oid;
+ ent->tag = tag;
+
+ ent->flags = FLAG_CURRENT;
+ ent->name_len = len;
+ memcpy(ent->name, name, len);
+
+ if (tag) {
+ struct sheepdog_dir_entry *e = (struct sheepdog_dir_entry *)buf;
+
+ while (e < ent) {
+ if (sheepdog_match(e, name, len))
+ e->flags &= ~FLAG_CURRENT;
+ e = next_entry(e);
+ }
+ }
+
+ ent = next_entry(ent);
+
+ ret = write_object(entries, nr_nodes, cluster->epoch,
+ SD_DIR_OID, buf, (char *)ent - buf, sizeof(*sb),
+ copies, 0);
+ if (ret) {
+ ret = SD_RES_DIR_WRITE;
+ goto out;
+ }
+
+ *added_oid = oid;
+out:
+ free(buf);
+
+ return ret;
+}
+
+int del_vdi(struct cluster_info *cluster, char *name, int len)
+{
+ return 0;
+}
+
+int lookup_vdi(struct cluster_info *cluster,
+ char *filename, uint64_t * oid, uint32_t tag, int do_lock,
+ int *current)
+{
+ struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+ int nr_nodes;
+ int rest, ret;
+ char *buf;
+ struct sheepdog_dir_entry *prv, *ent;
+
+ nr_nodes = build_node_list(&cluster->node_list, entries);
+
+ *current = 0;
+ buf = zalloc(DIR_BUF_LEN);
+ if (!buf)
+ return 1;
+
+ ret = read_object(entries, nr_nodes, cluster->epoch,
+ SD_DIR_OID, buf, DIR_BUF_LEN,
+ sizeof(struct sheepdog_super_block), nr_nodes);
+ if (ret < 0) {
+ ret = SD_RES_DIR_READ;
+ goto out;
+ }
+
+ eprintf("looking for %s %zd, %d\n", filename, strlen(filename), ret);
+
+ ent = (struct sheepdog_dir_entry *)buf;
+ rest = ret;
+ ret = SD_RES_NO_VDI;
+ while (rest > 0) {
+ if (!ent->name_len)
+ break;
+
+ eprintf("%s %d %" PRIu64 "\n", ent->name, ent->name_len,
+ ent->oid);
+
+ if (sheepdog_match(ent, filename, strlen(filename))) {
+ if (ent->tag != tag && tag != -1) {
+ ret = SD_RES_NO_TAG;
+ goto next;
+ }
+ if (ent->tag != tag && !(ent->flags & FLAG_CURRENT)) {
+ /* current vdi must exsit */
+ ret = SD_RES_SYSTEM_ERROR;
+ goto next;
+ }
+
+ *oid = ent->oid;
+ ret = 0;
+
+ if (ent->flags & FLAG_CURRENT)
+ *current = 1;
+ break;
+ }
+next:
+ prv = ent;
+ ent = next_entry(prv);
+ rest -= ((char *)ent - (char *)prv);
+ }
+out:
+ free(buf);
+ return ret;
+}
--
1.5.6.5
More information about the sheepdog
mailing list