[sheepdog] [PATCH v3 2/2] sheepkeeper: a new cluster manager specialized for sheepdog

Hitoshi Mitake h.mitake at gmail.com
Sat Nov 17 13:56:48 CET 2012


At Fri, 16 Nov 2012 17:25:59 +0900,
MORITA Kazutaka wrote:
> 
> 
> >  AM_CONDITIONAL(BUILD_ACCORD, test x$enable_accord = xyes)
> >  
> > +AC_ARG_ENABLE([sheepkeeper],
> > +	[  --enable-sheepkeeper    : build sheepkeeper and its cluster driver ],,
> > +	[ enable_sheepkeeper="no" ],)
> > +AM_CONDITIONAL(BUILD_SHEEPKEEPER, test x$enable_sheepkeeper = xyes)
> > +
> 
> Sheepkeeper requires no external library, doesn't it?  If yes, I think
> we don't need this configure option.
>

Ah, the configuration option has no meaning. I'll remove it in the next version.

> 
> 
> > +
> > +struct sk_msg {
> > +	enum sk_msg_type type;
> > +	int body_len;
> > +};
> > +
> > +#include "internal_proto.h"
> > +
> > +struct sk_msg_join {
> > +	struct sd_node node;
> > +	bool leader_elected;
> > +	char opaque[0];
> > +};
> > +
> > +struct sk_msg_join_reply {
> > +	struct sd_node nodes[SD_MAX_NODES];
> > +	int nr_nodes;
> > +	char opaque[0];
> > +};
> > +
> > +struct sk_msg_join_node_finish {
> > +	struct sd_node new_node;
> > +
> > +	struct sd_node nodes[SD_MAX_NODES];
> > +	int nr_nodes;
> > +	char opaque[0];
> > +};
> > +
> > +struct sk_msg_notify_forward {
> > +	struct sd_node from_node;
> > +	char notify_msg[0];
> > +};
> 
> These structure can be shared between different architecture, no?  If
> yes, please use types defined in stdint.h and aligned them to 8 bytes
> offset.
>

OK, I'll use types in stdint.h in the next version.

> 
> > +
> > +static void read_msg(void)
> > +{
> > +	int i, j, ret;
> > +	struct sk_msg rcv, snd;
> > +	struct sd_node sender;
> > +	struct iovec iov[2];
> > +	struct sk_msg_join *join;
> > +	struct sk_msg_notify_forward *notify_forward;
> > +	struct sk_msg_join_node_finish *join_node_finish;
> > +	struct join_message *jm;
> > +
> > +
> > +	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
> > +	if (ret != sizeof(rcv)) {
> > +		vprintf(SDOG_ERR, "xread() failed\n");
> > +		exit(1);
> > +	}
> > +
> > +	switch (rcv.type) {
> > +	case SK_MSG_NEW_NODE:
> > +		join = xmalloc(rcv.body_len);
> > +		ret = xread(sk_comm_fd, join, rcv.body_len);
> > +		if (ret != rcv.body_len) {
> > +			vprintf(SDOG_ERR, "xread() failed\n");
> > +			exit(1);
> > +		}
> > +
> > +		if (is_leader)
> > +			sd_check_join_cb(&join->node, join->opaque);
> > +
> > +		memset(&snd, 0, sizeof(snd));
> > +		snd.type = SK_MSG_NEW_NODE_REPLY;
> > +		snd.body_len = rcv.body_len;
> > +
> > +		iov[0].iov_base = &snd;
> > +		iov[0].iov_len = sizeof(snd);
> > +
> > +		iov[1].iov_base = join;
> > +		iov[1].iov_len = rcv.body_len;
> > +
> > +		ret = writev(sk_comm_fd, iov, 2);
> > +		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> > +			vprintf(SDOG_ERR, "writev() failed\n");
> > +			exit(1);
> > +		}
> > +
> > +		break;
> > +
> > +	case SK_MSG_NEW_NODE_FINISH:
> > +		join_node_finish = xmalloc(rcv.body_len);
> > +		ret = xread(sk_comm_fd, join_node_finish, rcv.body_len);
> > +		if (ret != rcv.body_len) {
> > +			vprintf(SDOG_ERR, "xread() failed\n");
> > +			exit(1);
> > +		}
> > +
> > +		jm = (struct join_message *)join_node_finish->opaque;
> > +		memcpy(nodes, join_node_finish->nodes,
> > +			join_node_finish->nr_nodes * sizeof(struct sd_node));
> > +		nr_nodes = join_node_finish->nr_nodes;
> > +
> > +		sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
> > +				CJ_RES_SUCCESS, jm);
> > +
> > +		break;
> > +
> > +	case SK_MSG_NOTIFY_FORWARD:
> > +		notify_forward = xmalloc(rcv.body_len);
> > +		ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
> > +		if (ret != rcv.body_len) {
> > +			vprintf(SDOG_ERR, "xread() failed\n");
> > +			exit(1);
> > +		}
> > +
> > +		sd_notify_handler(&notify_forward->from_node,
> > +				notify_forward->notify_msg,
> > +				rcv.body_len - sizeof(*notify_forward));
> > +
> > +		snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;
> 
> snd.body_len = 0 is missing?

SK_MSG_NOTIFY_FORWARD_REPLY doesn't have its body, so we don't have to
set its length.

> 
> > +		ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
> > +		if (ret != sizeof(snd)) {
> > +			vprintf(SDOG_ERR, "xwrite() failed\n");
> > +			exit(1);
> > +		}
> > +
> > +		break;
> > +
> > +	case SK_MSG_BLOCK_FORWARD:
> > +		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
> > +		if (ret != sizeof(sender)) {
> > +			vprintf(SDOG_ERR, "xread() failed\n");
> > +			exit(1);
> > +		}
> > +
> > +		snd.type = SK_MSG_BLOCK_FORWARD_REPLY;
> > +		snd.body_len = 0;
> > +		ret = xwrite(sk_comm_fd, &snd, sizeof(struct sk_msg));
> > +		if (ret != sizeof(snd)) {
> > +			vprintf(SDOG_ERR, "xwrite() failed\n");
> > +			exit(1);
> > +		}
> > +
> > +		break;
> > +
> > +	case SK_MSG_LEAVE_FORWARD:
> > +		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
> > +		if (ret != sizeof(sender)) {
> > +			vprintf(SDOG_ERR, "xread() failed\n");
> > +			exit(1);
> > +		}
> > +
> > +		for (i = 0; i < nr_nodes; i++) {
> > +			if (!memcmp(&sender, &nodes[i], sizeof(sender))) {
> > +				nr_nodes--;
> > +
> > +				for (j = i; j < nr_nodes; j++)
> > +					nodes[j] = nodes[j + 1];
> > +
> > +				goto removed;
> > +			}
> > +		}
> > +
> > +		vprintf(SDOG_INFO, "leave message from unknown node\n");
> > +		exit(1);
> > +
> > +removed:
> > +		sd_leave_handler(&sender, nodes, nr_nodes);
> > +		break;
> > +
> > +	case SK_MSG_LEADER_ELECTION:
> > +		is_leader = true;
> > +		vprintf(SDOG_INFO, "became new leader\n");
> > +
> > +		snd.type = SK_MSG_LEADER_ELECTION_REPLY;
> > +		snd.body_len = 0;
> > +
> > +		ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
> > +		if (-1 == ret) {
> > +			vprintf(SDOG_ERR, "xwrite() failed: %m\n");
> > +			exit(1);
> > +		}
> > +
> > +		break;
> > +
> > +	default:
> > +		vprintf(SDOG_ERR, "invalid message from sheepkeeper: %d, "
> > +			"length: %d\n", rcv.type, rcv.body_len);
> > +		exit(1);
> > +		break;
> > +	}
> > +}
> > +
> > +static void read_msg_from_sheepkeeper(void)
> > +{
> > +	switch (state) {
> > +	case state_pre_join:
> > +		read_msg_pre_join();
> > +		break;
> > +
> > +	case state_joined:
> > +		read_msg();
> > +		break;
> > +
> > +	default:
> > +		panic("invalid state of sheepkeeper cluster driver: %d\n",
> > +			state);
> > +		break;
> > +	};
> > +}
> > +
> > +static void sheepkeeper_comm_handler(int fd, int events, void *data)
> > +{
> > +	assert(fd == sk_comm_fd);
> > +	assert(data == NULL);
> > +
> > +	if (events & EPOLLIN)
> > +		read_msg_from_sheepkeeper();
> > +}
> > +
> > +static void init_addr_port(const char *s, struct sockaddr_in *addr)
> > +{
> > +	/* format: <address>[:<port>] */
> > +	char *sep, *copied, *p;
> > +	uint8_t tmp_addr[16];
> > +	bool addr_only = false;
> > +
> > +	copied = xcalloc(strlen(s) + 1, sizeof(char));
> > +	memcpy(copied, s, strlen(s));
> 
> copied = strdup(s) looks simpler.

Thanks for your advice, strdup() is more suitable for this
situation. I'll fix this in the next version.

> 
> 
> > +
> > +static void read_msg_from_sheep(struct sheep *sheep)
> > +{
> > +	int ret, fd = sheep->fd;
> > +	struct sk_msg rcv, snd;
> > +	struct sk_msg_join *join;
> > +	struct iovec iov[2];
> > +
> > +	struct sheep *s, *s2;
> > +
> > +	struct sk_msg_notify_forward *notify_forward;
> > +
> > +	struct sk_msg join_reply;
> > +	struct sk_msg_join_reply *reply_body;
> > +
> > +	struct sk_msg_join_node_finish *finish;
> > +
> > +	memset(&rcv, 0, sizeof(rcv));
> > +
> > +	ret = xread(fd, &rcv, sizeof(rcv));
> > +	if (-1 == ret) {
> > +		vprintf(SDOG_ERR, "xread() failed: %m\n");
> > +		goto purge_current_sheep;
> > +	}
> > +
> > +	switch (rcv.type) {
> > +	case SK_MSG_JOIN:
> > +		if (state == SK_STATE_JOINING) {
> > +			memset(&snd, 0, sizeof(snd));
> > +			snd.type = SK_MSG_JOIN_RETRY;
> > +
> > +			ret = xwrite(fd, &snd, sizeof(snd));
> > +			if (-1 == ret) {
> > +				vprintf(SDOG_ERR, "xwrite() failed: %m\n");
> > +				goto purge_current_sheep;
> > +			}
> > +
> > +			break;
> > +		}
> > +
> > +		join = xmalloc(rcv.body_len);
> > +		ret = xread(fd, join, rcv.body_len);
> > +		if (-1 == ret) {
> > +			vprintf(SDOG_ERR, "xread() for reading the body of" \
> > +				" SK_MSG_JOIN failed: %m\n");
> > +
> > +			free(join);
> > +			goto purge_current_sheep;
> > +		}
> > +
> > +		sheep->node = join->node;
> > +
> > +		snd.type = SK_MSG_NEW_NODE;
> > +		snd.body_len = rcv.body_len;
> > +
> > +		iov[0].iov_base = &snd;
> > +		iov[0].iov_len = sizeof(struct sk_msg);
> > +
> > +		iov[1].iov_base = join;
> > +		iov[1].iov_len = rcv.body_len;
> > +
> > +		if (!nr_joined_sheep) {
> > +			/* this sheep is a new leader */
> > +			join->leader_elected = true;
> > +		}
> > +		ret = writev(!nr_joined_sheep ?
> > +			fd : leader_sheep->fd, iov, 2);
> > +		free(join);
> > +		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> > +			vprintf(SDOG_ERR, "writev() for sending"	\
> > +				" SK_MSG_NEW_NODE failed: %m\n");
> > +			goto purge_current_sheep;
> > +		}
> > +
> > +		state = SK_STATE_JOINING;
> > +		break;
> > +
> > +	case SK_MSG_NEW_NODE_REPLY:
> > +		if (nr_joined_sheep && sheep != leader_sheep) {
> > +			vprintf(SDOG_ERR, "not leader sheep replied "	\
> > +				"SK_MSG_NEW_NODE_REPLY\n");
> > +			goto purge_current_sheep;
> > +		}
> > +
> > +		vprintf(SDOG_INFO, "new node reply from %s\n",
> > +			node_to_str(&sheep->node));
> > +
> > +		join = xmalloc(rcv.body_len);
> > +		ret = xread(fd, join, rcv.body_len);
> > +		if (-1 == ret) {
> > +			vprintf(SDOG_ERR, "xread() failed: %m\n");
> > +			free(join);
> > +
> > +			goto purge_current_sheep;
> > +		}
> > +
> > +		s = find_sheep_by_nid(&join->node.nid);
> > +		if (!s) {
> > +			/* leader is broken */
> > +			panic("invalid nid is required, %s\n",
> > +				node_to_str(&join->node));
> > +		}
> > +
> > +		s->kept_join_msg_len = rcv.body_len -
> > +			sizeof(struct sk_msg_join);
> > +		s->kept_join_msg = xmalloc(s->kept_join_msg_len);
> > +		memcpy(s->kept_join_msg, join->opaque, s->kept_join_msg_len);
> > +
> > +		memset(&join_reply, 0, sizeof(struct sk_msg));
> > +		join_reply.type = SK_MSG_JOIN_REPLY;
> > +		join_reply.body_len = sizeof(struct sk_msg_join_reply)
> > +			+ s->kept_join_msg_len;
> > +		iov[0].iov_base = &join_reply;
> > +		iov[0].iov_len = sizeof(struct sk_msg);
> > +
> > +		reply_body = xmalloc(join_reply.body_len);
> 
> Initialize with zero or use xzalloc.  Otherwise, uninitialized data
> will be sent to another machine.
> 
> > +
> > +		reply_body->nr_nodes = build_node_array(reply_body->nodes);
> > +		memcpy(reply_body->opaque, s->kept_join_msg,
> > +			s->kept_join_msg_len);
> > +
> > +		iov[1].iov_base = reply_body;
> > +		iov[1].iov_len = join_reply.body_len;
> > +
> > +		ret = writev(s->fd, iov, 2);
> > +		free(reply_body);
> > +		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> > +			vprintf(SDOG_ERR, "writev() failed: %m\n");
> > +			goto purge_current_sheep;
> > +		}
> > +
> > +		snd.type = SK_MSG_NEW_NODE_FINISH;
> > +		snd.body_len = sizeof(*finish) + s->kept_join_msg_len;
> > +
> > +		finish = xmalloc(snd.body_len);
> 
> Same here.

OK, I'll xzalloc() for them in the next version.

> 
> Anyway, the current sheepkeeper is too unstable.  Can you send this
> series after you pass the following test?
> 
> * setup sheepkeeper
>  $ valgrind ./sheepkeeper/sheepkeeper -f
> 
> * run tests
>  $ cd tests
>  $ DRIVER=sheepkeeper:127.0.0.1 ./check -e -valgrind
> 

Sorry for my buggy implementation. I'll check with the above tests and
documentation before next post.

Thanks for your review,
Hitoshi



More information about the sheepdog mailing list