[Sheepdog] [RFC PATCH 1/2] introduce cluster driver
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Thu Sep 29 22:28:57 CEST 2011
This patch abstracts out a cluster management of Sheepdog, and
introduces a cluster driver interface.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/cluster.h | 157 ++++++++++++++++
sheep/group.c | 527 ++++++++++++++++++++--------------------------------
sheep/sdnet.c | 2 +
sheep/sheep.c | 24 +++-
sheep/sheep_priv.h | 16 +-
5 files changed, 397 insertions(+), 329 deletions(-)
create mode 100644 sheep/cluster.h
diff --git a/sheep/cluster.h b/sheep/cluster.h
new file mode 100644
index 0000000..e5fecc5
--- /dev/null
+++ b/sheep/cluster.h
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef __CLUSTER_H__
+#define __CLUSTER_H__
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <memory.h>
+
+#include "sheepdog_proto.h"
+#include "sheep.h"
+#include "logger.h"
+
+struct sheepid {
+ uint8_t addr[16];
+ uint64_t pid;
+};
+
+struct cdrv_handlers {
+ void (*join_handler)(struct sheepid *joined, struct sheepid *members,
+ size_t nr_members);
+ void (*leave_handler)(struct sheepid *left, struct sheepid *members,
+ size_t nr_members);
+ void (*notify_handler)(struct sheepid *sender, void *msg, size_t msg_len);
+};
+
+struct cluster_driver {
+ const char *name;
+
+ /*
+ * Initialize the cluster driver
+ *
+ * On success, this function returns the file descriptor that
+ * may be used with the poll(2) to monitor cluster events. On
+ * error, returns -1.
+ */
+ int (*init)(struct cdrv_handlers *handlers, struct sheepid *myid);
+
+ /*
+ * Join the cluster
+ *
+ * This function is used to join the cluster, and notifies a
+ * join event to all the nodes.
+ *
+ * Returns zero on success, -1 on error
+ */
+ int (*join)(void);
+
+ /*
+ * Leave the cluster
+ *
+ * This function is used to leave the cluster, and notifies a
+ * leave event to all the nodes.
+ *
+ * Returns zero on success, -1 on error
+ */
+ int (*leave)(void);
+
+ /*
+ * Notify a message to all nodes in the cluster
+ *
+ * This function sends 'msg' to all the nodes. The notified
+ * messages can be read through notify_handler() in
+ * cdrv_handlers.
+ *
+ * Returns zero on success, -1 on error
+ */
+ int (*notify)(void *msg, size_t msg_len);
+
+ /*
+ * Dispatch handlers
+ *
+ * This function dispatches handlers according to the
+ * delivered events (join/leave/notify) in the cluster.
+ *
+ * Note that the events sequence is totally ordered; all nodes
+ * call the handlers in the same sequence.
+ *
+ * Returns zero on success, -1 on error
+ */
+ int (*dispatch)(void);
+
+ struct list_head list;
+};
+
+extern struct list_head cluster_drivers;
+
+#define cdrv_register(driver) \
+static void __attribute__((constructor)) regist_ ## driver(void) { \
+ list_add(&driver.list, &cluster_drivers); \
+}
+
+#define FOR_EACH_CLUSTER_DRIVER(driver) \
+ list_for_each_entry(driver, &cluster_drivers, list)
+
+
+static inline int sheepid_find(struct sheepid *sheeps, size_t nr_sheeps,
+ struct sheepid *key)
+{
+ int i;
+
+ for (i = 0; i < nr_sheeps; i++) {
+ if (memcmp(sheeps + i, key, sizeof(*key)) == 0)
+ return i;
+ }
+ return -1;
+}
+
+static inline void sheepid_add(struct sheepid *sheeps1, size_t nr_sheeps1,
+ struct sheepid *sheeps2, size_t nr_sheeps2)
+{
+ memcpy(sheeps1 + nr_sheeps1, sheeps2, sizeof(*sheeps2) * nr_sheeps2);
+}
+
+static inline void sheepid_del(struct sheepid *sheeps1, size_t nr_sheeps1,
+ struct sheepid *sheeps2, size_t nr_sheeps2)
+{
+ int i, idx;
+
+ for (i = 0; i < nr_sheeps2; i++) {
+ idx = sheepid_find(sheeps1, nr_sheeps1, sheeps2 + i);
+ if (idx < 0)
+ panic("internal error: cannot find sheepid\n");
+
+ nr_sheeps1--;
+ memmove(sheeps1 + idx, sheeps1 + idx + 1,
+ sizeof(*sheeps1) * nr_sheeps1 - idx);
+ }
+}
+
+static inline char *sheepid_to_str(struct sheepid *id)
+{
+ static char str[256];
+ char name[256];
+
+ snprintf(str, sizeof(str), "ip: %s, pid: %" PRIu64,
+ addr_to_str(name, sizeof(name), id->addr, 0), id->pid);
+
+ return str;
+}
+
+static inline int sheepid_cmp(struct sheepid *id1, struct sheepid *id2)
+{
+ return memcmp(id1, id2, sizeof(*id1));
+}
+
+#endif
diff --git a/sheep/group.c b/sheep/group.c
index 8c65d74..95fc799 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -15,8 +15,6 @@
#include <arpa/inet.h>
#include <sys/time.h>
#include <sys/epoll.h>
-#include <corosync/cpg.h>
-#include <corosync/cfg.h>
#include "sheepdog_proto.h"
#include "sheep_priv.h"
@@ -24,10 +22,10 @@
#include "util.h"
#include "logger.h"
#include "work.h"
+#include "cluster.h"
struct node {
- uint32_t nodeid;
- uint32_t pid;
+ struct sheepid sheepid;
struct sheepdog_node_list_entry ent;
struct list_head list;
};
@@ -44,8 +42,7 @@ struct message_header {
uint8_t op;
uint8_t state;
uint32_t msg_length;
- uint32_t nodeid;
- uint32_t pid;
+ struct sheepid sheepid;
struct sheepdog_node_list_entry from;
};
@@ -60,14 +57,12 @@ struct join_message {
uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */
uint8_t pad[3];
struct {
- uint32_t nodeid;
- uint32_t pid;
+ struct sheepid sheepid;
struct sheepdog_node_list_entry ent;
} nodes[SD_MAX_NODES];
uint32_t nr_leave_nodes;
struct {
- uint32_t nodeid;
- uint32_t pid;
+ struct sheepid sheepid;
struct sheepdog_node_list_entry ent;
} leave_nodes[SD_MAX_NODES];
};
@@ -89,34 +84,36 @@ struct mastership_tx_message {
uint32_t epoch;
};
-struct work_deliver {
+struct work_notify {
struct cpg_event cev;
struct message_header *msg;
};
-struct work_confchg {
+struct work_join {
struct cpg_event cev;
- struct cpg_address *member_list;
+ struct sheepid *member_list;
size_t member_list_entries;
- struct cpg_address *left_list;
- size_t left_list_entries;
- struct cpg_address *joined_list;
- size_t joined_list_entries;
+ struct sheepid joined;
+};
+
+struct work_leave {
+ struct cpg_event cev;
- int first_cpg_node;
- int sd_node_left;
+ struct sheepid *member_list;
+ size_t member_list_entries;
+ struct sheepid left;
};
#define print_node_list(node_list) \
({ \
struct node *__node; \
- char __name[128]; \
+ char __name[128]; \
list_for_each_entry(__node, node_list, list) { \
- dprintf("%c nodeid: %x, pid: %d, ip: %s\n", \
+ dprintf("%c pid: %ld, ip: %s\n", \
is_myself(__node->ent.addr, __node->ent.port) ? 'l' : ' ', \
- __node->nodeid, __node->pid, \
+ __node->sheepid.pid, \
addr_to_str(__name, sizeof(__name), \
__node->ent.addr, __node->ent.port)); \
} \
@@ -172,30 +169,6 @@ static inline int master_tx_message(struct message_header *m)
return m->op == SD_MSG_MASTER_TRANSFER;
}
-static int send_message(cpg_handle_t handle, struct message_header *msg)
-{
- struct iovec iov;
- int ret;
-
- iov.iov_base = msg;
- iov.iov_len = msg->msg_length;
-retry:
- ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1);
- switch (ret) {
- case CPG_OK:
- break;
- case CPG_ERR_TRY_AGAIN:
- dprintf("failed to send message. try again\n");
- sleep(1);
- goto retry;
- default:
- eprintf("failed to send message, %d\n", ret);
- return -1;
- }
- return 0;
-}
-
-
static int get_node_idx(struct sheepdog_node_list_entry *ent,
struct sheepdog_node_list_entry *entries, int nr_nodes)
{
@@ -393,7 +366,7 @@ forward:
list_add(&req->pending_list, &sys->pending_list);
- send_message(sys->handle, (struct message_header *)msg);
+ sys->cdrv->notify(msg, msg->header.msg_length);
free(msg);
}
@@ -406,9 +379,8 @@ static void group_handler(int listen_fd, int events, void *data)
goto out;
}
- ret = cpg_dispatch(sys->handle, CPG_DISPATCH_ALL);
-
- if (ret == CPG_OK)
+ ret = sys->cdrv->dispatch();
+ if (ret == 0)
return;
else
eprintf("oops...some error occured inside corosync\n");
@@ -417,12 +389,12 @@ out:
exit(1);
}
-static struct node *find_node(struct list_head *node_list, uint32_t nodeid, uint32_t pid)
+static struct node *find_node(struct list_head *node_list, struct sheepid *id)
{
struct node *node;
list_for_each_entry(node, node_list, list) {
- if (node->nodeid == nodeid && node->pid == pid)
+ if (sheepid_cmp(&node->sheepid, id) == 0)
return node;
}
@@ -511,8 +483,7 @@ static int add_node_to_leave_list(struct message_header *msg)
goto ret;
}
- n->nodeid = msg->nodeid;
- n->pid = msg->pid;
+ n->sheepid = msg->sheepid;
n->ent = msg->from;
list_add_tail(&n->list, &sys->leave_list);
@@ -533,8 +504,7 @@ static int add_node_to_leave_list(struct message_header *msg)
continue;
}
- n->nodeid = jm->leave_nodes[i].nodeid;
- n->pid = jm->leave_nodes[i].pid;
+ n->sheepid = jm->leave_nodes[i].sheepid;
n->ent = jm->leave_nodes[i].ent;
list_add_tail(&n->list, &tmp_list);
@@ -698,8 +668,7 @@ static void join(struct join_message *msg)
msg->ctime = get_cluster_ctime();
msg->nr_nodes = 0;
list_for_each_entry(node, &sys->sd_node_list, list) {
- msg->nodes[msg->nr_nodes].nodeid = node->nodeid;
- msg->nodes[msg->nr_nodes].pid = node->pid;
+ msg->nodes[msg->nr_nodes].sheepid = node->sheepid;
msg->nodes[msg->nr_nodes].ent = node->ent;
msg->nr_nodes++;
}
@@ -768,12 +737,12 @@ static void get_vdi_bitmap_from_sd_list(void)
get_vdi_bitmap_from(&nodes[i]);
}
-static int move_node_to_sd_list(uint32_t nodeid, uint32_t pid,
+static int move_node_to_sd_list(struct sheepid *id,
struct sheepdog_node_list_entry ent)
{
struct node *node;
- node = find_node(&sys->cpg_node_list, nodeid, pid);
+ node = find_node(&sys->cpg_node_list, id);
if (!node)
return 1;
@@ -830,16 +799,15 @@ static void update_cluster_info(struct join_message *msg)
sys->epoch = msg->epoch;
for (i = 0; i < nr_nodes; i++) {
- ret = move_node_to_sd_list(msg->nodes[i].nodeid,
- msg->nodes[i].pid,
+ ret = move_node_to_sd_list(&msg->nodes[i].sheepid,
msg->nodes[i].ent);
/*
* the node belonged to sheepdog when the master build
* the JOIN response however it has gone.
*/
if (ret)
- vprintf(SDOG_INFO "nodeid: %x, pid: %d has gone\n",
- msg->nodes[i].nodeid, msg->nodes[i].pid);
+ vprintf(SDOG_INFO "%s has gone\n",
+ sheepid_to_str(&msg->nodes[i].sheepid));
}
if (msg->cluster_status != SD_STATUS_OK)
@@ -851,14 +819,14 @@ static void update_cluster_info(struct join_message *msg)
update_epoch_log(sys->epoch);
join_finished:
- ret = move_node_to_sd_list(msg->header.nodeid, msg->header.pid, msg->header.from);
+ ret = move_node_to_sd_list(&msg->header.sheepid, msg->header.from);
/*
* this should not happen since __sd_deliver() checks if the
* host from msg on cpg_node_list.
*/
if (ret)
- vprintf(SDOG_ERR "nodeid: %x, pid: %d has gone\n",
- msg->header.nodeid, msg->header.pid);
+ vprintf(SDOG_ERR "%s has gone\n",
+ sheepid_to_str(&msg->header.sheepid));
if (msg->cluster_status == SD_STATUS_OK) {
if (msg->inc_epoch) {
@@ -1018,34 +986,31 @@ out:
req->done(req);
}
-static void __sd_deliver(struct cpg_event *cevent)
+static void __sd_notify(struct cpg_event *cevent)
{
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
struct message_header *m = w->msg;
char name[128];
struct node *node;
- dprintf("op: %d, state: %u, size: %d, from: %s, pid: %d\n",
+ dprintf("op: %d, state: %u, size: %d, from: %s, pid: %ld\n",
m->op, m->state, m->msg_length,
addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
- m->pid);
+ m->sheepid.pid);
/*
* we don't want to perform any deliver events except mastership_tx event
* until we join; we wait for our JOIN message.
*/
if (!sys->join_finished && !master_tx_message(m)) {
- if (m->pid != sys->this_pid || m->nodeid != sys->this_nodeid) {
+ if (sheepid_cmp(&m->sheepid, &sys->this_sheepid) != 0) {
cevent->skip = 1;
return;
}
}
if (join_message(m)) {
- uint32_t nodeid = m->nodeid;
- uint32_t pid = m->pid;
-
- node = find_node(&sys->cpg_node_list, nodeid, pid);
+ node = find_node(&sys->cpg_node_list, &m->sheepid);
if (!node) {
dprintf("the node was left before join operation is finished\n");
return;
@@ -1095,13 +1060,12 @@ static int tx_mastership(void)
msg.header.state = DM_FIN;
msg.header.msg_length = sizeof(msg);
msg.header.from = sys->this_node;
- msg.header.nodeid = sys->this_nodeid;
- msg.header.pid = sys->this_pid;
+ msg.header.sheepid = sys->this_sheepid;
- return send_message(sys->handle, (struct message_header *)&msg);
+ return sys->cdrv->notify(&msg, msg.header.msg_length);
}
-static void send_join_response(struct work_deliver *w)
+static void send_join_response(struct work_notify *w)
{
struct message_header *m;
struct join_message *jm;
@@ -1116,8 +1080,7 @@ static void send_join_response(struct work_deliver *w)
if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) {
jm->nr_leave_nodes = 0;
list_for_each_entry(node, &sys->leave_list, list) {
- jm->leave_nodes[jm->nr_leave_nodes].nodeid = node->nodeid;
- jm->leave_nodes[jm->nr_leave_nodes].pid = node->pid;
+ jm->leave_nodes[jm->nr_leave_nodes].sheepid = node->sheepid;
jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
jm->nr_leave_nodes++;
}
@@ -1131,12 +1094,12 @@ static void send_join_response(struct work_deliver *w)
exit(1);
}
jm->epoch = sys->epoch;
- send_message(sys->handle, m);
+ sys->cdrv->notify(m, m->msg_length);
}
-static void __sd_deliver_done(struct cpg_event *cevent)
+static void __sd_notify_done(struct cpg_event *cevent)
{
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
struct message_header *m;
char name[128];
int do_recovery;
@@ -1151,7 +1114,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
update_cluster_info((struct join_message *)m);
break;
case SD_MSG_LEAVE:
- node = find_node(&sys->sd_node_list, m->nodeid, m->pid);
+ node = find_node(&sys->sd_node_list, &m->sheepid);
if (node) {
sys->nr_vnodes = 0;
@@ -1173,7 +1136,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
*/
if (!sys->join_finished) {
sys->join_finished = 1;
- move_node_to_sd_list(sys->this_nodeid, sys->this_pid, sys->this_node);
+ move_node_to_sd_list(&sys->this_sheepid, sys->this_node);
sys->epoch = get_latest_epoch();
}
@@ -1210,7 +1173,7 @@ static void __sd_deliver_done(struct cpg_event *cevent)
break;
case SD_MSG_VDI_OP:
m->state = DM_FIN;
- send_message(sys->handle, m);
+ sys->cdrv->notify(m, m->msg_length);
break;
default:
eprintf("unknown message %d\n", m->op);
@@ -1226,25 +1189,24 @@ static void __sd_deliver_done(struct cpg_event *cevent)
}
}
-static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
- uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
{
struct cpg_event *cevent;
- struct work_deliver *w;
+ struct work_notify *w;
struct message_header *m = msg;
char name[128];
- dprintf("op: %d, state: %u, size: %d, from: %s, nodeid: %x, pid: %u\n",
+ dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n",
m->op, m->state, m->msg_length,
addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
- nodeid, pid);
+ sender->pid);
w = zalloc(sizeof(*w));
if (!w)
return;
cevent = &w->cev;
- cevent->ctype = CPG_EVENT_DELIVER;
+ cevent->ctype = CPG_EVENT_NOTIFY;
vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent);
@@ -1264,17 +1226,7 @@ static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
start_cpg_event_work();
}
-static void for_each_node_list(struct cpg_address list[], int count,
- void (*func)(struct cpg_address *addr,
- struct work_confchg *w),
- struct work_confchg *w)
-{
- int i;
- for (i = 0; i < count; i++)
- func(&list[i], w);
-}
-
-static void add_node(struct cpg_address *addr, struct work_confchg *w)
+static void add_node(struct sheepid *id)
{
struct node *node;
@@ -1282,22 +1234,20 @@ static void add_node(struct cpg_address *addr, struct work_confchg *w)
if (!node)
panic("failed to alloc memory for a new node\n");
- node->nodeid = addr->nodeid;
- node->pid = addr->pid;
+ node->sheepid = *id;
list_add_tail(&node->list, &sys->cpg_node_list);
}
-static void del_node(struct cpg_address *addr, struct work_confchg *w)
+static int del_node(struct sheepid *id)
{
struct node *node;
- node = find_node(&sys->sd_node_list, addr->nodeid, addr->pid);
+ node = find_node(&sys->sd_node_list, id);
if (node) {
int nr;
struct sheepdog_node_list_entry e[SD_MAX_NODES];
- w->sd_node_left++;
sys->nr_vnodes = 0;
list_del(&node->list);
@@ -1313,34 +1263,27 @@ static void del_node(struct cpg_address *addr, struct work_confchg *w)
update_epoch_store(sys->epoch);
}
- } else {
- node = find_node(&sys->cpg_node_list, addr->nodeid, addr->pid);
- if (node) {
- list_del(&node->list);
- free(node);
- }
+ return 1;
}
-}
-static int is_my_cpg_addr(struct cpg_address *addr)
-{
- return (sys->this_nodeid == addr->nodeid) &&
- (sys->this_pid == addr->pid);
+ node = find_node(&sys->cpg_node_list, id);
+ if (node) {
+ list_del(&node->list);
+ free(node);
+ }
+
+ return 0;
}
/*
* Check whether the majority of Sheepdog nodes are still alive or not
*/
-static int check_majority(struct cpg_address *left_list,
- size_t left_list_entries)
+static int check_majority(struct sheepid *left)
{
- int nr_nodes = 0, nr_majority, nr_reachable = 0, i, fd;
+ int nr_nodes = 0, nr_majority, nr_reachable = 0, fd;
struct node *node;
char name[INET6_ADDRSTRLEN];
- if (left_list_entries == 0)
- return 1; /* we don't need this check in this case */
-
nr_nodes = get_nodes_nr_from(&sys->sd_node_list);
nr_majority = nr_nodes / 2 + 1;
@@ -1350,12 +1293,7 @@ static int check_majority(struct cpg_address *left_list,
return 1;
list_for_each_entry(node, &sys->sd_node_list, list) {
- for (i = 0; i < left_list_entries; i++) {
- if (left_list[i].nodeid == node->nodeid &&
- left_list[i].pid == node->pid)
- break;
- }
- if (i != left_list_entries)
+ if (sheepid_cmp(&node->sheepid, left) == 0)
continue;
addr_to_str(name, sizeof(name), node->ent.addr, 0);
@@ -1375,34 +1313,29 @@ static int check_majority(struct cpg_address *left_list,
return 0;
}
-static void __sd_confchg(struct cpg_event *cevent)
+static void __sd_leave(struct cpg_event *cevent)
{
- struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
+ struct work_leave *w = container_of(cevent, struct work_leave, cev);
- if (!check_majority(w->left_list, w->left_list_entries)) {
+ if (!check_majority(&w->left)) {
eprintf("perhaps network partition failure has occurred\n");
abort();
}
}
-static void send_join_request(struct cpg_address *addr, struct work_confchg *w)
+static void send_join_request(struct sheepid *id)
{
struct join_message msg;
struct sheepdog_node_list_entry entries[SD_MAX_NODES];
int nr_entries, i, ret;
- /* if I've just joined in cpg, I'll join in sheepdog. */
- if (!is_my_cpg_addr(addr))
- return;
-
memset(&msg, 0, sizeof(msg));
msg.header.proto_ver = SD_SHEEP_PROTO_VER;
msg.header.op = SD_MSG_JOIN;
msg.header.state = DM_INIT;
msg.header.msg_length = sizeof(msg);
msg.header.from = sys->this_node;
- msg.header.nodeid = sys->this_nodeid;
- msg.header.pid = sys->this_pid;
+ msg.header.sheepid = sys->this_sheepid;
get_global_nr_copies(&msg.nr_sobjs);
@@ -1414,35 +1347,31 @@ static void send_join_request(struct cpg_address *addr, struct work_confchg *w)
msg.nodes[i].ent = entries[i];
}
- send_message(sys->handle, (struct message_header *)&msg);
+ sys->cdrv->notify(&msg, msg.header.msg_length);
- vprintf(SDOG_INFO "%x %u\n", sys->this_nodeid, sys->this_pid);
+ vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
}
-static void __sd_confchg_done(struct cpg_event *cevent)
+static void __sd_join_done(struct cpg_event *cevent)
{
- struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
- int ret;
+ struct work_join *w = container_of(cevent, struct work_join, cev);
+ int ret, i;
+ int first_cpg_node = 0;
- if (w->member_list_entries ==
- w->joined_list_entries - w->left_list_entries &&
- is_my_cpg_addr(w->member_list)) {
+ if (w->member_list_entries == 1 &&
+ sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) {
sys->join_finished = 1;
get_global_nr_copies(&sys->nr_sobjs);
- w->first_cpg_node = 1;
+ first_cpg_node = 1;
}
- if (list_empty(&sys->cpg_node_list))
- for_each_node_list(w->member_list, w->member_list_entries,
- add_node, w);
- else
- for_each_node_list(w->joined_list, w->joined_list_entries,
- add_node, w);
-
- for_each_node_list(w->left_list, w->left_list_entries,
- del_node, w);
+ if (list_empty(&sys->cpg_node_list)) {
+ for (i = 0; i < w->member_list_entries; i++)
+ add_node(w->member_list + i);
+ } else
+ add_node(&w->joined);
- if (w->first_cpg_node) {
+ if (first_cpg_node) {
struct join_message msg;
struct sheepdog_node_list_entry entries[SD_MAX_NODES];
int nr_entries;
@@ -1454,13 +1383,12 @@ static void __sd_confchg_done(struct cpg_event *cevent)
* becomes the master without sending JOIN.
*/
- vprintf(SDOG_DEBUG "%d %x\n", sys->this_pid, sys->this_nodeid);
+ vprintf(SDOG_DEBUG "%s\n", sheepid_to_str(&sys->this_sheepid));
memset(&msg, 0, sizeof(msg));
msg.header.from = sys->this_node;
- msg.header.nodeid = sys->this_nodeid;
- msg.header.pid = sys->this_pid;
+ msg.header.sheepid = sys->this_sheepid;
nr_entries = ARRAY_SIZE(entries);
ret = read_epoch(&epoch, &ctime, entries, &nr_entries);
@@ -1482,35 +1410,40 @@ static void __sd_confchg_done(struct cpg_event *cevent)
print_node_list(&sys->sd_node_list);
- if (w->first_cpg_node)
- goto skip_join;
+ if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0)
+ send_join_request(&w->joined);
+}
+
+static void __sd_leave_done(struct cpg_event *cevent)
+{
+ struct work_leave *w = container_of(cevent, struct work_leave, cev);
+ int node_left;
- for_each_node_list(w->joined_list, w->joined_list_entries,
- send_join_request, w);
+ node_left = del_node(&w->left);
-skip_join:
- if (w->sd_node_left && sys->status == SD_STATUS_OK) {
- if (w->sd_node_left > 1)
- panic("we can't handle the departure of multiple nodes %d, %Zd\n",
- w->sd_node_left, w->left_list_entries);
+ print_node_list(&sys->sd_node_list);
+ if (node_left && sys->status == SD_STATUS_OK)
start_recovery(sys->epoch);
- }
}
static void cpg_event_free(struct cpg_event *cevent)
{
switch (cevent->ctype) {
- case CPG_EVENT_CONCHG: {
- struct work_confchg *w = container_of(cevent, struct work_confchg, cev);
+ case CPG_EVENT_JOIN: {
+ struct work_join *w = container_of(cevent, struct work_join, cev);
free(w->member_list);
- free(w->left_list);
- free(w->joined_list);
free(w);
break;
}
- case CPG_EVENT_DELIVER: {
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ case CPG_EVENT_LEAVE: {
+ struct work_leave *w = container_of(cevent, struct work_leave, cev);
+ free(w->member_list);
+ free(w);
+ break;
+ }
+ case CPG_EVENT_NOTIFY: {
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
free(w->msg);
free(w);
break;
@@ -1535,14 +1468,16 @@ static void cpg_event_fn(struct work *work, int idx)
*/
switch (cevent->ctype) {
- case CPG_EVENT_CONCHG:
- __sd_confchg(cevent);
+ case CPG_EVENT_JOIN:
+ break;
+ case CPG_EVENT_LEAVE:
+ __sd_leave(cevent);
break;
- case CPG_EVENT_DELIVER:
+ case CPG_EVENT_NOTIFY:
{
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
vprintf(SDOG_DEBUG "%d\n", w->msg->state);
- __sd_deliver(cevent);
+ __sd_notify(cevent);
break;
}
case CPG_EVENT_REQUEST:
@@ -1572,12 +1507,15 @@ static void cpg_event_done(struct work *work, int idx)
goto out;
switch (cevent->ctype) {
- case CPG_EVENT_CONCHG:
- __sd_confchg_done(cevent);
+ case CPG_EVENT_JOIN:
+ __sd_join_done(cevent);
break;
- case CPG_EVENT_DELIVER:
+ case CPG_EVENT_LEAVE:
+ __sd_leave_done(cevent);
+ break;
+ case CPG_EVENT_NOTIFY:
{
- struct work_deliver *w = container_of(cevent, struct work_deliver, cev);
+ struct work_notify *w = container_of(cevent, struct work_notify, cev);
if (w->msg->state == DM_FIN && vdi_op_message(w->msg))
vdi_op_done((struct vdi_op_message *)w->msg);
@@ -1593,9 +1531,9 @@ static void cpg_event_done(struct work *work, int idx)
list_for_each_entry(f_cevent, &sys->cpg_event_siblings,
cpg_event_list) {
- struct work_deliver *fw =
- container_of(f_cevent, struct work_deliver, cev);
- if (f_cevent->ctype == CPG_EVENT_DELIVER &&
+ struct work_notify *fw =
+ container_of(f_cevent, struct work_notify, cev);
+ if (f_cevent->ctype == CPG_EVENT_NOTIFY &&
fw->msg->state == DM_FIN) {
vprintf("already got fin %p\n",
f_cevent);
@@ -1611,7 +1549,7 @@ static void cpg_event_done(struct work *work, int idx)
cpg_event_set_joining();
}
got_fin:
- __sd_deliver_done(cevent);
+ __sd_notify_done(cevent);
break;
}
case CPG_EVENT_REQUEST:
@@ -1722,7 +1660,7 @@ void start_cpg_event_work(void)
* thread is running for a deliver for VDI, then we need to
* run io requests.
*/
- if (cpg_event_running() && cevent->ctype == CPG_EVENT_CONCHG)
+ if (cpg_event_running() && is_membership_change_event(cevent->ctype))
return;
/*
@@ -1756,9 +1694,9 @@ do_retry:
list_for_each_entry_safe(cevent, n, &sys->cpg_event_siblings, cpg_event_list) {
struct request *req = container_of(cevent, struct request, cev);
- if (cevent->ctype == CPG_EVENT_DELIVER)
+ if (cevent->ctype == CPG_EVENT_NOTIFY)
continue;
- if (cevent->ctype == CPG_EVENT_CONCHG)
+ if (is_membership_change_event(cevent->ctype))
break;
list_del(&cevent->cpg_event_list);
@@ -1842,7 +1780,7 @@ do_retry:
cevent = list_first_entry(&sys->cpg_event_siblings,
struct cpg_event, cpg_event_list);
- if (cevent->ctype == CPG_EVENT_CONCHG && sys->nr_outstanding_io)
+ if (is_membership_change_event(cevent->ctype) && sys->nr_outstanding_io)
return;
list_del(&cevent->cpg_event_list);
@@ -1857,25 +1795,16 @@ do_retry:
queue_work(sys->cpg_wqueue, &cpg_event_work);
}
-static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name,
- const struct cpg_address *member_list,
- size_t member_list_entries,
- const struct cpg_address *left_list,
- size_t left_list_entries,
- const struct cpg_address *joined_list,
- size_t joined_list_entries)
+static void sd_join_handler(struct sheepid *joined, struct sheepid *members,
+ size_t nr_members)
{
struct cpg_event *cevent;
- struct work_confchg *w = NULL;
+ struct work_join *w = NULL;
int i, size;
- dprintf("confchg nodeid %x\n", member_list[0].nodeid);
- dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries,
- joined_list_entries);
- for (i = 0; i < member_list_entries; i++) {
- dprintf("[%x] node_id: %x, pid: %d\n", i,
- member_list[i].nodeid, member_list[i].pid);
- }
+ dprintf("join %s\n", sheepid_to_str(joined));
+ for (i = 0; i < nr_members; i++)
+ dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
if (sys->status == SD_STATUS_SHUTDOWN)
return;
@@ -1885,31 +1814,19 @@ static void sd_confchg(cpg_handle_t handle, const struct cpg_name *group_name,
goto oom;
cevent = &w->cev;
- cevent->ctype = CPG_EVENT_CONCHG;
+ cevent->ctype = CPG_EVENT_JOIN;
vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
- size = sizeof(struct cpg_address) * member_list_entries;
+ size = sizeof(struct sheepid) * nr_members;
w->member_list = zalloc(size);
if (!w->member_list)
goto oom;
- memcpy(w->member_list, member_list, size);
- w->member_list_entries = member_list_entries;
-
- size = sizeof(struct cpg_address) * left_list_entries;
- w->left_list = zalloc(size);
- if (!w->left_list)
- goto oom;
- memcpy(w->left_list, left_list, size);
- w->left_list_entries = left_list_entries;
+ memcpy(w->member_list, members, size);
+ w->member_list_entries = nr_members;
- size = sizeof(struct cpg_address) * joined_list_entries;
- w->joined_list = zalloc(size);
- if (!w->joined_list)
- goto oom;
- memcpy(w->joined_list, joined_list, size);
- w->joined_list_entries = joined_list_entries;
+ w->joined = *joined;
list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
start_cpg_event_work();
@@ -1919,111 +1836,83 @@ oom:
if (w) {
if (w->member_list)
free(w->member_list);
- if (w->left_list)
- free(w->left_list);
- if (w->joined_list)
- free(w->joined_list);
+ free(w);
}
panic("failed to allocate memory for a confchg event\n");
}
-static int set_addr(unsigned int nodeid, int port)
+static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
+ size_t nr_members)
{
- int ret, nr;
- corosync_cfg_handle_t handle;
- corosync_cfg_node_address_t addr;
- struct sockaddr_storage *ss = (struct sockaddr_storage *)addr.address;
- struct sockaddr_in *sin = (struct sockaddr_in *)addr.address;
- struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)addr.address;
- void *saddr;
- char tmp[INET6_ADDRSTRLEN];
-
- memset(sys->this_node.addr, 0, sizeof(sys->this_node.addr));
-
- ret = corosync_cfg_initialize(&handle, NULL);
- if (ret != CPG_OK) {
- vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret);
- return -1;
- }
+ struct cpg_event *cevent;
+ struct work_leave *w = NULL;
+ int i, size;
- ret = corosync_cfg_get_node_addrs(handle, nodeid, 1, &nr, &addr);
- if (ret != CPG_OK) {
- vprintf(SDOG_ERR "failed to get addr %d\n", ret);
- return -1;
- }
+ dprintf("leave %s\n", sheepid_to_str(left));
+ for (i = 0; i < nr_members; i++)
+ dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
- if (!nr) {
- vprintf(SDOG_ERR "we got no address\n");
- return -1;
- }
+ if (sys->status == SD_STATUS_SHUTDOWN)
+ return;
- if (ss->ss_family == AF_INET6) {
- saddr = &sin6->sin6_addr;
- memcpy(sys->this_node.addr, saddr, 16);
- } else if (ss->ss_family == AF_INET) {
- saddr = &sin->sin_addr;
- memcpy(sys->this_node.addr + 12, saddr, 4);
- } else {
- vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family);
- return -1;
- }
+ w = zalloc(sizeof(*w));
+ if (!w)
+ goto oom;
- inet_ntop(ss->ss_family, saddr, tmp, sizeof(tmp));
+ cevent = &w->cev;
+ cevent->ctype = CPG_EVENT_LEAVE;
- vprintf(SDOG_INFO "addr = %s, port = %d\n", tmp, port);
- return 0;
+
+ vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
+
+ size = sizeof(struct sheepid) * nr_members;
+ w->member_list = zalloc(size);
+ if (!w->member_list)
+ goto oom;
+ memcpy(w->member_list, members, size);
+ w->member_list_entries = nr_members;
+
+ w->left = *left;
+
+ list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+ start_cpg_event_work();
+
+ return;
+oom:
+ if (w) {
+ if (w->member_list)
+ free(w->member_list);
+ free(w);
+ }
+ panic("failed to allocate memory for a confchg event\n");
}
int create_cluster(int port, int64_t zone)
{
int fd, ret;
- cpg_handle_t cpg_handle;
- struct cpg_name group = { 8, "sheepdog" };
- cpg_callbacks_t cb = {&sd_deliver, &sd_confchg};
- unsigned int nodeid = 0;
-
- ret = cpg_initialize(&cpg_handle, &cb);
- if (ret != CPG_OK) {
- eprintf("Failed to initialize cpg, %d\n", ret);
- eprintf("Is corosync running?\n");
+ struct cdrv_handlers handlers = {
+ .join_handler = sd_join_handler,
+ .leave_handler = sd_leave_handler,
+ .notify_handler = sd_notify_handler,
+ };
+
+ fd = sys->cdrv->init(&handlers, &sys->this_sheepid);
+ if (fd < 0)
return -1;
- }
- ret = cpg_local_get(cpg_handle, &nodeid);
- if (ret != CPG_OK) {
- eprintf("Failed to get the local node's identifier, %d\n", ret);
- return 1;
- }
-
-join_retry:
- ret = cpg_join(cpg_handle, &group);
- switch (ret) {
- case CPG_OK:
- break;
- case CPG_ERR_TRY_AGAIN:
- dprintf("Failed to join the sheepdog group, try again\n");
- sleep(1);
- goto join_retry;
- case CPG_ERR_SECURITY:
- eprintf("Permission error.\n");
+ ret = sys->cdrv->join();
+ if (ret != 0)
return -1;
- default:
- eprintf("Failed to join the sheepdog group, %d\n", ret);
- return -1;
- }
-
- sys->handle = cpg_handle;
- sys->this_nodeid = nodeid;
- sys->this_pid = getpid();
- ret = set_addr(nodeid, port);
- if (ret)
- return 1;
+ memcpy(sys->this_node.addr, sys->this_sheepid.addr,
+ sizeof(sys->this_node.addr));
sys->this_node.port = port;
sys->this_node.nr_vnodes = SD_DEFAULT_VNODES;
- if (zone == -1)
- sys->this_node.zone = nodeid;
- else
+ if (zone == -1) {
+ /* use last 4 bytes as zone id */
+ uint8_t *b = sys->this_sheepid.addr + 12;
+ sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24;
+ } else
sys->this_node.zone = zone;
dprintf("zone id = %u\n", sys->this_node.zone);
@@ -2042,11 +1931,6 @@ join_retry:
INIT_LIST_HEAD(&sys->cpg_event_siblings);
- ret = cpg_fd_get(cpg_handle, &fd);
- if (ret != CPG_OK) {
- eprintf("Failed to retrieve cpg file descriptor, %d\n", ret);
- return 1;
- }
ret = register_event(fd, group_handler, NULL);
if (ret) {
eprintf("Failed to register epoll events, %d\n", ret);
@@ -2066,10 +1950,9 @@ int leave_cluster(void)
msg.header.state = DM_FIN;
msg.header.msg_length = sizeof(msg);
msg.header.from = sys->this_node;
- msg.header.nodeid = sys->this_nodeid;
- msg.header.pid = sys->this_pid;
+ msg.header.sheepid = sys->this_sheepid;
msg.epoch = get_latest_epoch();
dprintf("%d\n", msg.epoch);
- return send_message(sys->handle, (struct message_header *)&msg);
+ return sys->cdrv->notify(&msg, msg.header.msg_length);
}
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 9985bdb..db2e691 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -11,6 +11,8 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <fcntl.h>
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 50cd841..0a73587 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -26,6 +26,7 @@
#define DEFAULT_OBJECT_DIR "/tmp"
#define LOG_FILE_NAME "sheep.log"
+LIST_HEAD(cluster_drivers);
static char program_name[] = "sheep";
static struct option const long_options[] = {
@@ -35,11 +36,12 @@ static struct option const long_options[] = {
{"debug", no_argument, NULL, 'd'},
{"directio", no_argument, NULL, 'D'},
{"zone", required_argument, NULL, 'z'},
+ {"cluster", required_argument, NULL, 'c'},
{"help", no_argument, NULL, 'h'},
{NULL, 0, NULL, 0},
};
-static const char *short_options = "p:fl:dDz:h";
+static const char *short_options = "p:fl:dDz:c:h";
static void usage(int status)
{
@@ -56,6 +58,7 @@ Sheepdog Daemon, version %s\n\
-d, --debug print debug messages\n\
-D, --directio use direct IO\n\
-z, --zone specify the zone id\n\
+ -c, --cluster specify the cluster driver\n\
-h, --help display this help and exit\n\
", PACKAGE_VERSION);
}
@@ -75,6 +78,7 @@ int main(int argc, char **argv)
char path[PATH_MAX];
int64_t zone = -1;
char *p;
+ struct cluster_driver *cdrv;
signal(SIGPIPE, SIG_IGN);
@@ -112,6 +116,24 @@ int main(int argc, char **argv)
}
sys->this_node.zone = zone;
break;
+ case 'c':
+ FOR_EACH_CLUSTER_DRIVER(cdrv) {
+ if (strcmp(cdrv->name, optarg) == 0) {
+ sys->cdrv = cdrv;
+ break;
+ }
+ }
+
+ if (!sys->cdrv) {
+ printf("No such cluster driver, %s\n", optarg);
+ printf("Supported drivers:");
+ FOR_EACH_CLUSTER_DRIVER(cdrv) {
+ printf(" %s", cdrv->name);
+ }
+ printf("\n");
+ exit(1);
+ }
+ break;
case 'h':
usage(0);
break;
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 6409530..e2fcb40 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -12,7 +12,6 @@
#define __SHEEP_PRIV_H__
#include <inttypes.h>
-#include <corosync/cpg.h>
#include "sheepdog_proto.h"
#include "event.h"
@@ -20,6 +19,7 @@
#include "work.h"
#include "net.h"
#include "sheep.h"
+#include "cluster.h"
#define SD_OP_REMOVE_OBJ 0x91
@@ -41,11 +41,15 @@
#define SD_RES_NETWORK_ERROR 0x81 /* Network error between sheeps */
enum cpg_event_type {
- CPG_EVENT_CONCHG,
- CPG_EVENT_DELIVER,
+ CPG_EVENT_JOIN,
+ CPG_EVENT_LEAVE,
+ CPG_EVENT_NOTIFY,
CPG_EVENT_REQUEST,
};
+#define is_membership_change_event(x) \
+ ((x) == CPG_EVENT_JOIN || (x) == CPG_EVENT_LEAVE)
+
struct cpg_event {
enum cpg_event_type ctype;
struct list_head cpg_event_list;
@@ -102,11 +106,11 @@ struct data_object_bmap {
};
struct cluster_info {
- cpg_handle_t handle;
+ struct cluster_driver *cdrv;
+
/* set after finishing the JOIN procedure */
int join_finished;
- uint32_t this_nodeid;
- uint32_t this_pid;
+ struct sheepid this_sheepid;
struct sheepdog_node_list_entry this_node;
uint32_t epoch;
--
1.7.2.5
More information about the sheepdog
mailing list