[sheepdog] [PATCH] sheepkeeper: a new cluster manager specialized for sheepdog

MORITA Kazutaka morita.kazutaka at gmail.com
Sun Nov 11 20:57:17 CET 2012


> +	switch (rcv.type) {
> +	case SK_MSG_NEW_NODE:
> +		join = xmalloc(rcv.body_len);
> +		ret = xread(sk_comm_fd, join, rcv.body_len);
> +		if (ret != rcv.body_len) {
> +			vprintf(SDOG_ERR, "xread() failed\n");
> +			exit(1);
> +		}
> +
> +		if (is_master)
> +			sd_check_join_cb(&join->node, join->opaque);
> +
> +		bzero(&snd, sizeof(snd));

bzero is deprecated.  Please use memset.


> +static int sheepkeeper_init(const char *option)
> +{
> +	struct sockaddr_in addr;
> +
> +	sk_comm_fd = socket(AF_INET, SOCK_STREAM, 0);
> +	if (sk_comm_fd < 0) {
> +		vprintf(SDOG_INFO, "error at socket()\n");
> +		return -1;
> +	}
> +
> +	bzero(&addr, sizeof(struct sockaddr_in));

Use memset.

> +	addr.sin_family = AF_INET;
> +	addr.sin_port = SHEEPKEEPER_PORT;

The address and port of sheepkeeper need to be configurable.

> +
> +	if (connect(sk_comm_fd, &addr, sizeof(struct sockaddr_in))) {
> +		vprintf(SDOG_ERR, "cannot connect to sheepkeeper,"
> +			" is sheepkeeper running?\n");
> +		vprintf(SDOG_ERR, "errno: %s\n", strerror(errno));
> +		return -1;
> +	}
> +
> +	return 0;
> +}
> +
> +static int sheepkeeper_join(struct sd_node *myself,

Should be "const struct sd_node *myself".

> +		      void *opaque, size_t opaque_len)
> +{
> +	int ret, msg_join_len;
> +	struct iovec iov[2];
> +	struct sk_msg msg, rcv;
> +	struct sk_msg_join *msg_join;
> +	struct sk_msg_join_reply *join_reply;
> +
> +	msg_join_len = sizeof(struct sk_msg_join) + opaque_len;
> +
> +	bzero(&msg, sizeof(struct sk_msg));

Use memset.

> +	msg.type = SK_MSG_JOIN;
> +	msg.body_len = msg_join_len;
> +
> +	iov[0].iov_base = &msg;
> +	iov[0].iov_len = sizeof(struct sk_msg);
> +
> +	msg_join = calloc(opaque_len, sizeof(char));

The correct size looks to be msg_join_len.

> +	this_node = *myself;
> +	msg_join->node = this_node;
> +	memcpy(msg_join->opaque, opaque, opaque_len);
> +
> +	iov[1].iov_base = msg_join;
> +	iov[1].iov_len = msg_join_len;
> +
> +retry:
> +	ret = writev(sk_comm_fd, iov, 2);
> +	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> +		vprintf(SDOG_ERR, "sheepkeeper_join() failed, %s\n",
> +			strerror(errno));
> +		return -1;
> +	}
> +
> +	ret = xread(sk_comm_fd, &rcv, sizeof(struct sk_msg));
> +	if (rcv.type == SK_MSG_JOIN_RETRY) {
> +		vprintf(SDOG_INFO, "join request is rejected, retrying\n");
> +		goto retry;
> +	} else if (rcv.type == SK_MSG_NEW_NODE) {
> +		struct sk_msg_join *join;
> +		int join_len;
> +
> +		join_len = msg.body_len;
> +		join = xcalloc(join_len, sizeof(char));
> +		ret = xread(sk_comm_fd, join, join_len);
> +
> +		ret = sd_check_join_cb(&join->node, join->opaque);
> +
> +		if (join->master_elected) {
> +			vprintf(SDOG_INFO, "elected as master\n");
> +			is_master = true;
> +		}
> +
> +		msg.type = SK_MSG_NEW_NODE_REPLY;
> +		msg.body_len = join_len;
> +
> +		iov[0].iov_base = &msg;
> +		iov[0].iov_len = sizeof(struct sk_msg);
> +
> +		iov[1].iov_base = join;
> +		iov[1].iov_len = join_len;
> +
> +		ret = writev(sk_comm_fd, iov, 2);
> +		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> +			vprintf(SDOG_ERR, "writev() failed\n");
> +			exit(1);
> +		}
> +
> +		ret = xread(sk_comm_fd, &rcv, sizeof(struct sk_msg));
> +		if (ret != sizeof(rcv)) {
> +			vprintf(SDOG_ERR, "invalid ret: %d\n", ret);
> +			exit(1);
> +		}
> +	}
> +
> +	if (rcv.type != SK_MSG_JOIN_REPLY) {
> +		vprintf(SDOG_INFO, "invalid message from sheepkeeper, "
> +			"received message: %d\n", rcv.type);
> +		exit(1);
> +	}
> +
> +	join_reply = xmalloc(rcv.body_len);
> +	ret = xread(sk_comm_fd, join_reply, rcv.body_len);
> +	if (ret != rcv.body_len) {
> +		vprintf(SDOG_ERR, "xread() failed\n");
> +		exit(1);
> +	}
> +
> +	vprintf(SDOG_INFO, "join reply arrived, nr_nodes: %d\n",
> +		join_reply->nr_nodes);
> +	sd_join_handler(&this_node, join_reply->nodes, join_reply->nr_nodes,
> +			CJ_RES_SUCCESS, join_reply->opaque);

The current code doesn't expect that a cluster driver call
sd_join_handler in the join handler.  To fix the problem, you need to
initialize work queues before calling create_cluster.

However, can we move all xread calls against sk_comm_fd into
read_msg_from_sheepkeeper?  Then, we can remove sd_join_handler from
here, I think.


> diff --git a/sheepkeeper/sheepkeeper.c b/sheepkeeper/sheepkeeper.c
> new file mode 100644
> index 0000000..3249958
> --- /dev/null
> +++ b/sheepkeeper/sheepkeeper.c
> @@ -0,0 +1,530 @@
> +
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <string.h>
> +#include <stdbool.h>
> +#include <stdint.h>
> +#include <errno.h>
> +#include <assert.h>
> +#include <getopt.h>
> +
> +#include <unistd.h>
> +#include <sys/socket.h>
> +#include <sys/types.h>
> +#include <sys/epoll.h>
> +
> +#include <sys/un.h>
> +#include <netinet/in.h>
> +
> +#include "net.h"
> +#include "event.h"
> +#include "list.h"
> +#include "internal_proto.h"
> +#include "sheepkeeper.h"
> +
> +static bool daemonized;
> +
> +static inline char *node_to_str(struct sd_node *id)
> +{
> +	static char str[256];
> +	char name[256];
> +	int af = AF_INET6;
> +	uint8_t *addr = id->nid.addr;
> +
> +	/* Find address family type */
> +	if (addr[12]) {
> +		int  oct_no = 0;
> +		while (!addr[oct_no] && oct_no++ < 12)
> +			;
> +		if (oct_no == 12)
> +			af = AF_INET;
> +	}
> +
> +	snprintf(str, sizeof(str), "%s ip:%s port:%d",
> +		(af == AF_INET) ? "IPv4" : "IPv6",
> +		addr_to_str(name, sizeof(name), id->nid.addr, 0), id->nid.port);
> +
> +	return str;
> +}

How about moving node_to_str from sheep/cluster.h to
include/internal_proto.h rather than redefining it?

> +
> +#define die(fmt, ...) do {						\
> +		if (!daemonized) {					\
> +			fprintf(stderr, "%s, %d (errno: %s): " fmt,	\
> +				__FILE__, __LINE__, strerror(errno),	\
> +				## __VA_ARGS__);			\
> +		} else {						\
> +			syslog(LOG_ERR, "%s, %d (errno: %s): " fmt,	\
> +				__FILE__, __LINE__, strerror(errno),	\
> +				## __VA_ARGS__);			\
> +		}							\
> +									\
> +		exit(1);						\
> +	} while (0)

Can we use panic()?


> +
> +#define log(fmt, ...) do {						\
> +		if (!daemonized) {					\
> +			fprintf(stdout, "%s, %d: " fmt,			\
> +				__FILE__, __LINE__,			\
> +				## __VA_ARGS__);			\
> +		} else {						\
> +			syslog(LOG_INFO, "%s, %d: " fmt,		\
> +				__FILE__, __LINE__,			\
> +				## __VA_ARGS__);			\
> +		}							\
> +	} while (0)

Can we use eprintf?

> +
> +#define debug_printf(fmt, ...) do {					\
> +		if (!daemonized) {					\
> +			fprintf(stdout, "%s, %d (errno: %s): " fmt,	\
> +				__FILE__, __LINE__, strerror(errno),	\
> +				## __VA_ARGS__);			\
> +		} else {						\
> +			syslog(LOG_DEBUG, "%s, %d (errno: %s): " fmt,	\
> +				__FILE__, __LINE__, strerror(errno),	\
> +				## __VA_ARGS__);			\
> +		}							\
> +	} while (0)

Can we use dprintf?

> +
> +static void *_xcalloc(size_t nmemb, size_t size)
> +{
> +	void *ret;
> +
> +	ret = calloc(nmemb, size);
> +	if (!ret)
> +		die("error at calloc()\n");
> +
> +	return ret;
> +}

Can we use xcalloc in util.c?


> +	case SK_MSG_NEW_NODE_REPLY:
> +		log("new node reply from %s\n", node_to_str(&sheep->node));
> +		join = _xcalloc(rcv.body_len, sizeof(char));
> +		ret = xread(fd, join, rcv.body_len);
> +		if (ret != rcv.body_len)
> +			die("xread() failed, ret: %d\n", ret);
> +
> +		s = find_sheep_by_nid(&join->node.nid);
> +		if (!s) {
> +			die("invalid nid is required, %s\n",
> +				node_to_str(&join->node));
> +		}
> +
> +		if (!sheep->master)
> +			die("no master replied new node reply\n");
> +
> +		s->kept_join_msg_len = rcv.body_len -
> +			sizeof(struct sk_msg_join);
> +		s->kept_join_msg = _xcalloc(s->kept_join_msg_len, sizeof(char));
> +		memcpy(s->kept_join_msg, join->opaque, s->kept_join_msg_len);
> +
> +		log("reserved opaque, length: %d\n", s->kept_join_msg_len);
> +
> +		bzero(&join_reply, sizeof(struct sk_msg));

Use memset.


> +
> +static void sheep_comm_handler(int fd, int events, void *data)
> +{
> +	if (events & EPOLLIN)
> +		read_msg_from_sheep(data);
> +}

Should we handle EPOLLERR and EPOLLHUP, too?


> +static struct option const long_options[] = {
> +	{ "port", required_argument, NULL, 'p' },
> +	{ "address", required_argument, NULL, 'a' },
> +	{ "not-daemonize", no_argument, NULL, 'n' },

Use "foreground" and 'f' like sheep.

The last element of long_options has to be filled with zeros.


> +
> +	/* setup inet socket for communication with sheeps */
> +	sheep_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
> +	if (sheep_listen_fd < 0)
> +		die("socket() failed\n");

Turn on SO_REUSEADDR to allow us to reuse the port immediately.


Thanks,

Kazutaka



More information about the sheepdog mailing list