[Sheepdog] [PATCH v2 5/6] sheep: use cluster driver
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Tue Oct 4 14:38:57 CEST 2011
This patch removes all corosync stuff from group.c, and uses a cluster
driver instead of it.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/group.c | 493 ++++++++++++++++++---------------------------------
sheep/sdnet.c | 2 +
sheep/sheep.c | 23 +++-
sheep/sheep_priv.h | 12 +-
4 files changed, 206 insertions(+), 324 deletions(-)
diff --git a/sheep/group.c b/sheep/group.c
index 14dff00..f6743f5 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -15,8 +15,6 @@
#include <arpa/inet.h>
#include <sys/time.h>
#include <sys/epoll.h>
-#include <corosync/cpg.h>
-#include <corosync/cfg.h>
#include "sheepdog_proto.h"
#include "sheep_priv.h"
@@ -26,54 +24,6 @@
#include "work.h"
#include "cluster.h"
-static corosync_cfg_handle_t cfg_handle;
-
-static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
-{
- int ret, nr;
- corosync_cfg_node_address_t caddr;
- struct sockaddr_storage *ss = (struct sockaddr_storage *)caddr.address;
- struct sockaddr_in *sin = (struct sockaddr_in *)caddr.address;
- struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)caddr.address;
- void *saddr;
-
- ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &caddr);
- if (ret != CS_OK) {
- vprintf(SDOG_ERR "failed to get addr %d\n", ret);
- return -1;
- }
-
- if (!nr) {
- vprintf(SDOG_ERR "we got no address\n");
- return -1;
- }
-
- if (ss->ss_family == AF_INET6) {
- saddr = &sin6->sin6_addr;
- memcpy(addr, saddr, 16);
- } else if (ss->ss_family == AF_INET) {
- saddr = &sin->sin_addr;
- memset(addr, 0, 16);
- memcpy(addr + 12, saddr, 4);
- } else {
- vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family);
- return -1;
- }
-
- return 0;
-}
-
-static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
- struct sheepid *sheeps, size_t nr)
-{
- int i;
-
- for (i = 0; i < nr; i++) {
- nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr);
- sheeps[i].pid = cpgs[i].pid;
- }
-}
-
struct node {
struct sheepid sheepid;
struct sheepdog_node_list_entry ent;
@@ -134,23 +84,26 @@ struct mastership_tx_message {
uint32_t epoch;
};
-struct work_deliver {
+struct work_notify {
struct cpg_event cev;
struct message_header *msg;
};
-struct work_confchg {
+struct work_join {
struct cpg_event cev;
- struct cpg_address *member_list;
+ struct sheepid *member_list;
size_t member_list_entries;
- struct cpg_address *left_list;
- size_t left_list_entries;
- struct cpg_address *joined_list;
- size_t joined_list_entries;
+ struct sheepid joined;
+};
- int sd_node_left;
+struct work_leave {
+ struct cpg_event cev;
+
+ struct sheepid *member_list;
+ size_t member_list_entries;
+ struct sheepid left;
};
#define print_node_list(node_list) \
@@ -216,30 +169,6 @@ static inline int master_tx_message(struct message_header *m)
return m->op == SD_MSG_MASTER_TRANSFER;
}
-static int send_message(cpg_handle_t handle, struct message_header *msg)
-{
- struct iovec iov;
- int ret;
-
- iov.iov_base = msg;
- iov.iov_len = msg->msg_length;
-retry:
- ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1);
- switch (ret) {
- case CPG_OK:
- break;
- case CPG_ERR_TRY_AGAIN:
- dprintf("failed to send message. try again\n");
- sleep(1);
- goto retry;
- default:
- eprintf("failed to send message, %d\n", ret);
- return -1;
- }
- return 0;
-}
-
-
static int get_node_idx(struct sheepdog_node_list_entry *ent,
struct sheepdog_node_list_entry *entries, int nr_nodes)
{
@@ -437,7 +366,7 @@ forward:
list_add(&req->pending_list, &sys->pending_list);
- send_message(sys->handle, (struct message_header *)msg);
+ sys->cdrv->notify(msg, msg->header.msg_length);
free(msg);
}
@@ -450,9 +379,8 @@ static void group_handler(int listen_fd, int events, void *data)
goto out;
}
- ret = cpg_dispatch(sys->handle, CPG_DISPATCH_ALL);
-
- if (ret == CPG_OK)
+ ret = sys->cdrv->dispatch();
+ if (ret == 0)
return;
else
eprintf("oops...some error occured inside corosync\n");
@@ -1058,9 +986,9 @@ out:
req->done(req);
}
-static void __sd_deliver(struct cpg_event *cevent)
+static void __sd_notify(struct cpg_event *cevent)
{
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
struct message_header *m = w->msg;
char name[128];
struct node *node;
@@ -1134,10 +1062,10 @@ static int tx_mastership(void)
msg.header.from = sys->this_node;
msg.header.sheepid = sys->this_sheepid;
- return send_message(sys->handle, (struct message_header *)&msg);
+ return sys->cdrv->notify(&msg, msg.header.msg_length);
}
-static void send_join_response(struct work_deliver *w)
+static void send_join_response(struct work_notify *w)
{
struct message_header *m;
struct join_message *jm;
@@ -1166,12 +1094,12 @@ static void send_join_response(struct work_deliver *w)
exit(1);
}
jm->epoch = sys->epoch;
- send_message(sys->handle, m);
+ sys->cdrv->notify(m, m->msg_length);
}
-static void __sd_deliver_done(struct cpg_event *cevent)
+static void __sd_notify_done(struct cpg_event *cevent)
{
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
struct message_header *m;
char name[128];
int do_recovery;
@@ -1245,7 +1173,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
break;
case SD_MSG_VDI_OP:
m->state = DM_FIN;
- send_message(sys->handle, m);
+ sys->cdrv->notify(m, m->msg_length);
break;
default:
eprintf("unknown message %d\n", m->op);
@@ -1261,25 +1189,24 @@ static void __sd_deliver_done(struct cpg_event *cevent)
}
}
-static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
- uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
{
struct cpg_event *cevent;
- struct work_deliver *w;
+ struct work_notify *w;
struct message_header *m = msg;
char name[128];
- dprintf("op: %d, state: %u, size: %d, from: %s, nodeid: %x, pid: %u\n",
+ dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n",
m->op, m->state, m->msg_length,
addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
- nodeid, pid);
+ sender->pid);
w = zalloc(sizeof(*w));
if (!w)
return;
cevent = &w->cev;
- cevent->ctype = CPG_EVENT_DELIVER;
+ cevent->ctype = CPG_EVENT_NOTIFY;
vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent);
@@ -1299,20 +1226,7 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
start_cpg_event_work();
}
-static void for_each_node_list(struct cpg_address list[], int count,
- void (*func)(struct sheepid *id,
- struct work_confchg *w),
- struct work_confchg *w)
-{
- int i;
- struct sheepid sheepid[SD_MAX_NODES];
-
- cpg_addr_to_sheepid(list, sheepid, count);
- for (i = 0; i < count; i++)
- func(sheepid + i, w);
-}
-
-static void add_node(struct sheepid *id, struct work_confchg *w)
+static void add_node(struct sheepid *id)
{
struct node *node;
@@ -1325,7 +1239,7 @@ static void add_node(struct sheepid *id, struct work_confchg *w)
list_add_tail(&node->list, &sys->cpg_node_list);
}
-static void del_node(struct sheepid *id, struct work_confchg *w)
+static int del_node(struct sheepid *id)
{
struct node *node;
@@ -1334,7 +1248,6 @@ static void del_node(struct sheepid *id, struct work_confchg *w)
int nr;
struct sheepdog_node_list_entry e[SD_MAX_NODES];
- w->sd_node_left++;
sys->nr_vnodes = 0;
list_del(&node->list);
@@ -1350,38 +1263,26 @@ static void del_node(struct sheepid *id, struct work_confchg *w)
update_epoch_store(sys->epoch);
}
- } else {
- node = find_node(&sys->cpg_node_list, id);
- if (node) {
- list_del(&node->list);
- free(node);
- }
+ return 1;
}
-}
-static int is_my_cpg_addr(struct cpg_address *addr)
-{
- struct sheepid id;
+ node = find_node(&sys->cpg_node_list, id);
+ if (node) {
+ list_del(&node->list);
+ free(node);
+ }
- cpg_addr_to_sheepid(addr, &id, 1);
- return (sheepid_cmp(&id, &sys->this_sheepid) == 0);
+ return 0;
}
/*
* Check whether the majority of Sheepdog nodes are still alive or not
*/
-static int check_majority(struct cpg_address *left_list,
- size_t left_list_entries)
+static int check_majority(struct sheepid *left)
{
- int nr_nodes = 0, nr_majority, nr_reachable = 0, i, fd;
+ int nr_nodes = 0, nr_majority, nr_reachable = 0, fd;
struct node *node;
char name[INET6_ADDRSTRLEN];
- struct sheepid left_sheepid[SD_MAX_NODES];
-
- if (left_list_entries == 0)
- return 1; /* we don't need this check in this case */
-
- cpg_addr_to_sheepid(left_list, left_sheepid, left_list_entries);
nr_nodes = get_nodes_nr_from(&sys->sd_node_list);
nr_majority = nr_nodes / 2 + 1;
@@ -1392,11 +1293,7 @@ static int check_majority(struct cpg_address *left_list,
return 1;
list_for_each_entry(node, &sys->sd_node_list, list) {
- for (i = 0; i < left_list_entries; i++) {
- if (sheepid_cmp(left_sheepid + i, &node->sheepid) != 0)
- break;
- }
- if (i != left_list_entries)
+ if (sheepid_cmp(&node->sheepid, left) == 0)
continue;
addr_to_str(name, sizeof(name), node->ent.addr, 0);
@@ -1416,26 +1313,22 @@ static int check_majority(struct cpg_address *left_list,
return 0;
}
-static void __sd_confchg(struct cpg_event *cevent)
+static void __sd_leave(struct cpg_event *cevent)
{
- struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
+ struct work_leave *w = container_of(cevent, struct work_leave, cev);
- if (!check_majority(w->left_list, w->left_list_entries)) {
+ if (!check_majority(&w->left)) {
eprintf("perhaps network partition failure has occurred\n");
abort();
}
}
-static void send_join_request(struct sheepid *id, struct work_confchg *w)
+static void send_join_request(struct sheepid *id)
{
struct join_message msg;
struct sheepdog_node_list_entry entries[SD_MAX_NODES];
int nr_entries, i, ret;
- /* if I've just joined in cpg, I'll join in sheepdog. */
- if (sheepid_cmp(id, &sys->this_sheepid) != 0)
- return;
-
memset(&msg, 0, sizeof(msg));
msg.header.proto_ver = SD_SHEEP_PROTO_VER;
msg.header.op = SD_MSG_JOIN;
@@ -1454,34 +1347,29 @@ static void send_join_request(struct sheepid *id, struct work_confchg *w)
msg.nodes[i].ent = entries[i];
}
- send_message(sys->handle, (struct message_header *)&msg);
+ sys->cdrv->notify(&msg, msg.header.msg_length);
vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
}
-static void __sd_confchg_done(struct cpg_event *cevent)
+static void __sd_join_done(struct cpg_event *cevent)
{
- struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
- int ret;
+ struct work_join *w = container_of(cevent, struct work_join, cev);
+ int ret, i;
int first_cpg_node = 0;
- if (w->member_list_entries ==
- w->joined_list_entries - w->left_list_entries &&
- is_my_cpg_addr(w->member_list)) {
+ if (w->member_list_entries == 1 &&
+ sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) {
sys->join_finished = 1;
get_global_nr_copies(&sys->nr_sobjs);
first_cpg_node = 1;
}
- if (list_empty(&sys->cpg_node_list))
- for_each_node_list(w->member_list, w->member_list_entries,
- add_node, w);
- else
- for_each_node_list(w->joined_list, w->joined_list_entries,
- add_node, w);
-
- for_each_node_list(w->left_list, w->left_list_entries,
- del_node, w);
+ if (list_empty(&sys->cpg_node_list)) {
+ for (i = 0; i < w->member_list_entries; i++)
+ add_node(w->member_list + i);
+ } else
+ add_node(&w->joined);
if (first_cpg_node) {
struct join_message msg;
@@ -1522,35 +1410,40 @@ static void __sd_confchg_done(struct cpg_event *cevent)
print_node_list(&sys->sd_node_list);
- if (first_cpg_node)
- goto skip_join;
+ if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0)
+ send_join_request(&w->joined);
+}
+
+static void __sd_leave_done(struct cpg_event *cevent)
+{
+ struct work_leave *w = container_of(cevent, struct work_leave, cev);
+ int node_left;
- for_each_node_list(w->joined_list, w->joined_list_entries,
- send_join_request, w);
+ node_left = del_node(&w->left);
-skip_join:
- if (w->sd_node_left && sys->status == SD_STATUS_OK) {
- if (w->sd_node_left > 1)
- panic("we can't handle the departure of multiple nodes %d, %Zd\n",
- w->sd_node_left, w->left_list_entries);
+ print_node_list(&sys->sd_node_list);
+ if (node_left && sys->status == SD_STATUS_OK)
start_recovery(sys->epoch);
- }
}
static void cpg_event_free(struct cpg_event *cevent)
{
switch (cevent->ctype) {
- case CPG_EVENT_CONCHG: {
- struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
+ case CPG_EVENT_JOIN: {
+ struct work_join *w = container_of(cevent, struct work_join, cev);
+ free(w->member_list);
+ free(w);
+ break;
+ }
+ case CPG_EVENT_LEAVE: {
+ struct work_leave *w = container_of(cevent, struct work_leave, cev);
free(w->member_list);
- free(w->left_list);
- free(w->joined_list);
free(w);
break;
}
- case CPG_EVENT_DELIVER: {
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ case CPG_EVENT_NOTIFY: {
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
free(w->msg);
free(w);
break;
@@ -1575,14 +1468,16 @@ static void cpg_event_fn(struct work *work, int idx)
*/
switch (cevent->ctype) {
- case CPG_EVENT_CONCHG:
- __sd_confchg(cevent);
+ case CPG_EVENT_JOIN:
break;
- case CPG_EVENT_DELIVER:
+ case CPG_EVENT_LEAVE:
+ __sd_leave(cevent);
+ break;
+ case CPG_EVENT_NOTIFY:
{
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
vprintf(SDOG_DEBUG "%d\n", w->msg->state);
- __sd_deliver(cevent);
+ __sd_notify(cevent);
break;
}
case CPG_EVENT_REQUEST:
@@ -1612,12 +1507,15 @@ static void cpg_event_done(struct work *work, int idx)
goto out;
switch (cevent->ctype) {
- case CPG_EVENT_CONCHG:
- __sd_confchg_done(cevent);
+ case CPG_EVENT_JOIN:
+ __sd_join_done(cevent);
+ break;
+ case CPG_EVENT_LEAVE:
+ __sd_leave_done(cevent);
break;
- case CPG_EVENT_DELIVER:
+ case CPG_EVENT_NOTIFY:
{
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
if (w->msg->state == DM_FIN && vdi_op_message(w->msg))
vdi_op_done((struct vdi_op_message *)w->msg);
@@ -1633,9 +1531,9 @@ static void cpg_event_done(struct work *work, int idx)
list_for_each_entry(f_cevent, &sys->cpg_event_siblings,
cpg_event_list) {
- struct work_deliver *fw =
- container_of(f_cevent, struct work_deliver, cev);
- if (f_cevent->ctype == CPG_EVENT_DELIVER &&
+ struct work_notify *fw =
+ container_of(f_cevent, struct work_notify, cev);
+ if (f_cevent->ctype == CPG_EVENT_NOTIFY &&
fw->msg->state == DM_FIN) {
vprintf("already got fin %p\n",
f_cevent);
@@ -1651,7 +1549,7 @@ static void cpg_event_done(struct work *work, int idx)
cpg_event_set_joining();
}
got_fin:
- __sd_deliver_done(cevent);
+ __sd_notify_done(cevent);
break;
}
case CPG_EVENT_REQUEST:
@@ -1762,7 +1660,7 @@ void start_cpg_event_work(void)
* thread is running for a deliver for VDI, then we need to
* run io requests.
*/
- if (cpg_event_running() && cevent->ctype == CPG_EVENT_CONCHG)
+ if (cpg_event_running() && is_membership_change_event(cevent->ctype))
return;
/*
@@ -1796,9 +1694,9 @@ do_retry:
list_for_each_entry_safe(cevent, n, &sys->cpg_event_siblings, cpg_event_list) {
struct request *req = container_of(cevent, struct request, cev);
- if (cevent->ctype == CPG_EVENT_DELIVER)
+ if (cevent->ctype == CPG_EVENT_NOTIFY)
continue;
- if (cevent->ctype == CPG_EVENT_CONCHG)
+ if (is_membership_change_event(cevent->ctype))
break;
list_del(&cevent->cpg_event_list);
@@ -1882,7 +1780,7 @@ do_retry:
cevent = list_first_entry(&sys->cpg_event_siblings,
struct cpg_event, cpg_event_list);
- if (cevent->ctype == CPG_EVENT_CONCHG && sys->nr_outstanding_io)
+ if (is_membership_change_event(cevent->ctype) && sys->nr_outstanding_io)
return;
list_del(&cevent->cpg_event_list);
@@ -1897,25 +1795,16 @@ do_retry:
queue_work(sys->cpg_wqueue, &cpg_event_work);
}
-static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name,
- const struct cpg_address *member_list,
- size_t member_list_entries,
- const struct cpg_address *left_list,
- size_t left_list_entries,
- const struct cpg_address *joined_list,
- size_t joined_list_entries)
+static void sd_join_handler(struct sheepid *joined, struct sheepid *members,
+ size_t nr_members)
{
struct cpg_event *cevent;
- struct work_confchg *w = NULL;
+ struct work_join *w = NULL;
int i, size;
- dprintf("confchg nodeid %x\n", member_list[0].nodeid);
- dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries,
- joined_list_entries);
- for (i = 0; i < member_list_entries; i++) {
- dprintf("[%x] node_id: %x, pid: %d\n", i,
- member_list[i].nodeid, member_list[i].pid);
- }
+ dprintf("join %s\n", sheepid_to_str(joined));
+ for (i = 0; i < nr_members; i++)
+ dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
if (sys->status == SD_STATUS_SHUTDOWN)
return;
@@ -1925,31 +1814,19 @@ static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name,
goto oom;
cevent = &w->cev;
- cevent->ctype = CPG_EVENT_CONCHG;
+ cevent->ctype = CPG_EVENT_JOIN;
vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
- size = sizeof(struct cpg_address) * member_list_entries;
+ size = sizeof(struct sheepid) * nr_members;
w->member_list = zalloc(size);
if (!w->member_list)
goto oom;
- memcpy(w->member_list, member_list, size);
- w->member_list_entries = member_list_entries;
+ memcpy(w->member_list, members, size);
+ w->member_list_entries = nr_members;
- size = sizeof(struct cpg_address) * left_list_entries;
- w->left_list = zalloc(size);
- if (!w->left_list)
- goto oom;
- memcpy(w->left_list, left_list, size);
- w->left_list_entries = left_list_entries;
-
- size = sizeof(struct cpg_address) * joined_list_entries;
- w->joined_list = zalloc(size);
- if (!w->joined_list)
- goto oom;
- memcpy(w->joined_list, joined_list, size);
- w->joined_list_entries = joined_list_entries;
+ w->joined = *joined;
list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
start_cpg_event_work();
@@ -1959,111 +1836,94 @@ oom:
if (w) {
if (w->member_list)
free(w->member_list);
- if (w->left_list)
- free(w->left_list);
- if (w->joined_list)
- free(w->joined_list);
+ free(w);
}
panic("failed to allocate memory for a confchg event\n");
}
-static int set_addr(unsigned int nodeid, int port)
+static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
+ size_t nr_members)
{
- int ret, nr;
- corosync_cfg_node_address_t addr;
- struct sockaddr_storage *ss = (struct sockaddr_storage *)addr.address;
- struct sockaddr_in *sin = (struct sockaddr_in *)addr.address;
- struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)addr.address;
- void *saddr;
- char tmp[INET6_ADDRSTRLEN];
-
- memset(sys->this_node.addr, 0, sizeof(sys->this_node.addr));
-
- ret = corosync_cfg_initialize(&cfg_handle, NULL);
- if (ret != CPG_OK) {
- vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret);
- return -1;
- }
+ struct cpg_event *cevent;
+ struct work_leave *w = NULL;
+ int i, size;
- ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &addr);
- if (ret != CPG_OK) {
- vprintf(SDOG_ERR "failed to get addr %d\n", ret);
- return -1;
- }
+ dprintf("leave %s\n", sheepid_to_str(left));
+ for (i = 0; i < nr_members; i++)
+ dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
- if (!nr) {
- vprintf(SDOG_ERR "we got no address\n");
- return -1;
- }
+ if (sys->status == SD_STATUS_SHUTDOWN)
+ return;
- if (ss->ss_family == AF_INET6) {
- saddr = &sin6->sin6_addr;
- memcpy(sys->this_node.addr, saddr, 16);
- } else if (ss->ss_family == AF_INET) {
- saddr = &sin->sin_addr;
- memcpy(sys->this_node.addr + 12, saddr, 4);
- } else {
- vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family);
- return -1;
- }
+ w = zalloc(sizeof(*w));
+ if (!w)
+ goto oom;
- inet_ntop(ss->ss_family, saddr, tmp, sizeof(tmp));
+ cevent = &w->cev;
+ cevent->ctype = CPG_EVENT_LEAVE;
- vprintf(SDOG_INFO "addr = %s, port = %d\n", tmp, port);
- return 0;
+
+ vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
+
+ size = sizeof(struct sheepid) * nr_members;
+ w->member_list = zalloc(size);
+ if (!w->member_list)
+ goto oom;
+ memcpy(w->member_list, members, size);
+ w->member_list_entries = nr_members;
+
+ w->left = *left;
+
+ list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+ start_cpg_event_work();
+
+ return;
+oom:
+ if (w) {
+ if (w->member_list)
+ free(w->member_list);
+ free(w);
+ }
+ panic("failed to allocate memory for a confchg event\n");
}
int create_cluster(int port, int64_t zone)
{
int fd, ret;
- cpg_handle_t cpg_handle;
- struct cpg_name group = { 8, "sheepdog" };
- cpg_callbacks_t cb = {&sd_deliver, &sd_confchg};
- unsigned int nodeid = 0;
-
- ret = cpg_initialize(&cpg_handle, &cb);
- if (ret != CPG_OK) {
- eprintf("Failed to initialize cpg, %d\n", ret);
- eprintf("Is corosync running?\n");
- return -1;
- }
-
- ret = cpg_local_get(cpg_handle, &nodeid);
- if (ret != CPG_OK) {
- eprintf("Failed to get the local node's identifier, %d\n", ret);
- return 1;
+ struct cluster_driver *cdrv;
+ struct cdrv_handlers handlers = {
+ .join_handler = sd_join_handler,
+ .leave_handler = sd_leave_handler,
+ .notify_handler = sd_notify_handler,
+ };
+
+ if (!sys->cdrv) {
+ FOR_EACH_CLUSTER_DRIVER(cdrv) {
+ if (strcmp(cdrv->name, "corosync") == 0) {
+ dprintf("use corosync driver as default\n");
+ sys->cdrv = cdrv;
+ break;
+ }
+ }
}
-join_retry:
- ret = cpg_join(cpg_handle, &group);
- switch (ret) {
- case CPG_OK:
- break;
- case CPG_ERR_TRY_AGAIN:
- dprintf("Failed to join the sheepdog group, try again\n");
- sleep(1);
- goto join_retry;
- case CPG_ERR_SECURITY:
- eprintf("Permission error.\n");
- return -1;
- default:
- eprintf("Failed to join the sheepdog group, %d\n", ret);
+ fd = sys->cdrv->init(&handlers, &sys->this_sheepid);
+ if (fd < 0)
return -1;
- }
- sys->handle = cpg_handle;
- sys->this_sheepid.pid = getpid();
+ ret = sys->cdrv->join();
+ if (ret != 0)
+ return -1;
- ret = set_addr(nodeid, port);
- if (ret)
- return 1;
- memcpy(sys->this_sheepid.addr, sys->this_node.addr,
+ memcpy(sys->this_node.addr, sys->this_sheepid.addr,
sizeof(sys->this_node.addr));
sys->this_node.port = port;
sys->this_node.nr_vnodes = SD_DEFAULT_VNODES;
- if (zone == -1)
- sys->this_node.zone = nodeid;
- else
+ if (zone == -1) {
+ /* use last 4 bytes as zone id */
+ uint8_t *b = sys->this_sheepid.addr + 12;
+ sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24;
+ } else
sys->this_node.zone = zone;
dprintf("zone id = %u\n", sys->this_node.zone);
@@ -2082,11 +1942,6 @@ join_retry:
INIT_LIST_HEAD(&sys->cpg_event_siblings);
- ret = cpg_fd_get(cpg_handle, &fd);
- if (ret != CPG_OK) {
- eprintf("Failed to retrieve cpg file descriptor, %d\n", ret);
- return 1;
- }
ret = register_event(fd, group_handler, NULL);
if (ret) {
eprintf("Failed to register epoll events, %d\n", ret);
@@ -2110,5 +1965,5 @@ int leave_cluster(void)
msg.epoch = get_latest_epoch();
dprintf("%d\n", msg.epoch);
- return send_message(sys->handle, (struct message_header *)&msg);
+ return sys->cdrv->notify(&msg, msg.header.msg_length);
}
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 9985bdb..db2e691 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -11,6 +11,8 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <fcntl.h>
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 789cc4c..0a73587 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -36,11 +36,12 @@ static struct option const long_options[] = {
{"debug", no_argument, NULL, 'd'},
{"directio", no_argument, NULL, 'D'},
{"zone", required_argument, NULL, 'z'},
+ {"cluster", required_argument, NULL, 'c'},
{"help", no_argument, NULL, 'h'},
{NULL, 0, NULL, 0},
};
-static const char *short_options = "p:fl:dDz:h";
+static const char *short_options = "p:fl:dDz:c:h";
static void usage(int status)
{
@@ -57,6 +58,7 @@ Sheepdog Daemon, version %s\n\
-d, --debug print debug messages\n\
-D, --directio use direct IO\n\
-z, --zone specify the zone id\n\
+ -c, --cluster specify the cluster driver\n\
-h, --help display this help and exit\n\
", PACKAGE_VERSION);
}
@@ -76,6 +78,7 @@ int main(int argc, char **argv)
char path[PATH_MAX];
int64_t zone = -1;
char *p;
+ struct cluster_driver *cdrv;
signal(SIGPIPE, SIG_IGN);
@@ -113,6 +116,24 @@ int main(int argc, char **argv)
}
sys->this_node.zone = zone;
break;
+ case 'c':
+ FOR_EACH_CLUSTER_DRIVER(cdrv) {
+ if (strcmp(cdrv->name, optarg) == 0) {
+ sys->cdrv = cdrv;
+ break;
+ }
+ }
+
+ if (!sys->cdrv) {
+ printf("No such cluster driver, %s\n", optarg);
+ printf("Supported drivers:");
+ FOR_EACH_CLUSTER_DRIVER(cdrv) {
+ printf(" %s", cdrv->name);
+ }
+ printf("\n");
+ exit(1);
+ }
+ break;
case 'h':
usage(0);
break;
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 8b20213..e2fcb40 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -12,7 +12,6 @@
#define __SHEEP_PRIV_H__
#include <inttypes.h>
-#include <corosync/cpg.h>
#include "sheepdog_proto.h"
#include "event.h"
@@ -42,11 +41,15 @@
#define SD_RES_NETWORK_ERROR 0x81 /* Network error between sheeps */
enum cpg_event_type {
- CPG_EVENT_CONCHG,
- CPG_EVENT_DELIVER,
+ CPG_EVENT_JOIN,
+ CPG_EVENT_LEAVE,
+ CPG_EVENT_NOTIFY,
CPG_EVENT_REQUEST,
};
+#define is_membership_change_event(x) \
+ ((x) == CPG_EVENT_JOIN || (x) == CPG_EVENT_LEAVE)
+
struct cpg_event {
enum cpg_event_type ctype;
struct list_head cpg_event_list;
@@ -103,7 +106,8 @@ struct data_object_bmap {
};
struct cluster_info {
- cpg_handle_t handle;
+ struct cluster_driver *cdrv;
+
/* set after finishing the JOIN procedure */
int join_finished;
struct sheepid this_sheepid;
--
1.7.2.5
More information about the sheepdog
mailing list