This is against the next branch and depends on the fix that I've just sent: http://lists.wpkg.org/pipermail/sheepdog/2010-February/000242.html = From: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp> Subject: [PATCH] make the cluster_info global We will ever one cluster_info and it should be global. Passing it around just makes the code complicated needlessly. Let's make the cluster_info global. Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp> --- collie/collie.c | 9 +- collie/collie.h | 16 ++-- collie/group.c | 251 ++++++++++++++++++++++++------------------------------- collie/net.c | 4 +- collie/store.c | 107 +++++++++++------------- collie/vdi.c | 31 ++++---- 6 files changed, 186 insertions(+), 232 deletions(-) diff --git a/collie/collie.c b/collie/collie.c index da2a68a..6f87035 100644 --- a/collie/collie.c +++ b/collie/collie.c @@ -48,6 +48,8 @@ Sheepdog Daemon, version %s\n\ exit(status); } +struct cluster_info __sys, *sys = &__sys; + int main(int argc, char **argv) { int ch, longindex; @@ -55,7 +57,6 @@ int main(int argc, char **argv) char *dir = DEFAULT_OBJECT_DIR; int is_daemon = 1; int is_debug = 0; - struct cluster_info *ci; while ((ch = getopt_long(argc, argv, short_options, long_options, &longindex)) >= 0) { @@ -100,13 +101,13 @@ int main(int argc, char **argv) if (!dobj_queue) exit(1); - ci = create_cluster(port); - if (!ci) { + ret = create_cluster(port); + if (ret) { eprintf("failed to create sheepdog cluster.\n"); exit(1); } - ret = create_listen_port(port, ci); + ret = create_listen_port(port, sys); if (ret) exit(1); diff --git a/collie/collie.h b/collie/collie.h index 183f771..098d4c9 100644 --- a/collie/collie.h +++ b/collie/collie.h @@ -33,8 +33,6 @@ struct client_info { struct list_head reqs; struct list_head done_reqs; - - struct cluster_info *cluster; }; struct request; @@ -73,23 +71,25 @@ struct cluster_info { int nr_sobjs; }; +struct cluster_info *sys; + int create_listen_port(int port, void *data); int init_store(char *dir); -int add_vdi(struct cluster_info *ci, char *buf, int len, uint64_t size, +int add_vdi(char *buf, int len, uint64_t size, uint64_t *added_oid, uint64_t base_oid, uint32_t tag, int copies, uint16_t flags); -int lookup_vdi(struct cluster_info *cluster, char *filename, uint64_t * oid, +int lookup_vdi(char *filename, uint64_t * oid, uint32_t tag, int do_lock, int *current); -int make_super_object(struct cluster_info *ci, struct sd_vdi_req *hdr); +int make_super_object(struct sd_vdi_req *hdr); int build_node_list(struct list_head *node_list, struct sheepdog_node_list_entry *entries); -struct cluster_info *create_cluster(int port); +int create_cluster(int port); void so_queue_request(struct work *work, int idx); @@ -97,7 +97,7 @@ void store_queue_request(struct work *work, int idx); void cluster_queue_request(struct work *work, int idx); -int update_epoch_store(struct cluster_info *cluster, uint32_t epoch); +int update_epoch_store(uint32_t epoch); extern int nr_sobjs; @@ -107,6 +107,6 @@ extern struct work_queue *dobj_queue; int epoch_log_write(uint32_t epoch, char *buf, int len); int epoch_log_read(uint32_t epoch, char *buf, int len); -int start_recovery(struct cluster_info *ci, uint32_t epoch, int add); +int start_recovery(uint32_t epoch, int add); #endif diff --git a/collie/group.c b/collie/group.c index 9cbc7ce..9775648 100644 --- a/collie/group.c +++ b/collie/group.c @@ -69,7 +69,6 @@ struct vdi_op_message { struct work_deliver { struct message_header *msg; - struct cluster_info *ci; struct work work; }; @@ -81,7 +80,6 @@ struct work_confch { struct cpg_address *joined_list; size_t joined_list_entries; - struct cluster_info *ci; struct work work; }; @@ -131,33 +129,32 @@ static int get_node_idx(struct sheepdog_node_list_entry *ent, return ent - entries; } -static void get_node_list(struct cluster_info *cluster, struct sd_node_req *req, +static void get_node_list(struct sd_node_req *req, struct sd_node_rsp *rsp, void *data) { int nr_nodes; struct node *node; - nr_nodes = build_node_list(&cluster->node_list, data); + nr_nodes = build_node_list(&sys->node_list, data); rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry); rsp->nr_nodes = nr_nodes; - rsp->local_idx = get_node_idx(&cluster->this_node, data, nr_nodes); + rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes); - if (list_empty(&cluster->node_list)) { + if (list_empty(&sys->node_list)) { rsp->master_idx = -1; return; } - node = list_first_entry(&cluster->node_list, struct node, list); + node = list_first_entry(&sys->node_list, struct node, list); rsp->master_idx = get_node_idx(&node->ent, data, nr_nodes); } -static void get_vm_list(struct cluster_info *cluster, struct sd_rsp *rsp, - void *data) +static void get_vm_list(struct sd_rsp *rsp, void *data) { int nr_vms; struct vm *vm; struct sheepdog_vm_list_entry *p = data; - list_for_each_entry(vm, &cluster->vm_list, list) { + list_for_each_entry(vm, &sys->vm_list, list) { *p++ = vm->ent; } @@ -170,7 +167,6 @@ void cluster_queue_request(struct work *work, int idx) struct request *req = container_of(work, struct request, work); struct sd_req *hdr = (struct sd_req *)&req->rq; struct sd_rsp *rsp = (struct sd_rsp *)&req->rp; - struct cluster_info *cluster = req->ci->cluster; struct vdi_op_message *msg; int ret = SD_RES_SUCCESS; @@ -178,11 +174,11 @@ void cluster_queue_request(struct work *work, int idx) switch (hdr->opcode) { case SD_OP_GET_NODE_LIST: - get_node_list(cluster, (struct sd_node_req *)hdr, + get_node_list((struct sd_node_req *)hdr, (struct sd_node_rsp *)rsp, req->data); break; case SD_OP_GET_VM_LIST: - get_vm_list(cluster, rsp, req->data); + get_vm_list(rsp, req->data); break; default: /* forward request to group */ @@ -202,15 +198,15 @@ forward: msg->header.op = SD_MSG_VDI_OP; msg->header.done = 0; msg->header.msg_length = sizeof(*msg) + hdr->data_length; - msg->header.from = cluster->this_node; + msg->header.from = sys->this_node; msg->req = *((struct sd_vdi_req *)&req->rq); msg->rsp = *((struct sd_vdi_rsp *)&req->rp); if (hdr->flags & SD_FLAG_CMD_WRITE) memcpy(msg->data, req->data, hdr->data_length); - list_add(&req->pending_list, &cluster->pending_list); + list_add(&req->pending_list, &sys->pending_list); - send_message(cluster->handle, (struct message_header *)msg); + send_message(sys->handle, (struct message_header *)msg); free(msg); } @@ -229,23 +225,22 @@ static struct vm *lookup_vm(struct list_head *entries, char *name) static void group_handler(int listen_fd, int events, void *data) { - struct cluster_info *ci = data; - cpg_dispatch(ci->handle, CPG_DISPATCH_ALL); + cpg_dispatch(sys->handle, CPG_DISPATCH_ALL); } -static void print_node_list(struct cluster_info *ci) +static void print_node_list(void) { struct node *node; - list_for_each_entry(node, &ci->node_list, list) { + list_for_each_entry(node, &sys->node_list, list) { dprintf("%c nodeid: %x, pid: %d, ip: %d.%d.%d.%d:%d\n", - node_cmp(&node->ent, &ci->this_node) ? ' ' : 'l', + node_cmp(&node->ent, &sys->this_node) ? ' ' : 'l', node->nodeid, node->pid, node->ent.addr[12], node->ent.addr[13], node->ent.addr[14], node->ent.addr[15], node->ent.port); } } -static void add_node(struct cluster_info *ci, uint32_t nodeid, uint32_t pid, +static void add_node(uint32_t nodeid, uint32_t pid, struct sheepdog_node_list_entry *sd_ent) { struct node *node; @@ -258,42 +253,42 @@ static void add_node(struct cluster_info *ci, uint32_t nodeid, uint32_t pid, node->nodeid = nodeid; node->pid = pid; node->ent = *sd_ent; - list_add_tail(&node->list, &ci->node_list); + list_add_tail(&node->list, &sys->node_list); } -static int is_master(struct cluster_info *ci) +static int is_master(void) { struct node *node; - if (!ci->synchronized) + if (!sys->synchronized) return 0; - if (list_empty(&ci->node_list)) + if (list_empty(&sys->node_list)) return 1; - node = list_first_entry(&ci->node_list, struct node, list); - if (node_cmp(&node->ent, &ci->this_node) == 0) + node = list_first_entry(&sys->node_list, struct node, list); + if (node_cmp(&node->ent, &sys->this_node) == 0) return 1; return 0; } -static void join(struct cluster_info *ci, struct join_message *msg) +static void join(struct join_message *msg) { struct node *node; - if (!ci->synchronized) + if (!sys->synchronized) return; - if (!is_master(ci)) + if (!is_master()) return; if (msg->nr_sobjs) - ci->nr_sobjs = msg->nr_sobjs; + sys->nr_sobjs = msg->nr_sobjs; - msg->epoch = ci->epoch; - msg->nr_sobjs = ci->nr_sobjs; - list_for_each_entry(node, &ci->node_list, list) { + msg->epoch = sys->epoch; + msg->nr_sobjs = sys->nr_sobjs; + list_for_each_entry(node, &sys->node_list, list) { msg->nodes[msg->nr_nodes].nodeid = node->nodeid; msg->nodes[msg->nr_nodes].pid = node->pid; msg->nodes[msg->nr_nodes].ent = node->ent; @@ -301,60 +296,59 @@ static void join(struct cluster_info *ci, struct join_message *msg) } } -static void update_cluster_info(struct cluster_info *ci, - struct join_message *msg) +static void update_cluster_info(struct join_message *msg) { int i; int ret, nr_nodes = msg->nr_nodes; struct node *node, *e; struct sheepdog_node_list_entry entry[SD_MAX_NODES]; - if (!ci->nr_sobjs) - ci->nr_sobjs = msg->nr_sobjs; + if (!sys->nr_sobjs) + sys->nr_sobjs = msg->nr_sobjs; - if (ci->synchronized) + if (sys->synchronized) goto out; - list_for_each_entry_safe(node, e, &ci->node_list, list) { + list_for_each_entry_safe(node, e, &sys->node_list, list) { list_del(&node->list); free(node); } - INIT_LIST_HEAD(&ci->node_list); + INIT_LIST_HEAD(&sys->node_list); for (i = 0; i < nr_nodes; i++) - add_node(ci, msg->nodes[i].nodeid, msg->nodes[i].pid, + add_node(msg->nodes[i].nodeid, msg->nodes[i].pid, &msg->nodes[i].ent); - ci->epoch = msg->epoch; - ci->synchronized = 1; + sys->epoch = msg->epoch; + sys->synchronized = 1; - nr_nodes = build_node_list(&ci->node_list, entry); + nr_nodes = build_node_list(&sys->node_list, entry); - ret = epoch_log_write(ci->epoch, (char *)entry, + ret = epoch_log_write(sys->epoch, (char *)entry, nr_nodes * sizeof(struct sheepdog_node_list_entry)); if (ret < 0) - eprintf("can't write epoch %u\n", ci->epoch); + eprintf("can't write epoch %u\n", sys->epoch); /* we are ready for object operations */ - update_epoch_store(ci, ci->epoch); + update_epoch_store(sys->epoch); out: - add_node(ci, msg->nodeid, msg->pid, &msg->header.from); + add_node(msg->nodeid, msg->pid, &msg->header.from); - nr_nodes = build_node_list(&ci->node_list, entry); + nr_nodes = build_node_list(&sys->node_list, entry); - ret = epoch_log_write(ci->epoch + 1, (char *)entry, + ret = epoch_log_write(sys->epoch + 1, (char *)entry, nr_nodes * sizeof(struct sheepdog_node_list_entry)); if (ret < 0) - eprintf("can't write epoch %u\n", ci->epoch + 1); + eprintf("can't write epoch %u\n", sys->epoch + 1); - ci->epoch++; + sys->epoch++; - update_epoch_store(ci, ci->epoch); + update_epoch_store(sys->epoch); - print_node_list(ci); + print_node_list(); } -static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg) +static void vdi_op(struct vdi_op_message *msg) { const struct sd_vdi_req *hdr = &msg->req; struct sd_vdi_rsp *rsp = &msg->rsp; @@ -364,12 +358,12 @@ static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg) switch (hdr->opcode) { case SD_OP_NEW_VDI: - ret = add_vdi(ci, data, hdr->data_length, hdr->vdi_size, &oid, + ret = add_vdi(data, hdr->data_length, hdr->vdi_size, &oid, hdr->base_oid, hdr->tag, hdr->copies, hdr->flags); break; case SD_OP_LOCK_VDI: case SD_OP_GET_VDI_INFO: - ret = lookup_vdi(ci, data, &oid, hdr->tag, 1, &is_current); + ret = lookup_vdi(data, &oid, hdr->tag, 1, &is_current); if (ret != SD_RES_SUCCESS) break; if (is_current) @@ -378,7 +372,7 @@ static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg) case SD_OP_RELEASE_VDI: break; case SD_OP_MAKE_FS: - ret = make_super_object(ci, &msg->req); + ret = make_super_object(&msg->req); break; default: ret = SD_RES_SYSTEM_ERROR; @@ -390,7 +384,7 @@ static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg) rsp->result = ret; } -static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg) +static void vdi_op_done(struct vdi_op_message *msg) { const struct sd_vdi_req *hdr = &msg->req; struct sd_vdi_rsp *rsp = &msg->rsp; @@ -403,7 +397,7 @@ static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg) case SD_OP_NEW_VDI: break; case SD_OP_LOCK_VDI: - if (lookup_vm(&ci->vm_list, (char *)data)) { + if (lookup_vm(&sys->vm_list, (char *)data)) { ret = SD_RES_VDI_LOCKED; break; } @@ -418,10 +412,10 @@ static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg) sizeof(vm->ent.host_addr)); vm->ent.host_port = msg->header.from.port; - list_add(&vm->list, &ci->vm_list); + list_add(&vm->list, &sys->vm_list); break; case SD_OP_RELEASE_VDI: - vm = lookup_vm(&ci->vm_list, (char *)data); + vm = lookup_vm(&sys->vm_list, (char *)data); if (!vm) { ret = SD_RES_VDI_NOT_LOCKED; break; @@ -434,8 +428,8 @@ static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg) break; case SD_OP_MAKE_FS: if (ret == SD_RES_SUCCESS) { - ci->nr_sobjs = ((struct sd_so_req *)hdr)->copies; - eprintf("%d\n", ci->nr_sobjs); + sys->nr_sobjs = ((struct sd_so_req *)hdr)->copies; + eprintf("%d\n", sys->nr_sobjs); } break; @@ -444,10 +438,10 @@ static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg) ret = SD_RES_UNKNOWN; } - if (node_cmp(&ci->this_node, &msg->header.from) != 0) + if (node_cmp(&sys->this_node, &msg->header.from) != 0) return; - req = list_first_entry(&ci->pending_list, struct request, pending_list); + req = list_first_entry(&sys->pending_list, struct request, pending_list); rsp->result = ret; memcpy(req->data, data, rsp->data_length); @@ -460,7 +454,6 @@ static void __sd_deliver(struct work *work, int idx) { struct work_deliver *w = container_of(work, struct work_deliver, work); struct message_header *m = w->msg; - struct cluster_info *ci = w->ci; dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n", m->op, m->done, m->msg_length, @@ -468,15 +461,15 @@ static void __sd_deliver(struct work *work, int idx) m->from.addr[14], m->from.addr[15], m->from.port); if (!m->done) { - if (!is_master(ci)) + if (!is_master()) return; switch (m->op) { case SD_MSG_JOIN: - join(ci, (struct join_message *)m); + join((struct join_message *)m); break; case SD_MSG_VDI_OP: - vdi_op(ci, (struct vdi_op_message *)m); + vdi_op((struct vdi_op_message *)m); break; default: eprintf("unknown message %d\n", m->op); @@ -484,14 +477,14 @@ static void __sd_deliver(struct work *work, int idx) } m->done = 1; - send_message(ci->handle, m); + send_message(sys->handle, m); } else { switch (m->op) { case SD_MSG_JOIN: - update_cluster_info(ci, (struct join_message *)m); + update_cluster_info((struct join_message *)m); break; case SD_MSG_VDI_OP: - vdi_op_done(ci, (struct vdi_op_message *)m); + vdi_op_done((struct vdi_op_message *)m); break; default: eprintf("unknown message %d\n", m->op); @@ -524,13 +517,11 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, { struct work_deliver *w; struct message_header *m = msg; - struct cluster_info *ci; dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n", m->op, m->done, m->msg_length, m->from.addr[12], m->from.addr[13], m->from.addr[14], m->from.addr[15], m->from.port); - cpg_context_get(handle, (void **)&ci); w = zalloc(sizeof(*w)); if (!w) @@ -541,8 +532,6 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, return; memcpy(w->msg, msg, msg_len); - w->ci = ci; - w->work.fn = __sd_deliver; w->work.done = __sd_deliver_done; @@ -555,7 +544,6 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, static void __sd_confch(struct work *work, int idx) { struct work_confch *w = container_of(work, struct work_confch, work); - struct cluster_info *ci = w->ci; struct node *node, *e; int i; @@ -567,12 +555,12 @@ static void __sd_confch(struct work *work, int idx) size_t joined_list_entries = w->joined_list_entries; if (member_list_entries == joined_list_entries - left_list_entries && - ci->this_nodeid == member_list[0].nodeid && - ci->this_pid == member_list[0].pid) - ci->synchronized = 1; + sys->this_nodeid == member_list[0].nodeid && + sys->this_pid == member_list[0].pid) + sys->synchronized = 1; for (i = 0; i < left_list_entries; i++) { - list_for_each_entry_safe(node, e, &ci->node_list, list) { + list_for_each_entry_safe(node, e, &sys->node_list, list) { int nr; unsigned pid; struct sheepdog_node_list_entry e[SD_MAX_NODES]; @@ -586,30 +574,33 @@ static void __sd_confch(struct work *work, int idx) list_del(&node->list); free(node); - nr = build_node_list(&ci->node_list, e); - epoch_log_write(ci->epoch + 1, (char *)e, + nr = build_node_list(&sys->node_list, e); + epoch_log_write(sys->epoch + 1, (char *)e, nr * sizeof(struct sheepdog_node_list_entry)); - ci->epoch++; + sys->epoch++; - update_epoch_store(ci, ci->epoch); + update_epoch_store(sys->epoch); } } for (i = 0; i < joined_list_entries; i++) { - if (ci->this_nodeid == joined_list[i].nodeid && - ci->this_pid == joined_list[i].pid) { + if (sys->this_nodeid == joined_list[i].nodeid && + sys->this_pid == joined_list[i].pid) { struct join_message msg; msg.header.op = SD_MSG_JOIN; msg.header.done = 0; msg.header.msg_length = sizeof(msg); - msg.header.from = ci->this_node; - msg.nodeid = ci->this_nodeid; - msg.pid = ci->this_pid; + msg.header.from = sys->this_node; + msg.nodeid = sys->this_nodeid; + msg.pid = sys->this_pid; msg.nr_sobjs = nr_sobjs; - send_message(ci->handle, (struct message_header *)&msg); + send_message(sys->handle, (struct message_header *)&msg); + + eprintf("%d\n", i); + break; } } @@ -617,19 +608,18 @@ static void __sd_confch(struct work *work, int idx) if (left_list_entries == 0) return; - print_node_list(ci); + print_node_list(); } static void __sd_confch_done(struct work *work, int idx) { struct work_confch *w = container_of(work, struct work_confch, work); - struct cluster_info *ci = w->ci; /* FIXME: worker threads can't call start_recovery */ if (w->left_list_entries) { if (w->left_list_entries > 1) eprintf("we can't handle %Zd\n", w->left_list_entries); - start_recovery(ci, ci->epoch, 0); + start_recovery(sys->epoch, 0); } free(w->member_list); @@ -647,7 +637,6 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name, size_t joined_list_entries) { struct work_confch *w = NULL; - struct cluster_info *ci; int i, size; dprintf("confchg nodeid %x\n", member_list[0].nodeid); @@ -659,8 +648,6 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name, member_list[i].reason); } - cpg_context_get(handle, (void **)&ci); - w = zalloc(sizeof(*w)); if (!w) return; @@ -686,8 +673,6 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name, memcpy(w->joined_list, joined_list, size); w->joined_list_entries = joined_list_entries; - w->ci = ci; - w->work.fn = __sd_confch; w->work.done = __sd_confch_done; w->work.attr = WORK_ORDERED; @@ -723,24 +708,10 @@ int build_node_list(struct list_head *node_list, return nr; } -static void update_node_list_idx(struct cluster_info *ci, - struct sheepdog_node_list_entry *e, int nr) -{ - int i; - - for (i = 0; i < nr; i++) { - if (e->id == ci->this_node.id) { - ci->node_list_idx = i; - break; - } - } -} - -struct cluster_info *create_cluster(int port) +int create_cluster(int port) { int fd, ret; cpg_handle_t cpg_handle; - struct cluster_info *ci; struct addrinfo hints, *res, *res0; char name[INET6_ADDRSTRLEN]; struct cpg_name group = { 8, "sheepdog" }; @@ -749,15 +720,11 @@ struct cluster_info *create_cluster(int port) uint64_t hval; int i; - ci = zalloc(sizeof(*ci)); - if (!ci) - return NULL; - ret = cpg_initialize(&cpg_handle, &cb); if (ret != CS_OK) { eprintf("Failed to initialize cpg, %d\n", ret); eprintf("Is corosync running?\n"); - return NULL; + return -1; } join_retry: @@ -784,11 +751,9 @@ join_retry: exit(1); } - ci->handle = cpg_handle; - ci->this_nodeid = nodeid; - ci->this_pid = getpid(); - - memset(&ci->this_node, 0, sizeof(ci->this_node)); + sys->handle = cpg_handle; + sys->this_nodeid = nodeid; + sys->this_pid = getpid(); gethostname(name, sizeof(name)); @@ -807,8 +772,8 @@ join_retry: if (((char *) &addr->sin_addr)[0] == 127) continue; - memset(ci->this_node.addr, 0, 12); - memcpy(ci->this_node.addr + 12, &addr->sin_addr, 4); + memset(sys->this_node.addr, 0, 12); + memcpy(sys->this_node.addr + 12, &addr->sin_addr, 4); break; } else if (res->ai_family == AF_INET6) { struct sockaddr_in6 *addr; @@ -820,7 +785,7 @@ join_retry: if (memcmp(&addr->sin6_addr, localhost, 16) == 0) continue; - memcpy(ci->this_node.addr, &addr->sin6_addr, 16); + memcpy(sys->this_node.addr, &addr->sin6_addr, 16); break; } else dprintf("unknown address family\n"); @@ -828,27 +793,27 @@ join_retry: if (res == NULL) { eprintf("failed to get address info\n"); - return NULL; + return -1; } freeaddrinfo(res0); - ci->this_node.port = port; + sys->this_node.port = port; - hval = fnv_64a_buf(&ci->this_node.port, sizeof(ci->this_node.port), + hval = fnv_64a_buf(&sys->this_node.port, sizeof(sys->this_node.port), FNV1A_64_INIT); - for (i = ARRAY_SIZE(ci->this_node.addr) - 1; i >= 0; i--) - hval = fnv_64a_buf(&ci->this_node.addr[i], 1, hval); + for (i = ARRAY_SIZE(sys->this_node.addr) - 1; i >= 0; i--) + hval = fnv_64a_buf(&sys->this_node.addr[i], 1, hval); - ci->this_node.id = hval; + sys->this_node.id = hval; - ci->synchronized = 0; - INIT_LIST_HEAD(&ci->node_list); - INIT_LIST_HEAD(&ci->vm_list); - INIT_LIST_HEAD(&ci->pending_list); - cpg_context_set(cpg_handle, ci); + sys->synchronized = 0; + INIT_LIST_HEAD(&sys->node_list); + INIT_LIST_HEAD(&sys->vm_list); + INIT_LIST_HEAD(&sys->pending_list); + cpg_context_set(cpg_handle, sys); cpg_fd_get(cpg_handle, &fd); - register_event(fd, group_handler, ci); - return ci; + register_event(fd, group_handler, NULL); + return 0; } diff --git a/collie/net.c b/collie/net.c index 2acb5f5..84eecdc 100644 --- a/collie/net.c +++ b/collie/net.c @@ -198,7 +198,7 @@ static void init_tx_hdr(struct client_info *ci) /* use cpu_to_le */ memcpy(rsp, &req->rp, sizeof(*rsp)); - rsp->epoch = ci->cluster->epoch; + rsp->epoch = sys->epoch; rsp->opcode = req->rq.opcode; rsp->id = req->rq.id; } @@ -278,8 +278,6 @@ static struct client_info *create_client(int fd, struct cluster_info *cluster) init_rx_hdr(ci); - ci->cluster = cluster; - return ci; } diff --git a/collie/store.c b/collie/store.c index 131b9e8..6776254 100644 --- a/collie/store.c +++ b/collie/store.c @@ -93,7 +93,7 @@ static int get_obj_list(struct request *req) snprintf(path, sizeof(path), "%s%08u/", obj_path, hdr->obj_ver); - dprintf("%d\n", req->ci->cluster->this_node.port); + dprintf("%d\n", sys->this_node.port); dir = opendir(path); if (!dir) { @@ -136,7 +136,7 @@ static int get_obj_list(struct request *req) return SD_RES_SUCCESS; } -static int read_from_one(struct cluster_info *cluster, uint64_t oid, +static int read_from_one(uint64_t oid, unsigned *ori_rlen, void *buf, uint64_t offset) { int i, n, nr, fd, ret; @@ -148,7 +148,7 @@ static int read_from_one(struct cluster_info *cluster, uint64_t oid, e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry)); again: - nr = build_node_list(&cluster->node_list, e); + nr = build_node_list(&sys->node_list, e); for (i = 0; i < nr; i++) { n = obj_to_sheep(e, nr, oid, i); @@ -158,7 +158,7 @@ again: e[n].addr[14], e[n].addr[15]); /* FIXME: do like store_queue_request_local() */ - if (e[n].id == cluster->this_node.id) + if (e[n].id == sys->this_node.id) continue; fd = connect_to(name, e[n].port); @@ -168,7 +168,7 @@ again: memset(&hdr, 0, sizeof(hdr)); hdr.opcode = SD_OP_READ_OBJ; hdr.oid = oid; - hdr.epoch = cluster->epoch; + hdr.epoch = sys->epoch; rlen = *ori_rlen; wlen = 0; @@ -203,24 +203,21 @@ again: return -1; } -static int read_from_other_sheeps(struct cluster_info *cluster, - uint64_t oid, char *buf, int copies) +static int read_from_other_sheeps(uint64_t oid, char *buf, int copies) { int ret; unsigned int rlen; rlen = SD_DATA_OBJ_SIZE; - ret = read_from_one(cluster, oid, &rlen, buf, 0); + ret = read_from_one(oid, &rlen, buf, 0); return ret; } -static int store_queue_request_local(struct cluster_info *cluster, - struct request *req, char *buf, uint32_t epoch); +static int store_queue_request_local(struct request *req, char *buf, uint32_t epoch); -static int forward_obj_req(struct cluster_info *cluster, struct request *req, - char *buf) +static int forward_obj_req(struct request *req, char *buf) { int i, n, nr, fd, ret; unsigned wlen, rlen; @@ -234,13 +231,13 @@ static int forward_obj_req(struct cluster_info *cluster, struct request *req, e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry)); again: - nr = build_node_list(&cluster->node_list, e); + nr = build_node_list(&sys->node_list, e); copies = hdr->copies; /* temporary hack */ if (!copies) - copies = cluster->nr_sobjs; + copies = sys->nr_sobjs; for (i = 0; i < copies; i++) { n = obj_to_sheep(e, nr, oid, i); @@ -250,8 +247,8 @@ again: e[n].addr[14], e[n].addr[15]); /* TODO: we can do better; we need to chech this first */ - if (e[n].id == cluster->this_node.id) { - ret = store_queue_request_local(cluster, req, buf, cluster->epoch); + if (e[n].id == sys->this_node.id) { + ret = store_queue_request_local(req, buf, sys->epoch); memcpy(rsp, &req->rp, sizeof(*rsp)); rsp->result = ret; goto done; @@ -272,7 +269,7 @@ again: } hdr2.flags |= SD_FLAG_CMD_FORWARD; - hdr2.epoch = cluster->epoch; + hdr2.epoch = sys->epoch; ret = exec_req(fd, (struct sd_req *)&hdr2, req->data, &wlen, &rlen); @@ -301,21 +298,21 @@ again: return (hdr->flags & SD_FLAG_CMD_WRITE) ? SD_RES_SUCCESS: rsp->result; } -static int check_epoch(struct cluster_info *cluster, struct request *req) +static int check_epoch(struct request *req) { struct sd_req *hdr = (struct sd_req *)&req->rq; uint32_t req_epoch = hdr->epoch; uint32_t opcode = hdr->opcode; int ret = SD_RES_SUCCESS; - if (before(req_epoch, cluster->epoch)) { + if (before(req_epoch, sys->epoch)) { ret = SD_RES_OLD_NODE_VER; eprintf("old node version %u %u, %x\n", - cluster->epoch, req_epoch, opcode); - } else if (after(req_epoch, cluster->epoch)) { + sys->epoch, req_epoch, opcode); + } else if (after(req_epoch, sys->epoch)) { ret = SD_RES_NEW_NODE_VER; eprintf("new node version %u %u %x\n", - cluster->epoch, req_epoch, opcode); + sys->epoch, req_epoch, opcode); } return ret; @@ -342,23 +339,23 @@ static int ob_open(uint32_t epoch, uint64_t oid, int aflags, int *ret) return fd; } -static int is_my_obj(struct cluster_info *ci, uint64_t oid, int copies) +static int is_my_obj(uint64_t oid, int copies) { int i, n, nr; struct sheepdog_node_list_entry e[SD_MAX_NODES]; - nr = build_node_list(&ci->node_list, e); + nr = build_node_list(&sys->node_list, e); for (i = 0; i < copies; i++) { n = obj_to_sheep(e, nr, oid, i); - if (e[n].id == ci->this_node.id) + if (e[n].id == sys->this_node.id) return 1; } return 0; } -int update_epoch_store(struct cluster_info *ci, uint32_t epoch) +int update_epoch_store(uint32_t epoch) { int ret; char new[1024], old[1024]; @@ -388,7 +385,7 @@ int update_epoch_store(struct cluster_info *ci, uint32_t epoch) oid = strtoull(d->d_name, NULL, 16); /* TODO: use proper object coipes */ - if (is_my_obj(ci, oid, ci->nr_sobjs)) { + if (is_my_obj(oid, sys->nr_sobjs)) { snprintf(new, sizeof(new), "%s%08u/%s", obj_path, epoch, d->d_name); snprintf(old, sizeof(old), "%s%08u/%s", obj_path, epoch - 1, @@ -402,8 +399,7 @@ int update_epoch_store(struct cluster_info *ci, uint32_t epoch) return 0; } -static int store_queue_request_local(struct cluster_info *cluster, - struct request *req, char *buf, uint32_t epoch) +static int store_queue_request_local(struct request *req, char *buf, uint32_t epoch) { int fd = -1, copies; int ret = SD_RES_SUCCESS; @@ -455,8 +451,7 @@ static int store_queue_request_local(struct cluster_info *cluster, if (hdr->flags & SD_FLAG_CMD_COW) { dprintf("%" PRIu64 "\n", hdr->cow_oid); - ret = read_from_other_sheeps(cluster, - hdr->cow_oid, buf, + ret = read_from_other_sheeps(hdr->cow_oid, buf, hdr->copies); if (ret) { ret = 1; @@ -535,27 +530,26 @@ out: void store_queue_request(struct work *work, int idx) { struct request *req = container_of(work, struct request, work); - struct cluster_info *cluster = req->ci->cluster; int ret = SD_RES_SUCCESS; char *buf = zero_block + idx * SD_DATA_OBJ_SIZE; struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp; uint64_t oid = hdr->oid; uint32_t opcode = hdr->opcode; - uint32_t epoch = cluster->epoch; + uint32_t epoch = sys->epoch; uint32_t req_epoch = hdr->epoch; struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp; dprintf("%d, %x, %" PRIx64" , %u, %u\n", idx, opcode, oid, epoch, req_epoch); - if (list_empty(&cluster->node_list)) { + if (list_empty(&sys->node_list)) { /* we haven't got SD_OP_GET_NODE_LIST response yet. */ ret = SD_RES_SYSTEM_ERROR; goto out; } if (hdr->flags & SD_FLAG_CMD_FORWARD) { - ret = check_epoch(cluster, req); + ret = check_epoch(req); if (ret != SD_RES_SUCCESS) goto out; } @@ -571,11 +565,11 @@ void store_queue_request(struct work *work, int idx) } if (!(hdr->flags & SD_FLAG_CMD_FORWARD)) { - ret = forward_obj_req(cluster, req, buf); + ret = forward_obj_req(req, buf); goto out; } - ret = store_queue_request_local(cluster, req, buf, epoch); + ret = store_queue_request_local(req, buf, epoch); out: if (ret != SD_RES_SUCCESS) { dprintf("failed, %d, %x, %" PRIx64" , %u, %u, %x\n", @@ -735,7 +729,6 @@ void so_queue_request(struct work *work, int idx) struct request *req = container_of(work, struct request, work); struct sd_so_req *hdr = (struct sd_so_req *)&req->rq; struct sd_so_rsp *rsp = (struct sd_so_rsp *)&req->rp; - struct cluster_info *cluster = req->ci->cluster; int nfd = -1, fd = -1, ret, result = SD_RES_SUCCESS; uint32_t opcode = hdr->opcode; uint64_t last_oid = 0; @@ -743,7 +736,7 @@ void so_queue_request(struct work *work, int idx) char oldname[1024]; uint16_t id = 0; - if (list_empty(&cluster->node_list)) { + if (list_empty(&sys->node_list)) { /* we haven't got SD_OP_GET_NODE_LIST response yet. */ result = SD_RES_SYSTEM_ERROR; goto out; @@ -759,12 +752,12 @@ void so_queue_request(struct work *work, int idx) int local = 0; e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry)); - nr = build_node_list(&cluster->node_list, e); + nr = build_node_list(&sys->node_list, e); - for (i = 0; i < cluster->nr_sobjs; i++) { + for (i = 0; i < sys->nr_sobjs; i++) { n = obj_to_sheep(e, nr, SD_DIR_OID, i); - if (e[n].id == cluster->this_node.id) { + if (e[n].id == sys->this_node.id) { local = 1; break; } @@ -792,7 +785,7 @@ void so_queue_request(struct work *work, int idx) memset(&hdr2, 0, sizeof(hdr2)); hdr2.opcode = SD_OP_SO_READ_VDIS; - hdr2.epoch = cluster->epoch; + hdr2.epoch = sys->epoch; hdr2.data_length = hdr->data_length; wlen = 0; @@ -817,7 +810,7 @@ void so_queue_request(struct work *work, int idx) } if (opcode != SD_OP_SO_READ_VDIS) { - result = check_epoch(cluster, req); + result = check_epoch(req); if (result != SD_RES_SUCCESS) goto out; } @@ -1067,7 +1060,6 @@ struct recovery_work { struct work work; struct list_head rw_siblings; - struct cluster_info *ci; int count; char *buf; @@ -1103,7 +1095,7 @@ static void recover_one(struct work *work, int idx) memset(&hdr, 0, sizeof(hdr)); hdr.opcode = SD_OP_READ_OBJ; hdr.oid = oid; - hdr.epoch = rw->ci->epoch; + hdr.epoch = sys->epoch; hdr.flags = 0; hdr.data_length = rlen; @@ -1136,7 +1128,7 @@ static void recover_one_done(struct work *work, int idx) } if (rw->iteration) { - if (++rw->iteration <= rw->ci->nr_sobjs) { + if (++rw->iteration <= sys->nr_sobjs) { free(rw->buf); rw->done = 0; @@ -1195,7 +1187,7 @@ static int fill_obj_list(struct recovery_work *rw, memset(&hdr, 0, sizeof(hdr)); hdr.opcode = SD_OP_GET_OBJ_LIST; - hdr.epoch = rw->ci->epoch; + hdr.epoch = sys->epoch; hdr.oid = start_hash; hdr.cow_oid = end_hash; hdr.obj_ver = epoch - 1; @@ -1251,12 +1243,12 @@ static void __start_recovery(struct work *work, int idx) goto fail; old_nr /= sizeof(struct sheepdog_node_list_entry); - if (!rw->ci->nr_sobjs || cur_nr < rw->ci->nr_sobjs || old_nr < rw->ci->nr_sobjs) + if (!sys->nr_sobjs || cur_nr < sys->nr_sobjs || old_nr < sys->nr_sobjs) goto fail; if (cur_nr < old_nr) { for (i = 0; i < old_nr; i++) { - if (old_entry[i].id == rw->ci->this_node.id) { + if (old_entry[i].id == sys->this_node.id) { my_idx = i; break; } @@ -1277,18 +1269,18 @@ static void __start_recovery(struct work *work, int idx) dprintf("%u %u %u\n", my_idx, ch_idx, node_distance(my_idx, ch_idx, old_nr)); - if (node_distance(my_idx, ch_idx, old_nr) > rw->ci->nr_sobjs) + if (node_distance(my_idx, ch_idx, old_nr) > sys->nr_sobjs) return; - n = node_from_distance(my_idx, rw->ci->nr_sobjs, old_nr); + n = node_from_distance(my_idx, sys->nr_sobjs, old_nr); - dprintf("%d %d\n", n, rw->ci->nr_sobjs); + dprintf("%d %d\n", n, sys->nr_sobjs); start_hash = old_entry[(n - 1 + old_nr) % old_nr].id; end_hash = old_entry[n].id; /* FIXME */ - if (node_distance(my_idx, ch_idx, old_nr) == rw->ci->nr_sobjs) { + if (node_distance(my_idx, ch_idx, old_nr) == sys->nr_sobjs) { n++; n %= old_nr; } @@ -1296,7 +1288,7 @@ static void __start_recovery(struct work *work, int idx) fill_obj_list(rw, old_entry + n, start_hash, end_hash); } else { for (i = 0; i < cur_nr; i++) { - if (cur_entry[i].id == rw->ci->this_node.id) { + if (cur_entry[i].id == sys->this_node.id) { my_idx = i; break; } @@ -1338,7 +1330,7 @@ static void __start_recovery_done(struct work *work, int idx) if (!rw->count) { if (rw->iteration) { - if (++rw->iteration <= rw->ci->nr_sobjs) { + if (++rw->iteration <= sys->nr_sobjs) { free(rw->buf); rw->work.fn = __start_recovery; @@ -1364,7 +1356,7 @@ static void __start_recovery_done(struct work *work, int idx) queue_work(dobj_queue, &rw->work); } -int start_recovery(struct cluster_info *ci, uint32_t epoch, int add) +int start_recovery(uint32_t epoch, int add) { struct recovery_work *rw; @@ -1377,7 +1369,6 @@ int start_recovery(struct cluster_info *ci, uint32_t epoch, int add) return -1; rw->epoch = epoch; - rw->ci = ci; rw->count = 0; if (add) diff --git a/collie/vdi.c b/collie/vdi.c index 503b29b..f30a14a 100644 --- a/collie/vdi.c +++ b/collie/vdi.c @@ -80,7 +80,7 @@ static int create_inode_obj(struct sheepdog_node_list_entry *entries, /* * TODO: handle larger buffer */ -int add_vdi(struct cluster_info *ci, char *name, int len, uint64_t size, +int add_vdi(char *name, int len, uint64_t size, uint64_t *added_oid, uint64_t base_oid, uint32_t tag, int copies, uint16_t flags) { @@ -93,28 +93,28 @@ int add_vdi(struct cluster_info *ci, char *name, int len, uint64_t size, memset(&req, 0, sizeof(req)); - nr_nodes = build_node_list(&ci->node_list, entries); + nr_nodes = build_node_list(&sys->node_list, entries); dprintf("%s (%d) %" PRIu64 ", base: %" PRIu64 "\n", name, len, size, base_oid); - nr_reqs = ci->nr_sobjs; + nr_reqs = sys->nr_sobjs; if (nr_reqs > nr_nodes) nr_reqs = nr_nodes; memset(&req, 0, sizeof(req)); - eprintf("%d %d\n", copies, ci->nr_sobjs); + eprintf("%d %d\n", copies, sys->nr_sobjs); /* qemu doesn't specify the copies, then we use the default. */ if (!copies) - copies = ci->nr_sobjs; + copies = sys->nr_sobjs; req.opcode = SD_OP_SO_NEW_VDI; req.copies = copies; req.tag = tag; req.flags |= flags; - ret = exec_reqs(entries, nr_nodes, ci->epoch, + ret = exec_reqs(entries, nr_nodes, sys->epoch, SD_DIR_OID, (struct sd_req *)&req, name, len, 0, nr_reqs, nr_reqs); @@ -127,19 +127,18 @@ int add_vdi(struct cluster_info *ci, char *name, int len, uint64_t size, dprintf("%s (%d) %" PRIu64 ", base: %" PRIu64 "\n", name, len, size, oid); - ret = create_inode_obj(entries, nr_nodes, ci->epoch, copies, + ret = create_inode_obj(entries, nr_nodes, sys->epoch, copies, oid, size, base_oid); return ret; } -int del_vdi(struct cluster_info *cluster, char *name, int len) +int del_vdi(char *name, int len) { return 0; } -int lookup_vdi(struct cluster_info *ci, - char *filename, uint64_t * oid, uint32_t tag, int do_lock, +int lookup_vdi(char *filename, uint64_t * oid, uint32_t tag, int do_lock, int *current) { struct sheepdog_node_list_entry entries[SD_MAX_NODES]; @@ -150,13 +149,13 @@ int lookup_vdi(struct cluster_info *ci, memset(&req, 0, sizeof(req)); - nr_nodes = build_node_list(&ci->node_list, entries); + nr_nodes = build_node_list(&sys->node_list, entries); *current = 0; dprintf("looking for %s %zd\n", filename, strlen(filename)); - nr_reqs = ci->nr_sobjs; + nr_reqs = sys->nr_sobjs; if (nr_reqs > nr_nodes) nr_reqs = nr_nodes; @@ -165,7 +164,7 @@ int lookup_vdi(struct cluster_info *ci, req.opcode = SD_OP_SO_LOOKUP_VDI; req.tag = tag; - ret = exec_reqs(entries, nr_nodes, ci->epoch, + ret = exec_reqs(entries, nr_nodes, sys->epoch, SD_DIR_OID, (struct sd_req *)&req, filename, strlen(filename), 0, nr_reqs, 1); @@ -182,7 +181,7 @@ int lookup_vdi(struct cluster_info *ci, } /* todo: cleanup with the above */ -int make_super_object(struct cluster_info *ci, struct sd_vdi_req *hdr) +int make_super_object(struct sd_vdi_req *hdr) { struct timeval tv; int nr_nodes, ret; @@ -197,9 +196,9 @@ int make_super_object(struct cluster_info *ci, struct sd_vdi_req *hdr) req.ctime = (uint64_t)tv.tv_sec << 32 | tv.tv_usec * 1000; req.copies = ((struct sd_obj_req *)hdr)->copies; - nr_nodes = build_node_list(&ci->node_list, entries); + nr_nodes = build_node_list(&sys->node_list, entries); - ret = exec_reqs(entries, nr_nodes, ci->epoch, + ret = exec_reqs(entries, nr_nodes, sys->epoch, SD_DIR_OID, (struct sd_req *)&req, NULL, 0, 0, req.copies, req.copies); -- 1.6.5 |