[sheepdog] [PATCH] sheep: remove master node
MORITA Kazutaka
morita.kazutaka at gmail.com
Sat Jul 13 17:08:46 CEST 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
The current procedure to handle sheep join is as follows.
1. The joining node sends a join request.
2. The master node accepts the request.
3. All the nodes update cluster members.
This procedure has some problems:
- The master election is too complex to maintain.
It is very difficult to make sure that the implementation is
correct.
- The master node can fail while it is accepting the joining node.
The newly elected master has to take over the process, but it's
usually difficult to implement because we have to know what the
previous master did and what it did not before its failure.
This patch changes the sheep join procedure to the following.
1. The joining node sends a join request.
2. Some of the existing nodes accept the request.
3. All the nodes update cluster members.
It is allowed for the multiple nodes to call sd_accept_handler()
against the same join request, but at least one node must have to do
it. With this change, we can eliminate a master, and node failure
while accepting node join is also allowed.
The zookeeper driver still needs master election to handle concurrent
joins. I wrote it as a TODO in the source code.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
include/shepherd.h | 3 --
sheep/cluster.h | 2 +-
sheep/cluster/corosync.c | 60 +++--------------------
sheep/cluster/local.c | 38 +++++++--------
sheep/cluster/shepherd.c | 43 +++++------------
sheep/cluster/zookeeper.c | 75 ++++++++++++++---------------
sheep/group.c | 12 ++++-
shepherd/shepherd.c | 115 +++------------------------------------------
8 files changed, 88 insertions(+), 260 deletions(-)
diff --git a/include/shepherd.h b/include/shepherd.h
index 9a4cf81..95b4b8b 100644
--- a/include/shepherd.h
+++ b/include/shepherd.h
@@ -23,7 +23,6 @@ enum sph_srv_msg_type {
SPH_SRV_MSG_LEAVE_FORWARD,
SPH_SRV_MSG_REMOVE,
- SPH_SRV_MSG_MASTER_ELECTION,
};
struct sph_msg {
@@ -39,7 +38,6 @@ struct sph_msg {
struct sph_msg_join {
struct sd_node new_node;
- uint8_t master_elected;
struct sd_node nodes[SD_MAX_NODES];
uint32_t nr_nodes;
@@ -116,7 +114,6 @@ static inline const char *sph_srv_msg_to_str(enum sph_srv_msg_type msg)
{ SPH_SRV_MSG_NOTIFY_FORWARD, "SPH_SRV_MSG_NOTIFY_FORWARD" },
{ SPH_SRV_MSG_BLOCK_FORWARD, "SPH_SRV_MSG_BLOCK_FORWARD" },
{ SPH_SRV_MSG_REMOVE, "SPH_SRV_MSG_REMOVE" },
- { SPH_SRV_MSG_MASTER_ELECTION, "SPH_SRV_MSG_MASTER_ELECTION" },
};
for (i = 0; i < ARRAY_SIZE(msgs); i++) {
diff --git a/sheep/cluster.h b/sheep/cluster.h
index dedab1a..74940fc 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -171,7 +171,7 @@ void sd_notify_handler(const struct sd_node *sender, void *msg, size_t msg_len);
bool sd_block_handler(const struct sd_node *sender);
int sd_reconnect_handler(void);
void sd_update_node_handler(struct sd_node *);
-void sd_accept_handler(const struct sd_node *joining,
+bool sd_accept_handler(const struct sd_node *joining,
const struct sd_node *nodes, size_t nr_nodes,
void *opaque);
void recalculate_vnodes(struct sd_node *nodes, int nr_nodes);
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 7d308f7..f8c6d94 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -24,7 +24,6 @@
struct cpg_node {
uint32_t nodeid;
uint32_t pid;
- uint32_t gone;
struct sd_node node;
};
@@ -238,28 +237,6 @@ find_event(enum corosync_event_type type, struct cpg_node *sender)
return find_nonblock_event(type, sender);
}
-static int is_master(struct cpg_node *node)
-{
- int i;
- struct cpg_node *n = node;
- if (!n)
- n = &this_node;
- if (nr_cpg_nodes == 0)
- /* this node should be the first cpg node */
- return 0;
-
- for (i = 0; i < SD_MAX_NODES; i++) {
- if (!cpg_nodes[i].gone)
- goto eq_check;
- }
- return -1;
-
-eq_check:
- if (cpg_node_equal(&cpg_nodes[i], n))
- return i;
- return -1;
-}
-
static void build_node_list(const struct cpg_node *nodes, size_t nr_nodes,
struct sd_node *entries)
{
@@ -282,9 +259,6 @@ static bool __corosync_dispatch_one(struct corosync_event *cevent)
switch (cevent->type) {
case COROSYNC_EVENT_TYPE_JOIN:
- if (is_master(&this_node) < 0)
- return false;
-
if (!cevent->msg)
/* we haven't receive JOIN yet */
return false;
@@ -294,13 +268,14 @@ static bool __corosync_dispatch_one(struct corosync_event *cevent)
return false;
build_node_list(cpg_nodes, nr_cpg_nodes, entries);
- sd_accept_handler(&cevent->sender.node, entries, nr_cpg_nodes,
- cevent->msg);
- send_message(COROSYNC_MSG_TYPE_ACCEPT, &cevent->sender,
- cpg_nodes, nr_cpg_nodes, cevent->msg,
- cevent->msg_len);
+ if (sd_accept_handler(&cevent->sender.node, entries,
+ nr_cpg_nodes, cevent->msg)) {
+ send_message(COROSYNC_MSG_TYPE_ACCEPT, &cevent->sender,
+ cpg_nodes, nr_cpg_nodes, cevent->msg,
+ cevent->msg_len);
- cevent->callbacked = true;
+ cevent->callbacked = true;
+ }
return false;
case COROSYNC_EVENT_TYPE_ACCEPT:
add_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
@@ -469,7 +444,6 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
{
struct corosync_event *cevent;
struct corosync_message *cmsg = msg;
- int master;
sd_dprintf("%d", cmsg->type);
@@ -521,16 +495,6 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
case COROSYNC_MSG_TYPE_LEAVE:
cevent = xzalloc(sizeof(*cevent));
cevent->type = COROSYNC_EVENT_TYPE_LEAVE;
-
- master = is_master(&cmsg->sender);
- if (master >= 0)
- /*
- * Master is down before new nodes finish joining.
- * We have to revoke its mastership to avoid cluster
- * hanging
- */
- cpg_nodes[master].gone = 1;
-
cevent->sender = cmsg->sender;
cevent->msg_len = cmsg->msg_len;
if (cmsg->msg_len) {
@@ -614,7 +578,6 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
/* dispatch leave_handler */
for (i = 0; i < left_list_entries; i++) {
- int master;
cevent = find_event(COROSYNC_EVENT_TYPE_JOIN, left_sheep + i);
if (cevent) {
/* the node left before joining */
@@ -633,15 +596,6 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
}
cevent = xzalloc(sizeof(*cevent));
- master = is_master(&left_sheep[i]);
- if (master >= 0)
- /*
- * Master is down before new nodes finish joining.
- * We have to revoke its mastership to avoid cluster
- * hanging
- */
- cpg_nodes[master].gone = 1;
-
cevent->type = COROSYNC_EVENT_TYPE_LEAVE;
cevent->sender = left_sheep[i];
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 8757996..8b9203a 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -408,37 +408,33 @@ static bool local_process_event(void)
if (ev->callbacked)
return false; /* wait for unblock event */
- if (ev->type == EVENT_ACCEPT && lnode_eq(&this_node, &ev->sender)) {
- sd_dprintf("join Sheepdog");
- joined = true;
- }
-
if (!joined) {
- if (ev->type == EVENT_JOIN &&
- lnode_eq(&this_node, &ev->sender)) {
- struct local_node lnodes[SD_MAX_NODES];
-
- get_nodes(lnodes);
+ if (!lnode_eq(&this_node, &ev->sender))
+ goto out;
- if (!lnode_eq(&this_node, &lnodes[0])) {
- sd_dprintf("wait for another node"
- " to accept this node");
- return false;
- }
- } else
+ switch (ev->type) {
+ case EVENT_JOIN:
+ break;
+ case EVENT_ACCEPT:
+ sd_dprintf("join Sheepdog");
+ joined = true;
+ break;
+ default:
goto out;
+ }
}
switch (ev->type) {
case EVENT_JOIN:
/* nodes[nr_nodes - 1] is a sender, so don't include it */
assert(node_eq(&ev->sender.node, &nodes[nr_nodes - 1]));
- sd_accept_handler(&ev->sender.node, nodes, nr_nodes - 1,
- ev->buf);
- ev->type = EVENT_ACCEPT;
- msync(ev, sizeof(*ev), MS_SYNC);
+ if (sd_accept_handler(&ev->sender.node, nodes, nr_nodes - 1,
+ ev->buf)) {
+ ev->type = EVENT_ACCEPT;
+ msync(ev, sizeof(*ev), MS_SYNC);
- shm_queue_notify();
+ shm_queue_notify();
+ }
return false;
case EVENT_ACCEPT:
diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c
index ed79452..a9415b3 100644
--- a/sheep/cluster/shepherd.c
+++ b/sheep/cluster/shepherd.c
@@ -30,7 +30,6 @@
static int sph_comm_fd;
static struct sd_node this_node;
-static bool is_master;
static int nr_nodes;
static struct sd_node nodes[SD_MAX_NODES];
@@ -114,11 +113,8 @@ retry:
* FIXME: member change events must be ordered with nonblocked
* events
*/
- sd_accept_handler(&join->new_node, NULL, 0, join->opaque);
-
- /* FIXME: join->master_elected is needed? */
- assert(join->master_elected);
- is_master = true;
+ if (!sd_accept_handler(&join->new_node, NULL, 0, join->opaque))
+ panic("sd_accept_handler() failed");
snd.type = SPH_CLI_MSG_ACCEPT;
snd.body_len = join_len;
@@ -297,26 +293,12 @@ static void sph_event_handler(int fd, int events, void *data)
;
}
-static void elected_as_master(void)
-{
- sd_dprintf("elected_as_master() called");
-
- is_master = true;
- sd_iprintf("became new master");
-}
-
static void msg_new_node(struct sph_msg *rcv)
{
int ret;
struct sph_msg_join *join;
struct sph_msg snd;
- if (!is_master) {
- sd_printf(SDOG_EMERG, "I am not a master but received"
- " SPH_MSG_NEW_NODE, shepherd is buggy");
- exit(1);
- }
-
join = xzalloc(rcv->body_len);
ret = xread(sph_comm_fd, join, rcv->body_len);
if (ret != rcv->body_len) {
@@ -325,7 +307,13 @@ static void msg_new_node(struct sph_msg *rcv)
}
/* FIXME: member change events must be ordered with nonblocked events */
- sd_accept_handler(&join->new_node, nodes, nr_nodes, join->opaque);
+ if (!sd_accept_handler(&join->new_node, join->nodes, join->nr_nodes,
+ join->opaque))
+ /*
+ * This should succeed always because shepherd should have sent
+ * SPH_SRV_MSG_NEW_NODE only to the already joined node.
+ */
+ panic("sd_accept_handler() failed");
memset(&snd, 0, sizeof(snd));
snd.type = SPH_CLI_MSG_ACCEPT;
@@ -342,7 +330,6 @@ static void msg_new_node(struct sph_msg *rcv)
static void msg_new_node_finish(struct sph_msg *rcv)
{
int ret;
- struct join_message *jm;
struct sph_msg_join_node_finish *join_node_finish;
join_node_finish = xzalloc(rcv->body_len);
@@ -352,7 +339,6 @@ static void msg_new_node_finish(struct sph_msg *rcv)
exit(1);
}
- jm = (struct join_message *)join_node_finish->opaque;
memcpy(nodes, join_node_finish->nodes,
join_node_finish->nr_nodes * sizeof(struct sd_node));
nr_nodes = join_node_finish->nr_nodes;
@@ -361,7 +347,8 @@ static void msg_new_node_finish(struct sph_msg *rcv)
node_to_str(&join_node_finish->new_node));
/* FIXME: member change events must be ordered with nonblocked events */
- sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes, jm);
+ sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
+ join_node_finish->opaque);
free(join_node_finish);
}
@@ -441,11 +428,6 @@ static void msg_leave_forward(struct sph_msg *rcv)
do_leave_sheep();
}
-static void msg_master_election(struct sph_msg *rcv)
-{
- elected_as_master();
-}
-
static void (*msg_handlers[])(struct sph_msg *) = {
[SPH_SRV_MSG_NEW_NODE] = msg_new_node,
[SPH_SRV_MSG_NEW_NODE_FINISH] = msg_new_node_finish,
@@ -453,7 +435,6 @@ static void (*msg_handlers[])(struct sph_msg *) = {
[SPH_SRV_MSG_BLOCK_FORWARD] = msg_block_forward,
[SPH_SRV_MSG_REMOVE] = msg_remove,
[SPH_SRV_MSG_LEAVE_FORWARD] = msg_leave_forward,
- [SPH_SRV_MSG_MASTER_ELECTION] = msg_master_election,
};
static void interpret_msg(struct sph_msg *rcv)
@@ -593,8 +574,6 @@ static int shepherd_leave(void)
exit(1);
}
- is_master = false;
-
sd_dprintf("shepherd_leave() is completed");
return 0;
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index f450790..8b63bf7 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -72,15 +72,11 @@ static struct sd_node sd_nodes[SD_MAX_NODES];
static size_t nr_sd_nodes;
static struct rb_root zk_node_root = RB_ROOT;
static pthread_rwlock_t zk_tree_lock = PTHREAD_RWLOCK_INITIALIZER;
-static pthread_rwlock_t zk_compete_master_lock = PTHREAD_RWLOCK_INITIALIZER;
static LIST_HEAD(zk_block_list);
-static uatomic_bool is_master;
-static uatomic_bool stop;
+static bool first_member;
static bool joined;
static bool first_push = true;
-static void zk_compete_master(void);
-
static struct zk_node *zk_tree_insert(struct zk_node *new)
{
struct rb_node **p = &zk_node_root.rb_node;
@@ -563,12 +559,6 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
} else if (type == ZOO_DELETED_EVENT) {
struct zk_node *n;
- ret = sscanf(path, MASTER_ZNONE "/%s", str);
- if (ret == 1) {
- zk_compete_master();
- return;
- }
-
ret = sscanf(path, MEMBER_ZNODE "/%s", str);
if (ret != 1)
return;
@@ -720,27 +710,16 @@ static int zk_verify_last_sheep_join(int seq, int *last_sheep)
* Create sequential node under MASTER_ZNODE.
* Sheep with least sequential number win the competition.
*/
-static void zk_compete_master(void)
+static bool zk_compete_master(void)
{
int rc, last_joined_sheep;
char master_name[MAX_NODE_STR_LEN];
char my_compete_path[MAX_NODE_STR_LEN];
static int master_seq = -1, my_seq;
- /*
- * This is to protect master_seq and my_seq because this function will
- * be called by both main thread and zookeeper's event thread.
- */
- pthread_rwlock_wrlock(&zk_compete_master_lock);
-
- if (uatomic_is_true(&is_master) || uatomic_is_true(&stop))
- goto out_unlock;
-
if (!joined) {
sd_dprintf("start to compete master for the first time");
do {
- if (uatomic_is_true(&stop))
- goto out_unlock;
/* duplicate sequential node has no side-effect */
rc = zk_create_seq_node(MASTER_ZNONE "/",
node_to_str(&this_node.node),
@@ -750,7 +729,7 @@ static void zk_compete_master(void)
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
CHECK_ZK_RC(rc, MASTER_ZNONE "/");
if (rc != ZOK)
- goto out_unlock;
+ return false;
sd_dprintf("my compete path: %s", my_compete_path);
sscanf(my_compete_path, MASTER_ZNONE "/%"PRId32,
@@ -758,17 +737,17 @@ static void zk_compete_master(void)
}
if (zk_find_master(&master_seq, master_name) != ZOK)
- goto out_unlock;
+ return false;
if (!strcmp(master_name, node_to_str(&this_node.node)))
goto success;
else if (joined) {
sd_dprintf("lost");
- goto out_unlock;
+ return false;
} else {
if (zk_verify_last_sheep_join(my_seq,
&last_joined_sheep) != ZOK)
- goto out_unlock;
+ return false;
if (last_joined_sheep < 0) {
/* all previous sheep has quit, i'm master */
@@ -776,14 +755,20 @@ static void zk_compete_master(void)
goto success;
} else {
sd_dprintf("lost");
- goto out_unlock;
+ return false;
}
}
success:
- uatomic_set_true(&is_master);
sd_dprintf("success");
-out_unlock:
- pthread_rwlock_unlock(&zk_compete_master_lock);
+ return true;
+}
+
+static int zk_member_empty(void)
+{
+ struct String_vector strs;
+
+ zk_get_children(MEMBER_ZNODE, &strs);
+ return (strs.count == 0);
}
static int zk_join(const struct sd_node *myself,
@@ -801,7 +786,9 @@ static int zk_join(const struct sd_node *myself,
exit(1);
}
- zk_compete_master();
+ if (zk_member_empty() && zk_compete_master())
+ first_member = true;
+
RETURN_IF_ERROR(add_join_event(opaque, opaque_len), "");
return ZOK;
@@ -812,7 +799,6 @@ static int zk_leave(void)
char path[PATH_MAX];
sd_iprintf("leaving from cluster");
- uatomic_set_true(&stop);
snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
node_to_str(&this_node.node));
@@ -839,16 +825,24 @@ static int zk_unblock(void *msg, size_t msg_len)
static void zk_handle_join(struct zk_event *ev)
{
sd_dprintf("sender: %s", node_to_str(&ev->sender.node));
- if (!uatomic_is_true(&is_master)) {
- /* Let's await master acking the join-request */
+ if (!first_member && node_eq(&ev->sender.node, &this_node.node)) {
+ /*
+ * This node doesn't have sd_nodes yet. Let's await another
+ * acking the join-request.
+ *
+ * TODO: If the first member can have a valid sd_node, the node
+ * can call sd_accept_handler and we can remove a master from
+ * this driver.
+ */
queue_pos--;
return;
}
- sd_accept_handler(&ev->sender.node, sd_nodes, nr_sd_nodes, ev->buf);
- push_join_response(ev);
-
- sd_dprintf("I'm the master now");
+ if (sd_accept_handler(&ev->sender.node, sd_nodes, nr_sd_nodes, ev->buf))
+ push_join_response(ev);
+ else
+ /* Let's await another acking the join-request */
+ queue_pos--;
}
static void watch_all_nodes(void)
@@ -1061,6 +1055,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
sd_eprintf("detect a session timeout. reconnecting...");
handle_session_expire();
sd_iprintf("reconnected");
+ first_member = false;
eventfd_write(efd, 1);
return;
}
@@ -1121,8 +1116,6 @@ static int zk_init(const char *option)
return -1;
}
- uatomic_set_false(&stop);
- uatomic_set_false(&is_master);
if (zk_queue_init() != ZOK)
return -1;
diff --git a/sheep/group.c b/sheep/group.c
index 8b52198..ee7f37a 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -779,14 +779,22 @@ void sd_notify_handler(const struct sd_node *sender, void *data,
* Accept the joining node and pass the cluster info to it.
*
* Note that 'nodes' doesn't contain 'joining'.
+ *
+ * Return true if the joining node is accepted. At least one nodes in the
+ * cluster must call this function and succeed in accept of the joining node.
*/
-void sd_accept_handler(const struct sd_node *joining,
+bool sd_accept_handler(const struct sd_node *joining,
const struct sd_node *nodes, size_t nr_nodes,
void *opaque)
{
struct join_message *jm = opaque;
char str[MAX_NODE_STR_LEN];
+ if (nr_nodes > 0 && node_is_local(joining)) {
+ sd_dprintf("wait for another node to accept this node");
+ return false;
+ }
+
sd_dprintf("check %s, %d", node_to_str(joining), sys->status);
jm->proto_ver = SD_SHEEP_PROTO_VER;
@@ -802,6 +810,8 @@ void sd_accept_handler(const struct sd_node *joining,
joining->nid.port), jm->cluster_status);
jm->cinfo = sys->cinfo;
+
+ return true;
}
static int send_join_request(struct sd_node *ent)
diff --git a/shepherd/shepherd.c b/shepherd/shepherd.c
index 183107a..a7b4f73 100644
--- a/shepherd/shepherd.c
+++ b/shepherd/shepherd.c
@@ -62,19 +62,6 @@ struct sheep {
};
static LIST_HEAD(sheep_list_head);
-/*
- * nr_joined_sheep is a number of sheeps which is in state of
- * SHEEP_STATE_JOINED, not the length of sheep_list_head
- */
-static int nr_joined_sheep;
-
-/*
- * important invariant of shepherd: nr_joined_sheep ? !!master_sheep : true
- *
- * if there is at least one sheep which is in state of SHEEP_STATE_JOINED,
- * master sheep must be elected
- */
-static struct sheep *master_sheep;
static bool running;
static const char *progname;
@@ -122,9 +109,6 @@ static inline void remove_sheep(struct sheep *sheep)
sd_dprintf("remove_sheep() called, removing %s",
node_to_str(&sheep->node));
- if (sheep->state == SHEEP_STATE_JOINED)
- nr_joined_sheep--;
-
sheep->state = SHEEP_STATE_LEAVING;
ret = eventfd_write(remove_efd, 1);
if (ret < 0)
@@ -133,45 +117,6 @@ static inline void remove_sheep(struct sheep *sheep)
event_force_refresh();
}
-static int master_election(void)
-{
- int ret, nr_failed = 0;
- struct sheep *s;
- struct sph_msg msg;
-
- assert(!master_sheep);
-
- if (!nr_joined_sheep)
- return 0;
-
- list_for_each_entry(s, &sheep_list_head, sheep_list) {
- if (s->state != SHEEP_STATE_JOINED)
- continue;
-
- msg.type = SPH_SRV_MSG_MASTER_ELECTION;
- msg.body_len = 0;
-
- ret = xwrite(s->fd, &msg, sizeof(msg));
- if (sizeof(msg) != ret) {
- sd_eprintf("xwrite() for failed: %m");
- goto election_failed;
- }
-
- master_sheep = s;
- break;
-election_failed:
- remove_sheep(s);
- nr_failed++;
- }
-
- if (master_sheep) {
- sd_iprintf("new master elected: %s",
- node_to_str(&master_sheep->node));
- }
-
- return nr_failed;
-}
-
static int notify_remove_sheep(struct sheep *leaving)
{
int ret, failed = 0;
@@ -204,7 +149,6 @@ static void remove_handler(int fd, int events, void *data)
struct sheep *s;
int ret, failed = 0;
eventfd_t val;
- bool election = false;
ret = eventfd_read(remove_efd, &val);
if (ret < 0)
@@ -222,13 +166,6 @@ remove:
sd_printf(SDOG_DEBUG, "removing the node: %s",
node_to_str(&s->node));
- if (s == master_sheep) {
- sd_printf(SDOG_DEBUG, "removing the master");
-
- master_sheep = NULL;
- election = true;
- }
-
if (!is_sd_node_zero(&s->node))
/*
* This condition can be false when the sheep had
@@ -264,13 +201,6 @@ del:
goto remove;
end:
- if (election) {
- sd_dprintf("master is removed, electing new master");
- failed = master_election();
-
- assert(nr_joined_sheep ? !!master_sheep : true);
- }
-
sd_dprintf("nodes which failed during remove_handler(): %d", failed);
}
@@ -344,28 +274,23 @@ static void sph_handle_join(struct sph_msg *msg, struct sheep *sheep)
}
sheep->node = join->new_node;
+ join->nr_nodes = build_node_array(join->nodes);
snd.type = SPH_SRV_MSG_NEW_NODE;
snd.body_len = msg->body_len;
- if (!nr_joined_sheep) {
- /* this sheep is a new master */
- /* FIXME: is this master_elected need? */
- join->master_elected = true;
+ /* elect one node from the already joined nodes */
+ if (join->nr_nodes > 0) {
+ struct sd_node *n = join->nodes + rand() % join->nr_nodes;
+ fd = find_sheep_by_nid(&n->nid)->fd;
}
- assert(nr_joined_sheep ? !!master_sheep : true);
-
- wbytes = writev2(!nr_joined_sheep ? fd : master_sheep->fd,
- &snd, join, msg->body_len);
+ wbytes = writev2(fd, &snd, join, msg->body_len);
free(join);
if (sizeof(snd) + msg->body_len != wbytes) {
sd_eprintf("writev2() failed: %m");
- if (nr_joined_sheep)
- remove_sheep(master_sheep);
-
goto purge_current_sheep;
}
@@ -390,12 +315,6 @@ static void sph_handle_accept(struct sph_msg *msg, struct sheep *sheep)
struct sph_msg_join_reply *join_reply_body;
struct sph_msg_join_node_finish *join_node_finish;
- if (nr_joined_sheep && sheep != master_sheep) {
- sd_eprintf("sheep which is not a master replied "
- "SPH_CLI_MSG_NEW_NODE_REPLY");
- goto purge_current_sheep;
- }
-
sd_dprintf("new node reply from %s", node_to_str(&sheep->node));
join = xzalloc(msg->body_len);
@@ -410,16 +329,7 @@ static void sph_handle_accept(struct sph_msg *msg, struct sheep *sheep)
sd_dprintf("joining node is %s", node_to_str(&join->new_node));
joining_sheep = find_sheep_by_nid(&join->new_node.nid);
- if (!joining_sheep) {
- /* master is broken */
- sd_eprintf("invalid nid is required, %s",
- node_to_str(&join->new_node));
- sd_eprintf("purging master sheep: %s and joining one",
- node_to_str(&master_sheep->node));
-
- remove_sheep(master_sheep);
- goto purge_current_sheep;
- }
+ assert(joining_sheep != NULL);
opaque_len = msg->body_len - sizeof(struct sph_msg_join);
opaque = xzalloc(opaque_len);
@@ -449,7 +359,6 @@ static void sph_handle_accept(struct sph_msg *msg, struct sheep *sheep)
if (sizeof(snd) + snd.body_len != wbytes) {
sd_eprintf("writev2() to master failed: %m");
- remove_sheep(master_sheep);
goto purge_current_sheep;
}
@@ -483,17 +392,7 @@ static void sph_handle_accept(struct sph_msg *msg, struct sheep *sheep)
free(opaque);
joining_sheep->state = SHEEP_STATE_JOINED;
- nr_joined_sheep++;
-
- if (nr_joined_sheep == 1) {
- assert(!master_sheep);
- assert(joining_sheep == sheep);
- master_sheep = sheep;
-
- sd_iprintf("new master elected: %s",
- node_to_str(&sheep->node));
- }
state = SPH_STATE_DEFAULT;
removed += release_joining_sheep();
--
1.7.9.5
More information about the sheepdog
mailing list