A master node gathers epoch information from all the nodes, and create a tree about these information. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- collie/Makefile | 3 +- collie/collie.h | 3 + collie/group.c | 149 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 1 deletions(-) diff --git a/collie/Makefile b/collie/Makefile index 2bfc9c6..9314b27 100644 --- a/collie/Makefile +++ b/collie/Makefile @@ -5,7 +5,8 @@ CFLAGS += -D_GNU_SOURCE -DSD_VERSION=\"$(VERSION)\" LIBS += -lpthread -lcpg PROGRAMS = collie -COLLIE_OBJS = collie.o net.o vdi.o group.o store.o work.o ../lib/event.o ../lib/net.o ../lib/logger.o +COLLIE_OBJS = collie.o net.o vdi.o group.o store.o work.o ../lib/event.o \ + ../lib/net.o ../lib/logger.o ../lib/tree.o COLLIE_DEP = $(COLLIE_OBJS:.o=.d) .PHONY:all diff --git a/collie/collie.h b/collie/collie.h index 968925e..ce8e6e4 100644 --- a/collie/collie.h +++ b/collie/collie.h @@ -19,6 +19,7 @@ #include "logger.h" #include "work.h" #include "net.h" +#include "tree.h" #define SD_MSG_JOIN 0x01 #define SD_MSG_VDI_OP 0x02 @@ -70,6 +71,8 @@ struct cluster_info { struct list_head vm_list; struct list_head pending_list; + struct tree_vertex epoch_tree_root; + int nr_sobjs; }; diff --git a/collie/group.c b/collie/group.c index 0de23e6..02cb2d5 100644 --- a/collie/group.c +++ b/collie/group.c @@ -43,6 +43,15 @@ struct message_header { struct sheepdog_node_list_entry from; }; +struct epoch_tree { + uint64_t ctime; + uint32_t epoch; + uint16_t nr_nodes; + uint16_t updated; + struct tree_vertex vertex; + struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; +}; + struct join_message { struct message_header header; uint32_t nodeid; @@ -286,9 +295,45 @@ static int is_master(void) return 0; } +static int add_epoch_log(int epoch, uint64_t parent_hval, uint64_t hval, + int nr_nodes, struct sheepdog_node_list_entry *nodes, + int is_updated, uint64_t ctime) +{ + struct tree_vertex *v; + struct epoch_tree *tree; + + if (parent_hval) { + v = tree_find(parent_hval, &sys->epoch_tree_root); + tree = container_of(v, struct epoch_tree, vertex); + } + v = tree_find(hval, &sys->epoch_tree_root); + + if (v) { + tree = container_of(v, struct epoch_tree, vertex); + if (is_updated) + tree->updated = is_updated; + return 0; + } + + tree = zalloc(sizeof(*tree)); + if (!tree) { + eprintf("out of memory\n"); + return 1; + } + tree->epoch = epoch; + tree->ctime = ctime; + tree->updated = is_updated; + tree->nr_nodes = nr_nodes; + memcpy(tree->nodes, nodes, sizeof(nodes[0]) * nr_nodes); + INIT_TREE_VERTEX(&tree->vertex, hval, parent_hval); + tree_add(&tree->vertex, &sys->epoch_tree_root); + return 0; +} + static void join(struct join_message *msg) { struct node *node; + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; if (!sys->synchronized) return; @@ -299,8 +344,109 @@ static void join(struct join_message *msg) if (msg->nr_sobjs) sys->nr_sobjs = msg->nr_sobjs; + { + struct sd_epoch_req hdr; + struct sd_epoch_rsp *rsp = (struct sd_epoch_rsp *)&hdr; + uint32_t epoch_list[SD_MAX_NODES]; + unsigned int rlen, wlen; + int ret, i, n, local; + int fd = 0; + uint64_t pre_hval; + char name[128]; + + local = (msg->header.from.id == sys->this_node.id); + if (local) { + n = ARRAY_SIZE(epoch_list); + ret = get_epoch_list(epoch_list, &n); + } else { + addr_to_str(name, sizeof(name), msg->header.from.addr, 0); + + fd = connect_to(name, msg->header.from.port); + if (fd < 0) + goto out; + + memset(&hdr, 0, sizeof(hdr)); + + hdr.opcode = SD_OP_GET_EPOCH_LIST; + hdr.epoch = sys->epoch; + hdr.data_length = sizeof(epoch_list); + + rlen = hdr.data_length; + wlen = 0; + ret = exec_req(fd, (struct sd_req *)&hdr, epoch_list, + &wlen, &rlen); + n = rlen / sizeof(uint32_t); + /* TOOD: error handling */ + if (ret != 0) { + eprintf("failed to send request %s\n", name); + goto out; + } + ret = rsp->result; + } + + if (ret != SD_RES_SUCCESS) { + eprintf("failed to get epoch list, %x\n", ret); + goto out; + } + + pre_hval = 0; + dprintf("nr_epoch = %d\n", n); + + for (i = 0; i < n; i++) { + int size; + uint32_t is_updated; + uint64_t ctime; + uint64_t hval; + + if (local) { + size = ARRAY_SIZE(entries); + ret = read_epoch(epoch_list[i], &is_updated, &ctime, + &hval, entries, &size); + } else { + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = SD_OP_READ_EPOCH; + hdr.epoch = sys->epoch; + hdr.data_length = sizeof(entries); + hdr.request_epoch = epoch_list[i]; + rlen = hdr.data_length; + + ret = exec_req(fd, (struct sd_req *)&hdr, entries, + &wlen, &rlen); + + + size = rlen / sizeof(*entries); + if (ret != 0) { + eprintf("failed to send request, %x\n", ret); + break; + } + is_updated = rsp->is_updated; + hval = rsp->hval; + ctime = rsp->ctime; + ret = rsp->result; + } + + if (ret != SD_RES_SUCCESS) { + eprintf("failed to read epoch, %x\n", ret); + break; + } + if (size <= 0) { + pre_hval = 0; + continue; + } + + dprintf("add_epoch_log, %d, %lu -> %lu, %d\n", + epoch_list[i], pre_hval, hval, is_updated); + add_epoch_log(epoch_list[i], pre_hval, hval, size, + entries, is_updated, ctime); + + pre_hval = hval; + } + close(fd); + } + msg->epoch = sys->epoch; msg->nr_sobjs = sys->nr_sobjs; + msg->nr_nodes = 0; list_for_each_entry(node, &sys->cpg_node_list, list) { if (node->nodeid == msg->nodeid && node->pid == msg->pid) continue; @@ -312,6 +458,8 @@ static void join(struct join_message *msg) msg->nodes[msg->nr_nodes].ent = node->ent; msg->nr_nodes++; } +out: + return; } static void update_cluster_info(struct join_message *msg) @@ -865,6 +1013,7 @@ join_retry: INIT_LIST_HEAD(&sys->cpg_node_list); INIT_LIST_HEAD(&sys->vm_list); INIT_LIST_HEAD(&sys->pending_list); + INIT_TREE_VERTEX(&sys->epoch_tree_root, 0, 0); cpg_context_set(cpg_handle, sys); cpg_fd_get(cpg_handle, &fd); -- 1.5.6.5 |