[sheepdog] [PATCH v3 2/2] sheepkeeper: a new cluster manager specialized for sheepdog

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Fri Nov 16 09:25:59 CET 2012


At Wed, 14 Nov 2012 15:44:08 +0900,
Hitoshi Mitake wrote:
> 
> From: Hitoshi Mitake <h.mitake at gmail.com>
> 
> This patch adds sheepkeeper, a new cluster manager for sheepdog.
> 
> sheepkeeper cluster manager is designed as master-slave. So it can be
> SPOF. But I'm planning to let it be redundant by utilizing corosync.
> If it can be improved, this can be an alternative of the group
> management part of ZooKeeper, and it doesn't depend on JVM.
> 
> This patch adds the new directory sheepkeeper/ for it, and the
> executable binary will be produced as sheepkeeper/sheepkeeper.
> The cluster driver for sheep is added as sheep/cluster/sheepkeeper.c.
> 
> If you have some opinions or feature requests for a cluster manager
> designed and implemented from scratch, I'd like to hear your comments.
> 
> Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
> ---
> 
> v2
>  * lots of cleaning and bug fix
>  * sane command line option handling
>  * a little bit improvement of leave handling
>  * not call sd_join_handler() in sheepkeeper_join()
>  * add new option --enable-sheepkeeper to configure script for
>    enable/disable building sheepkeeper
> 
> v3
>  * lots of cleaning and bug fix
>  * suppress some compiler warnings
>  * handle leave of sheeps in saner way. this also reduces the problem
>    of odd handling of return values from xread/xwrite
>  * default log of sheepkeeper is /var/log/sheepkeeper.c, and users can
>    specify custom location with -l option

I didn't notice that you wrote change logs here, sorry.  But it's
better to write them in the cover letter.


>  AM_CONDITIONAL(BUILD_ACCORD, test x$enable_accord = xyes)
>  
> +AC_ARG_ENABLE([sheepkeeper],
> +	[  --enable-sheepkeeper    : build sheepkeeper and its cluster driver ],,
> +	[ enable_sheepkeeper="no" ],)
> +AM_CONDITIONAL(BUILD_SHEEPKEEPER, test x$enable_sheepkeeper = xyes)
> +

Sheepkeeper requires no external library, doesn't it?  If yes, I think
we don't need this configure option.


> +
> +struct sk_msg {
> +	enum sk_msg_type type;
> +	int body_len;
> +};
> +
> +#include "internal_proto.h"
> +
> +struct sk_msg_join {
> +	struct sd_node node;
> +	bool leader_elected;
> +	char opaque[0];
> +};
> +
> +struct sk_msg_join_reply {
> +	struct sd_node nodes[SD_MAX_NODES];
> +	int nr_nodes;
> +	char opaque[0];
> +};
> +
> +struct sk_msg_join_node_finish {
> +	struct sd_node new_node;
> +
> +	struct sd_node nodes[SD_MAX_NODES];
> +	int nr_nodes;
> +	char opaque[0];
> +};
> +
> +struct sk_msg_notify_forward {
> +	struct sd_node from_node;
> +	char notify_msg[0];
> +};

These structure can be shared between different architecture, no?  If
yes, please use types defined in stdint.h and aligned them to 8 bytes
offset.

> +
> +static void read_msg(void)
> +{
> +	int i, j, ret;
> +	struct sk_msg rcv, snd;
> +	struct sd_node sender;
> +	struct iovec iov[2];
> +	struct sk_msg_join *join;
> +	struct sk_msg_notify_forward *notify_forward;
> +	struct sk_msg_join_node_finish *join_node_finish;
> +	struct join_message *jm;
> +
> +
> +	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
> +	if (ret != sizeof(rcv)) {
> +		vprintf(SDOG_ERR, "xread() failed\n");
> +		exit(1);
> +	}
> +
> +	switch (rcv.type) {
> +	case SK_MSG_NEW_NODE:
> +		join = xmalloc(rcv.body_len);
> +		ret = xread(sk_comm_fd, join, rcv.body_len);
> +		if (ret != rcv.body_len) {
> +			vprintf(SDOG_ERR, "xread() failed\n");
> +			exit(1);
> +		}
> +
> +		if (is_leader)
> +			sd_check_join_cb(&join->node, join->opaque);
> +
> +		memset(&snd, 0, sizeof(snd));
> +		snd.type = SK_MSG_NEW_NODE_REPLY;
> +		snd.body_len = rcv.body_len;
> +
> +		iov[0].iov_base = &snd;
> +		iov[0].iov_len = sizeof(snd);
> +
> +		iov[1].iov_base = join;
> +		iov[1].iov_len = rcv.body_len;
> +
> +		ret = writev(sk_comm_fd, iov, 2);
> +		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> +			vprintf(SDOG_ERR, "writev() failed\n");
> +			exit(1);
> +		}
> +
> +		break;
> +
> +	case SK_MSG_NEW_NODE_FINISH:
> +		join_node_finish = xmalloc(rcv.body_len);
> +		ret = xread(sk_comm_fd, join_node_finish, rcv.body_len);
> +		if (ret != rcv.body_len) {
> +			vprintf(SDOG_ERR, "xread() failed\n");
> +			exit(1);
> +		}
> +
> +		jm = (struct join_message *)join_node_finish->opaque;
> +		memcpy(nodes, join_node_finish->nodes,
> +			join_node_finish->nr_nodes * sizeof(struct sd_node));
> +		nr_nodes = join_node_finish->nr_nodes;
> +
> +		sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
> +				CJ_RES_SUCCESS, jm);
> +
> +		break;
> +
> +	case SK_MSG_NOTIFY_FORWARD:
> +		notify_forward = xmalloc(rcv.body_len);
> +		ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
> +		if (ret != rcv.body_len) {
> +			vprintf(SDOG_ERR, "xread() failed\n");
> +			exit(1);
> +		}
> +
> +		sd_notify_handler(&notify_forward->from_node,
> +				notify_forward->notify_msg,
> +				rcv.body_len - sizeof(*notify_forward));
> +
> +		snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;

snd.body_len = 0 is missing?

> +		ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
> +		if (ret != sizeof(snd)) {
> +			vprintf(SDOG_ERR, "xwrite() failed\n");
> +			exit(1);
> +		}
> +
> +		break;
> +
> +	case SK_MSG_BLOCK_FORWARD:
> +		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
> +		if (ret != sizeof(sender)) {
> +			vprintf(SDOG_ERR, "xread() failed\n");
> +			exit(1);
> +		}
> +
> +		snd.type = SK_MSG_BLOCK_FORWARD_REPLY;
> +		snd.body_len = 0;
> +		ret = xwrite(sk_comm_fd, &snd, sizeof(struct sk_msg));
> +		if (ret != sizeof(snd)) {
> +			vprintf(SDOG_ERR, "xwrite() failed\n");
> +			exit(1);
> +		}
> +
> +		break;
> +
> +	case SK_MSG_LEAVE_FORWARD:
> +		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
> +		if (ret != sizeof(sender)) {
> +			vprintf(SDOG_ERR, "xread() failed\n");
> +			exit(1);
> +		}
> +
> +		for (i = 0; i < nr_nodes; i++) {
> +			if (!memcmp(&sender, &nodes[i], sizeof(sender))) {
> +				nr_nodes--;
> +
> +				for (j = i; j < nr_nodes; j++)
> +					nodes[j] = nodes[j + 1];
> +
> +				goto removed;
> +			}
> +		}
> +
> +		vprintf(SDOG_INFO, "leave message from unknown node\n");
> +		exit(1);
> +
> +removed:
> +		sd_leave_handler(&sender, nodes, nr_nodes);
> +		break;
> +
> +	case SK_MSG_LEADER_ELECTION:
> +		is_leader = true;
> +		vprintf(SDOG_INFO, "became new leader\n");
> +
> +		snd.type = SK_MSG_LEADER_ELECTION_REPLY;
> +		snd.body_len = 0;
> +
> +		ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
> +		if (-1 == ret) {
> +			vprintf(SDOG_ERR, "xwrite() failed: %m\n");
> +			exit(1);
> +		}
> +
> +		break;
> +
> +	default:
> +		vprintf(SDOG_ERR, "invalid message from sheepkeeper: %d, "
> +			"length: %d\n", rcv.type, rcv.body_len);
> +		exit(1);
> +		break;
> +	}
> +}
> +
> +static void read_msg_from_sheepkeeper(void)
> +{
> +	switch (state) {
> +	case state_pre_join:
> +		read_msg_pre_join();
> +		break;
> +
> +	case state_joined:
> +		read_msg();
> +		break;
> +
> +	default:
> +		panic("invalid state of sheepkeeper cluster driver: %d\n",
> +			state);
> +		break;
> +	};
> +}
> +
> +static void sheepkeeper_comm_handler(int fd, int events, void *data)
> +{
> +	assert(fd == sk_comm_fd);
> +	assert(data == NULL);
> +
> +	if (events & EPOLLIN)
> +		read_msg_from_sheepkeeper();
> +}
> +
> +static void init_addr_port(const char *s, struct sockaddr_in *addr)
> +{
> +	/* format: <address>[:<port>] */
> +	char *sep, *copied, *p;
> +	uint8_t tmp_addr[16];
> +	bool addr_only = false;
> +
> +	copied = xcalloc(strlen(s) + 1, sizeof(char));
> +	memcpy(copied, s, strlen(s));

copied = strdup(s) looks simpler.


> +
> +static void read_msg_from_sheep(struct sheep *sheep)
> +{
> +	int ret, fd = sheep->fd;
> +	struct sk_msg rcv, snd;
> +	struct sk_msg_join *join;
> +	struct iovec iov[2];
> +
> +	struct sheep *s, *s2;
> +
> +	struct sk_msg_notify_forward *notify_forward;
> +
> +	struct sk_msg join_reply;
> +	struct sk_msg_join_reply *reply_body;
> +
> +	struct sk_msg_join_node_finish *finish;
> +
> +	memset(&rcv, 0, sizeof(rcv));
> +
> +	ret = xread(fd, &rcv, sizeof(rcv));
> +	if (-1 == ret) {
> +		vprintf(SDOG_ERR, "xread() failed: %m\n");
> +		goto purge_current_sheep;
> +	}
> +
> +	switch (rcv.type) {
> +	case SK_MSG_JOIN:
> +		if (state == SK_STATE_JOINING) {
> +			memset(&snd, 0, sizeof(snd));
> +			snd.type = SK_MSG_JOIN_RETRY;
> +
> +			ret = xwrite(fd, &snd, sizeof(snd));
> +			if (-1 == ret) {
> +				vprintf(SDOG_ERR, "xwrite() failed: %m\n");
> +				goto purge_current_sheep;
> +			}
> +
> +			break;
> +		}
> +
> +		join = xmalloc(rcv.body_len);
> +		ret = xread(fd, join, rcv.body_len);
> +		if (-1 == ret) {
> +			vprintf(SDOG_ERR, "xread() for reading the body of" \
> +				" SK_MSG_JOIN failed: %m\n");
> +
> +			free(join);
> +			goto purge_current_sheep;
> +		}
> +
> +		sheep->node = join->node;
> +
> +		snd.type = SK_MSG_NEW_NODE;
> +		snd.body_len = rcv.body_len;
> +
> +		iov[0].iov_base = &snd;
> +		iov[0].iov_len = sizeof(struct sk_msg);
> +
> +		iov[1].iov_base = join;
> +		iov[1].iov_len = rcv.body_len;
> +
> +		if (!nr_joined_sheep) {
> +			/* this sheep is a new leader */
> +			join->leader_elected = true;
> +		}
> +		ret = writev(!nr_joined_sheep ?
> +			fd : leader_sheep->fd, iov, 2);
> +		free(join);
> +		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> +			vprintf(SDOG_ERR, "writev() for sending"	\
> +				" SK_MSG_NEW_NODE failed: %m\n");
> +			goto purge_current_sheep;
> +		}
> +
> +		state = SK_STATE_JOINING;
> +		break;
> +
> +	case SK_MSG_NEW_NODE_REPLY:
> +		if (nr_joined_sheep && sheep != leader_sheep) {
> +			vprintf(SDOG_ERR, "not leader sheep replied "	\
> +				"SK_MSG_NEW_NODE_REPLY\n");
> +			goto purge_current_sheep;
> +		}
> +
> +		vprintf(SDOG_INFO, "new node reply from %s\n",
> +			node_to_str(&sheep->node));
> +
> +		join = xmalloc(rcv.body_len);
> +		ret = xread(fd, join, rcv.body_len);
> +		if (-1 == ret) {
> +			vprintf(SDOG_ERR, "xread() failed: %m\n");
> +			free(join);
> +
> +			goto purge_current_sheep;
> +		}
> +
> +		s = find_sheep_by_nid(&join->node.nid);
> +		if (!s) {
> +			/* leader is broken */
> +			panic("invalid nid is required, %s\n",
> +				node_to_str(&join->node));
> +		}
> +
> +		s->kept_join_msg_len = rcv.body_len -
> +			sizeof(struct sk_msg_join);
> +		s->kept_join_msg = xmalloc(s->kept_join_msg_len);
> +		memcpy(s->kept_join_msg, join->opaque, s->kept_join_msg_len);
> +
> +		memset(&join_reply, 0, sizeof(struct sk_msg));
> +		join_reply.type = SK_MSG_JOIN_REPLY;
> +		join_reply.body_len = sizeof(struct sk_msg_join_reply)
> +			+ s->kept_join_msg_len;
> +		iov[0].iov_base = &join_reply;
> +		iov[0].iov_len = sizeof(struct sk_msg);
> +
> +		reply_body = xmalloc(join_reply.body_len);

Initialize with zero or use xzalloc.  Otherwise, uninitialized data
will be sent to another machine.

> +
> +		reply_body->nr_nodes = build_node_array(reply_body->nodes);
> +		memcpy(reply_body->opaque, s->kept_join_msg,
> +			s->kept_join_msg_len);
> +
> +		iov[1].iov_base = reply_body;
> +		iov[1].iov_len = join_reply.body_len;
> +
> +		ret = writev(s->fd, iov, 2);
> +		free(reply_body);
> +		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> +			vprintf(SDOG_ERR, "writev() failed: %m\n");
> +			goto purge_current_sheep;
> +		}
> +
> +		snd.type = SK_MSG_NEW_NODE_FINISH;
> +		snd.body_len = sizeof(*finish) + s->kept_join_msg_len;
> +
> +		finish = xmalloc(snd.body_len);

Same here.

Anyway, the current sheepkeeper is too unstable.  Can you send this
series after you pass the following test?

* setup sheepkeeper
 $ valgrind ./sheepkeeper/sheepkeeper -f

* run tests
 $ cd tests
 $ DRIVER=sheepkeeper:127.0.0.1 ./check -e -valgrind

Thanks,

Kazutaka



More information about the sheepdog mailing list