[Sheepdog] [PATCH 1/2] add puppy (tiny dog program)

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Thu Nov 12 21:50:42 CET 2009


Puppy is C implementation of dog program.
It use corosync instead of JGroups for the cluster communication,
so JVM is no longer required.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
  puppy/Makefile.in |   31 ++++
  puppy/group.c     |  422 +++++++++++++++++++++++++++++++++++++++++++++++++++++
  puppy/net.c       |  355 ++++++++++++++++++++++++++++++++++++++++++++
  puppy/puppy.c     |  186 +++++++++++++++++++++++
  puppy/puppy.h     |  127 ++++++++++++++++
  puppy/vdi.c       |  256 ++++++++++++++++++++++++++++++++
  6 files changed, 1377 insertions(+), 0 deletions(-)
  create mode 100644 puppy/Makefile.in
  create mode 100644 puppy/group.c
  create mode 100644 puppy/net.c
  create mode 100644 puppy/puppy.c
  create mode 100644 puppy/puppy.h
  create mode 100644 puppy/vdi.c

diff --git a/puppy/Makefile.in b/puppy/Makefile.in
new file mode 100644
index 0000000..8468066
--- /dev/null
+++ b/puppy/Makefile.in
@@ -0,0 +1,31 @@
+CFLAGS += -g -O2 -Wall -Wstrict-prototypes -I../include
+CFLAGS += -D_GNU_SOURCE
+LIBS += -lcrypto -lcpg
+
+PROGRAMS = puppy
+PUPPY_OBJS = puppy.o net.o net.o vdi.o group.o ../lib/event.o ../lib/net.o ../lib/logger.o
+PUPPY_DEP = $(PUPPY_OBJS:.o=.d)
+
+prefix = @prefix@
+exec_prefix = @exec_prefix@
+bindir = @bindir@
+
+.PHONY:all
+all: $(PROGRAMS)
+
+puppy: $(PUPPY_OBJS)
+	$(CC) $^ -o $@ $(LIBS)
+
+-include $(PUPPY_DEP)
+
+%.o: %.c
+	$(CC) -c $(CFLAGS) $*.c -o $*.o
+	@$(CC) -MM $(CFLAGS) -MF $*.d -MT $*.o $*.c
+
+.PHONY:clean
+clean:
+	rm -f *.[od] $(PROGRAMS)
+
+.PHONY:install
+install: $(PROGRAMS)
+	install $< $(bindir)
diff --git a/puppy/group.c b/puppy/group.c
new file mode 100644
index 0000000..982a9b4
--- /dev/null
+++ b/puppy/group.c
@@ -0,0 +1,422 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <corosync/cpg.h>
+
+#include "puppy.h"
+#include "list.h"
+#include "util.h"
+#include "event.h"
+#include "sheepdog_proto.h"
+#include "net.h"
+#include "meta.h"
+#include "logger.h"
+
+static struct vm *lookup_vm(struct list_head *entries, char *name)
+{
+	struct vm *vm;
+
+	list_for_each_entry(vm, entries, list) {
+		if (!strcmp((char *)vm->ent.name, name))
+			return vm;
+	}
+
+	return NULL;
+}
+
+static void group_handler(int listen_fd, int events, void *data)
+{
+	struct cluster_info *ci = data;
+	cpg_dispatch(ci->handle, CPG_DISPATCH_ALL);
+}
+
+struct cluster_info *create_cluster(cpg_handle_t handle, uint32_t this_nodeid,
+				    uint32_t this_pid,
+				    struct sheepdog_node_list_entry *this_node)
+{
+	int fd;
+	struct cluster_info *ci;
+	ci = zalloc(sizeof(*ci));
+	if (!ci)
+		return NULL;
+
+	ci->handle = handle;
+	ci->this_nodeid = this_nodeid;
+	ci->this_pid = this_pid;
+	ci->this_node = *this_node;
+	ci->synchronized = 0;
+	INIT_LIST_HEAD(&ci->node_list);
+	INIT_LIST_HEAD(&ci->vm_list);
+	INIT_LIST_HEAD(&ci->pending_list);
+	cpg_context_set(handle, ci);
+
+	cpg_fd_get(handle, &fd);
+	register_event(fd, group_handler, ci);
+	return ci;
+}
+
+static void print_node_list(struct cluster_info *ci)
+{
+	struct node *node;
+	list_for_each_entry(node, &ci->node_list, list) {
+		dprintf("%c nodeid: %x, pid: %d, ip: %d.%d.%d.%d:%d\n",
+			node_cmp(&node->ent, &ci->this_node) ? ' ' : 'l',
+			node->nodeid, node->pid,
+			node->ent.addr[12], node->ent.addr[13],
+			node->ent.addr[14], node->ent.addr[15], node->ent.port);
+	}
+}
+
+static void add_node(struct cluster_info *ci, uint32_t nodeid, uint32_t pid,
+		     struct sheepdog_node_list_entry *sd_ent)
+{
+	struct node *node;
+
+	node = zalloc(sizeof(*node));
+	if (!node) {
+		eprintf("out of memory\n");
+		return;
+	}
+	node->nodeid = nodeid;
+	node->pid = pid;
+	node->ent = *sd_ent;
+	list_add_tail(&node->list, &ci->node_list);
+	ci->epoch++;
+}
+
+static int is_master(struct cluster_info *ci)
+{
+	struct node *node;
+
+	if (!ci->synchronized)
+		return 0;
+
+	if (list_empty(&ci->node_list))
+		return 1;
+
+	node = list_first_entry(&ci->node_list, struct node, list);
+	if (node_cmp(&node->ent, &ci->this_node) == 0)
+		return 1;
+
+	return 0;
+}
+
+static void join(struct cluster_info *ci, cpg_handle_t handle,
+		 struct join_message *msg)
+{
+	if (!ci->synchronized)
+		return;
+
+	if (!is_master(ci))
+		return;
+
+	struct node *node;
+
+	list_for_each_entry(node, &ci->node_list, list) {
+		msg->nodes[msg->nr_nodes].nodeid = node->nodeid;
+		msg->nodes[msg->nr_nodes].pid = node->pid;
+		msg->nodes[msg->nr_nodes].ent = node->ent;
+		msg->nr_nodes++;
+	}
+}
+
+static void update_cluster_info(struct cluster_info *ci,
+				struct join_message *msg)
+{
+	int i;
+	int nr_nodes = msg->nr_nodes;
+	struct node *node, *e;
+
+	if (ci->synchronized)
+		goto out;
+
+	list_for_each_entry_safe(node, e, &ci->node_list, list) {
+		list_del(&node->list);
+		free(node);
+	}
+
+	INIT_LIST_HEAD(&ci->node_list);
+	for (i = 0; i < nr_nodes; i++)
+		add_node(ci, msg->nodes[i].nodeid, msg->nodes[i].pid,
+			 &msg->nodes[i].ent);
+
+	ci->synchronized = 1;
+
+out:
+	add_node(ci, msg->nodeid, msg->pid, &msg->header.from);
+	print_node_list(ci);
+}
+
+static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg)
+{
+	const struct sd_vdi_req *hdr = &msg->req;
+	struct sd_vdi_rsp *rsp = &msg->rsp;
+	void *data = msg->data;
+	int ret = SD_RES_SUCCESS, is_current;
+	uint64_t oid = 0;
+	struct sheepdog_super_block *sb;
+	struct timeval tv;
+	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+	int nr_nodes;
+
+	switch (hdr->opcode) {
+	case SD_OP_NEW_VDI:
+		ret = add_vdi(ci, data, strlen(data), hdr->vdi_size, &oid,
+			      hdr->base_oid, hdr->tag);
+		break;
+	case SD_OP_LOCK_VDI:
+	case SD_OP_GET_VDI_INFO:
+		ret = lookup_vdi(ci, data, &oid, hdr->tag, 1, &is_current);
+		if (ret < 0)
+			break;
+		if (is_current)
+			rsp->flags = SD_VDI_RSP_FLAG_CURRENT;
+		break;
+	case SD_OP_RELEASE_VDI:
+		break;
+	case SD_OP_MAKE_FS:
+		sb = zalloc(sizeof(*sb));
+		if (!sb) {
+			ret = -1;
+			break;
+		}
+		gettimeofday(&tv, NULL);
+		sb->ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+		sb->default_nr_copies = 3;
+
+		nr_nodes = build_node_list(&ci->node_list, entries);
+		ret = write_object(entries, nr_nodes, ci->epoch,
+				   SD_DIR_OID, (char *)sb, sizeof(*sb), 0,
+				   sb->default_nr_copies, 1);
+		break;
+	case SD_OP_UPDATE_EPOCH:
+		break;
+	case SD_OP_GET_EPOCH:
+		rsp->epoch = 1;
+		break;
+	default:
+		ret = SD_RES_SYSTEM_ERROR;
+		eprintf("opcode %d is not implemented\n", hdr->opcode);
+		break;
+	}
+
+	rsp->oid = oid;
+	rsp->result = ret;
+}
+
+static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg)
+{
+	const struct sd_vdi_req *hdr = &msg->req;
+	struct sd_vdi_rsp *rsp = &msg->rsp;
+	void *data = msg->data;
+	struct vm *vm;
+	struct request *req;
+	int ret = msg->rsp.result;
+
+	switch (hdr->opcode) {
+	case SD_OP_NEW_VDI:
+		break;
+	case SD_OP_LOCK_VDI:
+		if (lookup_vm(&ci->vm_list, (char *)data)) {
+			ret = SD_RES_VDI_LOCKED;
+			break;
+		}
+
+		vm = zalloc(sizeof(*vm));
+		if (!vm) {
+			ret = SD_RES_UNKNOWN;
+			break;
+		}
+		strcpy((char *)vm->ent.name, (char *)data);
+		memcpy(vm->ent.host_addr, msg->header.from.addr,
+		       sizeof(vm->ent.host_addr));
+		vm->ent.host_port = msg->header.from.port;
+
+		list_add(&vm->list, &ci->vm_list);
+		break;
+	case SD_OP_RELEASE_VDI:
+		vm = lookup_vm(&ci->vm_list, (char *)data);
+		if (!vm) {
+			ret = SD_RES_VDI_NOT_LOCKED;
+			break;
+		}
+
+		list_del(&vm->list);
+		break;
+	case SD_OP_GET_VDI_INFO:
+		break;
+	case SD_OP_UPDATE_EPOCH:
+		break;
+	case SD_OP_GET_EPOCH:
+		break;
+	case SD_OP_MAKE_FS:
+		break;
+	default:
+		eprintf("unknown operation %d\n", hdr->opcode);
+		ret = SD_RES_UNKNOWN;
+	}
+
+	if (node_cmp(&ci->this_node, &msg->header.from) != 0)
+		return;
+
+	req = list_first_entry(&ci->pending_list, struct request, pending_list);
+
+	rsp->result = ret;
+	memcpy(req->data, data, rsp->data_length);
+	memcpy(&req->rp, rsp, sizeof(req->rp));
+	list_del(&req->pending_list);
+	req->done(req);
+}
+
+void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
+		uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+{
+	struct cluster_info *ci;
+	struct message_header *m = msg;
+
+	dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n",
+		m->op, m->done, m->msg_length,
+		m->from.addr[12], m->from.addr[13],
+		m->from.addr[14], m->from.addr[15], m->from.port);
+
+	cpg_context_get(handle, (void **)&ci);
+	if (!m->done) {
+		if (!is_master(ci))
+			return;
+
+		switch (m->op) {
+		case SD_MSG_JOIN:
+			join(ci, handle, msg);
+			break;
+		case SD_MSG_VDI_OP:
+			vdi_op(ci, msg);
+			break;
+		default:
+			eprintf("unknown message %d\n", m->op);
+			break;
+		}
+
+		m->done = 1;
+		send_message(handle, m);
+	} else {
+		switch (m->op) {
+		case SD_MSG_JOIN:
+			update_cluster_info(ci, msg);
+			break;
+		case SD_MSG_VDI_OP:
+			vdi_op_done(ci, msg);
+			break;
+		default:
+			eprintf("unknown message %d\n", m->op);
+			break;
+		}
+	}
+}
+
+void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
+	       const struct cpg_address *member_list,
+	       size_t member_list_entries, const struct cpg_address *left_list,
+	       size_t left_list_entries, const struct cpg_address *joined_list,
+	       size_t joined_list_entries)
+{
+	struct cluster_info *ci;
+	struct node *node, *e;
+	int i;
+
+	dprintf("confchg nodeid %x\n", member_list[0].nodeid);
+	dprintf("%ld %ld %ld\n", member_list_entries, left_list_entries,
+		joined_list_entries);
+	for (i = 0; i < member_list_entries; i++) {
+		dprintf("[%d] node_id: %d, pid: %d, reason: %d\n", i,
+			member_list[i].nodeid, member_list[i].pid,
+			member_list[i].reason);
+	}
+
+	cpg_context_get(handle, (void **)&ci);
+
+	if (member_list_entries == joined_list_entries - left_list_entries &&
+	    ci->this_nodeid == member_list[0].nodeid &&
+	    ci->this_pid == member_list[0].pid)
+		ci->synchronized = 1;
+
+	for (i = 0; i < left_list_entries; i++) {
+		list_for_each_entry_safe(node, e, &ci->node_list, list) {
+			if (node->nodeid != left_list[i].nodeid ||
+			    node->pid != left_list[i].pid)
+				continue;
+
+			list_del(&node->list);
+			free(node);
+			ci->epoch++;
+		}
+	}
+
+	for (i = 0; i < joined_list_entries; i++) {
+		if (ci->this_nodeid == joined_list[0].nodeid &&
+		    ci->this_pid == joined_list[0].pid) {
+			struct join_message msg;
+
+			msg.header.op = SD_MSG_JOIN;
+			msg.header.done = 0;
+			msg.header.msg_length = sizeof(msg);
+			msg.nodeid = ci->this_nodeid;
+			msg.pid = ci->this_pid;
+			msg.header.from = ci->this_node;
+
+			send_message(ci->handle, (struct message_header *)&msg);
+		}
+	}
+
+	if (left_list_entries == 0)
+		return;
+
+	print_node_list(ci);
+}
+
+int send_message(cpg_handle_t handle, struct message_header *msg)
+{
+	struct iovec iov;
+	int ret;
+
+	iov.iov_base = msg;
+	iov.iov_len = msg->msg_length;
+retry:
+	ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1);
+	switch (ret) {
+	case CS_OK:
+		break;
+	case CS_ERR_TRY_AGAIN:
+		dprintf("failed to send message. try again\n");
+		sleep(1);
+		goto retry;
+	default:
+		eprintf("failed to send message, %d\n", ret);
+		return -1;
+	}
+	return 0;
+}
+
+int build_node_list(struct list_head *node_list,
+		    struct sheepdog_node_list_entry *entries)
+{
+	struct node *node;
+	int nr = 0;
+
+	list_for_each_entry(node, node_list, list) {
+		if (entries)
+			memcpy(entries + nr, &node->ent, sizeof(*entries));
+		nr++;
+	}
+	if (entries)
+		qsort(entries, nr, sizeof(*entries), node_cmp);
+
+	return nr;
+}
+
+int node_cmp(const void *a, const void *b)
+{
+	const struct sheepdog_node_list_entry *node1 = a;
+	const struct sheepdog_node_list_entry *node2 = b;
+	return memcmp(node1->id, node2->id, sizeof(node1->id));
+}
diff --git a/puppy/net.c b/puppy/net.c
new file mode 100644
index 0000000..f011acf
--- /dev/null
+++ b/puppy/net.c
@@ -0,0 +1,355 @@
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include "sheepdog_proto.h"
+#include "util.h"
+#include "event.h"
+#include "net.h"
+#include "list.h"
+#include "puppy.h"
+#include "meta.h"
+#include "logger.h"
+
+static int get_node_idx(struct sheepdog_node_list_entry *ent,
+			struct sheepdog_node_list_entry *entries, int nr_nodes)
+{
+	ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
+	if (!ent)
+		return -1;
+
+	return ent - entries;
+}
+
+static void get_node_list(struct cluster_info *cluster, struct sd_node_req *req,
+			  struct sd_node_rsp *rsp, void *data)
+{
+	int nr_nodes;
+	struct node *node;
+
+	nr_nodes = build_node_list(&cluster->node_list, data);
+	rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
+	rsp->nr_nodes = nr_nodes;
+	rsp->local_idx = get_node_idx(&cluster->this_node, data, nr_nodes);
+
+	if (list_empty(&cluster->node_list)) {
+		rsp->master_idx = -1;
+		return;
+	}
+	node = list_first_entry(&cluster->node_list, struct node, list);
+	rsp->master_idx = get_node_idx(&node->ent, data, nr_nodes);
+}
+
+static void get_vm_list(struct cluster_info *cluster, struct sd_rsp *rsp,
+			void *data)
+{
+	int nr_vms;
+	struct vm *vm;
+
+	struct sheepdog_vm_list_entry *p = data;
+	list_for_each_entry(vm, &cluster->vm_list, list) {
+		*p++ = vm->ent;
+	}
+
+	nr_vms = p - (struct sheepdog_vm_list_entry *)data;
+	rsp->data_length = nr_vms * sizeof(struct sheepdog_vm_list_entry);
+}
+
+void queue_request(struct request *req)
+{
+	struct sd_req *hdr = (struct sd_req *)&req->rq;
+	struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
+	struct cluster_info *cluster = req->ci->cluster;
+	struct vdi_op_message *msg;
+	int ret = SD_RES_SUCCESS;
+
+	eprintf("%p %x\n", req, hdr->opcode);
+
+	switch (hdr->opcode) {
+	case SD_OP_GET_NODE_LIST:
+		get_node_list(cluster, (struct sd_node_req *)hdr,
+			      (struct sd_node_rsp *)rsp, req->data);
+		break;
+	case SD_OP_GET_VM_LIST:
+		get_vm_list(cluster, rsp, req->data);
+		break;
+	default:
+		/* forward request to group */
+		goto forward;
+	}
+
+	rsp->result = ret;
+	req->done(req);
+	return;
+
+forward:
+	msg = zalloc(sizeof(*msg) + hdr->data_length);
+	if (!msg) {
+		eprintf("out of memory\n");
+		return;
+	}
+
+	msg->header.op = SD_MSG_VDI_OP;
+	msg->header.done = 0;
+	msg->header.msg_length = sizeof(*msg) + hdr->data_length;
+	msg->header.from = cluster->this_node;
+	msg->req = *((struct sd_vdi_req *)&req->rq);
+	msg->rsp = *((struct sd_vdi_rsp *)&req->rp);
+	if (hdr->flags & SD_FLAG_CMD_WRITE)
+		memcpy(msg->data, req->data, hdr->data_length);
+
+	list_add(&req->pending_list, &cluster->pending_list);
+	send_message(cluster->handle, (struct message_header *)msg);
+}
+
+static struct request *alloc_request(struct client_info *ci, int data_length)
+{
+	struct request *req;
+
+	req = zalloc(sizeof(struct request) + data_length);
+	if (!req)
+		return NULL;
+
+	req->ci = ci;
+	if (data_length)
+		req->data = (char *)req + sizeof(*req);
+
+	list_add(&req->r_siblings, &ci->reqs);
+	INIT_LIST_HEAD(&req->r_wlist);
+
+	return req;
+}
+
+static void free_request(struct request *req)
+{
+	list_del(&req->r_siblings);
+	free(req);
+}
+
+static void init_rx_hdr(struct client_info *ci)
+{
+	ci->conn.c_rx_state = C_IO_HEADER;
+	ci->rx_req = NULL;
+	ci->conn.rx_length = sizeof(struct sd_req);
+	ci->conn.rx_buf = &ci->conn.rx_hdr;
+}
+
+static void req_done(struct request *req)
+{
+	list_add(&req->r_wlist, &req->ci->done_reqs);
+	conn_tx_on(&req->ci->conn);
+}
+
+static void client_rx_handler(struct client_info *ci)
+{
+	int ret;
+	uint64_t data_len;
+	struct connection *conn = &ci->conn;
+	struct sd_req *hdr = &conn->rx_hdr;
+	struct request *req;
+
+	switch (conn->c_rx_state) {
+	case C_IO_HEADER:
+		ret = rx(conn, C_IO_DATA_INIT);
+		if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
+			break;
+	case C_IO_DATA_INIT:
+		data_len = hdr->data_length;
+
+		req = alloc_request(ci, data_len);
+		if (!req) {
+			conn->c_rx_state = C_IO_CLOSED;
+			break;
+		}
+		ci->rx_req = req;
+
+		/* use le_to_cpu */
+		memcpy(&req->rq, hdr, sizeof(req->rq));
+
+		if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
+			conn->c_rx_state = C_IO_DATA;
+			conn->rx_length = data_len;
+			conn->rx_buf = req->data;
+		} else {
+			conn->c_rx_state = C_IO_END;
+			break;
+		}
+	case C_IO_DATA:
+		ret = rx(conn, C_IO_END);
+		break;
+	default:
+		eprintf("BUG: unknown state %d\n", conn->c_rx_state);
+	}
+
+	if (is_conn_dead(conn) || conn->c_rx_state != C_IO_END)
+		return;
+
+	/* now we have a complete command */
+
+	req = ci->rx_req;
+
+	init_rx_hdr(ci);
+
+	if (hdr->flags & SD_FLAG_CMD_WRITE)
+		req->rp.data_length = 0;
+	else
+		req->rp.data_length = hdr->data_length;
+
+	req->done = req_done;
+
+	queue_request(req);
+}
+
+static void init_tx_hdr(struct client_info *ci)
+{
+	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+	struct request *req;
+
+	if (ci->tx_req || list_empty(&ci->done_reqs))
+		return;
+
+	memset(rsp, 0, sizeof(*rsp));
+
+	req = list_first_entry(&ci->done_reqs, struct request, r_wlist);
+	list_del(&req->r_wlist);
+
+	ci->tx_req = req;
+	ci->conn.tx_length = sizeof(*rsp);
+	ci->conn.c_tx_state = C_IO_HEADER;
+	ci->conn.tx_buf = rsp;
+
+	memcpy(rsp, &req->rp, sizeof(*rsp));
+
+	rsp->epoch = ci->cluster->epoch;
+	rsp->opcode = req->rq.opcode;
+	rsp->id = req->rq.id;
+}
+
+static void client_tx_handler(struct client_info *ci)
+{
+	int ret;
+	struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+
+again:
+	init_tx_hdr(ci);
+	if (!ci->tx_req) {
+		conn_tx_off(&ci->conn);
+		return;
+	}
+
+	switch (ci->conn.c_tx_state) {
+	case C_IO_HEADER:
+		if (rsp->data_length)
+			ret = tx(&ci->conn, C_IO_DATA_INIT, MSG_MORE);
+		else
+			ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
+
+		if (!ret)
+			break;
+
+		if (rsp->data_length) {
+			ci->conn.tx_length = rsp->data_length;
+			ci->conn.tx_buf = ci->tx_req->data;
+			ci->conn.c_tx_state = C_IO_DATA;
+		} else {
+			ci->conn.c_tx_state = C_IO_END;
+			break;
+		}
+	case C_IO_DATA:
+		ret = tx(&ci->conn, C_IO_END, 0);
+		if (!ret)
+			break;
+	default:
+		break;
+	}
+
+	if (is_conn_dead(&ci->conn) || ci->conn.c_tx_state != C_IO_END)
+		return;
+
+	if (ci->conn.c_tx_state == C_IO_END) {
+		free_request(ci->tx_req);
+		ci->tx_req = NULL;
+		goto again;
+	}
+}
+
+static struct client_info *create_client(int fd, struct cluster_info *cluster)
+{
+	struct client_info *ci;
+
+	ci = zalloc(sizeof(*ci));
+	if (!ci)
+		return NULL;
+
+	ci->conn.fd = fd;
+
+	ci->conn.c_rx_state = C_IO_HEADER;
+	ci->conn.rx_length = sizeof(struct sd_req);
+	ci->conn.rx_buf = &ci->conn.rx_hdr;
+
+	INIT_LIST_HEAD(&ci->reqs);
+	INIT_LIST_HEAD(&ci->done_reqs);
+
+	ci->cluster = cluster;
+
+	return ci;
+}
+
+static void destroy_client(struct client_info *ci)
+{
+	close(ci->conn.fd);
+	free(ci);
+}
+
+static void client_handler(int fd, int events, void *data)
+{
+	struct client_info *ci = (struct client_info *)data;
+
+	if (events & EPOLLIN)
+		client_rx_handler(ci);
+
+	if (!is_conn_dead(&ci->conn) && events & EPOLLOUT)
+		client_tx_handler(ci);
+
+	if (is_conn_dead(&ci->conn)) {
+		unregister_event(fd);
+		destroy_client(ci);
+	}
+}
+
+void listen_handler(int listen_fd, int events, void *data)
+{
+	struct sockaddr_storage from;
+	socklen_t namesize;
+	int fd, ret;
+	struct client_info *ci;
+
+	namesize = sizeof(from);
+	fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
+	if (fd < 0) {
+		eprintf("can't accept a new connection, %m\n");
+		return;
+	}
+
+	ci = create_client(fd, data);
+	if (!ci) {
+		close(fd);
+		return;
+	}
+
+	ret = register_event(fd, client_handler, ci);
+	if (ret) {
+		destroy_client(ci);
+		return;
+	}
+}
diff --git a/puppy/puppy.c b/puppy/puppy.c
new file mode 100644
index 0000000..1401f19
--- /dev/null
+++ b/puppy/puppy.c
@@ -0,0 +1,186 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <errno.h>
+#include <getopt.h>
+#include <openssl/sha.h>
+#include <corosync/cpg.h>
+
+#include "puppy.h"
+#include "list.h"
+#include "util.h"
+#include "event.h"
+#include "sheepdog_proto.h"
+#include "net.h"
+#include "logger.h"
+
+static char program_name[] = "puppy";
+
+static int sheepport = SHEEP_LISTEN_PORT;
+
+static struct option const long_options[] = {
+	{"dport", required_argument, 0, 'D'},
+	{"sport", required_argument, 0, 's'},
+	{"foreground", no_argument, 0, 'f'},
+	{"debug", no_argument, 0, 'd'},
+	{"help", no_argument, 0, 'h'},
+	{0, 0, 0, 0},
+};
+
+static char *short_options = "D:s:fdh";
+
+static void usage(int status)
+{
+	if (status)
+		fprintf(stderr, "Try `%s --help' for more information.\n",
+			program_name);
+	else {
+		printf("Usage: %s [OPTION]\n", program_name);
+		printf("\
+Sheepdog cluster monitor\n\
+  -D, --dport             specify the dog (our) listen port number\n\
+  -s, --sport             specify the sheep listen port number\n\
+  -f, --foreground        make the program run in the foreground\n\
+  -d, --debug             print debug messages\n\
+  -h, --help              display this help and exit\n\
+");
+	}
+	exit(status);
+}
+
+static int create_listen_ports_fn(int fd, void *data)
+{
+	return register_event(fd, listen_handler, data);
+}
+
+int main(int argc, char **argv)
+{
+	int ch, longindex;
+	int ret, port = DOG_LISTEN_PORT;
+	int is_daemon = 1;
+	int is_debug = 0;
+	cpg_handle_t cpg_handle = 0;
+	SHA_CTX ctx;
+	struct cluster_info *ci;
+	cpg_callbacks_t cb = { &sd_deliver, &sd_confch };
+	unsigned int nodeid = 0;
+	struct cpg_name group = { 8, "sheepdog" };
+	struct sheepdog_node_list_entry ent;
+	char name[INET6_ADDRSTRLEN];
+	struct addrinfo hints, *res;
+
+	while ((ch = getopt_long(argc, argv, short_options, long_options,
+				 &longindex)) >= 0) {
+		switch (ch) {
+		case 'D':
+			port = atoi(optarg);
+			break;
+		case 's':
+			sheepport = atoi(optarg);
+			break;
+		case 'f':
+			is_daemon = 0;
+			break;
+		case 'd':
+			is_debug = 1;
+			break;
+		case 'h':
+			usage(0);
+			break;
+		default:
+			usage(1);
+			break;
+		}
+	}
+
+	ret = log_init(program_name, LOG_SPACE_SIZE, is_daemon, is_debug);
+	if (ret)
+		exit(1);
+
+	if (is_daemon && daemon(0, 0))
+		exit(1);
+
+	ret = init_event(1024);
+	if (ret)
+		exit(1);
+
+	ret = cpg_initialize(&cpg_handle, &cb);
+	if (ret != CS_OK) {
+		eprintf("Failed to initialize cpg, %d\n", ret);
+		eprintf("Is corosync running?\n");
+		return -1;
+	}
+
+join_retry:
+	ret = cpg_join(cpg_handle, &group);
+	switch (ret) {
+	case CS_OK:
+		break;
+	case CS_ERR_TRY_AGAIN:
+		dprintf("Failed to join the sheepdog group, try again\n");
+		sleep(1);
+		goto join_retry;
+	case CS_ERR_SECURITY:
+		eprintf("Permission error.\n");
+		exit(1);
+	default:
+		eprintf("Failed to join the sheepdog group, %d\n", ret);
+		exit(1);
+		break;
+	}
+
+	ret = cpg_local_get(cpg_handle, &nodeid);
+	if (ret != CS_OK) {
+		eprintf("Failed to get the local node's identifier, %d\n", ret);
+		exit(1);
+	}
+
+	memset(&ent, 0, sizeof(ent));
+
+	gethostname(name, sizeof(name));
+
+	memset(&hints, 0, sizeof(hints));
+
+	hints.ai_socktype = SOCK_STREAM;
+	ret = getaddrinfo(name, NULL, &hints, &res);
+	if (ret)
+		exit(1);
+
+	if (res->ai_family == AF_INET) {
+		struct sockaddr_in *addr = (struct sockaddr_in *)res->ai_addr;
+		memset(ent.addr, 0, sizeof(ent.addr));
+		memcpy(ent.addr + 12, &addr->sin_addr, 4);
+	} else if (res->ai_family == AF_INET6) {
+		struct sockaddr_in6 *addr = (struct sockaddr_in6 *)res->ai_addr;
+		memcpy(ent.addr, &addr->sin6_addr, 16);
+	} else {
+		eprintf("unknown address family\n");
+		exit(1);
+	}
+
+	freeaddrinfo(res);
+
+	ent.port = sheepport;
+
+	SHA1_Init(&ctx);
+	SHA1_Update(&ctx, ent.addr, sizeof(ent.addr));
+	SHA1_Update(&ctx, &ent.port, sizeof(ent.port));
+	SHA1_Final(ent.id, &ctx);
+
+	ci = create_cluster(cpg_handle, nodeid, getpid(), &ent);
+	if (!ci) {
+		eprintf("failed to create sheepdog cluster.\n");
+		exit(1);
+	}
+
+	ret = create_listen_ports(port, create_listen_ports_fn, ci);
+	if (ret)
+		exit(1);
+
+	event_loop(-1);
+
+	cpg_finalize(cpg_handle);
+	return 0;
+}
diff --git a/puppy/puppy.h b/puppy/puppy.h
new file mode 100644
index 0000000..14f0f46
--- /dev/null
+++ b/puppy/puppy.h
@@ -0,0 +1,127 @@
+#include <corosync/cpg.h>
+
+#include "sheepdog_proto.h"
+#include "net.h"
+#include "list.h"
+
+#define SD_MSG_JOIN             0x01
+#define SD_MSG_VDI_OP           0x02
+#define SD_MSG_MASTER_CHANGED   0x03
+
+/* too much duplication */
+
+struct client_info {
+	struct connection conn;
+
+	char hostname[INET6_ADDRSTRLEN];
+	int port;
+
+	struct request *rx_req;
+
+	struct request *tx_req;
+
+	struct list_head reqs;
+	struct list_head done_reqs;
+
+	struct cluster_info *cluster;
+};
+
+struct request;
+
+typedef void (*req_end_t) (struct request *);
+
+struct request {
+	struct sd_req rq;
+	struct sd_rsp rp;
+
+	void *data;
+
+	struct client_info *ci;
+	struct list_head r_siblings;
+	struct list_head r_wlist;
+	struct list_head pending_list;
+
+	req_end_t done;
+};
+
+struct vm {
+	struct sheepdog_vm_list_entry ent;
+	struct list_head list;
+};
+
+struct node {
+	uint32_t nodeid;
+	uint32_t pid;
+	struct sheepdog_node_list_entry ent;
+	struct list_head list;
+};
+
+struct cluster_info {
+	cpg_handle_t handle;
+	int synchronized;
+	uint32_t this_nodeid;
+	uint32_t this_pid;
+	struct sheepdog_node_list_entry this_node;
+
+	uint64_t epoch;
+
+	struct list_head node_list;
+	struct list_head vm_list;
+	struct list_head pending_list;
+};
+
+struct message_header {
+	uint8_t op;
+	uint8_t done;
+	uint8_t pad[2];
+	uint32_t msg_length;
+	struct sheepdog_node_list_entry from;
+};
+
+struct join_message {
+	struct message_header header;
+	uint32_t nodeid;
+	uint32_t pid;
+	uint32_t nr_nodes;
+	struct sheepdog_node_list_entry master_node;
+	struct {
+		uint32_t nodeid;
+		uint32_t pid;
+		struct sheepdog_node_list_entry ent;
+	} nodes[SD_MAX_NODES];
+};
+
+struct vdi_op_message {
+	struct message_header header;
+	struct sd_vdi_req req;
+	struct sd_vdi_rsp rsp;
+	uint8_t data[0];
+};
+
+int add_vdi(struct cluster_info *cluster,
+	    char *name, int len, uint64_t size, uint64_t * added_oid,
+	    uint64_t base_oid, uint32_t tag);
+
+int lookup_vdi(struct cluster_info *cluster, char *filename, uint64_t * oid,
+	       uint32_t tag, int do_lock, int *current);
+
+int build_node_list(struct list_head *node_list,
+		    struct sheepdog_node_list_entry *entries);
+int send_message(cpg_handle_t handle, struct message_header *msg);
+int node_cmp(const void *a, const void *b);
+
+void listen_handler(int listen_fd, int events, void *data);
+struct cluster_info *create_cluster(cpg_handle_t handle,
+				    uint32_t this_nodeid,
+				    uint32_t this_pid,
+				    struct sheepdog_node_list_entry
+				    *this_node);
+void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
+		uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
+void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
+	       const struct cpg_address *member_list,
+	       size_t member_list_entries,
+	       const struct cpg_address *left_list,
+	       size_t left_list_entries,
+	       const struct cpg_address *joined_list,
+	       size_t joined_list_entries);
diff --git a/puppy/vdi.c b/puppy/vdi.c
new file mode 100644
index 0000000..82232e0
--- /dev/null
+++ b/puppy/vdi.c
@@ -0,0 +1,256 @@
+#include <errno.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <openssl/sha.h>
+
+#include "meta.h"
+#include "util.h"
+#include "list.h"
+#include "sheepdog_proto.h"
+#include "puppy.h"
+#include "net.h"
+#include "logger.h"
+
+static int sheepdog_match(struct sheepdog_dir_entry *ent, char *name, int len)
+{
+	if (!ent->name_len)
+		return 0;
+	if (ent->name_len != len)
+		return 0;
+	return !memcmp(ent->name, name, len);
+}
+
+/* TODO: should be performed atomically */
+static int create_inode_obj(struct sheepdog_node_list_entry *entries,
+			    int nr_nodes, uint64_t epoch, int copies,
+			    uint64_t oid, uint64_t size, uint64_t base_oid)
+{
+	struct sheepdog_inode inode, base;
+	struct timeval tv;
+	int ret;
+
+	if (base_oid) {
+		ret = read_object(entries, nr_nodes, epoch,
+				  base_oid, (char *)&base, sizeof(base), 0,
+				  copies);
+		if (ret < 0)
+			return SD_RES_BASE_VDI_READ;
+	}
+
+	gettimeofday(&tv, NULL);
+
+	memset(&inode, 0, sizeof(inode));
+
+	inode.oid = oid;
+	inode.vdi_size = size;
+	inode.block_size = SD_DATA_OBJ_SIZE;
+	inode.ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+	inode.nr_copies = copies;
+
+	if (base_oid) {
+		int i;
+
+		eprintf("%zd %zd\n", sizeof(inode.data_oid),
+			ARRAY_SIZE(base.child_oid));
+		inode.parent_oid = base_oid;
+		memcpy(inode.data_oid, base.data_oid,
+		       MAX_DATA_OBJS * sizeof(uint64_t));
+
+		for (i = 0; i < ARRAY_SIZE(base.child_oid); i++) {
+			if (!base.child_oid[i]) {
+				base.child_oid[i] = oid;
+				break;
+			}
+		}
+
+		if (i == ARRAY_SIZE(base.child_oid))
+			return SD_RES_NO_BASE_VDI;
+
+		ret = write_object(entries, nr_nodes,
+				   epoch, base_oid, (char *)&base,
+				   sizeof(base), 0, copies, 0);
+		if (ret < 0)
+			return SD_RES_BASE_VDI_WRITE;
+	}
+
+	ret = write_object(entries, nr_nodes, epoch,
+			   oid, (char *)&inode, sizeof(inode), 0, copies, 1);
+	if (ret < 0)
+		return SD_RES_VDI_WRITE;
+
+	return ret;
+}
+
+#define DIR_BUF_LEN (UINT64_C(1) << 20)
+
+/*
+ * TODO: handle larger buffer
+ */
+int add_vdi(struct cluster_info *cluster, char *name, int len, uint64_t size,
+	    uint64_t *added_oid, uint64_t base_oid, uint32_t tag)
+{
+	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+	int nr_nodes;
+	struct sheepdog_dir_entry *prv, *ent;
+	uint64_t oid = 0;
+	char *buf;
+	int ret, rest;
+	struct sheepdog_super_block *sb;
+	int copies;
+
+	nr_nodes = build_node_list(&cluster->node_list, entries);
+
+	eprintf("%s (%d) %" PRIu64 ", base: %" PRIu64 "\n", name, len, size,
+		base_oid);
+
+	buf = zalloc(DIR_BUF_LEN);
+	if (!buf)
+		return 1;
+
+	ret = read_object(entries, nr_nodes, cluster->epoch,
+			  SD_DIR_OID, buf, DIR_BUF_LEN, 0, nr_nodes);
+	if (ret < 0) {
+		ret = SD_RES_DIR_READ;
+		goto out;
+	}
+
+	sb = (struct sheepdog_super_block *)buf;
+	copies = sb->default_nr_copies;
+
+	ret = read_object(entries, nr_nodes, cluster->epoch,
+			  SD_DIR_OID, buf, DIR_BUF_LEN, sizeof(*sb), nr_nodes);
+	if (ret < 0) {
+		ret = SD_RES_DIR_READ;
+		goto out;
+	}
+
+	ent = (struct sheepdog_dir_entry *)buf;
+	rest = ret;
+	while (rest > 0) {
+		if (!ent->name_len)
+			break;
+
+		if (sheepdog_match(ent, name, len) && !tag) {
+			ret = SD_RES_VDI_EXIST;
+			goto out;
+		}
+		oid = ent->oid;
+		prv = ent;
+		ent = next_entry(prv);
+		rest -= ((char *)ent - (char *)prv);
+	}
+
+	/* need to check if the buffer is large enough here. */
+	oid += (1 << 18);
+
+	ret = create_inode_obj(entries, nr_nodes, cluster->epoch, copies,
+			       oid, size, base_oid);
+	if (ret)
+		goto out;
+
+	ent->oid = oid;
+	ent->tag = tag;
+
+	ent->flags = FLAG_CURRENT;
+	ent->name_len = len;
+	memcpy(ent->name, name, len);
+
+	if (tag) {
+		struct sheepdog_dir_entry *e = (struct sheepdog_dir_entry *)buf;
+
+		while (e < ent) {
+			if (sheepdog_match(e, name, len))
+				e->flags &= ~FLAG_CURRENT;
+			e = next_entry(e);
+		}
+	}
+
+	ent = next_entry(ent);
+
+	ret = write_object(entries, nr_nodes, cluster->epoch,
+			   SD_DIR_OID, buf, (char *)ent - buf, sizeof(*sb),
+			   copies, 0);
+	if (ret) {
+		ret = SD_RES_DIR_WRITE;
+		goto out;
+	}
+
+	*added_oid = oid;
+out:
+	free(buf);
+
+	return ret;
+}
+
+int del_vdi(struct cluster_info *cluster, char *name, int len)
+{
+	return 0;
+}
+
+int lookup_vdi(struct cluster_info *cluster,
+	       char *filename, uint64_t * oid, uint32_t tag, int do_lock,
+	       int *current)
+{
+	struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+	int nr_nodes;
+	int rest, ret;
+	char *buf;
+	struct sheepdog_dir_entry *prv, *ent;
+
+	nr_nodes = build_node_list(&cluster->node_list, entries);
+
+	*current = 0;
+	buf = zalloc(DIR_BUF_LEN);
+	if (!buf)
+		return 1;
+
+	ret = read_object(entries, nr_nodes, cluster->epoch,
+			  SD_DIR_OID, buf, DIR_BUF_LEN,
+			  sizeof(struct sheepdog_super_block), nr_nodes);
+	if (ret < 0) {
+		ret = SD_RES_DIR_READ;
+		goto out;
+	}
+
+	eprintf("looking for %s %zd, %d\n", filename, strlen(filename), ret);
+
+	ent = (struct sheepdog_dir_entry *)buf;
+	rest = ret;
+	ret = SD_RES_NO_VDI;
+	while (rest > 0) {
+		if (!ent->name_len)
+			break;
+
+		eprintf("%s %d %" PRIu64 "\n", ent->name, ent->name_len,
+			ent->oid);
+
+		if (sheepdog_match(ent, filename, strlen(filename))) {
+			if (ent->tag != tag && tag != -1) {
+				ret = SD_RES_NO_TAG;
+				goto next;
+			}
+			if (ent->tag != tag && !(ent->flags & FLAG_CURRENT)) {
+				/* current vdi must exsit */
+				ret = SD_RES_SYSTEM_ERROR;
+				goto next;
+			}
+
+			*oid = ent->oid;
+			ret = 0;
+
+			if (ent->flags & FLAG_CURRENT)
+				*current = 1;
+			break;
+		}
+next:
+		prv = ent;
+		ent = next_entry(prv);
+		rest -= ((char *)ent - (char *)prv);
+	}
+out:
+	free(buf);
+	return ret;
+}
-- 
1.5.6.5




More information about the sheepdog mailing list