This patch removes all corosync stuff from group.c, and uses a cluster driver instead of it. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/group.c | 493 ++++++++++++++++++--------------------------------- sheep/sdnet.c | 2 + sheep/sheep.c | 23 +++- sheep/sheep_priv.h | 12 +- 4 files changed, 206 insertions(+), 324 deletions(-) diff --git a/sheep/group.c b/sheep/group.c index 14dff00..f6743f5 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -15,8 +15,6 @@ #include <arpa/inet.h> #include <sys/time.h> #include <sys/epoll.h> -#include <corosync/cpg.h> -#include <corosync/cfg.h> #include "sheepdog_proto.h" #include "sheep_priv.h" @@ -26,54 +24,6 @@ #include "work.h" #include "cluster.h" -static corosync_cfg_handle_t cfg_handle; - -static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr) -{ - int ret, nr; - corosync_cfg_node_address_t caddr; - struct sockaddr_storage *ss = (struct sockaddr_storage *)caddr.address; - struct sockaddr_in *sin = (struct sockaddr_in *)caddr.address; - struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)caddr.address; - void *saddr; - - ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &caddr); - if (ret != CS_OK) { - vprintf(SDOG_ERR "failed to get addr %d\n", ret); - return -1; - } - - if (!nr) { - vprintf(SDOG_ERR "we got no address\n"); - return -1; - } - - if (ss->ss_family == AF_INET6) { - saddr = &sin6->sin6_addr; - memcpy(addr, saddr, 16); - } else if (ss->ss_family == AF_INET) { - saddr = &sin->sin_addr; - memset(addr, 0, 16); - memcpy(addr + 12, saddr, 4); - } else { - vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family); - return -1; - } - - return 0; -} - -static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, - struct sheepid *sheeps, size_t nr) -{ - int i; - - for (i = 0; i < nr; i++) { - nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr); - sheeps[i].pid = cpgs[i].pid; - } -} - struct node { struct sheepid sheepid; struct sheepdog_node_list_entry ent; @@ -134,23 +84,26 @@ struct mastership_tx_message { uint32_t epoch; }; -struct work_deliver { +struct work_notify { struct cpg_event cev; struct message_header *msg; }; -struct work_confchg { +struct work_join { struct cpg_event cev; - struct cpg_address *member_list; + struct sheepid *member_list; size_t member_list_entries; - struct cpg_address *left_list; - size_t left_list_entries; - struct cpg_address *joined_list; - size_t joined_list_entries; + struct sheepid joined; +}; - int sd_node_left; +struct work_leave { + struct cpg_event cev; + + struct sheepid *member_list; + size_t member_list_entries; + struct sheepid left; }; #define print_node_list(node_list) \ @@ -216,30 +169,6 @@ static inline int master_tx_message(struct message_header *m) return m->op == SD_MSG_MASTER_TRANSFER; } -static int send_message(cpg_handle_t handle, struct message_header *msg) -{ - struct iovec iov; - int ret; - - iov.iov_base = msg; - iov.iov_len = msg->msg_length; -retry: - ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1); - switch (ret) { - case CPG_OK: - break; - case CPG_ERR_TRY_AGAIN: - dprintf("failed to send message. try again\n"); - sleep(1); - goto retry; - default: - eprintf("failed to send message, %d\n", ret); - return -1; - } - return 0; -} - - static int get_node_idx(struct sheepdog_node_list_entry *ent, struct sheepdog_node_list_entry *entries, int nr_nodes) { @@ -437,7 +366,7 @@ forward: list_add(&req->pending_list, &sys->pending_list); - send_message(sys->handle, (struct message_header *)msg); + sys->cdrv->notify(msg, msg->header.msg_length); free(msg); } @@ -450,9 +379,8 @@ static void group_handler(int listen_fd, int events, void *data) goto out; } - ret = cpg_dispatch(sys->handle, CPG_DISPATCH_ALL); - - if (ret == CPG_OK) + ret = sys->cdrv->dispatch(); + if (ret == 0) return; else eprintf("oops...some error occured inside corosync\n"); @@ -1058,9 +986,9 @@ out: req->done(req); } -static void __sd_deliver(struct cpg_event *cevent) +static void __sd_notify(struct cpg_event *cevent) { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + struct work_notify *w = container_of(cevent, struct work_notify, cev); struct message_header *m = w->msg; char name[128]; struct node *node; @@ -1134,10 +1062,10 @@ static int tx_mastership(void) msg.header.from = sys->this_node; msg.header.sheepid = sys->this_sheepid; - return send_message(sys->handle, (struct message_header *)&msg); + return sys->cdrv->notify(&msg, msg.header.msg_length); } -static void send_join_response(struct work_deliver *w) +static void send_join_response(struct work_notify *w) { struct message_header *m; struct join_message *jm; @@ -1166,12 +1094,12 @@ static void send_join_response(struct work_deliver *w) exit(1); } jm->epoch = sys->epoch; - send_message(sys->handle, m); + sys->cdrv->notify(m, m->msg_length); } -static void __sd_deliver_done(struct cpg_event *cevent) +static void __sd_notify_done(struct cpg_event *cevent) { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + struct work_notify *w = container_of(cevent, struct work_notify, cev); struct message_header *m; char name[128]; int do_recovery; @@ -1245,7 +1173,7 @@ static void __sd_deliver_done(struct cpg_event *cevent) break; case SD_MSG_VDI_OP: m->state = DM_FIN; - send_message(sys->handle, m); + sys->cdrv->notify(m, m->msg_length); break; default: eprintf("unknown message %d\n", m->op); @@ -1261,25 +1189,24 @@ static void __sd_deliver_done(struct cpg_event *cevent) } } -static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, - uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) +static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len) { struct cpg_event *cevent; - struct work_deliver *w; + struct work_notify *w; struct message_header *m = msg; char name[128]; - dprintf("op: %d, state: %u, size: %d, from: %s, nodeid: %x, pid: %u\n", + dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n", m->op, m->state, m->msg_length, addr_to_str(name, sizeof(name), m->from.addr, m->from.port), - nodeid, pid); + sender->pid); w = zalloc(sizeof(*w)); if (!w) return; cevent = &w->cev; - cevent->ctype = CPG_EVENT_DELIVER; + cevent->ctype = CPG_EVENT_NOTIFY; vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent); @@ -1299,20 +1226,7 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name, start_cpg_event_work(); } -static void for_each_node_list(struct cpg_address list[], int count, - void (*func)(struct sheepid *id, - struct work_confchg *w), - struct work_confchg *w) -{ - int i; - struct sheepid sheepid[SD_MAX_NODES]; - - cpg_addr_to_sheepid(list, sheepid, count); - for (i = 0; i < count; i++) - func(sheepid + i, w); -} - -static void add_node(struct sheepid *id, struct work_confchg *w) +static void add_node(struct sheepid *id) { struct node *node; @@ -1325,7 +1239,7 @@ static void add_node(struct sheepid *id, struct work_confchg *w) list_add_tail(&node->list, &sys->cpg_node_list); } -static void del_node(struct sheepid *id, struct work_confchg *w) +static int del_node(struct sheepid *id) { struct node *node; @@ -1334,7 +1248,6 @@ static void del_node(struct sheepid *id, struct work_confchg *w) int nr; struct sheepdog_node_list_entry e[SD_MAX_NODES]; - w->sd_node_left++; sys->nr_vnodes = 0; list_del(&node->list); @@ -1350,38 +1263,26 @@ static void del_node(struct sheepid *id, struct work_confchg *w) update_epoch_store(sys->epoch); } - } else { - node = find_node(&sys->cpg_node_list, id); - if (node) { - list_del(&node->list); - free(node); - } + return 1; } -} -static int is_my_cpg_addr(struct cpg_address *addr) -{ - struct sheepid id; + node = find_node(&sys->cpg_node_list, id); + if (node) { + list_del(&node->list); + free(node); + } - cpg_addr_to_sheepid(addr, &id, 1); - return (sheepid_cmp(&id, &sys->this_sheepid) == 0); + return 0; } /* * Check whether the majority of Sheepdog nodes are still alive or not */ -static int check_majority(struct cpg_address *left_list, - size_t left_list_entries) +static int check_majority(struct sheepid *left) { - int nr_nodes = 0, nr_majority, nr_reachable = 0, i, fd; + int nr_nodes = 0, nr_majority, nr_reachable = 0, fd; struct node *node; char name[INET6_ADDRSTRLEN]; - struct sheepid left_sheepid[SD_MAX_NODES]; - - if (left_list_entries == 0) - return 1; /* we don't need this check in this case */ - - cpg_addr_to_sheepid(left_list, left_sheepid, left_list_entries); nr_nodes = get_nodes_nr_from(&sys->sd_node_list); nr_majority = nr_nodes / 2 + 1; @@ -1392,11 +1293,7 @@ static int check_majority(struct cpg_address *left_list, return 1; list_for_each_entry(node, &sys->sd_node_list, list) { - for (i = 0; i < left_list_entries; i++) { - if (sheepid_cmp(left_sheepid + i, &node->sheepid) != 0) - break; - } - if (i != left_list_entries) + if (sheepid_cmp(&node->sheepid, left) == 0) continue; addr_to_str(name, sizeof(name), node->ent.addr, 0); @@ -1416,26 +1313,22 @@ static int check_majority(struct cpg_address *left_list, return 0; } -static void __sd_confchg(struct cpg_event *cevent) +static void __sd_leave(struct cpg_event *cevent) { - struct work_confchg *w = container_of(cevent, struct work_confchg, cev); + struct work_leave *w = container_of(cevent, struct work_leave, cev); - if (!check_majority(w->left_list, w->left_list_entries)) { + if (!check_majority(&w->left)) { eprintf("perhaps network partition failure has occurred\n"); abort(); } } -static void send_join_request(struct sheepid *id, struct work_confchg *w) +static void send_join_request(struct sheepid *id) { struct join_message msg; struct sheepdog_node_list_entry entries[SD_MAX_NODES]; int nr_entries, i, ret; - /* if I've just joined in cpg, I'll join in sheepdog. */ - if (sheepid_cmp(id, &sys->this_sheepid) != 0) - return; - memset(&msg, 0, sizeof(msg)); msg.header.proto_ver = SD_SHEEP_PROTO_VER; msg.header.op = SD_MSG_JOIN; @@ -1454,34 +1347,29 @@ static void send_join_request(struct sheepid *id, struct work_confchg *w) msg.nodes[i].ent = entries[i]; } - send_message(sys->handle, (struct message_header *)&msg); + sys->cdrv->notify(&msg, msg.header.msg_length); vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid)); } -static void __sd_confchg_done(struct cpg_event *cevent) +static void __sd_join_done(struct cpg_event *cevent) { - struct work_confchg *w = container_of(cevent, struct work_confchg, cev); - int ret; + struct work_join *w = container_of(cevent, struct work_join, cev); + int ret, i; int first_cpg_node = 0; - if (w->member_list_entries == - w->joined_list_entries - w->left_list_entries && - is_my_cpg_addr(w->member_list)) { + if (w->member_list_entries == 1 && + sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) { sys->join_finished = 1; get_global_nr_copies(&sys->nr_sobjs); first_cpg_node = 1; } - if (list_empty(&sys->cpg_node_list)) - for_each_node_list(w->member_list, w->member_list_entries, - add_node, w); - else - for_each_node_list(w->joined_list, w->joined_list_entries, - add_node, w); - - for_each_node_list(w->left_list, w->left_list_entries, - del_node, w); + if (list_empty(&sys->cpg_node_list)) { + for (i = 0; i < w->member_list_entries; i++) + add_node(w->member_list + i); + } else + add_node(&w->joined); if (first_cpg_node) { struct join_message msg; @@ -1522,35 +1410,40 @@ static void __sd_confchg_done(struct cpg_event *cevent) print_node_list(&sys->sd_node_list); - if (first_cpg_node) - goto skip_join; + if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) + send_join_request(&w->joined); +} + +static void __sd_leave_done(struct cpg_event *cevent) +{ + struct work_leave *w = container_of(cevent, struct work_leave, cev); + int node_left; - for_each_node_list(w->joined_list, w->joined_list_entries, - send_join_request, w); + node_left = del_node(&w->left); -skip_join: - if (w->sd_node_left && sys->status == SD_STATUS_OK) { - if (w->sd_node_left > 1) - panic("we can't handle the departure of multiple nodes %d, %Zd\n", - w->sd_node_left, w->left_list_entries); + print_node_list(&sys->sd_node_list); + if (node_left && sys->status == SD_STATUS_OK) start_recovery(sys->epoch); - } } static void cpg_event_free(struct cpg_event *cevent) { switch (cevent->ctype) { - case CPG_EVENT_CONCHG: { - struct work_confchg *w = container_of(cevent, struct work_confchg, cev); + case CPG_EVENT_JOIN: { + struct work_join *w = container_of(cevent, struct work_join, cev); + free(w->member_list); + free(w); + break; + } + case CPG_EVENT_LEAVE: { + struct work_leave *w = container_of(cevent, struct work_leave, cev); free(w->member_list); - free(w->left_list); - free(w->joined_list); free(w); break; } - case CPG_EVENT_DELIVER: { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + case CPG_EVENT_NOTIFY: { + struct work_notify *w = container_of(cevent, struct work_notify, cev); free(w->msg); free(w); break; @@ -1575,14 +1468,16 @@ static void cpg_event_fn(struct work *work, int idx) */ switch (cevent->ctype) { - case CPG_EVENT_CONCHG: - __sd_confchg(cevent); + case CPG_EVENT_JOIN: break; - case CPG_EVENT_DELIVER: + case CPG_EVENT_LEAVE: + __sd_leave(cevent); + break; + case CPG_EVENT_NOTIFY: { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + struct work_notify *w = container_of(cevent, struct work_notify, cev); vprintf(SDOG_DEBUG "%d\n", w->msg->state); - __sd_deliver(cevent); + __sd_notify(cevent); break; } case CPG_EVENT_REQUEST: @@ -1612,12 +1507,15 @@ static void cpg_event_done(struct work *work, int idx) goto out; switch (cevent->ctype) { - case CPG_EVENT_CONCHG: - __sd_confchg_done(cevent); + case CPG_EVENT_JOIN: + __sd_join_done(cevent); + break; + case CPG_EVENT_LEAVE: + __sd_leave_done(cevent); break; - case CPG_EVENT_DELIVER: + case CPG_EVENT_NOTIFY: { - struct work_deliver *w = container_of(cevent, struct work_deliver, cev); + struct work_notify *w = container_of(cevent, struct work_notify, cev); if (w->msg->state == DM_FIN && vdi_op_message(w->msg)) vdi_op_done((struct vdi_op_message *)w->msg); @@ -1633,9 +1531,9 @@ static void cpg_event_done(struct work *work, int idx) list_for_each_entry(f_cevent, &sys->cpg_event_siblings, cpg_event_list) { - struct work_deliver *fw = - container_of(f_cevent, struct work_deliver, cev); - if (f_cevent->ctype == CPG_EVENT_DELIVER && + struct work_notify *fw = + container_of(f_cevent, struct work_notify, cev); + if (f_cevent->ctype == CPG_EVENT_NOTIFY && fw->msg->state == DM_FIN) { vprintf("already got fin %p\n", f_cevent); @@ -1651,7 +1549,7 @@ static void cpg_event_done(struct work *work, int idx) cpg_event_set_joining(); } got_fin: - __sd_deliver_done(cevent); + __sd_notify_done(cevent); break; } case CPG_EVENT_REQUEST: @@ -1762,7 +1660,7 @@ void start_cpg_event_work(void) * thread is running for a deliver for VDI, then we need to * run io requests. */ - if (cpg_event_running() && cevent->ctype == CPG_EVENT_CONCHG) + if (cpg_event_running() && is_membership_change_event(cevent->ctype)) return; /* @@ -1796,9 +1694,9 @@ do_retry: list_for_each_entry_safe(cevent, n, &sys->cpg_event_siblings, cpg_event_list) { struct request *req = container_of(cevent, struct request, cev); - if (cevent->ctype == CPG_EVENT_DELIVER) + if (cevent->ctype == CPG_EVENT_NOTIFY) continue; - if (cevent->ctype == CPG_EVENT_CONCHG) + if (is_membership_change_event(cevent->ctype)) break; list_del(&cevent->cpg_event_list); @@ -1882,7 +1780,7 @@ do_retry: cevent = list_first_entry(&sys->cpg_event_siblings, struct cpg_event, cpg_event_list); - if (cevent->ctype == CPG_EVENT_CONCHG && sys->nr_outstanding_io) + if (is_membership_change_event(cevent->ctype) && sys->nr_outstanding_io) return; list_del(&cevent->cpg_event_list); @@ -1897,25 +1795,16 @@ do_retry: queue_work(sys->cpg_wqueue, &cpg_event_work); } -static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name, - const struct cpg_address *member_list, - size_t member_list_entries, - const struct cpg_address *left_list, - size_t left_list_entries, - const struct cpg_address *joined_list, - size_t joined_list_entries) +static void sd_join_handler(struct sheepid *joined, struct sheepid *members, + size_t nr_members) { struct cpg_event *cevent; - struct work_confchg *w = NULL; + struct work_join *w = NULL; int i, size; - dprintf("confchg nodeid %x\n", member_list[0].nodeid); - dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries, - joined_list_entries); - for (i = 0; i < member_list_entries; i++) { - dprintf("[%x] node_id: %x, pid: %d\n", i, - member_list[i].nodeid, member_list[i].pid); - } + dprintf("join %s\n", sheepid_to_str(joined)); + for (i = 0; i < nr_members; i++) + dprintf("[%x] %s\n", i, sheepid_to_str(members + i)); if (sys->status == SD_STATUS_SHUTDOWN) return; @@ -1925,31 +1814,19 @@ static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name, goto oom; cevent = &w->cev; - cevent->ctype = CPG_EVENT_CONCHG; + cevent->ctype = CPG_EVENT_JOIN; vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent); - size = sizeof(struct cpg_address) * member_list_entries; + size = sizeof(struct sheepid) * nr_members; w->member_list = zalloc(size); if (!w->member_list) goto oom; - memcpy(w->member_list, member_list, size); - w->member_list_entries = member_list_entries; + memcpy(w->member_list, members, size); + w->member_list_entries = nr_members; - size = sizeof(struct cpg_address) * left_list_entries; - w->left_list = zalloc(size); - if (!w->left_list) - goto oom; - memcpy(w->left_list, left_list, size); - w->left_list_entries = left_list_entries; - - size = sizeof(struct cpg_address) * joined_list_entries; - w->joined_list = zalloc(size); - if (!w->joined_list) - goto oom; - memcpy(w->joined_list, joined_list, size); - w->joined_list_entries = joined_list_entries; + w->joined = *joined; list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); start_cpg_event_work(); @@ -1959,111 +1836,94 @@ oom: if (w) { if (w->member_list) free(w->member_list); - if (w->left_list) - free(w->left_list); - if (w->joined_list) - free(w->joined_list); + free(w); } panic("failed to allocate memory for a confchg event\n"); } -static int set_addr(unsigned int nodeid, int port) +static void sd_leave_handler(struct sheepid *left, struct sheepid *members, + size_t nr_members) { - int ret, nr; - corosync_cfg_node_address_t addr; - struct sockaddr_storage *ss = (struct sockaddr_storage *)addr.address; - struct sockaddr_in *sin = (struct sockaddr_in *)addr.address; - struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)addr.address; - void *saddr; - char tmp[INET6_ADDRSTRLEN]; - - memset(sys->this_node.addr, 0, sizeof(sys->this_node.addr)); - - ret = corosync_cfg_initialize(&cfg_handle, NULL); - if (ret != CPG_OK) { - vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret); - return -1; - } + struct cpg_event *cevent; + struct work_leave *w = NULL; + int i, size; - ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &addr); - if (ret != CPG_OK) { - vprintf(SDOG_ERR "failed to get addr %d\n", ret); - return -1; - } + dprintf("leave %s\n", sheepid_to_str(left)); + for (i = 0; i < nr_members; i++) + dprintf("[%x] %s\n", i, sheepid_to_str(members + i)); - if (!nr) { - vprintf(SDOG_ERR "we got no address\n"); - return -1; - } + if (sys->status == SD_STATUS_SHUTDOWN) + return; - if (ss->ss_family == AF_INET6) { - saddr = &sin6->sin6_addr; - memcpy(sys->this_node.addr, saddr, 16); - } else if (ss->ss_family == AF_INET) { - saddr = &sin->sin_addr; - memcpy(sys->this_node.addr + 12, saddr, 4); - } else { - vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family); - return -1; - } + w = zalloc(sizeof(*w)); + if (!w) + goto oom; - inet_ntop(ss->ss_family, saddr, tmp, sizeof(tmp)); + cevent = &w->cev; + cevent->ctype = CPG_EVENT_LEAVE; - vprintf(SDOG_INFO "addr = %s, port = %d\n", tmp, port); - return 0; + + vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent); + + size = sizeof(struct sheepid) * nr_members; + w->member_list = zalloc(size); + if (!w->member_list) + goto oom; + memcpy(w->member_list, members, size); + w->member_list_entries = nr_members; + + w->left = *left; + + list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + start_cpg_event_work(); + + return; +oom: + if (w) { + if (w->member_list) + free(w->member_list); + free(w); + } + panic("failed to allocate memory for a confchg event\n"); } int create_cluster(int port, int64_t zone) { int fd, ret; - cpg_handle_t cpg_handle; - struct cpg_name group = { 8, "sheepdog" }; - cpg_callbacks_t cb = {&sd_deliver, &sd_confchg}; - unsigned int nodeid = 0; - - ret = cpg_initialize(&cpg_handle, &cb); - if (ret != CPG_OK) { - eprintf("Failed to initialize cpg, %d\n", ret); - eprintf("Is corosync running?\n"); - return -1; - } - - ret = cpg_local_get(cpg_handle, &nodeid); - if (ret != CPG_OK) { - eprintf("Failed to get the local node's identifier, %d\n", ret); - return 1; + struct cluster_driver *cdrv; + struct cdrv_handlers handlers = { + .join_handler = sd_join_handler, + .leave_handler = sd_leave_handler, + .notify_handler = sd_notify_handler, + }; + + if (!sys->cdrv) { + FOR_EACH_CLUSTER_DRIVER(cdrv) { + if (strcmp(cdrv->name, "corosync") == 0) { + dprintf("use corosync driver as default\n"); + sys->cdrv = cdrv; + break; + } + } } -join_retry: - ret = cpg_join(cpg_handle, &group); - switch (ret) { - case CPG_OK: - break; - case CPG_ERR_TRY_AGAIN: - dprintf("Failed to join the sheepdog group, try again\n"); - sleep(1); - goto join_retry; - case CPG_ERR_SECURITY: - eprintf("Permission error.\n"); - return -1; - default: - eprintf("Failed to join the sheepdog group, %d\n", ret); + fd = sys->cdrv->init(&handlers, &sys->this_sheepid); + if (fd < 0) return -1; - } - sys->handle = cpg_handle; - sys->this_sheepid.pid = getpid(); + ret = sys->cdrv->join(); + if (ret != 0) + return -1; - ret = set_addr(nodeid, port); - if (ret) - return 1; - memcpy(sys->this_sheepid.addr, sys->this_node.addr, + memcpy(sys->this_node.addr, sys->this_sheepid.addr, sizeof(sys->this_node.addr)); sys->this_node.port = port; sys->this_node.nr_vnodes = SD_DEFAULT_VNODES; - if (zone == -1) - sys->this_node.zone = nodeid; - else + if (zone == -1) { + /* use last 4 bytes as zone id */ + uint8_t *b = sys->this_sheepid.addr + 12; + sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24; + } else sys->this_node.zone = zone; dprintf("zone id = %u\n", sys->this_node.zone); @@ -2082,11 +1942,6 @@ join_retry: INIT_LIST_HEAD(&sys->cpg_event_siblings); - ret = cpg_fd_get(cpg_handle, &fd); - if (ret != CPG_OK) { - eprintf("Failed to retrieve cpg file descriptor, %d\n", ret); - return 1; - } ret = register_event(fd, group_handler, NULL); if (ret) { eprintf("Failed to register epoll events, %d\n", ret); @@ -2110,5 +1965,5 @@ int leave_cluster(void) msg.epoch = get_latest_epoch(); dprintf("%d\n", msg.epoch); - return send_message(sys->handle, (struct message_header *)&msg); + return sys->cdrv->notify(&msg, msg.header.msg_length); } diff --git a/sheep/sdnet.c b/sheep/sdnet.c index 9985bdb..db2e691 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -11,6 +11,8 @@ #include <stdio.h> #include <stdlib.h> #include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> #include <netinet/tcp.h> #include <sys/epoll.h> #include <fcntl.h> diff --git a/sheep/sheep.c b/sheep/sheep.c index 789cc4c..0a73587 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -36,11 +36,12 @@ static struct option const long_options[] = { {"debug", no_argument, NULL, 'd'}, {"directio", no_argument, NULL, 'D'}, {"zone", required_argument, NULL, 'z'}, + {"cluster", required_argument, NULL, 'c'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0}, }; -static const char *short_options = "p:fl:dDz:h"; +static const char *short_options = "p:fl:dDz:c:h"; static void usage(int status) { @@ -57,6 +58,7 @@ Sheepdog Daemon, version %s\n\ -d, --debug print debug messages\n\ -D, --directio use direct IO\n\ -z, --zone specify the zone id\n\ + -c, --cluster specify the cluster driver\n\ -h, --help display this help and exit\n\ ", PACKAGE_VERSION); } @@ -76,6 +78,7 @@ int main(int argc, char **argv) char path[PATH_MAX]; int64_t zone = -1; char *p; + struct cluster_driver *cdrv; signal(SIGPIPE, SIG_IGN); @@ -113,6 +116,24 @@ int main(int argc, char **argv) } sys->this_node.zone = zone; break; + case 'c': + FOR_EACH_CLUSTER_DRIVER(cdrv) { + if (strcmp(cdrv->name, optarg) == 0) { + sys->cdrv = cdrv; + break; + } + } + + if (!sys->cdrv) { + printf("No such cluster driver, %s\n", optarg); + printf("Supported drivers:"); + FOR_EACH_CLUSTER_DRIVER(cdrv) { + printf(" %s", cdrv->name); + } + printf("\n"); + exit(1); + } + break; case 'h': usage(0); break; diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 8b20213..e2fcb40 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -12,7 +12,6 @@ #define __SHEEP_PRIV_H__ #include <inttypes.h> -#include <corosync/cpg.h> #include "sheepdog_proto.h" #include "event.h" @@ -42,11 +41,15 @@ #define SD_RES_NETWORK_ERROR 0x81 /* Network error between sheeps */ enum cpg_event_type { - CPG_EVENT_CONCHG, - CPG_EVENT_DELIVER, + CPG_EVENT_JOIN, + CPG_EVENT_LEAVE, + CPG_EVENT_NOTIFY, CPG_EVENT_REQUEST, }; +#define is_membership_change_event(x) \ + ((x) == CPG_EVENT_JOIN || (x) == CPG_EVENT_LEAVE) + struct cpg_event { enum cpg_event_type ctype; struct list_head cpg_event_list; @@ -103,7 +106,8 @@ struct data_object_bmap { }; struct cluster_info { - cpg_handle_t handle; + struct cluster_driver *cdrv; + /* set after finishing the JOIN procedure */ int join_finished; struct sheepid this_sheepid; -- 1.7.2.5 |