This is originally a part of dog (puppy). Cluster communication and VDI manipulation are supported. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- collie/group.c | 761 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ collie/vdi.c | 259 +++++++++++++++++++ 2 files changed, 1020 insertions(+), 0 deletions(-) create mode 100644 collie/group.c create mode 100644 collie/vdi.c diff --git a/collie/group.c b/collie/group.c new file mode 100644 index 0000000..a718afd --- /dev/null +++ b/collie/group.c @@ -0,0 +1,761 @@ +/* + * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <netdb.h> +#include <sys/time.h> +#include <corosync/cpg.h> + +#include "sheepdog_proto.h" +#include "collie.h" +#include "list.h" +#include "util.h" +#include "meta.h" +#include "logger.h" +#include "work.h" + +struct vm { + struct sheepdog_vm_list_entry ent; + struct list_head list; +}; + +struct node { + uint32_t nodeid; + uint32_t pid; + struct sheepdog_node_list_entry ent; + struct list_head list; +}; + +struct message_header { + uint8_t op; + uint8_t done; + uint8_t pad[2]; + uint32_t msg_length; + struct sheepdog_node_list_entry from; +}; + +struct join_message { + struct message_header header; + uint32_t nodeid; + uint32_t pid; + struct sheepdog_node_list_entry master_node; + uint32_t epoch; + uint32_t nr_nodes; + struct { + uint32_t nodeid; + uint32_t pid; + struct sheepdog_node_list_entry ent; + } nodes[SD_MAX_NODES]; +}; + +struct vdi_op_message { + struct message_header header; + struct sd_vdi_req req; + struct sd_vdi_rsp rsp; + uint8_t data[0]; +}; + +struct work_deliver { + struct message_header *msg; + + struct cluster_info *ci; + struct work work; +}; + +struct work_confch { + struct cpg_address *member_list; + size_t member_list_entries; + struct cpg_address *left_list; + size_t left_list_entries; + struct cpg_address *joined_list; + size_t joined_list_entries; + + struct cluster_info *ci; + struct work work; +}; + +static int node_cmp(const void *a, const void *b) +{ + const struct sheepdog_node_list_entry *node1 = a; + const struct sheepdog_node_list_entry *node2 = b; + return memcmp(node1->id, node2->id, sizeof(node1->id)); +} + +static int send_message(cpg_handle_t handle, struct message_header *msg) +{ + struct iovec iov; + int ret; + + iov.iov_base = msg; + iov.iov_len = msg->msg_length; +retry: + ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1); + switch (ret) { + case CS_OK: + break; + case CS_ERR_TRY_AGAIN: + dprintf("failed to send message. try again\n"); + sleep(1); + goto retry; + default: + eprintf("failed to send message, %d\n", ret); + return -1; + } + return 0; +} + + +static int get_node_idx(struct sheepdog_node_list_entry *ent, + struct sheepdog_node_list_entry *entries, int nr_nodes) +{ + ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp); + if (!ent) + return -1; + + return ent - entries; +} + +static void get_node_list(struct cluster_info *cluster, struct sd_node_req *req, + struct sd_node_rsp *rsp, void *data) +{ + int nr_nodes; + struct node *node; + + nr_nodes = build_node_list(&cluster->node_list, data); + rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry); + rsp->nr_nodes = nr_nodes; + rsp->local_idx = get_node_idx(&cluster->this_node, data, nr_nodes); + + if (list_empty(&cluster->node_list)) { + rsp->master_idx = -1; + return; + } + node = list_first_entry(&cluster->node_list, struct node, list); + rsp->master_idx = get_node_idx(&node->ent, data, nr_nodes); +} + +static void get_vm_list(struct cluster_info *cluster, struct sd_rsp *rsp, + void *data) +{ + int nr_vms; + struct vm *vm; + + struct sheepdog_vm_list_entry *p = data; + list_for_each_entry(vm, &cluster->vm_list, list) { + *p++ = vm->ent; + } + + nr_vms = p - (struct sheepdog_vm_list_entry *)data; + rsp->data_length = nr_vms * sizeof(struct sheepdog_vm_list_entry); +} + +void cluster_queue_request(struct work *work, int idx) +{ + struct request *req = container_of(work, struct request, work); + struct sd_req *hdr = (struct sd_req *)&req->rq; + struct sd_rsp *rsp = (struct sd_rsp *)&req->rp; + struct cluster_info *cluster = req->ci->cluster; + struct vdi_op_message *msg; + int ret = SD_RES_SUCCESS; + + eprintf("%p %x\n", req, hdr->opcode); + + switch (hdr->opcode) { + case SD_OP_GET_NODE_LIST: + get_node_list(cluster, (struct sd_node_req *)hdr, + (struct sd_node_rsp *)rsp, req->data); + break; + case SD_OP_GET_VM_LIST: + get_vm_list(cluster, rsp, req->data); + break; + default: + /* forward request to group */ + goto forward; + } + + rsp->result = ret; + return; + +forward: + msg = zalloc(sizeof(*msg) + hdr->data_length); + if (!msg) { + eprintf("out of memory\n"); + return; + } + + msg->header.op = SD_MSG_VDI_OP; + msg->header.done = 0; + msg->header.msg_length = sizeof(*msg) + hdr->data_length; + msg->header.from = cluster->this_node; + msg->req = *((struct sd_vdi_req *)&req->rq); + msg->rsp = *((struct sd_vdi_rsp *)&req->rp); + if (hdr->flags & SD_FLAG_CMD_WRITE) + memcpy(msg->data, req->data, hdr->data_length); + + list_add(&req->pending_list, &cluster->pending_list); + + send_message(cluster->handle, (struct message_header *)msg); + + free(msg); +} + +static struct vm *lookup_vm(struct list_head *entries, char *name) +{ + struct vm *vm; + + list_for_each_entry(vm, entries, list) { + if (!strcmp((char *)vm->ent.name, name)) + return vm; + } + + return NULL; +} + +static void group_handler(int listen_fd, int events, void *data) +{ + struct cluster_info *ci = data; + cpg_dispatch(ci->handle, CPG_DISPATCH_ALL); +} + +static void print_node_list(struct cluster_info *ci) +{ + struct node *node; + list_for_each_entry(node, &ci->node_list, list) { + dprintf("%c nodeid: %x, pid: %d, ip: %d.%d.%d.%d:%d\n", + node_cmp(&node->ent, &ci->this_node) ? ' ' : 'l', + node->nodeid, node->pid, + node->ent.addr[12], node->ent.addr[13], + node->ent.addr[14], node->ent.addr[15], node->ent.port); + } +} + +static void add_node(struct cluster_info *ci, uint32_t nodeid, uint32_t pid, + struct sheepdog_node_list_entry *sd_ent) +{ + struct node *node; + + node = zalloc(sizeof(*node)); + if (!node) { + eprintf("out of memory\n"); + return; + } + node->nodeid = nodeid; + node->pid = pid; + node->ent = *sd_ent; + list_add_tail(&node->list, &ci->node_list); + ci->epoch++; +} + +static int is_master(struct cluster_info *ci) +{ + struct node *node; + + if (!ci->synchronized) + return 0; + + if (list_empty(&ci->node_list)) + return 1; + + node = list_first_entry(&ci->node_list, struct node, list); + if (node_cmp(&node->ent, &ci->this_node) == 0) + return 1; + + return 0; +} + +static void join(struct cluster_info *ci, struct join_message *msg) +{ + struct node *node; + + if (!ci->synchronized) + return; + + if (!is_master(ci)) + return; + + msg->epoch = ci->epoch; + list_for_each_entry(node, &ci->node_list, list) { + msg->nodes[msg->nr_nodes].nodeid = node->nodeid; + msg->nodes[msg->nr_nodes].pid = node->pid; + msg->nodes[msg->nr_nodes].ent = node->ent; + msg->nr_nodes++; + } +} + +static void update_cluster_info(struct cluster_info *ci, + struct join_message *msg) +{ + int i; + int nr_nodes = msg->nr_nodes; + struct node *node, *e; + + if (ci->synchronized) + goto out; + + list_for_each_entry_safe(node, e, &ci->node_list, list) { + list_del(&node->list); + free(node); + } + + INIT_LIST_HEAD(&ci->node_list); + for (i = 0; i < nr_nodes; i++) + add_node(ci, msg->nodes[i].nodeid, msg->nodes[i].pid, + &msg->nodes[i].ent); + + ci->epoch = msg->epoch; + ci->synchronized = 1; + +out: + add_node(ci, msg->nodeid, msg->pid, &msg->header.from); + print_node_list(ci); +} + +static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg) +{ + const struct sd_vdi_req *hdr = &msg->req; + struct sd_vdi_rsp *rsp = &msg->rsp; + void *data = msg->data; + int ret = SD_RES_SUCCESS, is_current; + uint64_t oid = 0; + struct sheepdog_super_block *sb; + struct timeval tv; + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; + int nr_nodes; + + switch (hdr->opcode) { + case SD_OP_NEW_VDI: + ret = add_vdi(ci, data, strlen(data), hdr->vdi_size, &oid, + hdr->base_oid, hdr->tag); + break; + case SD_OP_LOCK_VDI: + case SD_OP_GET_VDI_INFO: + ret = lookup_vdi(ci, data, &oid, hdr->tag, 1, &is_current); + if (ret < 0) + break; + if (is_current) + rsp->flags = SD_VDI_RSP_FLAG_CURRENT; + break; + case SD_OP_RELEASE_VDI: + break; + case SD_OP_MAKE_FS: + sb = zalloc(sizeof(*sb)); + if (!sb) { + ret = -1; + break; + } + gettimeofday(&tv, NULL); + sb->ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000; + sb->default_nr_copies = 3; + + nr_nodes = build_node_list(&ci->node_list, entries); + ret = write_object(entries, nr_nodes, ci->epoch, + SD_DIR_OID, (char *)sb, sizeof(*sb), 0, + sb->default_nr_copies, 1); + break; + case SD_OP_UPDATE_EPOCH: + break; + default: + ret = SD_RES_SYSTEM_ERROR; + eprintf("opcode %d is not implemented\n", hdr->opcode); + break; + } + + rsp->oid = oid; + rsp->result = ret; +} + +static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg) +{ + const struct sd_vdi_req *hdr = &msg->req; + struct sd_vdi_rsp *rsp = &msg->rsp; + void *data = msg->data; + struct vm *vm; + struct request *req; + int ret = msg->rsp.result; + + switch (hdr->opcode) { + case SD_OP_NEW_VDI: + break; + case SD_OP_LOCK_VDI: + if (lookup_vm(&ci->vm_list, (char *)data)) { + ret = SD_RES_VDI_LOCKED; + break; + } + + vm = zalloc(sizeof(*vm)); + if (!vm) { + ret = SD_RES_UNKNOWN; + break; + } + strcpy((char *)vm->ent.name, (char *)data); + memcpy(vm->ent.host_addr, msg->header.from.addr, + sizeof(vm->ent.host_addr)); + vm->ent.host_port = msg->header.from.port; + + list_add(&vm->list, &ci->vm_list); + break; + case SD_OP_RELEASE_VDI: + vm = lookup_vm(&ci->vm_list, (char *)data); + if (!vm) { + ret = SD_RES_VDI_NOT_LOCKED; + break; + } + + list_del(&vm->list); + free(vm); + break; + case SD_OP_GET_VDI_INFO: + break; + case SD_OP_UPDATE_EPOCH: + break; + case SD_OP_MAKE_FS: + break; + default: + eprintf("unknown operation %d\n", hdr->opcode); + ret = SD_RES_UNKNOWN; + } + + if (node_cmp(&ci->this_node, &msg->header.from) != 0) + return; + + req = list_first_entry(&ci->pending_list, struct request, pending_list); + + rsp->result = ret; + memcpy(req->data, data, rsp->data_length); + memcpy(&req->rp, rsp, sizeof(req->rp)); + list_del(&req->pending_list); + req->done(req); +} + +static void __sd_deliver(struct work *work, int idx) +{ + struct work_deliver *w = container_of(work, struct work_deliver, work); + struct message_header *m = w->msg; + struct cluster_info *ci = w->ci; + + dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n", + m->op, m->done, m->msg_length, + m->from.addr[12], m->from.addr[13], + m->from.addr[14], m->from.addr[15], m->from.port); + + if (!m->done) { + if (!is_master(ci)) + return; + + switch (m->op) { + case SD_MSG_JOIN: + join(ci, (struct join_message *)m); + break; + case SD_MSG_VDI_OP: + vdi_op(ci, (struct vdi_op_message *)m); + break; + default: + eprintf("unknown message %d\n", m->op); + break; + } + + m->done = 1; + send_message(ci->handle, m); + } else { + switch (m->op) { + case SD_MSG_JOIN: + update_cluster_info(ci, (struct join_message *)m); + break; + case SD_MSG_VDI_OP: + vdi_op_done(ci, (struct vdi_op_message *)m); + break; + default: + eprintf("unknown message %d\n", m->op); + break; + } + } +} + +static void __sd_deliver_done(struct work *work, int idx) +{ + struct work_deliver *w = container_of(work, struct work_deliver, work); + + free(w->msg); + free(w); +} + +static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) +{ + struct work_deliver *w; + struct message_header *m = msg; + struct cluster_info *ci; + + dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n", + m->op, m->done, m->msg_length, + m->from.addr[12], m->from.addr[13], + m->from.addr[14], m->from.addr[15], m->from.port); + cpg_context_get(handle, (void **)&ci); + + w = zalloc(sizeof(*w)); + if (!w) + return; + + w->msg = zalloc(msg_len); + if (!w->msg) + return; + memcpy(w->msg, msg, msg_len); + + w->ci = ci; + + w->work.fn = __sd_deliver; + w->work.done = __sd_deliver_done; + + queue_work(&w->work); +} + +static void __sd_confch(struct work *work, int idx) +{ + struct work_confch *w = container_of(work, struct work_confch, work); + struct cluster_info *ci = w->ci; + struct node *node, *e; + int i; + + const struct cpg_address *member_list = w->member_list; + size_t member_list_entries = w->member_list_entries; + const struct cpg_address *left_list = w->left_list; + size_t left_list_entries = w->left_list_entries; + const struct cpg_address *joined_list = w->joined_list; + size_t joined_list_entries = w->joined_list_entries; + + if (member_list_entries == joined_list_entries - left_list_entries && + ci->this_nodeid == member_list[0].nodeid && + ci->this_pid == member_list[0].pid) + ci->synchronized = 1; + + for (i = 0; i < left_list_entries; i++) { + list_for_each_entry_safe(node, e, &ci->node_list, list) { + if (node->nodeid != left_list[i].nodeid || + node->pid != left_list[i].pid) + continue; + + list_del(&node->list); + free(node); + ci->epoch++; + } + } + + for (i = 0; i < joined_list_entries; i++) { + if (ci->this_nodeid == joined_list[0].nodeid && + ci->this_pid == joined_list[0].pid) { + struct join_message msg; + + msg.header.op = SD_MSG_JOIN; + msg.header.done = 0; + msg.header.msg_length = sizeof(msg); + msg.header.from = ci->this_node; + msg.nodeid = ci->this_nodeid; + msg.pid = ci->this_pid; + + send_message(ci->handle, (struct message_header *)&msg); + } + } + + if (left_list_entries == 0) + return; + + print_node_list(ci); +} + +static void __sd_confch_done(struct work *work, int idx) +{ + struct work_confch *w = container_of(work, struct work_confch, work); + + free(w->member_list); + free(w->left_list); + free(w->joined_list); + free(w); +} + +static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name, + const struct cpg_address *member_list, + size_t member_list_entries, + const struct cpg_address *left_list, + size_t left_list_entries, + const struct cpg_address *joined_list, + size_t joined_list_entries) +{ + struct work_confch *w = NULL; + struct cluster_info *ci; + int i, size; + + dprintf("confchg nodeid %x\n", member_list[0].nodeid); + dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries, + joined_list_entries); + for (i = 0; i < member_list_entries; i++) { + dprintf("[%d] node_id: %d, pid: %d, reason: %d\n", i, + member_list[i].nodeid, member_list[i].pid, + member_list[i].reason); + } + + cpg_context_get(handle, (void **)&ci); + + w = zalloc(sizeof(*w)); + if (!w) + return; + + size = sizeof(struct cpg_address) * member_list_entries; + w->member_list = zalloc(size); + if (!w->member_list) + goto err; + memcpy(w->member_list, member_list, size); + w->member_list_entries = member_list_entries; + + size = sizeof(struct cpg_address) * left_list_entries; + w->left_list = zalloc(size); + if (!w->left_list) + goto err; + memcpy(w->left_list, left_list, size); + w->left_list_entries = left_list_entries; + + size = sizeof(struct cpg_address) * joined_list_entries; + w->joined_list = zalloc(size); + if (!w->joined_list) + goto err; + memcpy(w->joined_list, joined_list, size); + w->joined_list_entries = joined_list_entries; + + w->ci = ci; + + w->work.fn = __sd_confch; + w->work.done = __sd_confch_done; + + queue_work(&w->work); + + return; +err: + if (!w) + return; + + if (w->member_list) + free(w->member_list); + if (w->left_list) + free(w->left_list); + if (w->joined_list) + free(w->joined_list); +} + +int build_node_list(struct list_head *node_list, + struct sheepdog_node_list_entry *entries) +{ + struct node *node; + int nr = 0; + + list_for_each_entry(node, node_list, list) { + if (entries) + memcpy(entries + nr, &node->ent, sizeof(*entries)); + nr++; + } + if (entries) + qsort(entries, nr, sizeof(*entries), node_cmp); + + return nr; +} + +struct cluster_info *create_cluster(int port) +{ + int fd, ret; + cpg_handle_t cpg_handle; + struct cluster_info *ci; + struct addrinfo hints, *res; + char name[INET6_ADDRSTRLEN]; + SHA_CTX ctx; + struct cpg_name group = { 8, "sheepdog" }; + cpg_callbacks_t cb = { &sd_deliver, &sd_confch }; + unsigned int nodeid = 0; + + ci = zalloc(sizeof(*ci)); + if (!ci) + return NULL; + + ret = cpg_initialize(&cpg_handle, &cb); + if (ret != CS_OK) { + eprintf("Failed to initialize cpg, %d\n", ret); + eprintf("Is corosync running?\n"); + return NULL; + } + +join_retry: + ret = cpg_join(cpg_handle, &group); + switch (ret) { + case CS_OK: + break; + case CS_ERR_TRY_AGAIN: + dprintf("Failed to join the sheepdog group, try again\n"); + sleep(1); + goto join_retry; + case CS_ERR_SECURITY: + eprintf("Permission error.\n"); + exit(1); + default: + eprintf("Failed to join the sheepdog group, %d\n", ret); + exit(1); + break; + } + + ret = cpg_local_get(cpg_handle, &nodeid); + if (ret != CS_OK) { + eprintf("Failed to get the local node's identifier, %d\n", ret); + exit(1); + } + + ci->handle = cpg_handle; + ci->this_nodeid = nodeid; + ci->this_pid = getpid(); + + memset(&ci->this_node, 0, sizeof(ci->this_node)); + + gethostname(name, sizeof(name)); + + memset(&hints, 0, sizeof(hints)); + + hints.ai_socktype = SOCK_STREAM; + ret = getaddrinfo(name, NULL, &hints, &res); + if (ret) + exit(1); + + if (res->ai_family == AF_INET) { + struct sockaddr_in *addr = (struct sockaddr_in *)res->ai_addr; + memset(ci->this_node.addr, 0, sizeof(ci->this_node.addr)); + memcpy(ci->this_node.addr + 12, &addr->sin_addr, 4); + } else if (res->ai_family == AF_INET6) { + struct sockaddr_in6 *addr = (struct sockaddr_in6 *)res->ai_addr; + memcpy(ci->this_node.addr, &addr->sin6_addr, 16); + } else { + eprintf("unknown address family\n"); + exit(1); + } + + freeaddrinfo(res); + + ci->this_node.port = port; + + SHA1_Init(&ctx); + SHA1_Update(&ctx, ci->this_node.addr, sizeof(ci->this_node.addr)); + SHA1_Update(&ctx, &ci->this_node.port, sizeof(ci->this_node.port)); + SHA1_Final(ci->this_node.id, &ctx); + + ci->synchronized = 0; + INIT_LIST_HEAD(&ci->node_list); + INIT_LIST_HEAD(&ci->vm_list); + INIT_LIST_HEAD(&ci->pending_list); + cpg_context_set(cpg_handle, ci); + + cpg_fd_get(cpg_handle, &fd); + register_event(fd, group_handler, ci); + return ci; +} diff --git a/collie/vdi.c b/collie/vdi.c new file mode 100644 index 0000000..184a22e --- /dev/null +++ b/collie/vdi.c @@ -0,0 +1,259 @@ +/* + * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <stdio.h> +#include <stdlib.h> +#include <sys/time.h> +#include <openssl/sha.h> + +#include "sheepdog_proto.h" +#include "meta.h" +#include "collie.h" + +static int sheepdog_match(struct sheepdog_dir_entry *ent, char *name, int len) +{ + if (!ent->name_len) + return 0; + if (ent->name_len != len) + return 0; + return !memcmp(ent->name, name, len); +} + +/* TODO: should be performed atomically */ +static int create_inode_obj(struct sheepdog_node_list_entry *entries, + int nr_nodes, uint64_t epoch, int copies, + uint64_t oid, uint64_t size, uint64_t base_oid) +{ + struct sheepdog_inode inode, base; + struct timeval tv; + int ret; + + if (base_oid) { + ret = read_object(entries, nr_nodes, epoch, + base_oid, (char *)&base, sizeof(base), 0, + copies); + if (ret < 0) + return SD_RES_BASE_VDI_READ; + } + + gettimeofday(&tv, NULL); + + memset(&inode, 0, sizeof(inode)); + + inode.oid = oid; + inode.vdi_size = size; + inode.block_size = SD_DATA_OBJ_SIZE; + inode.ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000; + inode.nr_copies = copies; + + if (base_oid) { + int i; + + eprintf("%zd %zd\n", sizeof(inode.data_oid), + ARRAY_SIZE(base.child_oid)); + inode.parent_oid = base_oid; + memcpy(inode.data_oid, base.data_oid, + MAX_DATA_OBJS * sizeof(uint64_t)); + + for (i = 0; i < ARRAY_SIZE(base.child_oid); i++) { + if (!base.child_oid[i]) { + base.child_oid[i] = oid; + break; + } + } + + if (i == ARRAY_SIZE(base.child_oid)) + return SD_RES_NO_BASE_VDI; + + ret = write_object(entries, nr_nodes, + epoch, base_oid, (char *)&base, + sizeof(base), 0, copies, 0); + if (ret < 0) + return SD_RES_BASE_VDI_WRITE; + } + + ret = write_object(entries, nr_nodes, epoch, + oid, (char *)&inode, sizeof(inode), 0, copies, 1); + if (ret < 0) + return SD_RES_VDI_WRITE; + + return ret; +} + +#define DIR_BUF_LEN (UINT64_C(1) << 20) + +/* + * TODO: handle larger buffer + */ +int add_vdi(struct cluster_info *cluster, char *name, int len, uint64_t size, + uint64_t *added_oid, uint64_t base_oid, uint32_t tag) +{ + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; + int nr_nodes; + struct sheepdog_dir_entry *prv, *ent; + uint64_t oid = 0; + char *buf; + int ret, rest; + struct sheepdog_super_block *sb; + int copies; + + nr_nodes = build_node_list(&cluster->node_list, entries); + + eprintf("%s (%d) %" PRIu64 ", base: %" PRIu64 "\n", name, len, size, + base_oid); + + buf = zalloc(DIR_BUF_LEN); + if (!buf) + return 1; + + ret = read_object(entries, nr_nodes, cluster->epoch, + SD_DIR_OID, buf, DIR_BUF_LEN, 0, nr_nodes); + if (ret < 0) { + ret = SD_RES_DIR_READ; + goto out; + } + + sb = (struct sheepdog_super_block *)buf; + copies = sb->default_nr_copies; + + ret = read_object(entries, nr_nodes, cluster->epoch, + SD_DIR_OID, buf, DIR_BUF_LEN, sizeof(*sb), nr_nodes); + if (ret < 0) { + ret = SD_RES_DIR_READ; + goto out; + } + + ent = (struct sheepdog_dir_entry *)buf; + rest = ret; + while (rest > 0) { + if (!ent->name_len) + break; + + if (sheepdog_match(ent, name, len) && !tag) { + ret = SD_RES_VDI_EXIST; + goto out; + } + oid = ent->oid; + prv = ent; + ent = next_entry(prv); + rest -= ((char *)ent - (char *)prv); + } + + /* need to check if the buffer is large enough here. */ + oid += (1 << 18); + + ret = create_inode_obj(entries, nr_nodes, cluster->epoch, copies, + oid, size, base_oid); + if (ret) + goto out; + + ent->oid = oid; + ent->tag = tag; + + ent->flags = FLAG_CURRENT; + ent->name_len = len; + memcpy(ent->name, name, len); + + if (tag) { + struct sheepdog_dir_entry *e = (struct sheepdog_dir_entry *)buf; + + while (e < ent) { + if (sheepdog_match(e, name, len)) + e->flags &= ~FLAG_CURRENT; + e = next_entry(e); + } + } + + ent = next_entry(ent); + + ret = write_object(entries, nr_nodes, cluster->epoch, + SD_DIR_OID, buf, (char *)ent - buf, sizeof(*sb), + copies, 0); + if (ret) { + ret = SD_RES_DIR_WRITE; + goto out; + } + + *added_oid = oid; +out: + free(buf); + + return ret; +} + +int del_vdi(struct cluster_info *cluster, char *name, int len) +{ + return 0; +} + +int lookup_vdi(struct cluster_info *cluster, + char *filename, uint64_t * oid, uint32_t tag, int do_lock, + int *current) +{ + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; + int nr_nodes; + int rest, ret; + char *buf; + struct sheepdog_dir_entry *prv, *ent; + + nr_nodes = build_node_list(&cluster->node_list, entries); + + *current = 0; + buf = zalloc(DIR_BUF_LEN); + if (!buf) + return 1; + + ret = read_object(entries, nr_nodes, cluster->epoch, + SD_DIR_OID, buf, DIR_BUF_LEN, + sizeof(struct sheepdog_super_block), nr_nodes); + if (ret < 0) { + ret = SD_RES_DIR_READ; + goto out; + } + + eprintf("looking for %s %zd, %d\n", filename, strlen(filename), ret); + + ent = (struct sheepdog_dir_entry *)buf; + rest = ret; + ret = SD_RES_NO_VDI; + while (rest > 0) { + if (!ent->name_len) + break; + + eprintf("%s %d %" PRIu64 "\n", ent->name, ent->name_len, + ent->oid); + + if (sheepdog_match(ent, filename, strlen(filename))) { + if (ent->tag != tag && tag != -1) { + ret = SD_RES_NO_TAG; + goto next; + } + if (ent->tag != tag && !(ent->flags & FLAG_CURRENT)) { + /* current vdi must exsit */ + ret = SD_RES_SYSTEM_ERROR; + goto next; + } + + *oid = ent->oid; + ret = 0; + + if (ent->flags & FLAG_CURRENT) + *current = 1; + break; + } +next: + prv = ent; + ent = next_entry(prv); + rest -= ((char *)ent - (char *)prv); + } +out: + free(buf); + return ret; +} -- 1.5.6.5 |