[Sheepdog] [PATCH 1/7] sheep: move node membership management into cluster driver
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Wed Oct 12 15:22:13 CEST 2011
Currently, Sheepdog has two node lists; sd_node_list and
cpg_node_list. The former is used for consistent hashing and seen
from users. The latter is managed in the cluster driver and notified
in join_handler/leave_handler. But this design is too complex. We
should move all the cluster management stuff into the cluster driver.
Main changes of this patch are as follows:
- make join process one phase
Node joining was really complex; cpg_confchg() notifies the newly
joining node, the node multicasts a SD_MSG_JOIN message, and the
master node receives it and multicasts the response. Moreover, we
couldn't allow any I/O events during two multicasting. This patch
moves all of them into the cluster driver.
- add check_join_handler() to cdrv_handlers
This new handler is called on one of the Sheepdog nodes (e.g. in
the case of the corosync driver, the master server will call this).
check_join_handler() checks whether the joining node may join the
cluster, and returns the result.
- use sheepdog_node_list_entry in the arguments of
join_handler()/leave_handler()
We can use the notified node list for consistent hashing now.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/cluster.h | 82 +++-----
sheep/cluster/corosync.c | 460 ++++++++++++++++++++++++++++++++------------
sheep/group.c | 484 +++++++++++++++++++++++++--------------------
sheep/sheep_priv.h | 1 -
4 files changed, 634 insertions(+), 393 deletions(-)
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 89d0566..a6689c6 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -21,17 +21,28 @@
#include "sheep.h"
#include "logger.h"
-struct sheepid {
- uint8_t addr[16];
- uint64_t pid;
+enum cluster_join_result {
+ CJ_RES_SUCCESS, /* Success */
+ CJ_RES_FAIL, /* Fail to join. The joining node has an invalidepoch. */
+ CJ_RES_JOIN_LATER, /* Fail to join. The joining node should
+ * be added after the cluster start working. */
+ CJ_RES_MASTER_TRANSFER, /* Transfer mastership. The joining
+ * node has a newer epoch, so this node
+ * will leave the cluster (restart later). */
};
struct cdrv_handlers {
- void (*join_handler)(struct sheepid *joined, struct sheepid *members,
- size_t nr_members);
- void (*leave_handler)(struct sheepid *left, struct sheepid *members,
+ enum cluster_join_result (*check_join_handler)(
+ struct sheepdog_node_list_entry *joining, void *opaque);
+ void (*join_handler)(struct sheepdog_node_list_entry *joined,
+ struct sheepdog_node_list_entry *members,
+ size_t nr_members, enum cluster_join_result result,
+ void *opaque);
+ void (*leave_handler)(struct sheepdog_node_list_entry *left,
+ struct sheepdog_node_list_entry *members,
size_t nr_members);
- void (*notify_handler)(struct sheepid *sender, void *msg, size_t msg_len);
+ void (*notify_handler)(struct sheepdog_node_list_entry *sender,
+ void *msg, size_t msg_len);
};
struct cluster_driver {
@@ -44,17 +55,23 @@ struct cluster_driver {
* may be used with the poll(2) to monitor cluster events. On
* error, returns -1.
*/
- int (*init)(struct cdrv_handlers *handlers, struct sheepid *myid);
+ int (*init)(struct cdrv_handlers *handlers, uint8_t *myaddr);
/*
* Join the cluster
*
* This function is used to join the cluster, and notifies a
- * join event to all the nodes.
+ * join event to all the nodes. The copy of 'opaque' is
+ * passed to check_join_handler() and join_handler().
+ * check_join_handler() is called on only one of the nodes
+ * which already paticipate in the cluster. If the content of
+ * 'opaque' is changed in check_join_handler(), the updated
+ * 'opaque' must be passed to join_handler().
*
* Returns zero on success, -1 on error
*/
- int (*join)(void);
+ int (*join)(struct sheepdog_node_list_entry *myself,
+ void *opaque, size_t opaque_len);
/*
* Leave the cluster
@@ -112,54 +129,15 @@ static void __attribute__((constructor)) regist_ ## driver(void) { \
list_for_each_entry(driver, &cluster_drivers, list)
-static inline int sheepid_find(struct sheepid *sheeps, size_t nr_sheeps,
- struct sheepid *key)
-{
- int i;
-
- for (i = 0; i < nr_sheeps; i++) {
- if (memcmp(sheeps + i, key, sizeof(*key)) == 0)
- return i;
- }
- return -1;
-}
-
-static inline void sheepid_add(struct sheepid *sheeps1, size_t nr_sheeps1,
- struct sheepid *sheeps2, size_t nr_sheeps2)
-{
- memcpy(sheeps1 + nr_sheeps1, sheeps2, sizeof(*sheeps2) * nr_sheeps2);
-}
-
-static inline void sheepid_del(struct sheepid *sheeps1, size_t nr_sheeps1,
- struct sheepid *sheeps2, size_t nr_sheeps2)
-{
- int i, idx;
-
- for (i = 0; i < nr_sheeps2; i++) {
- idx = sheepid_find(sheeps1, nr_sheeps1, sheeps2 + i);
- if (idx < 0)
- panic("internal error: cannot find sheepid\n");
-
- nr_sheeps1--;
- memmove(sheeps1 + idx, sheeps1 + idx + 1,
- sizeof(*sheeps1) * nr_sheeps1 - idx);
- }
-}
-
-static inline char *sheepid_to_str(struct sheepid *id)
+static inline char *node_to_str(struct sheepdog_node_list_entry *id)
{
static char str[256];
char name[256];
- snprintf(str, sizeof(str), "ip: %s, pid: %" PRIu64,
- addr_to_str(name, sizeof(name), id->addr, 0), id->pid);
+ snprintf(str, sizeof(str), "ip: %s, port: %d",
+ addr_to_str(name, sizeof(name), id->addr, 0), id->port);
return str;
}
-static inline int sheepid_cmp(struct sheepid *id1, struct sheepid *id2)
-{
- return memcmp(id1, id2, sizeof(*id1));
-}
-
#endif
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 7aa3d02..99cd69f 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -16,11 +16,17 @@
#include "cluster.h"
#include "work.h"
+struct cpg_node {
+ uint32_t nodeid;
+ uint32_t pid;
+ struct sheepdog_node_list_entry ent;
+};
+
static cpg_handle_t cpg_handle;
static struct cpg_name cpg_group = { 9, "sheepdog" };
static corosync_cfg_handle_t cfg_handle;
-static struct sheepid this_sheepid;
+static struct cpg_node this_node;
static struct work_queue *corosync_block_wq;
@@ -29,6 +35,9 @@ static struct cdrv_handlers corosync_handlers;
static LIST_HEAD(corosync_event_list);
static LIST_HEAD(corosync_block_list);
+static struct cpg_node cpg_nodes[SD_MAX_NODES];
+static size_t nr_cpg_nodes;
+
/* event types which are dispatched in corosync_dispatch() */
enum corosync_event_type {
COROSYNC_EVENT_TYPE_JOIN,
@@ -38,6 +47,8 @@ enum corosync_event_type {
/* multicast message type */
enum corosync_message_type {
+ COROSYNC_MSG_TYPE_JOIN_REQUEST,
+ COROSYNC_MSG_TYPE_JOIN_RESPONSE,
COROSYNC_MSG_TYPE_NOTIFY,
COROSYNC_MSG_TYPE_BLOCK,
COROSYNC_MSG_TYPE_UNBLOCK,
@@ -46,19 +57,31 @@ enum corosync_message_type {
struct corosync_event {
enum corosync_event_type type;
- struct sheepid members[SD_MAX_NODES];
- size_t nr_members;
-
- struct sheepid sender;
+ struct cpg_node sender;
void *msg;
size_t msg_len;
+ enum cluster_join_result result;
+ uint32_t nr_nodes;
+ struct cpg_node nodes[SD_MAX_NODES];
+
int blocked;
int callbacked;
+ int first_node;
struct list_head list;
};
+struct corosync_message {
+ struct cpg_node sender;
+ enum corosync_message_type type : 4;
+ enum cluster_join_result result : 4;
+ uint32_t msg_len;
+ uint32_t nr_nodes;
+ struct cpg_node nodes[SD_MAX_NODES];
+ uint8_t msg[0];
+};
+
struct corosync_block_msg {
void *msg;
size_t msg_len;
@@ -68,6 +91,44 @@ struct corosync_block_msg {
struct list_head list;
};
+static int cpg_node_equal(struct cpg_node *a, struct cpg_node *b)
+{
+ return (a->nodeid == b->nodeid && a->pid == b->pid);
+}
+
+static inline int find_cpg_node(struct cpg_node *nodes, size_t nr_nodes,
+ struct cpg_node *key)
+{
+ int i;
+
+ for (i = 0; i < nr_nodes; i++)
+ if (cpg_node_equal(nodes + i, key))
+ return i;
+
+ return -1;
+}
+
+static inline void add_cpg_node(struct cpg_node *nodes, size_t nr_nodes,
+ struct cpg_node *added)
+{
+ nodes[nr_nodes++] = *added;
+}
+
+static inline void del_cpg_node(struct cpg_node *nodes, size_t nr_nodes,
+ struct cpg_node *deled)
+{
+ int idx;
+
+ idx = find_cpg_node(nodes, nr_nodes, deled);
+ if (idx < 0) {
+ dprintf("cannot find node\n");
+ return;
+ }
+
+ nr_nodes--;
+ memmove(nodes + idx, nodes + idx + 1, sizeof(*nodes) * nr_nodes - idx);
+}
+
static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
{
int ret, nr;
@@ -103,24 +164,26 @@ static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
return 0;
}
-static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
- struct sheepid *sheeps, size_t nr)
-{
- int i;
-
- for (i = 0; i < nr; i++) {
- nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr);
- sheeps[i].pid = cpgs[i].pid;
- }
-}
-
-static int send_message(uint64_t type, void *msg, size_t msg_len)
+static int send_message(enum corosync_message_type type,
+ enum cluster_join_result result,
+ struct cpg_node *sender, struct cpg_node *nodes,
+ size_t nr_nodes, void *msg, size_t msg_len)
{
struct iovec iov[2];
int ret, iov_cnt = 1;
+ struct corosync_message cmsg = {
+ .type = type,
+ .msg_len = msg_len,
+ .result = result,
+ .sender = *sender,
+ .nr_nodes = nr_nodes
+ };
- iov[0].iov_base = &type;
- iov[0].iov_len = sizeof(type);
+ if (nodes)
+ memcpy(cmsg.nodes, nodes, sizeof(*nodes) * nr_nodes);
+
+ iov[0].iov_base = &cmsg;
+ iov[0].iov_len = sizeof(cmsg);
if (msg) {
iov[1].iov_base = msg;
iov[1].iov_len = msg_len;
@@ -153,13 +216,15 @@ static void corosync_block_done(struct work *work, int idx)
{
struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
- send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);
+ send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
+ bm->msg, bm->msg_len);
free(bm->msg);
free(bm);
}
-static struct corosync_event *find_block_event(struct sheepid *sender)
+static struct corosync_event *find_block_event(enum corosync_event_type type,
+ struct cpg_node *sender)
{
struct corosync_event *cevent;
@@ -167,67 +232,197 @@ static struct corosync_event *find_block_event(struct sheepid *sender)
if (!cevent->blocked)
continue;
- if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY &&
- sheepid_cmp(&cevent->sender, sender) == 0)
+ if (cevent->type == type &&
+ cpg_node_equal(&cevent->sender, sender))
return cevent;
}
return NULL;
}
+static int is_master(void)
+{
+ if (nr_cpg_nodes == 0)
+ /* this node should be the first cpg node */
+ return 1;
+
+ return cpg_node_equal(&cpg_nodes[0], &this_node);
+}
+
+static void build_node_list(struct cpg_node *nodes, size_t nr_nodes,
+ struct sheepdog_node_list_entry *entries)
+{
+ int i;
+
+ for (i = 0; i < nr_nodes; i++)
+ entries[i] = nodes[i].ent;
+}
+
+/*
+ * Process one dispatch event
+ *
+ * Returns 1 if the event is processed
+ */
+static int __corosync_dispatch_one(struct corosync_event *cevent)
+{
+ struct corosync_block_msg *bm;
+ enum cluster_join_result res;
+ struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+ int idx;
+
+ switch (cevent->type) {
+ case COROSYNC_EVENT_TYPE_JOIN:
+ if (cevent->blocked) {
+ if (!is_master())
+ return 0;
+
+ if (!cevent->msg)
+ /* we haven't receive JOIN_REQUEST yet */
+ return 0;
+
+ if (cevent->callbacked)
+ /* check_join() must be called only once */
+ return 0;
+
+ build_node_list(cpg_nodes, nr_cpg_nodes, entries);
+ res = corosync_handlers.check_join_handler(&cevent->sender.ent,
+ cevent->msg);
+ if (res == CJ_RES_MASTER_TRANSFER)
+ nr_cpg_nodes = 0;
+
+ send_message(COROSYNC_MSG_TYPE_JOIN_RESPONSE, res,
+ &cevent->sender, cpg_nodes, nr_cpg_nodes,
+ cevent->msg, cevent->msg_len);
+
+ if (res == CJ_RES_MASTER_TRANSFER) {
+ eprintf("Restart me later when master is up, please. Bye.\n");
+ exit(1);
+ }
+
+ cevent->callbacked = 1;
+ return 0;
+ }
+
+ switch (cevent->result) {
+ case CJ_RES_SUCCESS:
+ case CJ_RES_MASTER_TRANSFER:
+ add_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
+ nr_cpg_nodes++;
+ /* fall through */
+ case CJ_RES_FAIL:
+ case CJ_RES_JOIN_LATER:
+ build_node_list(cpg_nodes, nr_cpg_nodes, entries);
+ corosync_handlers.join_handler(&cevent->sender.ent, entries,
+ nr_cpg_nodes, cevent->result,
+ cevent->msg);
+ break;
+ }
+ break;
+ case COROSYNC_EVENT_TYPE_LEAVE:
+ idx = find_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
+ if (idx < 0)
+ break;
+ cevent->sender.ent = cpg_nodes[idx].ent;
+
+ del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
+ nr_cpg_nodes--;
+ build_node_list(cpg_nodes, nr_cpg_nodes, entries);
+ corosync_handlers.leave_handler(&cevent->sender.ent,
+ entries, nr_cpg_nodes);
+ break;
+ case COROSYNC_EVENT_TYPE_NOTIFY:
+ if (cevent->blocked) {
+ if (cpg_node_equal(&cevent->sender, &this_node) &&
+ !cevent->callbacked) {
+ /* call a block callback function from a worker thread */
+ if (list_empty(&corosync_block_list))
+ panic("cannot call block callback\n");
+
+ bm = list_first_entry(&corosync_block_list,
+ typeof(*bm), list);
+ list_del(&bm->list);
+
+ bm->work.fn = corosync_block;
+ bm->work.done = corosync_block_done;
+ queue_work(corosync_block_wq, &bm->work);
+
+ cevent->callbacked = 1;
+ }
+
+ /* block the rest messages until unblock message comes */
+ return 0;
+ }
+
+ corosync_handlers.notify_handler(&cevent->sender.ent, cevent->msg,
+ cevent->msg_len);
+ break;
+ }
+
+ return 1;
+}
+
static void __corosync_dispatch(void)
{
struct corosync_event *cevent;
- struct corosync_block_msg *bm;
+ static int join_finished;
+ int done;
while (!list_empty(&corosync_event_list)) {
cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
- switch (cevent->type) {
- case COROSYNC_EVENT_TYPE_JOIN:
- corosync_handlers.join_handler(&cevent->sender,
- cevent->members,
- cevent->nr_members);
- break;
- case COROSYNC_EVENT_TYPE_LEAVE:
- corosync_handlers.leave_handler(&cevent->sender,
- cevent->members,
- cevent->nr_members);
- break;
- case COROSYNC_EVENT_TYPE_NOTIFY:
- if (cevent->blocked) {
- if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 &&
- !cevent->callbacked) {
- /* call a block callback function from a worker thread */
- if (list_empty(&corosync_block_list))
- panic("cannot call block callback\n");
-
- bm = list_first_entry(&corosync_block_list,
- typeof(*bm), list);
- list_del(&bm->list);
-
- bm->work.fn = corosync_block;
- bm->work.done = corosync_block_done;
- queue_work(corosync_block_wq, &bm->work);
-
- cevent->callbacked = 1;
+ /* update join status */
+ if (!join_finished && cevent->type == COROSYNC_EVENT_TYPE_JOIN) {
+ if (cevent->first_node) {
+ join_finished = 1;
+ nr_cpg_nodes = 0;
+ }
+ if (!cevent->blocked && cpg_node_equal(&cevent->sender, &this_node)) {
+ if (cevent->result == CJ_RES_SUCCESS ||
+ cevent->result == CJ_RES_MASTER_TRANSFER) {
+ join_finished = 1;
+ nr_cpg_nodes = cevent->nr_nodes;
+ memcpy(cpg_nodes, cevent->nodes,
+ sizeof(*cevent->nodes) * cevent->nr_nodes);
}
-
- /* block the rest messages until unblock message comes */
- goto out;
}
+ }
+
+ if (join_finished)
+ done = __corosync_dispatch_one(cevent);
+ else
+ done = !cevent->blocked;
- corosync_handlers.notify_handler(&cevent->sender,
- cevent->msg,
- cevent->msg_len);
+ if (!done)
break;
- }
list_del(&cevent->list);
free(cevent);
}
-out:
- return;
+}
+
+static struct corosync_event *update_block_event(enum corosync_event_type type,
+ struct cpg_node *sender,
+ void *msg, size_t msg_len)
+{
+ struct corosync_event *cevent;
+
+ cevent = find_block_event(type, sender);
+ if (!cevent)
+ /* block message was casted before this node joins */
+ return NULL;
+
+ cevent->msg_len = msg_len;
+ if (msg_len) {
+ cevent->msg = realloc(cevent->msg, msg_len);
+ if (!cevent->msg)
+ panic("oom\n");
+ memcpy(cevent->msg, msg, msg_len);
+ } else {
+ free(cevent->msg);
+ cevent->msg = NULL;
+ }
+
+ return cevent;
}
static void cdrv_cpg_deliver(cpg_handle_t handle,
@@ -236,57 +431,67 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
void *msg, size_t msg_len)
{
struct corosync_event *cevent;
- uint64_t type;
- struct sheepid sender;
-
- nodeid_to_addr(nodeid, sender.addr);
- sender.pid = pid;
-
- memcpy(&type, msg, sizeof(type));
- msg = (uint8_t *)msg + sizeof(type);
- msg_len -= sizeof(type);
+ struct corosync_message *cmsg = msg;
cevent = zalloc(sizeof(*cevent));
if (!cevent)
panic("oom\n");
- switch (type) {
+ switch (cmsg->type) {
+ case COROSYNC_MSG_TYPE_JOIN_REQUEST:
+ free(cevent); /* we don't add a new cluster event in this case */
+
+ cevent = update_block_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender,
+ cmsg->msg, cmsg->msg_len);
+ if (!cevent)
+ break;
+
+ cevent->sender = cmsg->sender;
+ cevent->msg_len = cmsg->msg_len;
+ break;
case COROSYNC_MSG_TYPE_BLOCK:
cevent->blocked = 1;
/* fall through */
case COROSYNC_MSG_TYPE_NOTIFY:
cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
- cevent->sender = sender;
- cevent->msg_len = msg_len;
- if (msg_len) {
- cevent->msg = zalloc(msg_len);
+
+ cevent->sender = cmsg->sender;
+ cevent->msg_len = cmsg->msg_len;
+ if (cmsg->msg_len) {
+ cevent->msg = zalloc(cmsg->msg_len);
if (!cevent->msg)
panic("oom\n");
- memcpy(cevent->msg, msg, msg_len);
+ memcpy(cevent->msg, cmsg->msg, cmsg->msg_len);
} else
cevent->msg = NULL;
list_add_tail(&cevent->list, &corosync_event_list);
break;
+ case COROSYNC_MSG_TYPE_JOIN_RESPONSE:
+ free(cevent); /* we don't add a new cluster event in this case */
+
+ cevent = update_block_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender,
+ cmsg->msg, cmsg->msg_len);
+ if (!cevent)
+ break;
+
+ cevent->blocked = 0;
+
+ cevent->result = cmsg->result;
+ cevent->nr_nodes = cmsg->nr_nodes;
+ memcpy(cevent->nodes, cmsg->nodes,
+ sizeof(*cmsg->nodes) * cmsg->nr_nodes);
+
+ break;
case COROSYNC_MSG_TYPE_UNBLOCK:
free(cevent); /* we don't add a new cluster event in this case */
- cevent = find_block_event(&sender);
+ cevent = update_block_event(COROSYNC_EVENT_TYPE_NOTIFY,
+ &cmsg->sender, cmsg->msg, cmsg->msg_len);
if (!cevent)
- /* block message was casted before this node joins */
break;
cevent->blocked = 0;
- cevent->msg_len = msg_len;
- if (msg_len) {
- cevent->msg = realloc(cevent->msg, msg_len);
- if (!cevent->msg)
- panic("oom\n");
- memcpy(cevent->msg, msg, msg_len);
- } else {
- free(cevent->msg);
- cevent->msg = NULL;
- }
break;
}
@@ -304,27 +509,32 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
{
struct corosync_event *cevent;
int i;
- struct sheepid member_sheeps[SD_MAX_NODES];
- struct sheepid joined_sheeps[SD_MAX_NODES];
- struct sheepid left_sheeps[SD_MAX_NODES];
-
- /* convert cpg_address to sheepid*/
- cpg_addr_to_sheepid(member_list, member_sheeps, member_list_entries);
- cpg_addr_to_sheepid(left_list, left_sheeps, left_list_entries);
- cpg_addr_to_sheepid(joined_list, joined_sheeps, joined_list_entries);
-
- /* calculate a start member list */
- sheepid_del(member_sheeps, member_list_entries,
- joined_sheeps, joined_list_entries);
- member_list_entries -= joined_list_entries;
+ struct cpg_node joined_sheeps[SD_MAX_NODES];
+ struct cpg_node left_sheeps[SD_MAX_NODES];
- sheepid_add(member_sheeps, member_list_entries,
- left_sheeps, left_list_entries);
- member_list_entries += left_list_entries;
+ /* convert cpg_address to cpg_node */
+ for (i = 0; i < left_list_entries; i++) {
+ left_sheeps[i].nodeid = left_list[i].nodeid;
+ left_sheeps[i].pid = left_list[i].pid;
+ }
+ for (i = 0; i < joined_list_entries; i++) {
+ joined_sheeps[i].nodeid = joined_list[i].nodeid;
+ joined_sheeps[i].pid = joined_list[i].pid;
+ }
/* dispatch leave_handler */
for (i = 0; i < left_list_entries; i++) {
- cevent = find_block_event(left_sheeps + i);
+ cevent = find_block_event(COROSYNC_EVENT_TYPE_JOIN,
+ left_sheeps + i);
+ if (cevent) {
+ /* the node left before joining */
+ list_del(&cevent->list);
+ free(cevent);
+ continue;
+ }
+
+ cevent = find_block_event(COROSYNC_EVENT_TYPE_NOTIFY,
+ left_sheeps + i);
if (cevent) {
/* the node left before sending UNBLOCK */
list_del(&cevent->list);
@@ -335,14 +545,8 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
if (!cevent)
panic("oom\n");
- sheepid_del(member_sheeps, member_list_entries,
- left_sheeps + i, 1);
- member_list_entries--;
-
cevent->type = COROSYNC_EVENT_TYPE_LEAVE;
cevent->sender = left_sheeps[i];
- memcpy(cevent->members, member_sheeps, sizeof(member_sheeps));
- cevent->nr_members = member_list_entries;
list_add_tail(&cevent->list, &corosync_event_list);
}
@@ -353,14 +557,12 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
if (!cevent)
panic("oom\n");
- sheepid_add(member_sheeps, member_list_entries,
- joined_sheeps, 1);
- member_list_entries++;
-
cevent->type = COROSYNC_EVENT_TYPE_JOIN;
cevent->sender = joined_sheeps[i];
- memcpy(cevent->members, member_sheeps, sizeof(member_sheeps));
- cevent->nr_members = member_list_entries;
+ cevent->blocked = 1; /* FIXME: add explanation */
+ if (member_list_entries == joined_list_entries - left_list_entries &&
+ cpg_node_equal(&joined_sheeps[0], &this_node))
+ cevent->first_node = 1;
list_add_tail(&cevent->list, &corosync_event_list);
}
@@ -368,7 +570,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
__corosync_dispatch();
}
-static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
+static int corosync_init(struct cdrv_handlers *handlers, uint8_t *myaddr)
{
int ret, fd;
uint32_t nodeid;
@@ -398,14 +600,14 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
return -1;
}
- ret = nodeid_to_addr(nodeid, myid->addr);
+ ret = nodeid_to_addr(nodeid, myaddr);
if (ret < 0) {
eprintf("failed to get local address\n");
return -1;
}
- myid->pid = getpid();
- this_sheepid = *myid;
+ this_node.nodeid = nodeid;
+ this_node.pid = getpid();
ret = cpg_fd_get(cpg_handle, &fd);
if (ret != CPG_OK) {
@@ -418,7 +620,8 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
return fd;
}
-static int corosync_join(void)
+static int corosync_join(struct sheepdog_node_list_entry *myself,
+ void *opaque, size_t opaque_len)
{
int ret;
retry:
@@ -438,7 +641,12 @@ retry:
return -1;
}
- return 0;
+ this_node.ent = *myself;
+
+ ret = send_message(COROSYNC_MSG_TYPE_JOIN_REQUEST, 0, &this_node,
+ NULL, 0, opaque, opaque_len);
+
+ return ret;
}
static int corosync_leave(void)
@@ -472,9 +680,11 @@ static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))
bm->cb = block_cb;
list_add_tail(&bm->list, &corosync_block_list);
- ret = send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0);
+ ret = send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node,
+ NULL, 0, NULL, 0);
} else
- ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, msg, msg_len);
+ ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
+ NULL, 0, msg, msg_len);
return ret;
}
diff --git a/sheep/group.c b/sheep/group.c
index 0b18fd2..42c8d71 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -25,7 +25,6 @@
#include "cluster.h"
struct node {
- struct sheepid sheepid;
struct sheepdog_node_list_entry ent;
struct list_head list;
};
@@ -42,7 +41,6 @@ struct message_header {
uint8_t op;
uint8_t state;
uint32_t msg_length;
- struct sheepid sheepid;
struct sheepdog_node_list_entry from;
};
@@ -57,12 +55,10 @@ struct join_message {
uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */
uint8_t pad[3];
struct {
- struct sheepid sheepid;
struct sheepdog_node_list_entry ent;
} nodes[SD_MAX_NODES];
uint32_t nr_leave_nodes;
struct {
- struct sheepid sheepid;
struct sheepdog_node_list_entry ent;
} leave_nodes[SD_MAX_NODES];
};
@@ -93,17 +89,19 @@ struct work_notify {
struct work_join {
struct cpg_event cev;
- struct sheepid *member_list;
+ struct sheepdog_node_list_entry *member_list;
size_t member_list_entries;
- struct sheepid joined;
+ struct sheepdog_node_list_entry joined;
+
+ struct join_message jm;
};
struct work_leave {
struct cpg_event cev;
- struct sheepid *member_list;
+ struct sheepdog_node_list_entry *member_list;
size_t member_list_entries;
- struct sheepid left;
+ struct sheepdog_node_list_entry left;
};
#define print_node_list(node_list) \
@@ -111,11 +109,11 @@ struct work_leave {
struct node *__node; \
char __name[128]; \
list_for_each_entry(__node, node_list, list) { \
- dprintf("%c pid: %ld, ip: %s\n", \
+ dprintf("%c ip: %s, port: %d\n", \
is_myself(__node->ent.addr, __node->ent.port) ? 'l' : ' ', \
- __node->sheepid.pid, \
addr_to_str(__name, sizeof(__name), \
- __node->ent.addr, __node->ent.port)); \
+ __node->ent.addr, __node->ent.port), \
+ __node->ent.port); \
} \
})
@@ -389,12 +387,13 @@ out:
exit(1);
}
-static struct node *find_node(struct list_head *node_list, struct sheepid *id)
+static struct node *find_node(struct list_head *node_list,
+ struct sheepdog_node_list_entry *ent)
{
struct node *node;
list_for_each_entry(node, node_list, list) {
- if (sheepid_cmp(&node->sheepid, id) == 0)
+ if (node_cmp(&node->ent, ent) == 0)
return node;
}
@@ -483,7 +482,6 @@ static int add_node_to_leave_list(struct message_header *msg)
goto ret;
}
- n->sheepid = msg->sheepid;
n->ent = msg->from;
list_add_tail(&n->list, &sys->leave_list);
@@ -504,7 +502,6 @@ static int add_node_to_leave_list(struct message_header *msg)
continue;
}
- n->sheepid = jm->leave_nodes[i].sheepid;
n->ent = jm->leave_nodes[i].ent;
list_add_tail(&n->list, &tmp_list);
@@ -647,7 +644,6 @@ static int get_cluster_status(struct sheepdog_node_list_entry *from,
static void join(struct join_message *msg)
{
- struct node *node;
struct sheepdog_node_list_entry entry[SD_MAX_NODES];
int i;
@@ -666,12 +662,6 @@ static void join(struct join_message *msg)
&msg->inc_epoch);
msg->nr_sobjs = sys->nr_sobjs;
msg->ctime = get_cluster_ctime();
- msg->nr_nodes = 0;
- list_for_each_entry(node, &sys->sd_node_list, list) {
- msg->nodes[msg->nr_nodes].sheepid = node->sheepid;
- msg->nodes[msg->nr_nodes].ent = node->ent;
- msg->nr_nodes++;
- }
}
static int get_vdi_bitmap_from(struct sheepdog_node_list_entry *node)
@@ -737,18 +727,16 @@ static void get_vdi_bitmap_from_sd_list(void)
get_vdi_bitmap_from(&nodes[i]);
}
-static int move_node_to_sd_list(struct sheepid *id,
- struct sheepdog_node_list_entry ent)
+static int move_node_to_sd_list(struct sheepdog_node_list_entry ent)
{
struct node *node;
- node = find_node(&sys->cpg_node_list, id);
+ node = zalloc(sizeof(*node));
if (!node)
- return 1;
+ panic("failed to alloc memory for a new node\n");
node->ent = ent;
- list_del(&node->list);
list_add_tail(&node->list, &sys->sd_node_list);
sys->nr_vnodes = 0;
@@ -771,22 +759,14 @@ static int update_epoch_log(int epoch)
return ret;
}
-static void update_cluster_info(struct join_message *msg)
+static void update_cluster_info(struct join_message *msg,
+ struct sheepdog_node_list_entry *nodes,
+ size_t nr_nodes)
{
int i;
- int ret, nr_nodes = msg->nr_nodes;
+ int ret;
eprintf("status = %d, epoch = %d, %d, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished);
- if (msg->result != SD_RES_SUCCESS) {
- if (is_myself(msg->header.from.addr, msg->header.from.port)) {
- eprintf("failed to join sheepdog, %d\n", msg->result);
- leave_cluster();
- eprintf("Restart me later when master is up, please.Bye.\n");
- exit(1);
- /* sys->status = SD_STATUS_JOIN_FAILED; */
- }
- return;
- }
if (sys->status == SD_STATUS_JOIN_FAILED)
return;
@@ -799,15 +779,16 @@ static void update_cluster_info(struct join_message *msg)
sys->epoch = msg->epoch;
for (i = 0; i < nr_nodes; i++) {
- ret = move_node_to_sd_list(&msg->nodes[i].sheepid,
- msg->nodes[i].ent);
+ if (node_cmp(nodes + i, &msg->header.from) == 0)
+ continue;
+ ret = move_node_to_sd_list(nodes[i]);
/*
* the node belonged to sheepdog when the master build
* the JOIN response however it has gone.
*/
if (ret)
vprintf(SDOG_INFO "%s has gone\n",
- sheepid_to_str(&msg->nodes[i].sheepid));
+ node_to_str(&nodes[i]));
}
if (msg->cluster_status != SD_STATUS_OK)
@@ -819,14 +800,14 @@ static void update_cluster_info(struct join_message *msg)
update_epoch_log(sys->epoch);
join_finished:
- ret = move_node_to_sd_list(&msg->header.sheepid, msg->header.from);
+ ret = move_node_to_sd_list(msg->header.from);
/*
* this should not happen since __sd_deliver() checks if the
* host from msg on cpg_node_list.
*/
if (ret)
vprintf(SDOG_ERR "%s has gone\n",
- sheepid_to_str(&msg->header.sheepid));
+ node_to_str(&msg->header.from));
if (msg->cluster_status == SD_STATUS_OK) {
if (msg->inc_epoch) {
@@ -995,24 +976,24 @@ static void __sd_notify(struct cpg_event *cevent)
char name[128];
struct node *node;
- dprintf("op: %d, state: %u, size: %d, from: %s, pid: %ld\n",
+ dprintf("op: %d, state: %u, size: %d, from: %s, port: %d\n",
m->op, m->state, m->msg_length,
addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
- m->sheepid.pid);
+ m->from.port);
/*
* we don't want to perform any deliver events except mastership_tx event
* until we join; we wait for our JOIN message.
*/
if (!sys->join_finished && !master_tx_message(m)) {
- if (sheepid_cmp(&m->sheepid, &sys->this_sheepid) != 0) {
+ if (node_cmp(&m->from, &sys->this_node) != 0) {
cevent->skip = 1;
return;
}
}
if (join_message(m)) {
- node = find_node(&sys->cpg_node_list, &m->sheepid);
+ node = find_node(&sys->cpg_node_list, &m->from);
if (!node) {
dprintf("the node was left before join operation is finished\n");
return;
@@ -1020,70 +1001,6 @@ static void __sd_notify(struct cpg_event *cevent)
node->ent = m->from;
}
-
- if (m->state == DM_FIN) {
- switch (m->op) {
- case SD_MSG_JOIN:
- if (((struct join_message *)m)->cluster_status == SD_STATUS_OK)
- if (sys->status != SD_STATUS_OK) {
- struct join_message *msg = (struct join_message *)m;
- int i;
-
- get_vdi_bitmap_from_sd_list();
- get_vdi_bitmap_from(&m->from);
- for (i = 0; i < msg->nr_nodes;i++)
- get_vdi_bitmap_from(&msg->nodes[i].ent);
- }
- break;
- }
- }
-
-}
-
-static int tx_mastership(void)
-{
- struct mastership_tx_message msg;
- memset(&msg, 0, sizeof(msg));
- msg.header.proto_ver = SD_SHEEP_PROTO_VER;
- msg.header.op = SD_MSG_MASTER_TRANSFER;
- msg.header.state = DM_FIN;
- msg.header.msg_length = sizeof(msg);
- msg.header.from = sys->this_node;
- msg.header.sheepid = sys->this_sheepid;
-
- return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
-}
-
-static void send_join_response(struct work_notify *w)
-{
- struct message_header *m;
- struct join_message *jm;
- struct node *node;
-
- m = w->msg;
- jm = (struct join_message *)m;
- join(jm);
- m->state = DM_FIN;
-
- dprintf("%d, %d\n", jm->result, jm->cluster_status);
- if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) {
- jm->nr_leave_nodes = 0;
- list_for_each_entry(node, &sys->leave_list, list) {
- jm->leave_nodes[jm->nr_leave_nodes].sheepid = node->sheepid;
- jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
- jm->nr_leave_nodes++;
- }
- print_node_list(&sys->leave_list);
- } else if (jm->result != SD_RES_SUCCESS &&
- jm->epoch > sys->epoch &&
- jm->cluster_status == SD_STATUS_WAIT_FOR_JOIN) {
- eprintf("Transfer mastership.\n");
- tx_mastership();
- eprintf("Restart me later when master is up, please.Bye.\n");
- exit(1);
- }
- jm->epoch = sys->epoch;
- sys->cdrv->notify(m, m->msg_length, NULL);
}
static void __sd_notify_done(struct cpg_event *cevent)
@@ -1100,10 +1017,9 @@ static void __sd_notify_done(struct cpg_event *cevent)
if (m->state == DM_FIN) {
switch (m->op) {
case SD_MSG_JOIN:
- update_cluster_info((struct join_message *)m);
break;
case SD_MSG_LEAVE:
- node = find_node(&sys->sd_node_list, &m->sheepid);
+ node = find_node(&sys->sd_node_list, &m->from);
if (node) {
sys->nr_vnodes = 0;
@@ -1125,7 +1041,7 @@ static void __sd_notify_done(struct cpg_event *cevent)
*/
if (!sys->join_finished) {
sys->join_finished = 1;
- move_node_to_sd_list(&sys->this_sheepid, sys->this_node);
+ move_node_to_sd_list(sys->this_node);
sys->epoch = get_latest_epoch();
}
@@ -1158,7 +1074,6 @@ static void __sd_notify_done(struct cpg_event *cevent)
if (m->state == DM_INIT && is_master()) {
switch (m->op) {
case SD_MSG_JOIN:
- send_join_response(w);
break;
default:
eprintf("unknown message %d\n", m->op);
@@ -1174,17 +1089,18 @@ static void __sd_notify_done(struct cpg_event *cevent)
}
}
-static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
+static void sd_notify_handler(struct sheepdog_node_list_entry *sender,
+ void *msg, size_t msg_len)
{
struct cpg_event *cevent;
struct work_notify *w;
struct message_header *m = msg;
char name[128];
- dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n",
+ dprintf("op: %d, state: %u, size: %d, from: %s, pid: %u\n",
m->op, m->state, m->msg_length,
addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
- sender->pid);
+ sender->port);
w = zalloc(sizeof(*w));
if (!w)
@@ -1212,7 +1128,7 @@ static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
start_cpg_event_work();
}
-static void add_node(struct sheepid *id)
+static void add_node(struct sheepdog_node_list_entry *ent)
{
struct node *node;
@@ -1220,16 +1136,16 @@ static void add_node(struct sheepid *id)
if (!node)
panic("failed to alloc memory for a new node\n");
- node->sheepid = *id;
+ node->ent = *ent;
list_add_tail(&node->list, &sys->cpg_node_list);
}
-static int del_node(struct sheepid *id)
+static int del_node(struct sheepdog_node_list_entry *ent)
{
struct node *node;
- node = find_node(&sys->sd_node_list, id);
+ node = find_node(&sys->sd_node_list, ent);
if (node) {
int nr;
struct sheepdog_node_list_entry e[SD_MAX_NODES];
@@ -1252,7 +1168,7 @@ static int del_node(struct sheepid *id)
return 1;
}
- node = find_node(&sys->cpg_node_list, id);
+ node = find_node(&sys->cpg_node_list, ent);
if (node) {
list_del(&node->list);
free(node);
@@ -1264,7 +1180,7 @@ static int del_node(struct sheepid *id)
/*
* Check whether the majority of Sheepdog nodes are still alive or not
*/
-static int check_majority(struct sheepid *left)
+static int check_majority(struct sheepdog_node_list_entry *left)
{
int nr_nodes = 0, nr_majority, nr_reachable = 0, fd;
struct node *node;
@@ -1279,7 +1195,7 @@ static int check_majority(struct sheepid *left)
return 1;
list_for_each_entry(node, &sys->sd_node_list, list) {
- if (sheepid_cmp(&node->sheepid, left) == 0)
+ if (node_cmp(&node->ent, left) == 0)
continue;
addr_to_str(name, sizeof(name), node->ent.addr, 0);
@@ -1299,6 +1215,23 @@ static int check_majority(struct sheepid *left)
return 0;
}
+static void __sd_join(struct cpg_event *cevent)
+{
+ struct work_join *w = container_of(cevent, struct work_join, cev);
+ struct join_message *msg = &w->jm;
+ int i;
+
+ if (msg->cluster_status != SD_STATUS_OK)
+ return;
+
+ if (sys->status == SD_STATUS_OK)
+ return;
+
+ get_vdi_bitmap_from_sd_list();
+ for (i = 0; i < w->member_list_entries; i++)
+ get_vdi_bitmap_from(w->member_list + i);
+}
+
static void __sd_leave(struct cpg_event *cevent)
{
struct work_leave *w = container_of(cevent, struct work_leave, cev);
@@ -1309,7 +1242,7 @@ static void __sd_leave(struct cpg_event *cevent)
}
}
-static void send_join_request(struct sheepid *id)
+static int send_join_request(struct sheepdog_node_list_entry *ent)
{
struct join_message msg;
struct sheepdog_node_list_entry entries[SD_MAX_NODES];
@@ -1320,8 +1253,7 @@ static void send_join_request(struct sheepid *id)
msg.header.op = SD_MSG_JOIN;
msg.header.state = DM_INIT;
msg.header.msg_length = sizeof(msg);
- msg.header.from = sys->this_node;
- msg.header.sheepid = sys->this_sheepid;
+ msg.header.from = *ent;
get_global_nr_copies(&msg.nr_sobjs);
@@ -1333,22 +1265,24 @@ static void send_join_request(struct sheepid *id)
msg.nodes[i].ent = entries[i];
}
- sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
+ ret = sys->cdrv->join(ent, &msg, msg.header.msg_length);
+
+ vprintf(SDOG_INFO "%s\n", node_to_str(&sys->this_node));
- vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
+ return ret;
}
static void __sd_join_done(struct cpg_event *cevent)
{
struct work_join *w = container_of(cevent, struct work_join, cev);
- int ret, i;
- int first_cpg_node = 0;
+ struct join_message *jm = &w->jm;
+ struct node *node, *t;
+ int i;
if (w->member_list_entries == 1 &&
- sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) {
+ node_cmp(&w->joined, &sys->this_node) == 0) {
sys->join_finished = 1;
get_global_nr_copies(&sys->nr_sobjs);
- first_cpg_node = 1;
}
if (list_empty(&sys->cpg_node_list)) {
@@ -1357,47 +1291,16 @@ static void __sd_join_done(struct cpg_event *cevent)
} else
add_node(&w->joined);
- if (first_cpg_node) {
- struct join_message msg;
- struct sheepdog_node_list_entry entries[SD_MAX_NODES];
- int nr_entries;
- uint64_t ctime;
- uint32_t epoch;
-
- /*
- * If I'm the first sheep joins in colosync, I
- * becomes the master without sending JOIN.
- */
-
- vprintf(SDOG_DEBUG "%s\n", sheepid_to_str(&sys->this_sheepid));
-
- memset(&msg, 0, sizeof(msg));
-
- msg.header.from = sys->this_node;
- msg.header.sheepid = sys->this_sheepid;
-
- nr_entries = ARRAY_SIZE(entries);
- ret = read_epoch(&epoch, &ctime, entries, &nr_entries);
- if (ret == SD_RES_SUCCESS) {
- sys->epoch = epoch;
- msg.ctime = ctime;
- get_cluster_status(&msg.header.from, entries, nr_entries,
- ctime, epoch, &msg.cluster_status, NULL);
- } else
- msg.cluster_status = SD_STATUS_WAIT_FOR_FORMAT;
-
- update_cluster_info(&msg);
+ print_node_list(&sys->sd_node_list);
- if (sys->status == SD_STATUS_OK) /* sheepdog starts with one node */
- start_recovery(sys->epoch);
+ update_cluster_info(jm, w->member_list, w->member_list_entries);
- return;
+ if (sys->status == SD_STATUS_OK) {
+ list_for_each_entry_safe(node, t, &sys->leave_list, list) {
+ list_del(&node->list);
+ }
+ start_recovery(sys->epoch);
}
-
- print_node_list(&sys->sd_node_list);
-
- if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0)
- send_join_request(&w->joined);
}
static void __sd_leave_done(struct cpg_event *cevent)
@@ -1455,6 +1358,7 @@ static void cpg_event_fn(struct work *work, int idx)
switch (cevent->ctype) {
case CPG_EVENT_JOIN:
+ __sd_join(cevent);
break;
case CPG_EVENT_LEAVE:
__sd_leave(cevent);
@@ -1746,62 +1650,214 @@ do_retry:
queue_work(sys->cpg_wqueue, &cpg_event_work);
}
-static void sd_join_handler(struct sheepid *joined, struct sheepid *members,
- size_t nr_members)
+static enum cluster_join_result sd_check_join_handler(struct sheepdog_node_list_entry *joining,
+ void *opaque)
+{
+ struct message_header *m = opaque;
+ struct join_message *jm;
+ struct node *node;
+
+ jm = (struct join_message *)m;
+
+ if (node_cmp(joining, &sys->this_node) == 0) {
+ struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+ int nr_entries;
+ uint64_t ctime;
+ uint32_t epoch;
+ int ret;
+
+ /*
+ * If I'm the first sheep joins in colosync, I
+ * becomes the master without sending JOIN.
+ */
+
+ vprintf(SDOG_DEBUG "%s\n", node_to_str(&sys->this_node));
+
+ jm->header.from = sys->this_node;
+
+ nr_entries = ARRAY_SIZE(entries);
+ ret = read_epoch(&epoch, &ctime, entries, &nr_entries);
+ if (ret == SD_RES_SUCCESS) {
+ sys->epoch = epoch;
+ jm->ctime = ctime;
+ get_cluster_status(&jm->header.from, entries, nr_entries,
+ ctime, epoch, &jm->cluster_status, NULL);
+ } else
+ jm->cluster_status = SD_STATUS_WAIT_FOR_FORMAT;
+
+ return CJ_RES_SUCCESS;
+ }
+
+ join(jm);
+ m->state = DM_FIN;
+
+ dprintf("%d, %d\n", jm->result, jm->cluster_status);
+ if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) {
+ jm->nr_leave_nodes = 0;
+ list_for_each_entry(node, &sys->leave_list, list) {
+ jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
+ jm->nr_leave_nodes++;
+ }
+ print_node_list(&sys->leave_list);
+ } else if (jm->result != SD_RES_SUCCESS &&
+ jm->epoch > sys->epoch &&
+ jm->cluster_status == SD_STATUS_WAIT_FOR_JOIN) {
+ eprintf("Transfer mastership. %d, %d\n", jm->epoch, sys->epoch);
+ return CJ_RES_MASTER_TRANSFER;
+ }
+ jm->epoch = sys->epoch;
+
+ if (jm->result == SD_RES_SUCCESS)
+ return CJ_RES_SUCCESS;
+ else if (jm->result == SD_RES_OLD_NODE_VER &&
+ jm->result == SD_RES_NEW_NODE_VER)
+ return CJ_RES_JOIN_LATER;
+ else
+ return CJ_RES_FAIL;
+}
+
+static void sd_join_handler(struct sheepdog_node_list_entry *joined,
+ struct sheepdog_node_list_entry *members,
+ size_t nr_members, enum cluster_join_result result,
+ void *opaque)
{
struct cpg_event *cevent;
struct work_join *w = NULL;
int i, size;
+ int nr, nr_local, nr_leave;
+ struct node *n;
+ struct join_message *jm;
+ int le = get_latest_epoch();
- dprintf("join %s\n", sheepid_to_str(joined));
- for (i = 0; i < nr_members; i++)
- dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
+ if (node_cmp(joined, &sys->this_node) == 0) {
+ if (result == CJ_RES_FAIL) {
+ eprintf("failed to join sheepdog\n");
+ sys->cdrv->leave();
+ exit(1);
+ } else if (result == CJ_RES_JOIN_LATER) {
+ eprintf("Restart me later when master is up, please .Bye.\n");
+ sys->cdrv->leave();
+ exit(1);
+ }
+ }
- if (sys->status == SD_STATUS_SHUTDOWN)
- return;
+ switch (result) {
+ case CJ_RES_SUCCESS:
+ dprintf("join %s\n", node_to_str(joined));
+ for (i = 0; i < nr_members; i++)
+ dprintf("[%x] %s\n", i, node_to_str(members + i));
- w = zalloc(sizeof(*w));
- if (!w)
- goto oom;
+ if (sys->status == SD_STATUS_SHUTDOWN)
+ break;
- cevent = &w->cev;
- cevent->ctype = CPG_EVENT_JOIN;
+ w = zalloc(sizeof(*w));
+ if (!w)
+ panic("oom");
+ cevent = &w->cev;
+ cevent->ctype = CPG_EVENT_JOIN;
- vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
- size = sizeof(struct sheepid) * nr_members;
- w->member_list = zalloc(size);
- if (!w->member_list)
- goto oom;
- memcpy(w->member_list, members, size);
- w->member_list_entries = nr_members;
+ vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
- w->joined = *joined;
+ size = sizeof(struct sheepdog_node_list_entry) * nr_members;
+ w->member_list = zalloc(size);
+ if (!w->member_list)
+ panic("oom");
- list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
- start_cpg_event_work();
+ memcpy(w->member_list, members, size);
+ w->member_list_entries = nr_members;
- return;
-oom:
- if (w) {
- if (w->member_list)
- free(w->member_list);
- free(w);
+ w->joined = *joined;
+
+ memcpy(&w->jm, opaque, sizeof(w->jm));
+
+ list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+ start_cpg_event_work();
+ break;
+ case CJ_RES_FAIL:
+ case CJ_RES_JOIN_LATER:
+ if (sys->status != SD_STATUS_WAIT_FOR_JOIN)
+ break;
+
+ n = zalloc(sizeof(*n));
+ if (!n)
+ panic("oom\n");
+
+ if (find_entry_list(joined, &sys->leave_list)
+ || !find_entry_epoch(joined, le)) {
+ free(n);
+ break;
+ }
+
+ n->ent = *joined;
+
+ list_add_tail(&n->list, &sys->leave_list);
+
+ nr_local = get_nodes_nr_epoch(sys->epoch);
+ nr = nr_members;
+ nr_leave = get_nodes_nr_from(&sys->leave_list);
+
+ dprintf("%d == %d + %d \n", nr_local, nr, nr_leave);
+ if (nr_local == nr + nr_leave) {
+ sys->status = SD_STATUS_OK;
+ update_epoch_log(sys->epoch);
+ update_epoch_store(sys->epoch);
+ }
+ break;
+ case CJ_RES_MASTER_TRANSFER:
+ jm = (struct join_message *)opaque;
+ nr = jm->nr_leave_nodes;
+ for (i = 0; i < nr; i++) {
+ n = zalloc(sizeof(*n));
+ if (!n)
+ panic("oom\n");
+
+ if (find_entry_list(&jm->leave_nodes[i].ent, &sys->leave_list)
+ || !find_entry_epoch(&jm->leave_nodes[i].ent, le)) {
+ free(n);
+ continue;
+ }
+
+ n->ent = jm->leave_nodes[i].ent;
+
+ list_add_tail(&n->list, &sys->leave_list);
+ }
+
+ /* Sheep needs this to identify itself as master.
+ * Now mastership transfer is done.
+ */
+ if (!sys->join_finished) {
+ sys->join_finished = 1;
+ move_node_to_sd_list(sys->this_node);
+ sys->epoch = get_latest_epoch();
+ }
+
+ nr_local = get_nodes_nr_epoch(sys->epoch);
+ nr = nr_members;
+ nr_leave = get_nodes_nr_from(&sys->leave_list);
+
+ dprintf("%d == %d + %d \n", nr_local, nr, nr_leave);
+ if (nr_local == nr + nr_leave) {
+ sys->status = SD_STATUS_OK;
+ update_epoch_log(sys->epoch);
+ update_epoch_store(sys->epoch);
+ }
+ break;
}
- panic("failed to allocate memory for a confchg event\n");
}
-static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
+static void sd_leave_handler(struct sheepdog_node_list_entry *left,
+ struct sheepdog_node_list_entry *members,
size_t nr_members)
{
struct cpg_event *cevent;
struct work_leave *w = NULL;
int i, size;
- dprintf("leave %s\n", sheepid_to_str(left));
+ dprintf("leave %s\n", node_to_str(left));
for (i = 0; i < nr_members; i++)
- dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
+ dprintf("[%x] %s\n", i, node_to_str(members + i));
if (sys->status == SD_STATUS_SHUTDOWN)
return;
@@ -1816,7 +1872,7 @@ static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
- size = sizeof(struct sheepid) * nr_members;
+ size = sizeof(struct sheepdog_node_list_entry) * nr_members;
w->member_list = zalloc(size);
if (!w->member_list)
goto oom;
@@ -1843,6 +1899,7 @@ int create_cluster(int port, int64_t zone)
int fd, ret;
struct cluster_driver *cdrv;
struct cdrv_handlers handlers = {
+ .check_join_handler = sd_check_join_handler,
.join_handler = sd_join_handler,
.leave_handler = sd_leave_handler,
.notify_handler = sd_notify_handler,
@@ -1858,26 +1915,24 @@ int create_cluster(int port, int64_t zone)
}
}
- fd = sys->cdrv->init(&handlers, &sys->this_sheepid);
+ fd = sys->cdrv->init(&handlers, sys->this_node.addr);
if (fd < 0)
return -1;
- ret = sys->cdrv->join();
- if (ret != 0)
- return -1;
-
- memcpy(sys->this_node.addr, sys->this_sheepid.addr,
- sizeof(sys->this_node.addr));
sys->this_node.port = port;
sys->this_node.nr_vnodes = SD_DEFAULT_VNODES;
if (zone == -1) {
/* use last 4 bytes as zone id */
- uint8_t *b = sys->this_sheepid.addr + 12;
+ uint8_t *b = sys->this_node.addr + 12;
sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24;
} else
sys->this_node.zone = zone;
dprintf("zone id = %u\n", sys->this_node.zone);
+ ret = send_join_request(&sys->this_node);
+ if (ret != 0)
+ return -1;
+
if (get_latest_epoch() == 0)
sys->status = SD_STATUS_WAIT_FOR_FORMAT;
else
@@ -1912,7 +1967,6 @@ int leave_cluster(void)
msg.header.state = DM_FIN;
msg.header.msg_length = sizeof(msg);
msg.header.from = sys->this_node;
- msg.header.sheepid = sys->this_sheepid;
msg.epoch = get_latest_epoch();
dprintf("%d\n", msg.epoch);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index b292548..5613f12 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -110,7 +110,6 @@ struct cluster_info {
/* set after finishing the JOIN procedure */
int join_finished;
- struct sheepid this_sheepid;
struct sheepdog_node_list_entry this_node;
uint32_t epoch;
--
1.7.2.5
More information about the sheepdog
mailing list