[sheepdog] [PATCH v3] sheep: remove master node
Liu Yuan
namei.unix at gmail.com
Tue Jul 23 06:44:15 CEST 2013
On Fri, Jul 19, 2013 at 01:01:01PM +0900, MORITA Kazutaka wrote:
> From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
>
> The current procedure to handle sheep join is as follows.
>
> 1. The joining node sends a join request.
> 2. The master node accepts the request.
> 3. All the nodes update cluster members.
>
> This procedure has some problems:
>
> - The master election is too complex to maintain.
> It is very difficult to make sure that the implementation is
> correct.
>
> - The master node can fail while it is accepting the joining node.
> The newly elected master has to take over the process, but it's
> usually difficult to implement because we have to know what the
> previous master did and what it did not before its failure.
>
> This patch changes the sheep join procedure to the following.
>
> 1. The joining node sends a join request.
> 2. Some of the existing nodes accept the request.
> 3. All the nodes update cluster members.
>
> It is allowed for the multiple nodes to call sd_join_handler() against
> the same join request, but at least one node must have to do it. With
> this change, we can eliminate a master, and node failure while
> accepting node join is also allowed.
>
> Removing a master from zookeeper is not easy since it doesn't expect
> that multiple nodes send EVENT_ACCEPT. I'll leave this for another
> day.
>
> Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
> ---
>
> v3:
> - remove the zk driver
>
> v2:
> - rebase onto the master branch
>
>
> include/shepherd.h | 3 --
> sheep/cluster.h | 2 +-
> sheep/cluster/corosync.c | 60 +++---------------------
> sheep/cluster/local.c | 38 +++++++--------
> sheep/cluster/shepherd.c | 43 +++++------------
> sheep/group.c | 12 ++++-
> shepherd/shepherd.c | 115 +++-------------------------------------------
> 7 files changed, 54 insertions(+), 219 deletions(-)
>
> diff --git a/include/shepherd.h b/include/shepherd.h
> index 9a4cf81..95b4b8b 100644
> --- a/include/shepherd.h
> +++ b/include/shepherd.h
> @@ -23,7 +23,6 @@ enum sph_srv_msg_type {
> SPH_SRV_MSG_LEAVE_FORWARD,
>
> SPH_SRV_MSG_REMOVE,
> - SPH_SRV_MSG_MASTER_ELECTION,
> };
>
> struct sph_msg {
> @@ -39,7 +38,6 @@ struct sph_msg {
>
> struct sph_msg_join {
> struct sd_node new_node;
> - uint8_t master_elected;
>
> struct sd_node nodes[SD_MAX_NODES];
> uint32_t nr_nodes;
> @@ -116,7 +114,6 @@ static inline const char *sph_srv_msg_to_str(enum sph_srv_msg_type msg)
> { SPH_SRV_MSG_NOTIFY_FORWARD, "SPH_SRV_MSG_NOTIFY_FORWARD" },
> { SPH_SRV_MSG_BLOCK_FORWARD, "SPH_SRV_MSG_BLOCK_FORWARD" },
> { SPH_SRV_MSG_REMOVE, "SPH_SRV_MSG_REMOVE" },
> - { SPH_SRV_MSG_MASTER_ELECTION, "SPH_SRV_MSG_MASTER_ELECTION" },
> };
>
> for (i = 0; i < ARRAY_SIZE(msgs); i++) {
> diff --git a/sheep/cluster.h b/sheep/cluster.h
> index 2ec0a14..354896e 100644
> --- a/sheep/cluster.h
> +++ b/sheep/cluster.h
> @@ -171,7 +171,7 @@ void sd_notify_handler(const struct sd_node *sender, void *msg, size_t msg_len);
> bool sd_block_handler(const struct sd_node *sender);
> int sd_reconnect_handler(void);
> void sd_update_node_handler(struct sd_node *);
> -void sd_join_handler(const struct sd_node *joining,
> +bool sd_join_handler(const struct sd_node *joining,
> const struct sd_node *nodes, size_t nr_nodes,
> void *opaque);
> void recalculate_vnodes(struct sd_node *nodes, int nr_nodes);
> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> index 577d0cb..ef403bf 100644
> --- a/sheep/cluster/corosync.c
> +++ b/sheep/cluster/corosync.c
> @@ -24,7 +24,6 @@
> struct cpg_node {
> uint32_t nodeid;
> uint32_t pid;
> - uint32_t gone;
> struct sd_node node;
> };
>
> @@ -238,28 +237,6 @@ find_event(enum corosync_event_type type, struct cpg_node *sender)
> return find_nonblock_event(type, sender);
> }
>
> -static int is_master(struct cpg_node *node)
> -{
> - int i;
> - struct cpg_node *n = node;
> - if (!n)
> - n = &this_node;
> - if (nr_cpg_nodes == 0)
> - /* this node should be the first cpg node */
> - return 0;
> -
> - for (i = 0; i < SD_MAX_NODES; i++) {
> - if (!cpg_nodes[i].gone)
> - goto eq_check;
> - }
> - return -1;
> -
> -eq_check:
> - if (cpg_node_equal(&cpg_nodes[i], n))
> - return i;
> - return -1;
> -}
> -
> static void build_node_list(const struct cpg_node *nodes, size_t nr_nodes,
> struct sd_node *entries)
> {
> @@ -282,9 +259,6 @@ static bool __corosync_dispatch_one(struct corosync_event *cevent)
>
> switch (cevent->type) {
> case COROSYNC_EVENT_TYPE_JOIN:
> - if (is_master(&this_node) < 0)
> - return false;
> -
> if (!cevent->msg)
> /* we haven't receive JOIN yet */
> return false;
> @@ -294,13 +268,14 @@ static bool __corosync_dispatch_one(struct corosync_event *cevent)
> return false;
>
> build_node_list(cpg_nodes, nr_cpg_nodes, entries);
> - sd_join_handler(&cevent->sender.node, entries, nr_cpg_nodes,
> - cevent->msg);
> - send_message(COROSYNC_MSG_TYPE_ACCEPT, &cevent->sender,
> - cpg_nodes, nr_cpg_nodes, cevent->msg,
> - cevent->msg_len);
> + if (sd_join_handler(&cevent->sender.node, entries,
> + nr_cpg_nodes, cevent->msg)) {
> + send_message(COROSYNC_MSG_TYPE_ACCEPT, &cevent->sender,
> + cpg_nodes, nr_cpg_nodes, cevent->msg,
> + cevent->msg_len);
>
> - cevent->callbacked = true;
> + cevent->callbacked = true;
> + }
> return false;
> case COROSYNC_EVENT_TYPE_ACCEPT:
> add_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
> @@ -469,7 +444,6 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
> {
> struct corosync_event *cevent;
> struct corosync_message *cmsg = msg;
> - int master;
>
> sd_dprintf("%d", cmsg->type);
>
> @@ -521,16 +495,6 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
> case COROSYNC_MSG_TYPE_LEAVE:
> cevent = xzalloc(sizeof(*cevent));
> cevent->type = COROSYNC_EVENT_TYPE_LEAVE;
> -
> - master = is_master(&cmsg->sender);
> - if (master >= 0)
> - /*
> - * Master is down before new nodes finish joining.
> - * We have to revoke its mastership to avoid cluster
> - * hanging
> - */
> - cpg_nodes[master].gone = 1;
> -
> cevent->sender = cmsg->sender;
> cevent->msg_len = cmsg->msg_len;
> if (cmsg->msg_len) {
> @@ -614,7 +578,6 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
>
> /* dispatch leave_handler */
> for (i = 0; i < left_list_entries; i++) {
> - int master;
> cevent = find_event(COROSYNC_EVENT_TYPE_JOIN, left_sheep + i);
> if (cevent) {
> /* the node left before joining */
> @@ -633,15 +596,6 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
> }
>
> cevent = xzalloc(sizeof(*cevent));
> - master = is_master(&left_sheep[i]);
> - if (master >= 0)
> - /*
> - * Master is down before new nodes finish joining.
> - * We have to revoke its mastership to avoid cluster
> - * hanging
> - */
> - cpg_nodes[master].gone = 1;
> -
> cevent->type = COROSYNC_EVENT_TYPE_LEAVE;
> cevent->sender = left_sheep[i];
>
> diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
> index 9d109ca..9135ab7 100644
> --- a/sheep/cluster/local.c
> +++ b/sheep/cluster/local.c
> @@ -408,37 +408,33 @@ static bool local_process_event(void)
> if (ev->callbacked)
> return false; /* wait for unblock event */
>
> - if (ev->type == EVENT_ACCEPT && lnode_eq(&this_node, &ev->sender)) {
> - sd_dprintf("join Sheepdog");
> - joined = true;
> - }
> -
> if (!joined) {
> - if (ev->type == EVENT_JOIN &&
> - lnode_eq(&this_node, &ev->sender)) {
> - struct local_node lnodes[SD_MAX_NODES];
> -
> - get_nodes(lnodes);
> + if (!lnode_eq(&this_node, &ev->sender))
> + goto out;
>
> - if (!lnode_eq(&this_node, &lnodes[0])) {
> - sd_dprintf("wait for another node"
> - " to accept this node");
> - return false;
> - }
> - } else
> + switch (ev->type) {
> + case EVENT_JOIN:
> + break;
> + case EVENT_ACCEPT:
> + sd_dprintf("join Sheepdog");
> + joined = true;
> + break;
> + default:
> goto out;
> + }
> }
>
> switch (ev->type) {
> case EVENT_JOIN:
> /* nodes[nr_nodes - 1] is a sender, so don't include it */
> assert(node_eq(&ev->sender.node, &nodes[nr_nodes - 1]));
> - sd_join_handler(&ev->sender.node, nodes, nr_nodes - 1,
> - ev->buf);
> - ev->type = EVENT_ACCEPT;
> - msync(ev, sizeof(*ev), MS_SYNC);
> + if (sd_join_handler(&ev->sender.node, nodes, nr_nodes - 1,
> + ev->buf)) {
> + ev->type = EVENT_ACCEPT;
> + msync(ev, sizeof(*ev), MS_SYNC);
>
> - shm_queue_notify();
> + shm_queue_notify();
> + }
>
> return false;
> case EVENT_ACCEPT:
> diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c
> index 49fcf8d..b2ab92d 100644
> --- a/sheep/cluster/shepherd.c
> +++ b/sheep/cluster/shepherd.c
> @@ -30,7 +30,6 @@
> static int sph_comm_fd;
>
> static struct sd_node this_node;
> -static bool is_master;
>
> static int nr_nodes;
> static struct sd_node nodes[SD_MAX_NODES];
> @@ -114,11 +113,8 @@ retry:
> * FIXME: member change events must be ordered with nonblocked
> * events
> */
> - sd_join_handler(&join->new_node, NULL, 0, join->opaque);
> -
> - /* FIXME: join->master_elected is needed? */
> - assert(join->master_elected);
> - is_master = true;
> + if (!sd_join_handler(&join->new_node, NULL, 0, join->opaque))
> + panic("sd_accept_handler() failed");
>
> snd.type = SPH_CLI_MSG_ACCEPT;
> snd.body_len = join_len;
> @@ -297,26 +293,12 @@ static void sph_event_handler(int fd, int events, void *data)
> ;
> }
>
> -static void elected_as_master(void)
> -{
> - sd_dprintf("elected_as_master() called");
> -
> - is_master = true;
> - sd_iprintf("became new master");
> -}
> -
> static void msg_new_node(struct sph_msg *rcv)
> {
> int ret;
> struct sph_msg_join *join;
> struct sph_msg snd;
>
> - if (!is_master) {
> - sd_printf(SDOG_EMERG, "I am not a master but received"
> - " SPH_MSG_NEW_NODE, shepherd is buggy");
> - exit(1);
> - }
> -
> join = xzalloc(rcv->body_len);
> ret = xread(sph_comm_fd, join, rcv->body_len);
> if (ret != rcv->body_len) {
> @@ -325,7 +307,13 @@ static void msg_new_node(struct sph_msg *rcv)
> }
>
> /* FIXME: member change events must be ordered with nonblocked events */
> - sd_join_handler(&join->new_node, nodes, nr_nodes, join->opaque);
> + if (!sd_join_handler(&join->new_node, join->nodes, join->nr_nodes,
> + join->opaque))
> + /*
> + * This should succeed always because shepherd should have sent
> + * SPH_SRV_MSG_NEW_NODE only to the already joined node.
> + */
> + panic("sd_join_handler() failed");
>
> memset(&snd, 0, sizeof(snd));
> snd.type = SPH_CLI_MSG_ACCEPT;
> @@ -342,7 +330,6 @@ static void msg_new_node(struct sph_msg *rcv)
> static void msg_new_node_finish(struct sph_msg *rcv)
> {
> int ret;
> - struct join_message *jm;
> struct sph_msg_join_node_finish *join_node_finish;
>
> join_node_finish = xzalloc(rcv->body_len);
> @@ -352,7 +339,6 @@ static void msg_new_node_finish(struct sph_msg *rcv)
> exit(1);
> }
>
> - jm = (struct join_message *)join_node_finish->opaque;
> memcpy(nodes, join_node_finish->nodes,
> join_node_finish->nr_nodes * sizeof(struct sd_node));
> nr_nodes = join_node_finish->nr_nodes;
> @@ -361,7 +347,8 @@ static void msg_new_node_finish(struct sph_msg *rcv)
> node_to_str(&join_node_finish->new_node));
>
> /* FIXME: member change events must be ordered with nonblocked events */
> - sd_accept_handler(&join_node_finish->new_node, nodes, nr_nodes, jm);
> + sd_accept_handler(&join_node_finish->new_node, nodes, nr_nodes,
> + join_node_finish->opaque);
>
> free(join_node_finish);
> }
> @@ -441,11 +428,6 @@ static void msg_leave_forward(struct sph_msg *rcv)
> do_leave_sheep();
> }
>
> -static void msg_master_election(struct sph_msg *rcv)
> -{
> - elected_as_master();
> -}
> -
> static void (*msg_handlers[])(struct sph_msg *) = {
> [SPH_SRV_MSG_NEW_NODE] = msg_new_node,
> [SPH_SRV_MSG_NEW_NODE_FINISH] = msg_new_node_finish,
> @@ -453,7 +435,6 @@ static void (*msg_handlers[])(struct sph_msg *) = {
> [SPH_SRV_MSG_BLOCK_FORWARD] = msg_block_forward,
> [SPH_SRV_MSG_REMOVE] = msg_remove,
> [SPH_SRV_MSG_LEAVE_FORWARD] = msg_leave_forward,
> - [SPH_SRV_MSG_MASTER_ELECTION] = msg_master_election,
> };
>
> static void interpret_msg(struct sph_msg *rcv)
> @@ -593,8 +574,6 @@ static int shepherd_leave(void)
> exit(1);
> }
>
> - is_master = false;
> -
> sd_dprintf("shepherd_leave() is completed");
>
> return 0;
> diff --git a/sheep/group.c b/sheep/group.c
> index 4a2a83b..054292d 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -779,14 +779,22 @@ void sd_notify_handler(const struct sd_node *sender, void *data,
> * Accept the joining node and pass the cluster info to it.
> *
> * Note that 'nodes' doesn't contain 'joining'.
> + *
> + * Return true if the joining node is accepted. At least one nodes in the
> + * cluster must call this function and succeed in accept of the joining node.
> */
> -void sd_join_handler(const struct sd_node *joining,
> +bool sd_join_handler(const struct sd_node *joining,
> const struct sd_node *nodes, size_t nr_nodes,
> void *opaque)
> {
> struct join_message *jm = opaque;
> char str[MAX_NODE_STR_LEN];
>
> + if (nr_nodes > 0 && node_is_local(joining)) {
> + sd_dprintf("wait for another node to accept this node");
> + return false;
> + }
> +
Is there any case nr_nodes = 0? If not, we don't need this check
Thanks
Yuan
More information about the sheepdog
mailing list