This patch uses a generic sheepid instead of a corosync-specific node id. This patch is necessary to remove the dependency on corosync. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/group.c | 175 ++++++++++++++++++++++++++++++++-------------------- sheep/sheep_priv.h | 4 +- 2 files changed, 109 insertions(+), 70 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index 8c65d74..b5bd6fd 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -24,10 +24,58 @@ #include "util.h" #include "logger.h" #include "work.h" +#include "cluster.h" + +static corosync_cfg_handle_t cfg_handle; + +static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr) +{ + int ret, nr; + corosync_cfg_node_address_t caddr; + struct sockaddr_storage *ss = (struct sockaddr_storage *)caddr.address; + struct sockaddr_in *sin = (struct sockaddr_in *)caddr.address; + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)caddr.address; + void *saddr; + + ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &caddr); + if (ret != CS_OK) { + vprintf(SDOG_ERR "failed to get addr %d\n", ret); + return -1; + } + + if (!nr) { + vprintf(SDOG_ERR "we got no address\n"); + return -1; + } + + if (ss->ss_family == AF_INET6) { + saddr = &sin6->sin6_addr; + memcpy(addr, saddr, 16); + } else if (ss->ss_family == AF_INET) { + saddr = &sin->sin_addr; + memset(addr, 0, 16); + memcpy(addr + 12, saddr, 4); + } else { + vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family); + return -1; + } + + return 0; +} + +static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, + struct sheepid *sheeps, size_t nr) +{ + int i; + + for (i = 0; i < nr; i++) { + nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr); + sheeps[i].pid = cpgs[i].pid; + } +} struct node { - uint32_t nodeid; - uint32_t pid; + struct sheepid sheepid; struct sheepdog_node_list_entry ent; struct list_head list; }; @@ -44,8 +92,7 @@ struct message_header { uint8_t op; uint8_t state; uint32_t msg_length; - uint32_t nodeid; - uint32_t pid; + struct sheepid sheepid; struct sheepdog_node_list_entry from; }; @@ -60,14 +107,12 @@ struct join_message { uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */ uint8_t pad[3]; struct { - uint32_t nodeid; - uint32_t pid; + struct sheepid sheepid; struct sheepdog_node_list_entry ent; } nodes[SD_MAX_NODES]; uint32_t nr_leave_nodes; struct { - uint32_t nodeid; - uint32_t pid; + struct sheepid sheepid; struct sheepdog_node_list_entry ent; } leave_nodes[SD_MAX_NODES]; }; @@ -112,11 +157,11 @@ struct work_confchg { #define print_node_list(node_list) \ ({ \ struct node *__node; \ - char __name[128]; \ + char __name[128]; \ list_for_each_entry(__node, node_list, list) { \ - dprintf("%c nodeid: %x, pid: %d, ip: %s\n", \ + dprintf("%c pid: %ld, ip: %s\n", \ is_myself(__node->ent.addr, __node->ent.port) ? 'l' : ' ', \ - __node->nodeid, __node->pid, \ + __node->sheepid.pid, \ addr_to_str(__name, sizeof(__name), \ __node->ent.addr, __node->ent.port)); \ } \ @@ -417,12 +462,12 @@ out: exit(1); } -static struct node *find_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid) +static struct node *find_node(struct list_head *node_list, struct sheepid *id) { struct node *node; list_for_each_entry(node, node_list, list) { - if (node->nodeid == nodeid && node->pid == pid) + if (sheepid_cmp(&node->sheepid, id) == 0) return node; } @@ -511,8 +556,7 @@ static int add_node_to_leave_list(struct message_header *msg) goto ret; } - n->nodeid = msg->nodeid; - n->pid = msg->pid; + n->sheepid = msg->sheepid; n->ent = msg->from; list_add_tail(&n->list, &sys->leave_list); @@ -533,8 +577,7 @@ static int add_node_to_leave_list(struct message_header *msg) continue; } - n->nodeid = jm->leave_nodes[i].nodeid; - n->pid = jm->leave_nodes[i].pid; + n->sheepid = jm->leave_nodes[i].sheepid; n->ent = jm->leave_nodes[i].ent; list_add_tail(&n->list, &tmp_list); @@ -698,8 +741,7 @@ static void join(struct join_message *msg) msg->ctime = get_cluster_ctime(); msg->nr_nodes = 0; list_for_each_entry(node, &sys->sd_node_list, list) { - msg->nodes[msg->nr_nodes].nodeid = node->nodeid; - msg->nodes[msg->nr_nodes].pid = node->pid; + msg->nodes[msg->nr_nodes].sheepid = node->sheepid; msg->nodes[msg->nr_nodes].ent = node->ent; msg->nr_nodes++; } @@ -768,12 +810,12 @@ static void get_vdi_bitmap_from_sd_list(void) get_vdi_bitmap_from(&nodes[i]); } -static int move_node_to_sd_list(uint32_t nodeid, uint32_t pid, +static int move_node_to_sd_list(struct sheepid *id, struct sheepdog_node_list_entry ent) { struct node *node; - node = find_node(&sys->cpg_node_list, nodeid, pid); + node = find_node(&sys->cpg_node_list, id); if (!node) return 1; @@ -830,16 +872,15 @@ static void update_cluster_info(struct join_message *msg) sys->epoch = msg->epoch; for (i = 0; i < nr_nodes; i++) { - ret = move_node_to_sd_list(msg->nodes[i].nodeid, - msg->nodes[i].pid, + ret = move_node_to_sd_list(&msg->nodes[i].sheepid, msg->nodes[i].ent); /* * the node belonged to sheepdog when the master build * the JOIN response however it has gone. */ if (ret) - vprintf(SDOG_INFO "nodeid: %x, pid: %d has gone\n", - msg->nodes[i].nodeid, msg->nodes[i].pid); + vprintf(SDOG_INFO "%s has gone\n", + sheepid_to_str(&msg->nodes[i].sheepid)); } if (msg->cluster_status != SD_STATUS_OK) @@ -851,14 +892,14 @@ static void update_cluster_info(struct join_message *msg) update_epoch_log(sys->epoch); join_finished: - ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from); + ret = move_node_to_sd_list(&msg->header.sheepid, msg->header.from); /* * this should not happen since __sd_deliver() checks if the * host from msg on cpg_node_list. */ if (ret) - vprintf(SDOG_ERR "nodeid: %x, pid: %d has gone\n", - msg->header.nodeid, msg->header.pid); + vprintf(SDOG_ERR "%s has gone\n", + sheepid_to_str(&msg->header.sheepid)); if (msg->cluster_status == SD_STATUS_OK) { if (msg->inc_epoch) { @@ -1025,27 +1066,24 @@ static void __sd_deliver(struct cpg_event *cevent) char name[128]; struct node *node; - dprintf("op: %d, state: %u, size: %d, from: %s, pid: %d\n", + dprintf("op: %d, state: %u, size: %d, from: %s, pid: %ld\n", m->op, m->state, m->msg_length, addr_to_str(name, sizeof(name), m->from.addr, m->from.port), - m->pid); + m->sheepid.pid); /* * we don't want to perform any deliver events except mastership_tx event * until we join; we wait for our JOIN message. */ if (!sys->join_finished && !master_tx_message(m)) { - if (m->pid != sys->this_pid || m->nodeid != sys->this_nodeid) { + if (sheepid_cmp(&m->sheepid, &sys->this_sheepid) != 0) { cevent->skip = 1; return; } } if (join_message(m)) { - uint32_t nodeid = m->nodeid; - uint32_t pid = m->pid; - - node = find_node(&sys->cpg_node_list, nodeid, pid); + node = find_node(&sys->cpg_node_list, &m->sheepid); if (!node) { dprintf("the node was left before join operation is finished\n"); return; @@ -1095,8 +1133,7 @@ static int tx_mastership(void) msg.header.state = DM_FIN; msg.header.msg_length = sizeof(msg); msg.header.from = sys->this_node; - msg.header.nodeid = sys->this_nodeid; - msg.header.pid = sys->this_pid; + msg.header.sheepid = sys->this_sheepid; return send_message(sys->handle, (struct message_header *)&msg); } @@ -1116,8 +1153,7 @@ static void send_join_response(struct work_deliver *w) if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) { jm->nr_leave_nodes = 0; list_for_each_entry(node, &sys->leave_list, list) { - jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid; - jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid; + jm->leave_nodes[jm->nr_leave_nodes].sheepid = node->sheepid; jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent; jm->nr_leave_nodes++; } @@ -1151,7 +1187,7 @@ static void __sd_deliver_done(struct cpg_event *cevent) update_cluster_info((struct join_message *)m); break; case SD_MSG_LEAVE: - node = find_node(&sys->sd_node_list, m->nodeid, m->pid); + node = find_node(&sys->sd_node_list, &m->sheepid); if (node) { sys->nr_vnodes = 0; @@ -1173,7 +1209,7 @@ static void __sd_deliver_done(struct cpg_event *cevent) */ if (!sys->join_finished) { sys->join_finished = 1; - move_node_to_sd_list(sys->this_nodeid, sys->this_pid, sys->this_node); + move_node_to_sd_list(&sys->this_sheepid, sys->this_node); sys->epoch = get_latest_epoch(); } @@ -1265,16 +1301,19 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, } static void for_each_node_list(struct cpg_address list[], int count, - void (*func)(struct cpg_address *addr, + void (*func)(struct sheepid *id, struct work_confchg *w), struct work_confchg *w) { int i; + struct sheepid sheepid[SD_MAX_NODES]; + + cpg_addr_to_sheepid(list, sheepid, count); for (i = 0; i < count; i++) - func(&list[i], w); + func(sheepid + i, w); } -static void add_node(struct cpg_address *addr, struct work_confchg *w) +static void add_node(struct sheepid *id, struct work_confchg *w) { struct node *node; @@ -1282,17 +1321,16 @@ static void add_node(struct cpg_address *addr, struct work_confchg *w) if (!node) panic("failed to alloc memory for a new node\n"); - node->nodeid = addr->nodeid; - node->pid = addr->pid; + node->sheepid = *id; list_add_tail(&node->list, &sys->cpg_node_list); } -static void del_node(struct cpg_address *addr, struct work_confchg *w) +static void del_node(struct sheepid *id, struct work_confchg *w) { struct node *node; - node = find_node(&sys->sd_node_list, addr->nodeid, addr->pid); + node = find_node(&sys->sd_node_list, id); if (node) { int nr; struct sheepdog_node_list_entry e[SD_MAX_NODES]; @@ -1314,7 +1352,7 @@ static void del_node(struct cpg_address *addr, struct work_confchg *w) update_epoch_store(sys->epoch); } } else { - node = find_node(&sys->cpg_node_list, addr->nodeid, addr->pid); + node = find_node(&sys->cpg_node_list, id); if (node) { list_del(&node->list); free(node); @@ -1324,8 +1362,10 @@ static void del_node(struct cpg_address *addr, struct work_confchg *w) static int is_my_cpg_addr(struct cpg_address *addr) { - return (sys->this_nodeid == addr->nodeid) && - (sys->this_pid == addr->pid); + struct sheepid id; + + cpg_addr_to_sheepid(addr, &id, 1); + return (sheepid_cmp(&id, &sys->this_sheepid) == 0); } /* @@ -1337,10 +1377,13 @@ static int check_majority(struct cpg_address *left_list, int nr_nodes = 0, nr_majority, nr_reachable = 0, i, fd; struct node *node; char name[INET6_ADDRSTRLEN]; + struct sheepid left_sheepid[SD_MAX_NODES]; if (left_list_entries == 0) return 1; /* we don't need this check in this case */ + cpg_addr_to_sheepid(left_list, left_sheepid, left_list_entries); + nr_nodes = get_nodes_nr_from(&sys->sd_node_list); nr_majority = nr_nodes / 2 + 1; @@ -1351,8 +1394,7 @@ static int check_majority(struct cpg_address *left_list, list_for_each_entry(node, &sys->sd_node_list, list) { for (i = 0; i < left_list_entries; i++) { - if (left_list[i].nodeid == node->nodeid && - left_list[i].pid == node->pid) + if (sheepid_cmp(left_sheepid + i, &node->sheepid) != 0) break; } if (i != left_list_entries) @@ -1385,14 +1427,14 @@ static void __sd_confchg(struct cpg_event *cevent) } } -static void send_join_request(struct cpg_address *addr, struct work_confchg *w) +static void send_join_request(struct sheepid *id, struct work_confchg *w) { struct join_message msg; struct sheepdog_node_list_entry entries[SD_MAX_NODES]; int nr_entries, i, ret; /* if I've just joined in cpg, I'll join in sheepdog. */ - if (!is_my_cpg_addr(addr)) + if (sheepid_cmp(id, &sys->this_sheepid) != 0) return; memset(&msg, 0, sizeof(msg)); @@ -1401,8 +1443,7 @@ static void send_join_request(struct cpg_address *addr, struct work_confchg *w) msg.header.state = DM_INIT; msg.header.msg_length = sizeof(msg); msg.header.from = sys->this_node; - msg.header.nodeid = sys->this_nodeid; - msg.header.pid = sys->this_pid; + msg.header.sheepid = sys->this_sheepid; get_global_nr_copies(&msg.nr_sobjs); @@ -1416,7 +1457,7 @@ static void send_join_request(struct cpg_address *addr, struct work_confchg *w) send_message(sys->handle, (struct message_header *)&msg); - vprintf(SDOG_INFO "%x %u\n", sys->this_nodeid, sys->this_pid); + vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid)); } static void __sd_confchg_done(struct cpg_event *cevent) @@ -1454,13 +1495,12 @@ static void __sd_confchg_done(struct cpg_event *cevent) * becomes the master without sending JOIN. */ - vprintf(SDOG_DEBUG "%d %x\n", sys->this_pid, sys->this_nodeid); + vprintf(SDOG_DEBUG "%s\n", sheepid_to_str(&sys->this_sheepid)); memset(&msg, 0, sizeof(msg)); msg.header.from = sys->this_node; - msg.header.nodeid = sys->this_nodeid; - msg.header.pid = sys->this_pid; + msg.header.sheepid = sys->this_sheepid; nr_entries = ARRAY_SIZE(entries); ret = read_epoch(&epoch, &ctime, entries, &nr_entries); @@ -1930,7 +1970,6 @@ oom: static int set_addr(unsigned int nodeid, int port) { int ret, nr; - corosync_cfg_handle_t handle; corosync_cfg_node_address_t addr; struct sockaddr_storage *ss = (struct sockaddr_storage *)addr.address; struct sockaddr_in *sin = (struct sockaddr_in *)addr.address; @@ -1940,13 +1979,13 @@ static int set_addr(unsigned int nodeid, int port) memset(sys->this_node.addr, 0, sizeof(sys->this_node.addr)); - ret = corosync_cfg_initialize(&handle, NULL); + ret = corosync_cfg_initialize(&cfg_handle, NULL); if (ret != CPG_OK) { vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret); return -1; } - ret = corosync_cfg_get_node_addrs(handle, nodeid, 1, &nr, &addr); + ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &addr); if (ret != CPG_OK) { vprintf(SDOG_ERR "failed to get addr %d\n", ret); return -1; @@ -2013,12 +2052,13 @@ join_retry: } sys->handle = cpg_handle; - sys->this_nodeid = nodeid; - sys->this_pid = getpid(); + sys->this_sheepid.pid = getpid(); ret = set_addr(nodeid, port); if (ret) return 1; + memcpy(sys->this_sheepid.addr, sys->this_node.addr, + sizeof(sys->this_node.addr)); sys->this_node.port = port; sys->this_node.nr_vnodes = SD_DEFAULT_VNODES; if (zone == -1) @@ -2066,8 +2106,7 @@ int leave_cluster(void) msg.header.state = DM_FIN; msg.header.msg_length = sizeof(msg); msg.header.from = sys->this_node; - msg.header.nodeid = sys->this_nodeid; - msg.header.pid = sys->this_pid; + msg.header.sheepid = sys->this_sheepid; msg.epoch = get_latest_epoch(); dprintf("%d\n", msg.epoch); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 6409530..8b20213 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -20,6 +20,7 @@ #include "work.h" #include "net.h" #include "sheep.h" +#include "cluster.h" #define SD_OP_REMOVE_OBJ 0x91 @@ -105,8 +106,7 @@ struct cluster_info { cpg_handle_t handle; /* set after finishing the JOIN procedure */ int join_finished; - uint32_t this_nodeid; - uint32_t this_pid; + struct sheepid this_sheepid; struct sheepdog_node_list_entry this_node; uint32_t epoch; -- 1.7.2.5 |