This patch abstracts out a cluster management of Sheepdog, and introduces a cluster driver interface. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/cluster.h | 157 ++++++++++++++++ sheep/group.c | 527 ++++++++++++++++++++-------------------------------- sheep/sdnet.c | 2 + sheep/sheep.c | 24 +++- sheep/sheep_priv.h | 16 +- 5 files changed, 397 insertions(+), 329 deletions(-) create mode 100644 sheep/cluster.h diff --git a/sheep/cluster.h b/sheep/cluster.h new file mode 100644 index 0000000..e5fecc5 --- /dev/null +++ b/sheep/cluster.h @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#ifndef __CLUSTER_H__ +#define __CLUSTER_H__ + +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <inttypes.h> +#include <memory.h> + +#include "sheepdog_proto.h" +#include "sheep.h" +#include "logger.h" + +struct sheepid { + uint8_t addr[16]; + uint64_t pid; +}; + +struct cdrv_handlers { + void (*join_handler)(struct sheepid *joined, struct sheepid *members, + size_t nr_members); + void (*leave_handler)(struct sheepid *left, struct sheepid *members, + size_t nr_members); + void (*notify_handler)(struct sheepid *sender, void *msg, size_t msg_len); +}; + +struct cluster_driver { + const char *name; + + /* + * Initialize the cluster driver + * + * On success, this function returns the file descriptor that + * may be used with the poll(2) to monitor cluster events. On + * error, returns -1. + */ + int (*init)(struct cdrv_handlers *handlers, struct sheepid *myid); + + /* + * Join the cluster + * + * This function is used to join the cluster, and notifies a + * join event to all the nodes. + * + * Returns zero on success, -1 on error + */ + int (*join)(void); + + /* + * Leave the cluster + * + * This function is used to leave the cluster, and notifies a + * leave event to all the nodes. + * + * Returns zero on success, -1 on error + */ + int (*leave)(void); + + /* + * Notify a message to all nodes in the cluster + * + * This function sends 'msg' to all the nodes. The notified + * messages can be read through notify_handler() in + * cdrv_handlers. + * + * Returns zero on success, -1 on error + */ + int (*notify)(void *msg, size_t msg_len); + + /* + * Dispatch handlers + * + * This function dispatches handlers according to the + * delivered events (join/leave/notify) in the cluster. + * + * Note that the events sequence is totally ordered; all nodes + * call the handlers in the same sequence. + * + * Returns zero on success, -1 on error + */ + int (*dispatch)(void); + + struct list_head list; +}; + +extern struct list_head cluster_drivers; + +#define cdrv_register(driver) \ +static void __attribute__((constructor)) regist_ ## driver(void) { \ + list_add(&driver.list, &cluster_drivers); \ +} + +#define FOR_EACH_CLUSTER_DRIVER(driver) \ + list_for_each_entry(driver, &cluster_drivers, list) + + +static inline int sheepid_find(struct sheepid *sheeps, size_t nr_sheeps, + struct sheepid *key) +{ + int i; + + for (i = 0; i < nr_sheeps; i++) { + if (memcmp(sheeps + i, key, sizeof(*key)) == 0) + return i; + } + return -1; +} + +static inline void sheepid_add(struct sheepid *sheeps1, size_t nr_sheeps1, + struct sheepid *sheeps2, size_t nr_sheeps2) +{ + memcpy(sheeps1 + nr_sheeps1, sheeps2, sizeof(*sheeps2) * nr_sheeps2); +} + +static inline void sheepid_del(struct sheepid *sheeps1, size_t nr_sheeps1, + struct sheepid *sheeps2, size_t nr_sheeps2) +{ + int i, idx; + + for (i = 0; i < nr_sheeps2; i++) { + idx = sheepid_find(sheeps1, nr_sheeps1, sheeps2 + i); + if (idx < 0) + panic("internal error: cannot find sheepid\n"); + + nr_sheeps1--; + memmove(sheeps1 + idx, sheeps1 + idx + 1, + sizeof(*sheeps1) * nr_sheeps1 - idx); + } +} + +static inline char *sheepid_to_str(struct sheepid *id) +{ + static char str[256]; + char name[256]; + + snprintf(str, sizeof(str), "ip: %s, pid: %" PRIu64, + addr_to_str(name, sizeof(name), id->addr, 0), id->pid); + + return str; +} + +static inline int sheepid_cmp(struct sheepid *id1, struct sheepid *id2) +{ + return memcmp(id1, id2, sizeof(*id1)); +} + +#endif diff --git a/sheep/group.c b/sheep/group.c index 8c65d74..95fc799 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -15,8 +15,6 @@ #include <arpa/inet.h> #include <sys/time.h> #include <sys/epoll.h> -#include <corosync/cpg.h> -#include <corosync/cfg.h> #include "sheepdog_proto.h" #include "sheep_priv.h" @@ -24,10 +22,10 @@ #include "util.h" #include "logger.h" #include "work.h" +#include "cluster.h" struct node { - uint32_t nodeid; - uint32_t pid; + struct sheepid sheepid; struct sheepdog_node_list_entry ent; struct list_head list; }; @@ -44,8 +42,7 @@ struct message_header { uint8_t op; uint8_t state; uint32_t msg_length; - uint32_t nodeid; - uint32_t pid; + struct sheepid sheepid; struct sheepdog_node_list_entry from; }; @@ -60,14 +57,12 @@ struct join_message { uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */ uint8_t pad[3]; struct { - uint32_t nodeid; - uint32_t pid; + struct sheepid sheepid; struct sheepdog_node_list_entry ent; } nodes[SD_MAX_NODES]; uint32_t nr_leave_nodes; struct { - uint32_t nodeid; - uint32_t pid; + struct sheepid sheepid; struct sheepdog_node_list_entry ent; } leave_nodes[SD_MAX_NODES]; }; @@ -89,34 +84,36 @@ struct mastership_tx_message { uint32_t epoch; }; -struct work_deliver { +struct work_notify { struct cpg_event cev; struct message_header *msg; }; -struct work_confchg { +struct work_join { struct cpg_event cev; - struct cpg_address *member_list; + struct sheepid *member_list; size_t member_list_entries; - struct cpg_address *left_list; - size_t left_list_entries; - struct cpg_address *joined_list; - size_t joined_list_entries; + struct sheepid joined; +}; + +struct work_leave { + struct cpg_event cev; - int first_cpg_node; - int sd_node_left; + struct sheepid *member_list; + size_t member_list_entries; + struct sheepid left; }; #define print_node_list(node_list) \ ({ \ struct node *__node; \ - char __name[128]; \ + char __name[128]; \ list_for_each_entry(__node, node_list, list) { \ - dprintf("%c nodeid: %x, pid: %d, ip: %s\n", \ + dprintf("%c pid: %ld, ip: %s\n", \ is_myself(__node->ent.addr, __node->ent.port) ? 'l' : ' ', \ - __node->nodeid, __node->pid, \ + __node->sheepid.pid, \ addr_to_str(__name, sizeof(__name), \ __node->ent.addr, __node->ent.port)); \ } \ @@ -172,30 +169,6 @@ static inline int master_tx_message(struct message_header *m) return m->op == SD_MSG_MASTER_TRANSFER; } -static int send_message(cpg_handle_t handle, struct message_header *msg) -{ - struct iovec iov; - int ret; - - iov.iov_base = msg; - iov.iov_len = msg->msg_length; -retry: - ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1); - switch (ret) { - case CPG_OK: - break; - case CPG_ERR_TRY_AGAIN: - dprintf("failed to send message. try again\n"); - sleep(1); - goto retry; - default: - eprintf("failed to send message, %d\n", ret); - return -1; - } - return 0; -} - - static int get_node_idx(struct sheepdog_node_list_entry *ent, struct sheepdog_node_list_entry *entries, int nr_nodes) { @@ -393,7 +366,7 @@ forward: list_add(&req->pending_list, &sys->pending_list); - send_message(sys->handle, (struct message_header *)msg); + sys->cdrv->notify(msg, msg->header.msg_length); free(msg); } @@ -406,9 +379,8 @@ static void group_handler(int listen_fd, int events, void *data) goto out; } - ret = cpg_dispatch(sys->handle, CPG_DISPATCH_ALL); - - if (ret == CPG_OK) + ret = sys->cdrv->dispatch(); + if (ret == 0) return; else eprintf("oops...some error occured inside corosync\n"); @@ -417,12 +389,12 @@ out: exit(1); } -static struct node *find_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid) +static struct node *find_node(struct list_head *node_list, struct sheepid *id) { struct node *node; list_for_each_entry(node, node_list, list) { - if (node->nodeid == nodeid && node->pid == pid) + if (sheepid_cmp(&node->sheepid, id) == 0) return node; } @@ -511,8 +483,7 @@ static int add_node_to_leave_list(struct message_header *msg) goto ret; } - n->nodeid = msg->nodeid; - n->pid = msg->pid; + n->sheepid = msg->sheepid; n->ent = msg->from; list_add_tail(&n->list, &sys->leave_list); @@ -533,8 +504,7 @@ static int add_node_to_leave_list(struct message_header *msg) continue; } - n->nodeid = jm->leave_nodes[i].nodeid; - n->pid = jm->leave_nodes[i].pid; + n->sheepid = jm->leave_nodes[i].sheepid; n->ent = jm->leave_nodes[i].ent; list_add_tail(&n->list, &tmp_list); @@ -698,8 +668,7 @@ static void join(struct join_message *msg) msg->ctime = get_cluster_ctime(); msg->nr_nodes = 0; list_for_each_entry(node, &sys->sd_node_list, list) { - msg->nodes[msg->nr_nodes].nodeid = node->nodeid; - msg->nodes[msg->nr_nodes].pid = node->pid; + msg->nodes[msg->nr_nodes].sheepid = node->sheepid; msg->nodes[msg->nr_nodes].ent = node->ent; msg->nr_nodes++; } @@ -768,12 +737,12 @@ static void get_vdi_bitmap_from_sd_list(void) get_vdi_bitmap_from(&nodes[i]); } -static int move_node_to_sd_list(uint32_t nodeid, uint32_t pid, +static int move_node_to_sd_list(struct sheepid *id, struct sheepdog_node_list_entry ent) { struct node *node; - node = find_node(&sys->cpg_node_list, nodeid, pid); + node = find_node(&sys->cpg_node_list, id); if (!node) return 1; @@ -830,16 +799,15 @@ static void update_cluster_info(struct join_message *msg) sys->epoch = msg->epoch; for (i = 0; i < nr_nodes; i++) { - ret = move_node_to_sd_list(msg->nodes[i].nodeid, - msg->nodes[i].pid, + ret = move_node_to_sd_list(&msg->nodes[i].sheepid, msg->nodes[i].ent); /* * the node belonged to sheepdog when the master build * the JOIN response however it has gone. */ if (ret) - vprintf(SDOG_INFO "nodeid: %x, pid: %d has gone\n", - msg->nodes[i].nodeid, msg->nodes[i].pid); + vprintf(SDOG_INFO "%s has gone\n", + sheepid_to_str(&msg->nodes[i].sheepid)); } if (msg->cluster_status != SD_STATUS_OK) @@ -851,14 +819,14 @@ static void update_cluster_info(struct join_message *msg) update_epoch_log(sys->epoch); join_finished: - ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from); + ret = move_node_to_sd_list(&msg->header.sheepid, msg->header.from); /* * this should not happen since __sd_deliver() checks if the * host from msg on cpg_node_list. */ if (ret) - vprintf(SDOG_ERR "nodeid: %x, pid: %d has gone\n", - msg->header.nodeid, msg->header.pid); + vprintf(SDOG_ERR "%s has gone\n", + sheepid_to_str(&msg->header.sheepid)); if (msg->cluster_status == SD_STATUS_OK) { if (msg->inc_epoch) { @@ -1018,34 +986,31 @@ out: req->done(req); } -static void __sd_deliver(struct cpg_event *cevent) +static void __sd_notify(struct cpg_event *cevent) { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + struct work_notify *w = container_of(cevent, struct work_notify, cev); struct message_header *m = w->msg; char name[128]; struct node *node; - dprintf("op: %d, state: %u, size: %d, from: %s, pid: %d\n", + dprintf("op: %d, state: %u, size: %d, from: %s, pid: %ld\n", m->op, m->state, m->msg_length, addr_to_str(name, sizeof(name), m->from.addr, m->from.port), - m->pid); + m->sheepid.pid); /* * we don't want to perform any deliver events except mastership_tx event * until we join; we wait for our JOIN message. */ if (!sys->join_finished && !master_tx_message(m)) { - if (m->pid != sys->this_pid || m->nodeid != sys->this_nodeid) { + if (sheepid_cmp(&m->sheepid, &sys->this_sheepid) != 0) { cevent->skip = 1; return; } } if (join_message(m)) { - uint32_t nodeid = m->nodeid; - uint32_t pid = m->pid; - - node = find_node(&sys->cpg_node_list, nodeid, pid); + node = find_node(&sys->cpg_node_list, &m->sheepid); if (!node) { dprintf("the node was left before join operation is finished\n"); return; @@ -1095,13 +1060,12 @@ static int tx_mastership(void) msg.header.state = DM_FIN; msg.header.msg_length = sizeof(msg); msg.header.from = sys->this_node; - msg.header.nodeid = sys->this_nodeid; - msg.header.pid = sys->this_pid; + msg.header.sheepid = sys->this_sheepid; - return send_message(sys->handle, (struct message_header *)&msg); + return sys->cdrv->notify(&msg, msg.header.msg_length); } -static void send_join_response(struct work_deliver *w) +static void send_join_response(struct work_notify *w) { struct message_header *m; struct join_message *jm; @@ -1116,8 +1080,7 @@ static void send_join_response(struct work_deliver *w) if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) { jm->nr_leave_nodes = 0; list_for_each_entry(node, &sys->leave_list, list) { - jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid; - jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid; + jm->leave_nodes[jm->nr_leave_nodes].sheepid = node->sheepid; jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent; jm->nr_leave_nodes++; } @@ -1131,12 +1094,12 @@ static void send_join_response(struct work_deliver *w) exit(1); } jm->epoch = sys->epoch; - send_message(sys->handle, m); + sys->cdrv->notify(m, m->msg_length); } -static void __sd_deliver_done(struct cpg_event *cevent) +static void __sd_notify_done(struct cpg_event *cevent) { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + struct work_notify *w = container_of(cevent, struct work_notify, cev); struct message_header *m; char name[128]; int do_recovery; @@ -1151,7 +1114,7 @@ static void __sd_deliver_done(struct cpg_event *cevent) update_cluster_info((struct join_message *)m); break; case SD_MSG_LEAVE: - node = find_node(&sys->sd_node_list, m->nodeid, m->pid); + node = find_node(&sys->sd_node_list, &m->sheepid); if (node) { sys->nr_vnodes = 0; @@ -1173,7 +1136,7 @@ static void __sd_deliver_done(struct cpg_event *cevent) */ if (!sys->join_finished) { sys->join_finished = 1; - move_node_to_sd_list(sys->this_nodeid, sys->this_pid, sys->this_node); + move_node_to_sd_list(&sys->this_sheepid, sys->this_node); sys->epoch = get_latest_epoch(); } @@ -1210,7 +1173,7 @@ static void __sd_deliver_done(struct cpg_event *cevent) break; case SD_MSG_VDI_OP: m->state = DM_FIN; - send_message(sys->handle, m); + sys->cdrv->notify(m, m->msg_length); break; default: eprintf("unknown message %d\n", m->op); @@ -1226,25 +1189,24 @@ static void __sd_deliver_done(struct cpg_event *cevent) } } -static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, - uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) +static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len) { struct cpg_event *cevent; - struct work_deliver *w; + struct work_notify *w; struct message_header *m = msg; char name[128]; - dprintf("op: %d, state: %u, size: %d, from: %s, nodeid: %x, pid: %u\n", + dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n", m->op, m->state, m->msg_length, addr_to_str(name, sizeof(name), m->from.addr, m->from.port), - nodeid, pid); + sender->pid); w = zalloc(sizeof(*w)); if (!w) return; cevent = &w->cev; - cevent->ctype = CPG_EVENT_DELIVER; + cevent->ctype = CPG_EVENT_NOTIFY; vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent); @@ -1264,17 +1226,7 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, start_cpg_event_work(); } -static void for_each_node_list(struct cpg_address list[], int count, - void (*func)(struct cpg_address *addr, - struct work_confchg *w), - struct work_confchg *w) -{ - int i; - for (i = 0; i < count; i++) - func(&list[i], w); -} - -static void add_node(struct cpg_address *addr, struct work_confchg *w) +static void add_node(struct sheepid *id) { struct node *node; @@ -1282,22 +1234,20 @@ static void add_node(struct cpg_address *addr, struct work_confchg *w) if (!node) panic("failed to alloc memory for a new node\n"); - node->nodeid = addr->nodeid; - node->pid = addr->pid; + node->sheepid = *id; list_add_tail(&node->list, &sys->cpg_node_list); } -static void del_node(struct cpg_address *addr, struct work_confchg *w) +static int del_node(struct sheepid *id) { struct node *node; - node = find_node(&sys->sd_node_list, addr->nodeid, addr->pid); + node = find_node(&sys->sd_node_list, id); if (node) { int nr; struct sheepdog_node_list_entry e[SD_MAX_NODES]; - w->sd_node_left++; sys->nr_vnodes = 0; list_del(&node->list); @@ -1313,34 +1263,27 @@ static void del_node(struct cpg_address *addr, struct work_confchg *w) update_epoch_store(sys->epoch); } - } else { - node = find_node(&sys->cpg_node_list, addr->nodeid, addr->pid); - if (node) { - list_del(&node->list); - free(node); - } + return 1; } -} -static int is_my_cpg_addr(struct cpg_address *addr) -{ - return (sys->this_nodeid == addr->nodeid) && - (sys->this_pid == addr->pid); + node = find_node(&sys->cpg_node_list, id); + if (node) { + list_del(&node->list); + free(node); + } + + return 0; } /* * Check whether the majority of Sheepdog nodes are still alive or not */ -static int check_majority(struct cpg_address *left_list, - size_t left_list_entries) +static int check_majority(struct sheepid *left) { - int nr_nodes = 0, nr_majority, nr_reachable = 0, i, fd; + int nr_nodes = 0, nr_majority, nr_reachable = 0, fd; struct node *node; char name[INET6_ADDRSTRLEN]; - if (left_list_entries == 0) - return 1; /* we don't need this check in this case */ - nr_nodes = get_nodes_nr_from(&sys->sd_node_list); nr_majority = nr_nodes / 2 + 1; @@ -1350,12 +1293,7 @@ static int check_majority(struct cpg_address *left_list, return 1; list_for_each_entry(node, &sys->sd_node_list, list) { - for (i = 0; i < left_list_entries; i++) { - if (left_list[i].nodeid == node->nodeid && - left_list[i].pid == node->pid) - break; - } - if (i != left_list_entries) + if (sheepid_cmp(&node->sheepid, left) == 0) continue; addr_to_str(name, sizeof(name), node->ent.addr, 0); @@ -1375,34 +1313,29 @@ static int check_majority(struct cpg_address *left_list, return 0; } -static void __sd_confchg(struct cpg_event *cevent) +static void __sd_leave(struct cpg_event *cevent) { - struct work_confchg *w = container_of(cevent, struct work_confchg, cev); + struct work_leave *w = container_of(cevent, struct work_leave, cev); - if (!check_majority(w->left_list, w->left_list_entries)) { + if (!check_majority(&w->left)) { eprintf("perhaps network partition failure has occurred\n"); abort(); } } -static void send_join_request(struct cpg_address *addr, struct work_confchg *w) +static void send_join_request(struct sheepid *id) { struct join_message msg; struct sheepdog_node_list_entry entries[SD_MAX_NODES]; int nr_entries, i, ret; - /* if I've just joined in cpg, I'll join in sheepdog. */ - if (!is_my_cpg_addr(addr)) - return; - memset(&msg, 0, sizeof(msg)); msg.header.proto_ver = SD_SHEEP_PROTO_VER; msg.header.op = SD_MSG_JOIN; msg.header.state = DM_INIT; msg.header.msg_length = sizeof(msg); msg.header.from = sys->this_node; - msg.header.nodeid = sys->this_nodeid; - msg.header.pid = sys->this_pid; + msg.header.sheepid = sys->this_sheepid; get_global_nr_copies(&msg.nr_sobjs); @@ -1414,35 +1347,31 @@ static void send_join_request(struct cpg_address *addr, struct work_confchg *w) msg.nodes[i].ent = entries[i]; } - send_message(sys->handle, (struct message_header *)&msg); + sys->cdrv->notify(&msg, msg.header.msg_length); - vprintf(SDOG_INFO "%x %u\n", sys->this_nodeid, sys->this_pid); + vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid)); } -static void __sd_confchg_done(struct cpg_event *cevent) +static void __sd_join_done(struct cpg_event *cevent) { - struct work_confchg *w = container_of(cevent, struct work_confchg, cev); - int ret; + struct work_join *w = container_of(cevent, struct work_join, cev); + int ret, i; + int first_cpg_node = 0; - if (w->member_list_entries == - w->joined_list_entries - w->left_list_entries && - is_my_cpg_addr(w->member_list)) { + if (w->member_list_entries == 1 && + sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) { sys->join_finished = 1; get_global_nr_copies(&sys->nr_sobjs); - w->first_cpg_node = 1; + first_cpg_node = 1; } - if (list_empty(&sys->cpg_node_list)) - for_each_node_list(w->member_list, w->member_list_entries, - add_node, w); - else - for_each_node_list(w->joined_list, w->joined_list_entries, - add_node, w); - - for_each_node_list(w->left_list, w->left_list_entries, - del_node, w); + if (list_empty(&sys->cpg_node_list)) { + for (i = 0; i < w->member_list_entries; i++) + add_node(w->member_list + i); + } else + add_node(&w->joined); - if (w->first_cpg_node) { + if (first_cpg_node) { struct join_message msg; struct sheepdog_node_list_entry entries[SD_MAX_NODES]; int nr_entries; @@ -1454,13 +1383,12 @@ static void __sd_confchg_done(struct cpg_event *cevent) * becomes the master without sending JOIN. */ - vprintf(SDOG_DEBUG "%d %x\n", sys->this_pid, sys->this_nodeid); + vprintf(SDOG_DEBUG "%s\n", sheepid_to_str(&sys->this_sheepid)); memset(&msg, 0, sizeof(msg)); msg.header.from = sys->this_node; - msg.header.nodeid = sys->this_nodeid; - msg.header.pid = sys->this_pid; + msg.header.sheepid = sys->this_sheepid; nr_entries = ARRAY_SIZE(entries); ret = read_epoch(&epoch, &ctime, entries, &nr_entries); @@ -1482,35 +1410,40 @@ static void __sd_confchg_done(struct cpg_event *cevent) print_node_list(&sys->sd_node_list); - if (w->first_cpg_node) - goto skip_join; + if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) + send_join_request(&w->joined); +} + +static void __sd_leave_done(struct cpg_event *cevent) +{ + struct work_leave *w = container_of(cevent, struct work_leave, cev); + int node_left; - for_each_node_list(w->joined_list, w->joined_list_entries, - send_join_request, w); + node_left = del_node(&w->left); -skip_join: - if (w->sd_node_left && sys->status == SD_STATUS_OK) { - if (w->sd_node_left > 1) - panic("we can't handle the departure of multiple nodes %d, %Zd\n", - w->sd_node_left, w->left_list_entries); + print_node_list(&sys->sd_node_list); + if (node_left && sys->status == SD_STATUS_OK) start_recovery(sys->epoch); - } } static void cpg_event_free(struct cpg_event *cevent) { switch (cevent->ctype) { - case CPG_EVENT_CONCHG: { - struct work_confchg *w = container_of(cevent, struct work_confchg, cev); + case CPG_EVENT_JOIN: { + struct work_join *w = container_of(cevent, struct work_join, cev); free(w->member_list); - free(w->left_list); - free(w->joined_list); free(w); break; } - case CPG_EVENT_DELIVER: { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + case CPG_EVENT_LEAVE: { + struct work_leave *w = container_of(cevent, struct work_leave, cev); + free(w->member_list); + free(w); + break; + } + case CPG_EVENT_NOTIFY: { + struct work_notify *w = container_of(cevent, struct work_notify, cev); free(w->msg); free(w); break; @@ -1535,14 +1468,16 @@ static void cpg_event_fn(struct work *work, int idx) */ switch (cevent->ctype) { - case CPG_EVENT_CONCHG: - __sd_confchg(cevent); + case CPG_EVENT_JOIN: + break; + case CPG_EVENT_LEAVE: + __sd_leave(cevent); break; - case CPG_EVENT_DELIVER: + case CPG_EVENT_NOTIFY: { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + struct work_notify *w = container_of(cevent, struct work_notify, cev); vprintf(SDOG_DEBUG "%d\n", w->msg->state); - __sd_deliver(cevent); + __sd_notify(cevent); break; } case CPG_EVENT_REQUEST: @@ -1572,12 +1507,15 @@ static void cpg_event_done(struct work *work, int idx) goto out; switch (cevent->ctype) { - case CPG_EVENT_CONCHG: - __sd_confchg_done(cevent); + case CPG_EVENT_JOIN: + __sd_join_done(cevent); break; - case CPG_EVENT_DELIVER: + case CPG_EVENT_LEAVE: + __sd_leave_done(cevent); + break; + case CPG_EVENT_NOTIFY: { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + struct work_notify *w = container_of(cevent, struct work_notify, cev); if (w->msg->state == DM_FIN && vdi_op_message(w->msg)) vdi_op_done((struct vdi_op_message *)w->msg); @@ -1593,9 +1531,9 @@ static void cpg_event_done(struct work *work, int idx) list_for_each_entry(f_cevent, &sys->cpg_event_siblings, cpg_event_list) { - struct work_deliver *fw = - container_of(f_cevent, struct work_deliver, cev); - if (f_cevent->ctype == CPG_EVENT_DELIVER && + struct work_notify *fw = + container_of(f_cevent, struct work_notify, cev); + if (f_cevent->ctype == CPG_EVENT_NOTIFY && fw->msg->state == DM_FIN) { vprintf("already got fin %p\n", f_cevent); @@ -1611,7 +1549,7 @@ static void cpg_event_done(struct work *work, int idx) cpg_event_set_joining(); } got_fin: - __sd_deliver_done(cevent); + __sd_notify_done(cevent); break; } case CPG_EVENT_REQUEST: @@ -1722,7 +1660,7 @@ void start_cpg_event_work(void) * thread is running for a deliver for VDI, then we need to * run io requests. */ - if (cpg_event_running() && cevent->ctype == CPG_EVENT_CONCHG) + if (cpg_event_running() && is_membership_change_event(cevent->ctype)) return; /* @@ -1756,9 +1694,9 @@ do_retry: list_for_each_entry_safe(cevent, n, &sys->cpg_event_siblings, cpg_event_list) { struct request *req = container_of(cevent, struct request, cev); - if (cevent->ctype == CPG_EVENT_DELIVER) + if (cevent->ctype == CPG_EVENT_NOTIFY) continue; - if (cevent->ctype == CPG_EVENT_CONCHG) + if (is_membership_change_event(cevent->ctype)) break; list_del(&cevent->cpg_event_list); @@ -1842,7 +1780,7 @@ do_retry: cevent = list_first_entry(&sys->cpg_event_siblings, struct cpg_event, cpg_event_list); - if (cevent->ctype == CPG_EVENT_CONCHG && sys->nr_outstanding_io) + if (is_membership_change_event(cevent->ctype) && sys->nr_outstanding_io) return; list_del(&cevent->cpg_event_list); @@ -1857,25 +1795,16 @@ do_retry: queue_work(sys->cpg_wqueue, &cpg_event_work); } -static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name, - const struct cpg_address *member_list, - size_t member_list_entries, - const struct cpg_address *left_list, - size_t left_list_entries, - const struct cpg_address *joined_list, - size_t joined_list_entries) +static void sd_join_handler(struct sheepid *joined, struct sheepid *members, + size_t nr_members) { struct cpg_event *cevent; - struct work_confchg *w = NULL; + struct work_join *w = NULL; int i, size; - dprintf("confchg nodeid %x\n", member_list[0].nodeid); - dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries, - joined_list_entries); - for (i = 0; i < member_list_entries; i++) { - dprintf("[%x] node_id: %x, pid: %d\n", i, - member_list[i].nodeid, member_list[i].pid); - } + dprintf("join %s\n", sheepid_to_str(joined)); + for (i = 0; i < nr_members; i++) + dprintf("[%x] %s\n", i, sheepid_to_str(members + i)); if (sys->status == SD_STATUS_SHUTDOWN) return; @@ -1885,31 +1814,19 @@ static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name, goto oom; cevent = &w->cev; - cevent->ctype = CPG_EVENT_CONCHG; + cevent->ctype = CPG_EVENT_JOIN; vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent); - size = sizeof(struct cpg_address) * member_list_entries; + size = sizeof(struct sheepid) * nr_members; w->member_list = zalloc(size); if (!w->member_list) goto oom; - memcpy(w->member_list, member_list, size); - w->member_list_entries = member_list_entries; - - size = sizeof(struct cpg_address) * left_list_entries; - w->left_list = zalloc(size); - if (!w->left_list) - goto oom; - memcpy(w->left_list, left_list, size); - w->left_list_entries = left_list_entries; + memcpy(w->member_list, members, size); + w->member_list_entries = nr_members; - size = sizeof(struct cpg_address) * joined_list_entries; - w->joined_list = zalloc(size); - if (!w->joined_list) - goto oom; - memcpy(w->joined_list, joined_list, size); - w->joined_list_entries = joined_list_entries; + w->joined = *joined; list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); start_cpg_event_work(); @@ -1919,111 +1836,83 @@ oom: if (w) { if (w->member_list) free(w->member_list); - if (w->left_list) - free(w->left_list); - if (w->joined_list) - free(w->joined_list); + free(w); } panic("failed to allocate memory for a confchg event\n"); } -static int set_addr(unsigned int nodeid, int port) +static void sd_leave_handler(struct sheepid *left, struct sheepid *members, + size_t nr_members) { - int ret, nr; - corosync_cfg_handle_t handle; - corosync_cfg_node_address_t addr; - struct sockaddr_storage *ss = (struct sockaddr_storage *)addr.address; - struct sockaddr_in *sin = (struct sockaddr_in *)addr.address; - struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)addr.address; - void *saddr; - char tmp[INET6_ADDRSTRLEN]; - - memset(sys->this_node.addr, 0, sizeof(sys->this_node.addr)); - - ret = corosync_cfg_initialize(&handle, NULL); - if (ret != CPG_OK) { - vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret); - return -1; - } + struct cpg_event *cevent; + struct work_leave *w = NULL; + int i, size; - ret = corosync_cfg_get_node_addrs(handle, nodeid, 1, &nr, &addr); - if (ret != CPG_OK) { - vprintf(SDOG_ERR "failed to get addr %d\n", ret); - return -1; - } + dprintf("leave %s\n", sheepid_to_str(left)); + for (i = 0; i < nr_members; i++) + dprintf("[%x] %s\n", i, sheepid_to_str(members + i)); - if (!nr) { - vprintf(SDOG_ERR "we got no address\n"); - return -1; - } + if (sys->status == SD_STATUS_SHUTDOWN) + return; - if (ss->ss_family == AF_INET6) { - saddr = &sin6->sin6_addr; - memcpy(sys->this_node.addr, saddr, 16); - } else if (ss->ss_family == AF_INET) { - saddr = &sin->sin_addr; - memcpy(sys->this_node.addr + 12, saddr, 4); - } else { - vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family); - return -1; - } + w = zalloc(sizeof(*w)); + if (!w) + goto oom; - inet_ntop(ss->ss_family, saddr, tmp, sizeof(tmp)); + cevent = &w->cev; + cevent->ctype = CPG_EVENT_LEAVE; - vprintf(SDOG_INFO "addr = %s, port = %d\n", tmp, port); - return 0; + + vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent); + + size = sizeof(struct sheepid) * nr_members; + w->member_list = zalloc(size); + if (!w->member_list) + goto oom; + memcpy(w->member_list, members, size); + w->member_list_entries = nr_members; + + w->left = *left; + + list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + start_cpg_event_work(); + + return; +oom: + if (w) { + if (w->member_list) + free(w->member_list); + free(w); + } + panic("failed to allocate memory for a confchg event\n"); } int create_cluster(int port, int64_t zone) { int fd, ret; - cpg_handle_t cpg_handle; - struct cpg_name group = { 8, "sheepdog" }; - cpg_callbacks_t cb = {&sd_deliver, &sd_confchg}; - unsigned int nodeid = 0; - - ret = cpg_initialize(&cpg_handle, &cb); - if (ret != CPG_OK) { - eprintf("Failed to initialize cpg, %d\n", ret); - eprintf("Is corosync running?\n"); + struct cdrv_handlers handlers = { + .join_handler = sd_join_handler, + .leave_handler = sd_leave_handler, + .notify_handler = sd_notify_handler, + }; + + fd = sys->cdrv->init(&handlers, &sys->this_sheepid); + if (fd < 0) return -1; - } - ret = cpg_local_get(cpg_handle, &nodeid); - if (ret != CPG_OK) { - eprintf("Failed to get the local node's identifier, %d\n", ret); - return 1; - } - -join_retry: - ret = cpg_join(cpg_handle, &group); - switch (ret) { - case CPG_OK: - break; - case CPG_ERR_TRY_AGAIN: - dprintf("Failed to join the sheepdog group, try again\n"); - sleep(1); - goto join_retry; - case CPG_ERR_SECURITY: - eprintf("Permission error.\n"); + ret = sys->cdrv->join(); + if (ret != 0) return -1; - default: - eprintf("Failed to join the sheepdog group, %d\n", ret); - return -1; - } - - sys->handle = cpg_handle; - sys->this_nodeid = nodeid; - sys->this_pid = getpid(); - ret = set_addr(nodeid, port); - if (ret) - return 1; + memcpy(sys->this_node.addr, sys->this_sheepid.addr, + sizeof(sys->this_node.addr)); sys->this_node.port = port; sys->this_node.nr_vnodes = SD_DEFAULT_VNODES; - if (zone == -1) - sys->this_node.zone = nodeid; - else + if (zone == -1) { + /* use last 4 bytes as zone id */ + uint8_t *b = sys->this_sheepid.addr + 12; + sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24; + } else sys->this_node.zone = zone; dprintf("zone id = %u\n", sys->this_node.zone); @@ -2042,11 +1931,6 @@ join_retry: INIT_LIST_HEAD(&sys->cpg_event_siblings); - ret = cpg_fd_get(cpg_handle, &fd); - if (ret != CPG_OK) { - eprintf("Failed to retrieve cpg file descriptor, %d\n", ret); - return 1; - } ret = register_event(fd, group_handler, NULL); if (ret) { eprintf("Failed to register epoll events, %d\n", ret); @@ -2066,10 +1950,9 @@ int leave_cluster(void) msg.header.state = DM_FIN; msg.header.msg_length = sizeof(msg); msg.header.from = sys->this_node; - msg.header.nodeid = sys->this_nodeid; - msg.header.pid = sys->this_pid; + msg.header.sheepid = sys->this_sheepid; msg.epoch = get_latest_epoch(); dprintf("%d\n", msg.epoch); - return send_message(sys->handle, (struct message_header *)&msg); + return sys->cdrv->notify(&msg, msg.header.msg_length); } diff --git a/sheep/sdnet.c b/sheep/sdnet.c index 9985bdb..db2e691 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -11,6 +11,8 @@ #include <stdio.h> #include <stdlib.h> #include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> #include <netinet/tcp.h> #include <sys/epoll.h> #include <fcntl.h> diff --git a/sheep/sheep.c b/sheep/sheep.c index 50cd841..0a73587 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -26,6 +26,7 @@ #define DEFAULT_OBJECT_DIR "/tmp" #define LOG_FILE_NAME "sheep.log" +LIST_HEAD(cluster_drivers); static char program_name[] = "sheep"; static struct option const long_options[] = { @@ -35,11 +36,12 @@ static struct option const long_options[] = { {"debug", no_argument, NULL, 'd'}, {"directio", no_argument, NULL, 'D'}, {"zone", required_argument, NULL, 'z'}, + {"cluster", required_argument, NULL, 'c'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0}, }; -static const char *short_options = "p:fl:dDz:h"; +static const char *short_options = "p:fl:dDz:c:h"; static void usage(int status) { @@ -56,6 +58,7 @@ Sheepdog Daemon, version %s\n\ -d, --debug print debug messages\n\ -D, --directio use direct IO\n\ -z, --zone specify the zone id\n\ + -c, --cluster specify the cluster driver\n\ -h, --help display this help and exit\n\ ", PACKAGE_VERSION); } @@ -75,6 +78,7 @@ int main(int argc, char **argv) char path[PATH_MAX]; int64_t zone = -1; char *p; + struct cluster_driver *cdrv; signal(SIGPIPE, SIG_IGN); @@ -112,6 +116,24 @@ int main(int argc, char **argv) } sys->this_node.zone = zone; break; + case 'c': + FOR_EACH_CLUSTER_DRIVER(cdrv) { + if (strcmp(cdrv->name, optarg) == 0) { + sys->cdrv = cdrv; + break; + } + } + + if (!sys->cdrv) { + printf("No such cluster driver, %s\n", optarg); + printf("Supported drivers:"); + FOR_EACH_CLUSTER_DRIVER(cdrv) { + printf(" %s", cdrv->name); + } + printf("\n"); + exit(1); + } + break; case 'h': usage(0); break; diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 6409530..e2fcb40 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -12,7 +12,6 @@ #define __SHEEP_PRIV_H__ #include <inttypes.h> -#include <corosync/cpg.h> #include "sheepdog_proto.h" #include "event.h" @@ -20,6 +19,7 @@ #include "work.h" #include "net.h" #include "sheep.h" +#include "cluster.h" #define SD_OP_REMOVE_OBJ 0x91 @@ -41,11 +41,15 @@ #define SD_RES_NETWORK_ERROR 0x81 /* Network error between sheeps */ enum cpg_event_type { - CPG_EVENT_CONCHG, - CPG_EVENT_DELIVER, + CPG_EVENT_JOIN, + CPG_EVENT_LEAVE, + CPG_EVENT_NOTIFY, CPG_EVENT_REQUEST, }; +#define is_membership_change_event(x) \ + ((x) == CPG_EVENT_JOIN || (x) == CPG_EVENT_LEAVE) + struct cpg_event { enum cpg_event_type ctype; struct list_head cpg_event_list; @@ -102,11 +106,11 @@ struct data_object_bmap { }; struct cluster_info { - cpg_handle_t handle; + struct cluster_driver *cdrv; + /* set after finishing the JOIN procedure */ int join_finished; - uint32_t this_nodeid; - uint32_t this_pid; + struct sheepid this_sheepid; struct sheepdog_node_list_entry this_node; uint32_t epoch; -- 1.7.2.5 |