[Sheepdog] [PATCH 2/6] collie: add cluster manager
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Tue Dec 1 19:35:19 CET 2009
This is originally a part of dog (puppy).
Cluster communication and VDI manipulation are supported.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
collie/group.c | 761 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
collie/vdi.c | 259 +++++++++++++++++++
2 files changed, 1020 insertions(+), 0 deletions(-)
create mode 100644 collie/group.c
create mode 100644 collie/vdi.c
diff --git a/collie/group.c b/collie/group.c
new file mode 100644
index 0000000..a718afd
--- /dev/null
+++ b/collie/group.c
@@ -0,0 +1,761 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <sys/time.h>
+#include <corosync/cpg.h>
+
+#include "sheepdog_proto.h"
+#include "collie.h"
+#include "list.h"
+#include "util.h"
+#include "meta.h"
+#include "logger.h"
+#include "work.h"
+
+struct vm {
+ struct sheepdog_vm_list_entry ent;
+ struct list_head list;
+};
+
+struct node {
+ uint32_t nodeid;
+ uint32_t pid;
+ struct sheepdog_node_list_entry ent;
+ struct list_head list;
+};
+
+struct message_header {
+ uint8_t op;
+ uint8_t done;
+ uint8_t pad[2];
+ uint32_t msg_length;
+ struct sheepdog_node_list_entry from;
+};
+
+struct join_message {
+ struct message_header header;
+ uint32_t nodeid;
+ uint32_t pid;
+ struct sheepdog_node_list_entry master_node;
+ uint32_t epoch;
+ uint32_t nr_nodes;
+ struct {
+ uint32_t nodeid;
+ uint32_t pid;
+ struct sheepdog_node_list_entry ent;
+ } nodes[SD_MAX_NODES];
+};
+
+struct vdi_op_message {
+ struct message_header header;
+ struct sd_vdi_req req;
+ struct sd_vdi_rsp rsp;
+ uint8_t data[0];
+};
+
+struct work_deliver {
+ struct message_header *msg;
+
+ struct cluster_info *ci;
+ struct work work;
+};
+
+struct work_confch {
+ struct cpg_address *member_list;
+ size_t member_list_entries;
+ struct cpg_address *left_list;
+ size_t left_list_entries;
+ struct cpg_address *joined_list;
+ size_t joined_list_entries;
+
+ struct cluster_info *ci;
+ struct work work;
+};
+
+static int node_cmp(const void *a, const void *b)
+{
+ const struct sheepdog_node_list_entry *node1 = a;
+ const struct sheepdog_node_list_entry *node2 = b;
+ return memcmp(node1->id, node2->id, sizeof(node1->id));
+}
+
+static int send_message(cpg_handle_t handle, struct message_header *msg)
+{
+ struct iovec iov;
+ int ret;
+
+ iov.iov_base = msg;
+ iov.iov_len = msg->msg_length;
+retry:
+ ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1);
+ switch (ret) {
+ case CS_OK:
+ break;
+ case CS_ERR_TRY_AGAIN:
+ dprintf("failed to send message. try again\n");
+ sleep(1);
+ goto retry;
+ default:
+ eprintf("failed to send message, %d\n", ret);
+ return -1;
+ }
+ return 0;
+}
+
+
+static int get_node_idx(struct sheepdog_node_list_entry *ent,
+ struct sheepdog_node_list_entry *entries, int nr_nodes)
+{
+ ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
+ if (!ent)
+ return -1;
+
+ return ent - entries;
+}
+
+static void get_node_list(struct cluster_info *cluster, struct sd_node_req *req,
+ struct sd_node_rsp *rsp, void *data)
+{
+ int nr_nodes;
+ struct node *node;
+
+ nr_nodes = build_node_list(&cluster->node_list, data);
+ rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
+ rsp->nr_nodes = nr_nodes;
+ rsp->local_idx = get_node_idx(&cluster->this_node, data, nr_nodes);
+
+ if (list_empty(&cluster->node_list)) {
+ rsp->master_idx = -1;
+ return;
+ }
+ node = list_first_entry(&cluster->node_list, struct node, list);
+ rsp->master_idx = get_node_idx(&node->ent, data, nr_nodes);
+}
+
+static void get_vm_list(struct cluster_info *cluster, struct sd_rsp *rsp,
+ void *data)
+{
+ int nr_vms;
+ struct vm *vm;
+
+ struct sheepdog_vm_list_entry *p = data;
+ list_for_each_entry(vm, &cluster->vm_list, list) {
+ *p++ = vm->ent;
+ }
+
+ nr_vms = p - (struct sheepdog_vm_list_entry *)data;
+ rsp->data_length = nr_vms * sizeof(struct sheepdog_vm_list_entry);
+}
+
+void cluster_queue_request(struct work *work, int idx)
+{
+ struct request *req = container_of(work, struct request, work);
+ struct sd_req *hdr = (struct sd_req *)&req->rq;
+ struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
+ struct cluster_info *cluster = req->ci->cluster;
+ struct vdi_op_message *msg;
+ int ret = SD_RES_SUCCESS;
+
+ eprintf("%p %x\n", req, hdr->opcode);
+
+ switch (hdr->opcode) {
+ case SD_OP_GET_NODE_LIST:
+ get_node_list(cluster, (struct sd_node_req *)hdr,
+ (struct sd_node_rsp *)rsp, req->data);
+ break;
+ case SD_OP_GET_VM_LIST:
+ get_vm_list(cluster, rsp, req->data);
+ break;
+ default:
+ /* forward request to group */
+ goto forward;
+ }
+
+ rsp->result = ret;
+ return;
+
+forward:
+ msg = zalloc(sizeof(*msg) + hdr->data_length);
+ if (!msg) {
+ eprintf("out of memory\n");
+ return;
+ }
+
+ msg->header.op = SD_MSG_VDI_OP;
+ msg->header.done = 0;
+ msg->header.msg_length = sizeof(*msg) + hdr->data_length;
+ msg->header.from = cluster->this_node;
+ msg->req = *((struct sd_vdi_req *)&req->rq);
+ msg->rsp = *((struct sd_vdi_rsp *)&req->rp);
+ if (hdr->flags & SD_FLAG_CMD_WRITE)
+ memcpy(msg->data, req->data, hdr->data_length);
+
+ list_add(&req->pending_list, &cluster->pending_list);
+
+ send_message(cluster->handle, (struct message_header *)msg);
+
+ free(msg);
+}
+
+static struct vm *lookup_vm(struct list_head *entries, char *name)
+{
+ struct vm *vm;
+
+ list_for_each_entry(vm, entries, list) {
+ if (!strcmp((char *)vm->ent.name, name))
+ return vm;
+ }
+
+ return NULL;
+}
+
+static void group_handler(int listen_fd, int events, void *data)
+{
+ struct cluster_info *ci = data;
+ cpg_dispatch(ci->handle, CPG_DISPATCH_ALL);
+}
+
+static void print_node_list(struct cluster_info *ci)
+{
+ struct node *node;
+ list_for_each_entry(node, &ci->node_list, list) {
+ dprintf("%c nodeid: %x, pid: %d, ip: %d.%d.%d.%d:%d\n",
+ node_cmp(&node->ent, &ci->this_node) ? ' ' : 'l',
+ node->nodeid, node->pid,
+ node->ent.addr[12], node->ent.addr[13],
+ node->ent.addr[14], node->ent.addr[15], node->ent.port);
+ }
+}
+
+static void add_node(struct cluster_info *ci, uint32_t nodeid, uint32_t pid,
+ struct sheepdog_node_list_entry *sd_ent)
+{
+ struct node *node;
+
+ node = zalloc(sizeof(*node));
+ if (!node) {
+ eprintf("out of memory\n");
+ return;
+ }
+ node->nodeid = nodeid;
+ node->pid = pid;
+ node->ent = *sd_ent;
+ list_add_tail(&node->list, &ci->node_list);
+ ci->epoch++;
+}
+
+static int is_master(struct cluster_info *ci)
+{
+ struct node *node;
+
+ if (!ci->synchronized)
+ return 0;
+
+ if (list_empty(&ci->node_list))
+ return 1;
+
+ node = list_first_entry(&ci->node_list, struct node, list);
+ if (node_cmp(&node->ent, &ci->this_node) == 0)
+ return 1;
+
+ return 0;
+}
+
+static void join(struct cluster_info *ci, struct join_message *msg)
+{
+ struct node *node;
+
+ if (!ci->synchronized)
+ return;
+
+ if (!is_master(ci))
+ return;
+
+ msg->epoch = ci->epoch;
+ list_for_each_entry(node, &ci->node_list, list) {
+ msg->nodes[msg->nr_nodes].nodeid = node->nodeid;
+ msg->nodes[msg->nr_nodes].pid = node->pid;
+ msg->nodes[msg->nr_nodes].ent = node->ent;
+ msg->nr_nodes++;
+ }
+}
+
+static void update_cluster_info(struct cluster_info *ci,
+ struct join_message *msg)
+{
+ int i;
+ int nr_nodes = msg->nr_nodes;
+ struct node *node, *e;
+
+ if (ci->synchronized)
+ goto out;
+
+ list_for_each_entry_safe(node, e, &ci->node_list, list) {
+ list_del(&node->list);
+ free(node);
+ }
+
+ INIT_LIST_HEAD(&ci->node_list);
+ for (i = 0; i < nr_nodes; i++)
+ add_node(ci, msg->nodes[i].nodeid, msg->nodes[i].pid,
+ &msg->nodes[i].ent);
+
+ ci->epoch = msg->epoch;
+ ci->synchronized = 1;
+
+out:
+ add_node(ci, msg->nodeid, msg->pid, &msg->header.from);
+ print_node_list(ci);
+}
+
+static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg)
+{
+ const struct sd_vdi_req *hdr = &msg->req;
+ struct sd_vdi_rsp *rsp = &msg->rsp;
+ void *data = msg->data;
+ int ret = SD_RES_SUCCESS, is_current;
+ uint64_t oid = 0;
+ struct sheepdog_super_block *sb;
+ struct timeval tv;
+ struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+ int nr_nodes;
+
+ switch (hdr->opcode) {
+ case SD_OP_NEW_VDI:
+ ret = add_vdi(ci, data, strlen(data), hdr->vdi_size, &oid,
+ hdr->base_oid, hdr->tag);
+ break;
+ case SD_OP_LOCK_VDI:
+ case SD_OP_GET_VDI_INFO:
+ ret = lookup_vdi(ci, data, &oid, hdr->tag, 1, &is_current);
+ if (ret < 0)
+ break;
+ if (is_current)
+ rsp->flags = SD_VDI_RSP_FLAG_CURRENT;
+ break;
+ case SD_OP_RELEASE_VDI:
+ break;
+ case SD_OP_MAKE_FS:
+ sb = zalloc(sizeof(*sb));
+ if (!sb) {
+ ret = -1;
+ break;
+ }
+ gettimeofday(&tv, NULL);
+ sb->ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+ sb->default_nr_copies = 3;
+
+ nr_nodes = build_node_list(&ci->node_list, entries);
+ ret = write_object(entries, nr_nodes, ci->epoch,
+ SD_DIR_OID, (char *)sb, sizeof(*sb), 0,
+ sb->default_nr_copies, 1);
+ break;
+ case SD_OP_UPDATE_EPOCH:
+ break;
+ default:
+ ret = SD_RES_SYSTEM_ERROR;
+ eprintf("opcode %d is not implemented\n", hdr->opcode);
+ break;
+ }
+
+ rsp->oid = oid;
+ rsp->result = ret;
+}
+
+static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg)
+{
+ const struct sd_vdi_req *hdr = &msg->req;
+ struct sd_vdi_rsp *rsp = &msg->rsp;
+ void *data = msg->data;
+ struct vm *vm;
+ struct request *req;
+ int ret = msg->rsp.result;
+
+ switch (hdr->opcode) {
+ case SD_OP_NEW_VDI:
+ break;
+ case SD_OP_LOCK_VDI:
+ if (lookup_vm(&ci->vm_list, (char *)data)) {
+ ret = SD_RES_VDI_LOCKED;
+ break;
+ }
+
+ vm = zalloc(sizeof(*vm));
+ if (!vm) {
+ ret = SD_RES_UNKNOWN;
+ break;
+ }
+ strcpy((char *)vm->ent.name, (char *)data);
+ memcpy(vm->ent.host_addr, msg->header.from.addr,
+ sizeof(vm->ent.host_addr));
+ vm->ent.host_port = msg->header.from.port;
+
+ list_add(&vm->list, &ci->vm_list);
+ break;
+ case SD_OP_RELEASE_VDI:
+ vm = lookup_vm(&ci->vm_list, (char *)data);
+ if (!vm) {
+ ret = SD_RES_VDI_NOT_LOCKED;
+ break;
+ }
+
+ list_del(&vm->list);
+ free(vm);
+ break;
+ case SD_OP_GET_VDI_INFO:
+ break;
+ case SD_OP_UPDATE_EPOCH:
+ break;
+ case SD_OP_MAKE_FS:
+ break;
+ default:
+ eprintf("unknown operation %d\n", hdr->opcode);
+ ret = SD_RES_UNKNOWN;
+ }
+
+ if (node_cmp(&ci->this_node, &msg->header.from) != 0)
+ return;
+
+ req = list_first_entry(&ci->pending_list, struct request, pending_list);
+
+ rsp->result = ret;
+ memcpy(req->data, data, rsp->data_length);
+ memcpy(&req->rp, rsp, sizeof(req->rp));
+ list_del(&req->pending_list);
+ req->done(req);
+}
+
+static void __sd_deliver(struct work *work, int idx)
+{
+ struct work_deliver *w = container_of(work, struct work_deliver, work);
+ struct message_header *m = w->msg;
+ struct cluster_info *ci = w->ci;
+
+ dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n",
+ m->op, m->done, m->msg_length,
+ m->from.addr[12], m->from.addr[13],
+ m->from.addr[14], m->from.addr[15], m->from.port);
+
+ if (!m->done) {
+ if (!is_master(ci))
+ return;
+
+ switch (m->op) {
+ case SD_MSG_JOIN:
+ join(ci, (struct join_message *)m);
+ break;
+ case SD_MSG_VDI_OP:
+ vdi_op(ci, (struct vdi_op_message *)m);
+ break;
+ default:
+ eprintf("unknown message %d\n", m->op);
+ break;
+ }
+
+ m->done = 1;
+ send_message(ci->handle, m);
+ } else {
+ switch (m->op) {
+ case SD_MSG_JOIN:
+ update_cluster_info(ci, (struct join_message *)m);
+ break;
+ case SD_MSG_VDI_OP:
+ vdi_op_done(ci, (struct vdi_op_message *)m);
+ break;
+ default:
+ eprintf("unknown message %d\n", m->op);
+ break;
+ }
+ }
+}
+
+static void __sd_deliver_done(struct work *work, int idx)
+{
+ struct work_deliver *w = container_of(work, struct work_deliver, work);
+
+ free(w->msg);
+ free(w);
+}
+
+static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+{
+ struct work_deliver *w;
+ struct message_header *m = msg;
+ struct cluster_info *ci;
+
+ dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n",
+ m->op, m->done, m->msg_length,
+ m->from.addr[12], m->from.addr[13],
+ m->from.addr[14], m->from.addr[15], m->from.port);
+ cpg_context_get(handle, (void **)&ci);
+
+ w = zalloc(sizeof(*w));
+ if (!w)
+ return;
+
+ w->msg = zalloc(msg_len);
+ if (!w->msg)
+ return;
+ memcpy(w->msg, msg, msg_len);
+
+ w->ci = ci;
+
+ w->work.fn = __sd_deliver;
+ w->work.done = __sd_deliver_done;
+
+ queue_work(&w->work);
+}
+
+static void __sd_confch(struct work *work, int idx)
+{
+ struct work_confch *w = container_of(work, struct work_confch, work);
+ struct cluster_info *ci = w->ci;
+ struct node *node, *e;
+ int i;
+
+ const struct cpg_address *member_list = w->member_list;
+ size_t member_list_entries = w->member_list_entries;
+ const struct cpg_address *left_list = w->left_list;
+ size_t left_list_entries = w->left_list_entries;
+ const struct cpg_address *joined_list = w->joined_list;
+ size_t joined_list_entries = w->joined_list_entries;
+
+ if (member_list_entries == joined_list_entries - left_list_entries &&
+ ci->this_nodeid == member_list[0].nodeid &&
+ ci->this_pid == member_list[0].pid)
+ ci->synchronized = 1;
+
+ for (i = 0; i < left_list_entries; i++) {
+ list_for_each_entry_safe(node, e, &ci->node_list, list) {
+ if (node->nodeid != left_list[i].nodeid ||
+ node->pid != left_list[i].pid)
+ continue;
+
+ list_del(&node->list);
+ free(node);
+ ci->epoch++;
+ }
+ }
+
+ for (i = 0; i < joined_list_entries; i++) {
+ if (ci->this_nodeid == joined_list[0].nodeid &&
+ ci->this_pid == joined_list[0].pid) {
+ struct join_message msg;
+
+ msg.header.op = SD_MSG_JOIN;
+ msg.header.done = 0;
+ msg.header.msg_length = sizeof(msg);
+ msg.header.from = ci->this_node;
+ msg.nodeid = ci->this_nodeid;
+ msg.pid = ci->this_pid;
+
+ send_message(ci->handle, (struct message_header *)&msg);
+ }
+ }
+
+ if (left_list_entries == 0)
+ return;
+
+ print_node_list(ci);
+}
+
+static void __sd_confch_done(struct work *work, int idx)
+{
+ struct work_confch *w = container_of(work, struct work_confch, work);
+
+ free(w->member_list);
+ free(w->left_list);
+ free(w->joined_list);
+ free(w);
+}
+
+static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
+ const struct cpg_address *member_list,
+ size_t member_list_entries,
+ const struct cpg_address *left_list,
+ size_t left_list_entries,
+ const struct cpg_address *joined_list,
+ size_t joined_list_entries)
+{
+ struct work_confch *w = NULL;
+ struct cluster_info *ci;
+ int i, size;
+
+ dprintf("confchg nodeid %x\n", member_list[0].nodeid);
+ dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries,
+ joined_list_entries);
+ for (i = 0; i < member_list_entries; i++) {
+ dprintf("[%d] node_id: %d, pid: %d, reason: %d\n", i,
+ member_list[i].nodeid, member_list[i].pid,
+ member_list[i].reason);
+ }
+
+ cpg_context_get(handle, (void **)&ci);
+
+ w = zalloc(sizeof(*w));
+ if (!w)
+ return;
+
+ size = sizeof(struct cpg_address) * member_list_entries;
+ w->member_list = zalloc(size);
+ if (!w->member_list)
+ goto err;
+ memcpy(w->member_list, member_list, size);
+ w->member_list_entries = member_list_entries;
+
+ size = sizeof(struct cpg_address) * left_list_entries;
+ w->left_list = zalloc(size);
+ if (!w->left_list)
+ goto err;
+ memcpy(w->left_list, left_list, size);
+ w->left_list_entries = left_list_entries;
+
+ size = sizeof(struct cpg_address) * joined_list_entries;
+ w->joined_list = zalloc(size);
+ if (!w->joined_list)
+ goto err;
+ memcpy(w->joined_list, joined_list, size);
+ w->joined_list_entries = joined_list_entries;
+
+ w->ci = ci;
+
+ w->work.fn = __sd_confch;
+ w->work.done = __sd_confch_done;
+
+ queue_work(&w->work);
+
+ return;
+err:
+ if (!w)
+ return;
+
+ if (w->member_list)
+ free(w->member_list);
+ if (w->left_list)
+ free(w->left_list);
+ if (w->joined_list)
+ free(w->joined_list);
+}
+
+int build_node_list(struct list_head *node_list,
+ struct sheepdog_node_list_entry *entries)
+{
+ struct node *node;
+ int nr = 0;
+
+ list_for_each_entry(node, node_list, list) {
+ if (entries)
+ memcpy(entries + nr, &node->ent, sizeof(*entries));
+ nr++;
+ }
+ if (entries)
+ qsort(entries, nr, sizeof(*entries), node_cmp);
+
+ return nr;
+}
+
+struct cluster_info *create_cluster(int port)
+{
+ int fd, ret;
+ cpg_handle_t cpg_handle;
+ struct cluster_info *ci;
+ struct addrinfo hints, *res;
+ char name[INET6_ADDRSTRLEN];
+ SHA_CTX ctx;
+ struct cpg_name group = { 8, "sheepdog" };
+ cpg_callbacks_t cb = { &sd_deliver, &sd_confch };
+ unsigned int nodeid = 0;
+
+ ci = zalloc(sizeof(*ci));
+ if (!ci)
+ return NULL;
+
+ ret = cpg_initialize(&cpg_handle, &cb);
+ if (ret != CS_OK) {
+ eprintf("Failed to initialize cpg, %d\n", ret);
+ eprintf("Is corosync running?\n");
+ return NULL;
+ }
+
+join_retry:
+ ret = cpg_join(cpg_handle, &group);
+ switch (ret) {
+ case CS_OK:
+ break;
+ case CS_ERR_TRY_AGAIN:
+ dprintf("Failed to join the sheepdog group, try again\n");
+ sleep(1);
+ goto join_retry;
+ case CS_ERR_SECURITY:
+ eprintf("Permission error.\n");
+ exit(1);
+ default:
+ eprintf("Failed to join the sheepdog group, %d\n", ret);
+ exit(1);
+ break;
+ }
+
+ ret = cpg_local_get(cpg_handle, &nodeid);
+ if (ret != CS_OK) {
+ eprintf("Failed to get the local node's identifier, %d\n", ret);
+ exit(1);
+ }
+
+ ci->handle = cpg_handle;
+ ci->this_nodeid = nodeid;
+ ci->this_pid = getpid();
+
+ memset(&ci->this_node, 0, sizeof(ci->this_node));
+
+ gethostname(name, sizeof(name));
+
+ memset(&hints, 0, sizeof(hints));
+
+ hints.ai_socktype = SOCK_STREAM;
+ ret = getaddrinfo(name, NULL, &hints, &res);
+ if (ret)
+ exit(1);
+
+ if (res->ai_family == AF_INET) {
+ struct sockaddr_in *addr = (struct sockaddr_in *)res->ai_addr;
+ memset(ci->this_node.addr, 0, sizeof(ci->this_node.addr));
+ memcpy(ci->this_node.addr + 12, &addr->sin_addr, 4);
+ } else if (res->ai_family == AF_INET6) {
+ struct sockaddr_in6 *addr = (struct sockaddr_in6 *)res->ai_addr;
+ memcpy(ci->this_node.addr, &addr->sin6_addr, 16);
+ } else {
+ eprintf("unknown address family\n");
+ exit(1);
+ }
+
+ freeaddrinfo(res);
+
+ ci->this_node.port = port;
+
+ SHA1_Init(&ctx);
+ SHA1_Update(&ctx, ci->this_node.addr, sizeof(ci->this_node.addr));
+ SHA1_Update(&ctx, &ci->this_node.port, sizeof(ci->this_node.port));
+ SHA1_Final(ci->this_node.id, &ctx);
+
+ ci->synchronized = 0;
+ INIT_LIST_HEAD(&ci->node_list);
+ INIT_LIST_HEAD(&ci->vm_list);
+ INIT_LIST_HEAD(&ci->pending_list);
+ cpg_context_set(cpg_handle, ci);
+
+ cpg_fd_get(cpg_handle, &fd);
+ register_event(fd, group_handler, ci);
+ return ci;
+}
diff --git a/collie/vdi.c b/collie/vdi.c
new file mode 100644
index 0000000..184a22e
--- /dev/null
+++ b/collie/vdi.c
@@ -0,0 +1,259 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <openssl/sha.h>
+
+#include "sheepdog_proto.h"
+#include "meta.h"
+#include "collie.h"
+
+static int sheepdog_match(struct sheepdog_dir_entry *ent, char *name, int len)
+{
+ if (!ent->name_len)
+ return 0;
+ if (ent->name_len != len)
+ return 0;
+ return !memcmp(ent->name, name, len);
+}
+
+/* TODO: should be performed atomically */
+static int create_inode_obj(struct sheepdog_node_list_entry *entries,
+ int nr_nodes, uint64_t epoch, int copies,
+ uint64_t oid, uint64_t size, uint64_t base_oid)
+{
+ struct sheepdog_inode inode, base;
+ struct timeval tv;
+ int ret;
+
+ if (base_oid) {
+ ret = read_object(entries, nr_nodes, epoch,
+ base_oid, (char *)&base, sizeof(base), 0,
+ copies);
+ if (ret < 0)
+ return SD_RES_BASE_VDI_READ;
+ }
+
+ gettimeofday(&tv, NULL);
+
+ memset(&inode, 0, sizeof(inode));
+
+ inode.oid = oid;
+ inode.vdi_size = size;
+ inode.block_size = SD_DATA_OBJ_SIZE;
+ inode.ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+ inode.nr_copies = copies;
+
+ if (base_oid) {
+ int i;
+
+ eprintf("%zd %zd\n", sizeof(inode.data_oid),
+ ARRAY_SIZE(base.child_oid));
+ inode.parent_oid = base_oid;
+ memcpy(inode.data_oid, base.data_oid,
+ MAX_DATA_OBJS * sizeof(uint64_t));
+
+ for (i = 0; i < ARRAY_SIZE(base.child_oid); i++) {
+ if (!base.child_oid[i]) {
+ base.child_oid[i] = oid;
+ break;
+ }
+ }
+
+ if (i == ARRAY_SIZE(base.child_oid))
+ return SD_RES_NO_BASE_VDI;
+
+ ret = write_object(entries, nr_nodes,
+ epoch, base_oid, (char *)&base,
+ sizeof(base), 0, copies, 0);
+ if (ret < 0)
+ return SD_RES_BASE_VDI_WRITE;
+ }
+
+ ret = write_object(entries, nr_nodes, epoch,
+ oid, (char *)&inode, sizeof(inode), 0, copies, 1);
+ if (ret < 0)
+ return SD_RES_VDI_WRITE;
+
+ return ret;
+}
+
+#define DIR_BUF_LEN (UINT64_C(1) << 20)
+
+/*
+ * TODO: handle larger buffer
+ */
+int add_vdi(struct cluster_info *cluster, char *name, int len, uint64_t size,
+ uint64_t *added_oid, uint64_t base_oid, uint32_t tag)
+{
+ struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+ int nr_nodes;
+ struct sheepdog_dir_entry *prv, *ent;
+ uint64_t oid = 0;
+ char *buf;
+ int ret, rest;
+ struct sheepdog_super_block *sb;
+ int copies;
+
+ nr_nodes = build_node_list(&cluster->node_list, entries);
+
+ eprintf("%s (%d) %" PRIu64 ", base: %" PRIu64 "\n", name, len, size,
+ base_oid);
+
+ buf = zalloc(DIR_BUF_LEN);
+ if (!buf)
+ return 1;
+
+ ret = read_object(entries, nr_nodes, cluster->epoch,
+ SD_DIR_OID, buf, DIR_BUF_LEN, 0, nr_nodes);
+ if (ret < 0) {
+ ret = SD_RES_DIR_READ;
+ goto out;
+ }
+
+ sb = (struct sheepdog_super_block *)buf;
+ copies = sb->default_nr_copies;
+
+ ret = read_object(entries, nr_nodes, cluster->epoch,
+ SD_DIR_OID, buf, DIR_BUF_LEN, sizeof(*sb), nr_nodes);
+ if (ret < 0) {
+ ret = SD_RES_DIR_READ;
+ goto out;
+ }
+
+ ent = (struct sheepdog_dir_entry *)buf;
+ rest = ret;
+ while (rest > 0) {
+ if (!ent->name_len)
+ break;
+
+ if (sheepdog_match(ent, name, len) && !tag) {
+ ret = SD_RES_VDI_EXIST;
+ goto out;
+ }
+ oid = ent->oid;
+ prv = ent;
+ ent = next_entry(prv);
+ rest -= ((char *)ent - (char *)prv);
+ }
+
+ /* need to check if the buffer is large enough here. */
+ oid += (1 << 18);
+
+ ret = create_inode_obj(entries, nr_nodes, cluster->epoch, copies,
+ oid, size, base_oid);
+ if (ret)
+ goto out;
+
+ ent->oid = oid;
+ ent->tag = tag;
+
+ ent->flags = FLAG_CURRENT;
+ ent->name_len = len;
+ memcpy(ent->name, name, len);
+
+ if (tag) {
+ struct sheepdog_dir_entry *e = (struct sheepdog_dir_entry *)buf;
+
+ while (e < ent) {
+ if (sheepdog_match(e, name, len))
+ e->flags &= ~FLAG_CURRENT;
+ e = next_entry(e);
+ }
+ }
+
+ ent = next_entry(ent);
+
+ ret = write_object(entries, nr_nodes, cluster->epoch,
+ SD_DIR_OID, buf, (char *)ent - buf, sizeof(*sb),
+ copies, 0);
+ if (ret) {
+ ret = SD_RES_DIR_WRITE;
+ goto out;
+ }
+
+ *added_oid = oid;
+out:
+ free(buf);
+
+ return ret;
+}
+
+int del_vdi(struct cluster_info *cluster, char *name, int len)
+{
+ return 0;
+}
+
+int lookup_vdi(struct cluster_info *cluster,
+ char *filename, uint64_t * oid, uint32_t tag, int do_lock,
+ int *current)
+{
+ struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+ int nr_nodes;
+ int rest, ret;
+ char *buf;
+ struct sheepdog_dir_entry *prv, *ent;
+
+ nr_nodes = build_node_list(&cluster->node_list, entries);
+
+ *current = 0;
+ buf = zalloc(DIR_BUF_LEN);
+ if (!buf)
+ return 1;
+
+ ret = read_object(entries, nr_nodes, cluster->epoch,
+ SD_DIR_OID, buf, DIR_BUF_LEN,
+ sizeof(struct sheepdog_super_block), nr_nodes);
+ if (ret < 0) {
+ ret = SD_RES_DIR_READ;
+ goto out;
+ }
+
+ eprintf("looking for %s %zd, %d\n", filename, strlen(filename), ret);
+
+ ent = (struct sheepdog_dir_entry *)buf;
+ rest = ret;
+ ret = SD_RES_NO_VDI;
+ while (rest > 0) {
+ if (!ent->name_len)
+ break;
+
+ eprintf("%s %d %" PRIu64 "\n", ent->name, ent->name_len,
+ ent->oid);
+
+ if (sheepdog_match(ent, filename, strlen(filename))) {
+ if (ent->tag != tag && tag != -1) {
+ ret = SD_RES_NO_TAG;
+ goto next;
+ }
+ if (ent->tag != tag && !(ent->flags & FLAG_CURRENT)) {
+ /* current vdi must exsit */
+ ret = SD_RES_SYSTEM_ERROR;
+ goto next;
+ }
+
+ *oid = ent->oid;
+ ret = 0;
+
+ if (ent->flags & FLAG_CURRENT)
+ *current = 1;
+ break;
+ }
+next:
+ prv = ent;
+ ent = next_entry(prv);
+ rest -= ((char *)ent - (char *)prv);
+ }
+out:
+ free(buf);
+ return ret;
+}
--
1.5.6.5
More information about the sheepdog
mailing list