[sheepdog] [PATCH v3 2/2] sheepkeeper: a new cluster manager specialized for sheepdog

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Wed Nov 14 07:44:08 CET 2012


From: Hitoshi Mitake <h.mitake at gmail.com>

This patch adds sheepkeeper, a new cluster manager for sheepdog.

sheepkeeper cluster manager is designed as master-slave. So it can be
SPOF. But I'm planning to let it be redundant by utilizing corosync.
If it can be improved, this can be an alternative of the group
management part of ZooKeeper, and it doesn't depend on JVM.

This patch adds the new directory sheepkeeper/ for it, and the
executable binary will be produced as sheepkeeper/sheepkeeper.
The cluster driver for sheep is added as sheep/cluster/sheepkeeper.c.

If you have some opinions or feature requests for a cluster manager
designed and implemented from scratch, I'd like to hear your comments.

Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---

v2
 * lots of cleaning and bug fix
 * sane command line option handling
 * a little bit improvement of leave handling
 * not call sd_join_handler() in sheepkeeper_join()
 * add new option --enable-sheepkeeper to configure script for
   enable/disable building sheepkeeper

v3
 * lots of cleaning and bug fix
 * suppress some compiler warnings
 * handle leave of sheeps in saner way. this also reduces the problem
   of odd handling of return values from xread/xwrite
 * default log of sheepkeeper is /var/log/sheepkeeper.c, and users can
   specify custom location with -l option
8<---
 .gitignore                  |    1 +
 Makefile.am                 |    3 +
 configure.ac                |    8 +-
 include/sheepkeeper.h       |   64 ++++
 sheep/Makefile.am           |    3 +
 sheep/cluster/sheepkeeper.c |  543 +++++++++++++++++++++++++++++++
 sheepkeeper/Makefile.am     |   44 +++
 sheepkeeper/sheepkeeper.c   |  736 +++++++++++++++++++++++++++++++++++++++++++
 8 files changed, 1401 insertions(+), 1 deletions(-)
 create mode 100644 include/sheepkeeper.h
 create mode 100644 sheep/cluster/sheepkeeper.c
 create mode 100644 sheepkeeper/Makefile.am
 create mode 100644 sheepkeeper/sheepkeeper.c

diff --git a/.gitignore b/.gitignore
index dbdbd55..ffc9003 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,6 +31,7 @@ GSYMS
 collie/collie
 sheep/sheep
 sheepfs/sheepfs
+sheepkeeper/sheepkeeper
 
 # directories
 .deps
diff --git a/Makefile.am b/Makefile.am
index 53d18b9..1ed8072 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -19,6 +19,9 @@ sheepdogsysconf_DATA	=
 
 SUBDIRS			= lib collie sheep include script man
 
+if BUILD_SHEEPKEEPER
+SUBDIRS			+= sheepkeeper
+endif
 if BUILD_SHEEPFS
 SUBDIRS			+= sheepfs
 endif
diff --git a/configure.ac b/configure.ac
index 373c03f..fac12c3 100644
--- a/configure.ac
+++ b/configure.ac
@@ -133,7 +133,8 @@ AC_CONFIG_FILES([Makefile
 		include/Makefile
 		script/Makefile
 		lib/Makefile
-		man/Makefile])
+		man/Makefile
+		sheepkeeper/Makefile])
 
 ### Local business
 
@@ -197,6 +198,11 @@ AC_ARG_ENABLE([accord],
 	[ enable_accord="no" ],)
 AM_CONDITIONAL(BUILD_ACCORD, test x$enable_accord = xyes)
 
+AC_ARG_ENABLE([sheepkeeper],
+	[  --enable-sheepkeeper    : build sheepkeeper and its cluster driver ],,
+	[ enable_sheepkeeper="no" ],)
+AM_CONDITIONAL(BUILD_SHEEPKEEPER, test x$enable_sheepkeeper = xyes)
+
 AC_ARG_WITH([initddir],
 	[  --with-initddir=DIR     : path to init script directory. ],
 	[ INITDDIR="$withval" ],
diff --git a/include/sheepkeeper.h b/include/sheepkeeper.h
new file mode 100644
index 0000000..ccdbbfd
--- /dev/null
+++ b/include/sheepkeeper.h
@@ -0,0 +1,64 @@
+#ifndef SHEEPKEEPER_H
+#define SHEEPKEEPER_H
+
+enum sk_msg_type {
+	SK_MSG_JOIN = 1,
+	SK_MSG_JOIN_REPLY,
+	SK_MSG_JOIN_RETRY,
+
+	SK_MSG_NEW_NODE,
+	SK_MSG_NEW_NODE_REPLY,
+	SK_MSG_NEW_NODE_FINISH,
+
+	SK_MSG_NOTIFY,
+	SK_MSG_NOTIFY_FORWARD,
+	SK_MSG_NOTIFY_FORWARD_REPLY,
+	SK_MSG_NOTIFY_FINISH,
+
+	SK_MSG_BLOCK,
+	SK_MSG_BLOCK_FORWARD,
+	SK_MSG_BLOCK_FORWARD_REPLY,
+	SK_MSG_BLOCK_FINISH,
+
+	SK_MSG_LEAVE,
+	SK_MSG_LEAVE_FORWARD,
+
+	SK_MSG_LEADER_ELECTION,
+	SK_MSG_LEADER_ELECTION_REPLY,
+};
+
+struct sk_msg {
+	enum sk_msg_type type;
+	int body_len;
+};
+
+#include "internal_proto.h"
+
+struct sk_msg_join {
+	struct sd_node node;
+	bool leader_elected;
+	char opaque[0];
+};
+
+struct sk_msg_join_reply {
+	struct sd_node nodes[SD_MAX_NODES];
+	int nr_nodes;
+	char opaque[0];
+};
+
+struct sk_msg_join_node_finish {
+	struct sd_node new_node;
+
+	struct sd_node nodes[SD_MAX_NODES];
+	int nr_nodes;
+	char opaque[0];
+};
+
+struct sk_msg_notify_forward {
+	struct sd_node from_node;
+	char notify_msg[0];
+};
+
+#define SHEEPKEEPER_PORT 2501
+
+#endif	/* SHEEPKEEPER_H */
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index e7b4f53..3e22e13 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -38,6 +38,9 @@ endif
 if BUILD_ACCORD
 sheep_SOURCES		+= cluster/accord.c
 endif
+if BUILD_SHEEPKEEPER
+sheep_SOURCES		+= cluster/sheepkeeper.c
+endif
 
 sheep_SOURCES		+= farm/sha1_file.c farm/trunk.c farm/snap.c farm/farm.c
 
diff --git a/sheep/cluster/sheepkeeper.c b/sheep/cluster/sheepkeeper.c
new file mode 100644
index 0000000..ca5f1da
--- /dev/null
+++ b/sheep/cluster/sheepkeeper.c
@@ -0,0 +1,543 @@
+/*
+ * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+
+#include "cluster.h"
+#include "event.h"
+#include "sheepkeeper.h"
+#include "internal_proto.h"
+
+static int sk_comm_fd;
+
+static struct sd_node this_node;
+static bool is_leader;
+
+static int nr_nodes;
+static struct sd_node nodes[SD_MAX_NODES];
+
+enum sk_driver_state {
+	state_pre_join,
+	state_joined,
+};
+
+static enum sk_driver_state state = state_pre_join;
+
+static void read_msg_pre_join(void)
+{
+	int ret;
+	struct sk_msg msg, rcv;
+	struct sk_msg_join_reply *join_reply;
+	struct iovec iov[2];
+
+retry:
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv))
+		panic("invalid message from sheepkeeper: %m\n");
+
+	if (rcv.type == SK_MSG_JOIN_RETRY) {
+		vprintf(SDOG_INFO, "join request is rejected, retrying\n");
+		goto retry;
+	} else if (rcv.type == SK_MSG_NEW_NODE) {
+		struct sk_msg_join *join;
+		int join_len;
+
+		join_len = rcv.body_len;
+		join = xcalloc(join_len, sizeof(char));
+		ret = xread(sk_comm_fd, join, join_len);
+
+		ret = sd_check_join_cb(&join->node, join->opaque);
+
+		if (join->leader_elected) {
+			vprintf(SDOG_INFO, "elected as leader\n");
+			is_leader = true;
+		}
+
+		msg.type = SK_MSG_NEW_NODE_REPLY;
+		msg.body_len = join_len;
+
+		iov[0].iov_base = &msg;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = join_len;
+
+		ret = writev(sk_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed\n");
+			exit(1);
+		}
+
+		ret = xread(sk_comm_fd, &rcv, sizeof(struct sk_msg));
+		if (ret != sizeof(rcv)) {
+			vprintf(SDOG_ERR, "invalid ret: %d\n", ret);
+			exit(1);
+		}
+	}
+
+	if (rcv.type != SK_MSG_JOIN_REPLY) {
+		vprintf(SDOG_INFO, "invalid message from sheepkeeper, "
+			"received message: %d\n", rcv.type);
+		exit(1);
+	}
+
+	join_reply = xmalloc(rcv.body_len);
+	ret = xread(sk_comm_fd, join_reply, rcv.body_len);
+	if (ret != rcv.body_len) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	vprintf(SDOG_INFO, "join reply arrived, nr_nodes: %d\n",
+		join_reply->nr_nodes);
+	sd_join_handler(&this_node, join_reply->nodes, join_reply->nr_nodes,
+			CJ_RES_SUCCESS, join_reply->opaque);
+
+	memcpy(nodes, join_reply->nodes,
+		join_reply->nr_nodes * sizeof(struct sd_node));
+	nr_nodes = join_reply->nr_nodes;
+
+	vprintf(SDOG_INFO, "sheepkeeper_join() succeed\n");
+	state = state_joined;
+}
+
+static void read_msg(void)
+{
+	int i, j, ret;
+	struct sk_msg rcv, snd;
+	struct sd_node sender;
+	struct iovec iov[2];
+	struct sk_msg_join *join;
+	struct sk_msg_notify_forward *notify_forward;
+	struct sk_msg_join_node_finish *join_node_finish;
+	struct join_message *jm;
+
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	switch (rcv.type) {
+	case SK_MSG_NEW_NODE:
+		join = xmalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, join, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		if (is_leader)
+			sd_check_join_cb(&join->node, join->opaque);
+
+		memset(&snd, 0, sizeof(snd));
+		snd.type = SK_MSG_NEW_NODE_REPLY;
+		snd.body_len = rcv.body_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = rcv.body_len;
+
+		ret = writev(sk_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_NEW_NODE_FINISH:
+		join_node_finish = xmalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, join_node_finish, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		jm = (struct join_message *)join_node_finish->opaque;
+		memcpy(nodes, join_node_finish->nodes,
+			join_node_finish->nr_nodes * sizeof(struct sd_node));
+		nr_nodes = join_node_finish->nr_nodes;
+
+		sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
+				CJ_RES_SUCCESS, jm);
+
+		break;
+
+	case SK_MSG_NOTIFY_FORWARD:
+		notify_forward = xmalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		sd_notify_handler(&notify_forward->from_node,
+				notify_forward->notify_msg,
+				rcv.body_len - sizeof(*notify_forward));
+
+		snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;
+		ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
+		if (ret != sizeof(snd)) {
+			vprintf(SDOG_ERR, "xwrite() failed\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_BLOCK_FORWARD:
+		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		snd.type = SK_MSG_BLOCK_FORWARD_REPLY;
+		snd.body_len = 0;
+		ret = xwrite(sk_comm_fd, &snd, sizeof(struct sk_msg));
+		if (ret != sizeof(snd)) {
+			vprintf(SDOG_ERR, "xwrite() failed\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_LEAVE_FORWARD:
+		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		for (i = 0; i < nr_nodes; i++) {
+			if (!memcmp(&sender, &nodes[i], sizeof(sender))) {
+				nr_nodes--;
+
+				for (j = i; j < nr_nodes; j++)
+					nodes[j] = nodes[j + 1];
+
+				goto removed;
+			}
+		}
+
+		vprintf(SDOG_INFO, "leave message from unknown node\n");
+		exit(1);
+
+removed:
+		sd_leave_handler(&sender, nodes, nr_nodes);
+		break;
+
+	case SK_MSG_LEADER_ELECTION:
+		is_leader = true;
+		vprintf(SDOG_INFO, "became new leader\n");
+
+		snd.type = SK_MSG_LEADER_ELECTION_REPLY;
+		snd.body_len = 0;
+
+		ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
+		if (-1 == ret) {
+			vprintf(SDOG_ERR, "xwrite() failed: %m\n");
+			exit(1);
+		}
+
+		break;
+
+	default:
+		vprintf(SDOG_ERR, "invalid message from sheepkeeper: %d, "
+			"length: %d\n", rcv.type, rcv.body_len);
+		exit(1);
+		break;
+	}
+}
+
+static void read_msg_from_sheepkeeper(void)
+{
+	switch (state) {
+	case state_pre_join:
+		read_msg_pre_join();
+		break;
+
+	case state_joined:
+		read_msg();
+		break;
+
+	default:
+		panic("invalid state of sheepkeeper cluster driver: %d\n",
+			state);
+		break;
+	};
+}
+
+static void sheepkeeper_comm_handler(int fd, int events, void *data)
+{
+	assert(fd == sk_comm_fd);
+	assert(data == NULL);
+
+	if (events & EPOLLIN)
+		read_msg_from_sheepkeeper();
+}
+
+static void init_addr_port(const char *s, struct sockaddr_in *addr)
+{
+	/* format: <address>[:<port>] */
+	char *sep, *copied, *p;
+	uint8_t tmp_addr[16];
+	bool addr_only = false;
+
+	copied = xcalloc(strlen(s) + 1, sizeof(char));
+	memcpy(copied, s, strlen(s));
+	sep = strchr(copied, ':');
+	if (!sep) {
+		addr_only = true;
+		goto addr_trans;
+	}
+
+	if (sep == copied)
+		goto invalid_format;
+
+	if (*(sep + 1) == '\0')
+		goto invalid_format;
+
+	*sep = '\0';
+
+addr_trans:
+	memset(addr, 0, sizeof(*addr));
+	addr->sin_family = AF_INET;
+
+	if (!str_to_addr(AF_INET, copied, tmp_addr))
+		goto invalid_format;
+
+	memcpy(&addr->sin_addr, &tmp_addr[12], 4);
+	if (addr_only) {
+		addr->sin_port = SHEEPKEEPER_PORT;
+		goto end;
+	}
+
+	addr->sin_port = strtol(sep + 1, &p, 10);
+	if (sep + 1 == p)
+		goto invalid_format;
+	if (*p != '\0')
+		goto invalid_format;
+
+end:
+	free(copied);
+	return;
+
+invalid_format:
+	vprintf(SDOG_ERR, "invalid option for host and port: %s\n", s);
+	exit(1);
+}
+
+static int sheepkeeper_init(const char *option)
+{
+	struct sockaddr_in addr;
+
+	sk_comm_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sk_comm_fd < 0) {
+		vprintf(SDOG_INFO, "error at socket()\n");
+		return -1;
+	}
+
+	if (option)
+		init_addr_port(option, &addr);
+
+	if (connect(sk_comm_fd, &addr, sizeof(struct sockaddr_in))) {
+		vprintf(SDOG_ERR, "cannot connect to sheepkeeper,"
+			" is sheepkeeper running?\n");
+		vprintf(SDOG_ERR, "errno: %s\n", strerror(errno));
+		return -1;
+	}
+
+	return 0;
+}
+
+static int sheepkeeper_join(const struct sd_node *myself,
+		      void *opaque, size_t opaque_len)
+{
+	int ret, msg_join_len;
+	struct iovec iov[2];
+	struct sk_msg msg;
+	struct sk_msg_join *msg_join;
+
+	msg_join_len = sizeof(struct sk_msg_join) + opaque_len;
+
+	memset(&msg, 0, sizeof(struct sk_msg));
+	msg.type = SK_MSG_JOIN;
+	msg.body_len = msg_join_len;
+
+	iov[0].iov_base = &msg;
+	iov[0].iov_len = sizeof(struct sk_msg);
+
+	msg_join = calloc(msg_join_len, sizeof(char));
+	this_node = *myself;
+	msg_join->node = this_node;
+	memcpy(msg_join->opaque, opaque, opaque_len);
+
+	iov[1].iov_base = msg_join;
+	iov[1].iov_len = msg_join_len;
+
+	ret = writev(sk_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "sheepkeeper_join() failed, %s\n",
+			strerror(errno));
+		return -1;
+	}
+
+	register_event(sk_comm_fd, sheepkeeper_comm_handler, NULL);
+
+	return 0;
+}
+
+static int sheepkeeper_leave(void)
+{
+	int ret;
+	struct sk_msg msg;
+
+	msg.type = SK_MSG_LEAVE;
+	msg.body_len = 0;
+
+	ret = xwrite(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_INFO, "xwrite() failed\n");
+		exit(1);
+	}
+
+	unregister_event(sk_comm_fd);
+	close(sk_comm_fd);
+
+	return 0;
+}
+
+static int sheepkeeper_notify(void *msg, size_t msg_len)
+{
+	int ret;
+	struct sk_msg snd, rcv;
+	struct iovec iov[2];
+
+	struct sk_msg_notify_forward *notify_forward;
+
+	vprintf(SDOG_INFO, "sheepkeeper_notify() called\n");
+
+	snd.type = SK_MSG_NOTIFY;
+	snd.body_len = msg_len;
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(struct sk_msg);
+
+	iov[1].iov_base = msg;
+	iov[1].iov_len = msg_len;
+
+	ret = writev(sk_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "writev() failed\n");
+		exit(1);
+	}
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	if (rcv.type != SK_MSG_NOTIFY_FORWARD) {
+		vprintf(SDOG_ERR, "invalid type: %d\n", rcv.type);
+		exit(1);
+	}
+
+	notify_forward = xmalloc(rcv.body_len);
+	ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
+	if (ret != rcv.body_len) {
+		vprintf(SDOG_INFO, "xread() failed, ret: %d\n", ret);
+		exit(1);
+	}
+
+	sd_notify_handler(&notify_forward->from_node,
+			notify_forward->notify_msg,
+			rcv.body_len - sizeof(struct sd_node));
+
+	snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;
+	ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
+	if (ret != sizeof(snd)) {
+		vprintf(SDOG_ERR, "xwrite() failed\n");
+		exit(1);
+	}
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	if (rcv.type != SK_MSG_NOTIFY_FINISH) {
+		vprintf(SDOG_ERR, "invalid type: %d\n", rcv.type);
+		exit(1);
+	}
+
+	return 0;
+}
+
+static void sheepkeeper_block(void)
+{
+	int ret;
+	struct sk_msg msg;
+
+	msg.type = SK_MSG_BLOCK;
+	msg.body_len = 0;
+
+	ret = xwrite(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_ERR, "xwrite() failed\n");
+		exit(1);
+	}
+
+	ret = xread(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	if (msg.type != SK_MSG_BLOCK_FINISH) {
+		vprintf(SDOG_ERR, "invalid message from sheepkeeper\n");
+		exit(1);
+	}
+
+	sd_block_handler(&this_node);
+}
+
+static void sheepkeeper_unblock(void *msg, size_t msg_len)
+{
+	sheepkeeper_notify(msg, msg_len);
+}
+
+static struct cluster_driver cdrv_sheepkeeper = {
+	.name		= "sheepkeeper",
+
+	.init		= sheepkeeper_init,
+	.join		= sheepkeeper_join,
+	.leave		= sheepkeeper_leave,
+	.notify		= sheepkeeper_notify,
+	.block		= sheepkeeper_block,
+	.unblock	= sheepkeeper_unblock,
+};
+
+cdrv_register(cdrv_sheepkeeper);
diff --git a/sheepkeeper/Makefile.am b/sheepkeeper/Makefile.am
new file mode 100644
index 0000000..4e32aa3
--- /dev/null
+++ b/sheepkeeper/Makefile.am
@@ -0,0 +1,44 @@
+#
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; see the file COPYING.  If not, write to
+# the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+
+MAINTAINERCLEANFILES	= Makefile.in
+
+AM_CFLAGS		=
+
+INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include
+
+sbin_PROGRAMS		= sheepkeeper
+
+sheepkeeper_SOURCES		= sheepkeeper.c
+
+sheepkeeper_LDADD	  	= ../lib/libsheepdog.a
+sheepkeeper_DEPENDENCIES	= ../lib/libsheepdog.a
+
+EXTRA_DIST		=
+
+lint:
+	-splint $(INCLUDES) $(LINT_FLAGS) $(CFLAGS) *.c
+
+all-local:
+	@echo Built sheepkeeper
+
+clean-local:
+	rm -f sheepkeeper *.o gmon.out *.da *.bb *.bbg
+
+# support for GNU Flymake
+check-syntax:
+	$(COMPILE) -fsyntax-only $(CHK_SOURCES)
diff --git a/sheepkeeper/sheepkeeper.c b/sheepkeeper/sheepkeeper.c
new file mode 100644
index 0000000..c0cc482
--- /dev/null
+++ b/sheepkeeper/sheepkeeper.c
@@ -0,0 +1,736 @@
+/*
+ * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <errno.h>
+#include <assert.h>
+#include <getopt.h>
+
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+
+#include <sys/un.h>
+#include <netinet/in.h>
+
+#include "net.h"
+#include "event.h"
+#include "list.h"
+#include "internal_proto.h"
+#include "util.h"
+#include "sheepkeeper.h"
+
+#define EPOLL_SIZE 4096
+
+enum sheepkeeper_state {
+	SK_STATE_ORDINAL,
+	SK_STATE_JOINING,
+};
+
+static enum sheepkeeper_state state = SK_STATE_ORDINAL;
+
+enum sheep_state {
+	SHEEP_STATE_CONNECTED,	/* accept()ed */
+	SHEEP_STATE_JOINED,
+	SHEEP_STATE_LEAVING,
+};
+
+struct sheep {
+	int fd;
+	struct sd_node node;
+	struct sockaddr_in addr;
+
+	enum sheep_state state;
+
+	void *kept_join_msg;	/* opaque */
+	int kept_join_msg_len;
+
+	struct list_head sheep_list;
+};
+
+static LIST_HEAD(sheep_list_head);
+static int nr_joined_sheep;
+
+static struct sheep *leader_sheep;
+
+static bool running;
+
+static int port = SHEEPKEEPER_PORT;
+static uint8_t addr[16];
+static int sheep_listen_fd;
+
+static const char *progname;
+
+static int build_node_array(struct sd_node *nodes)
+{
+	int i;
+	struct sheep *s;
+
+	i = 0;
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		nodes[i++] = s->node;
+	}
+
+	return i;
+}
+
+static struct sheep *find_sheep_by_nid(struct node_id *id)
+{
+	struct sheep *s;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (!memcmp(&s->node.nid, id, sizeof(struct node_id)))
+			return s;
+	}
+
+	return NULL;
+}
+
+static int remove_efd;
+
+static inline void remove_sheep(struct sheep *sheep)
+{
+	int ret;
+
+	sheep->state = SHEEP_STATE_LEAVING;
+	ret = eventfd_write(remove_efd, 1);
+	if (ret < 0)
+		panic("eventfd_write() failed: %m\n");
+}
+
+static void leader_election(void)
+{
+	int ret;
+	struct sheep *sheep;
+	struct sk_msg msg;
+
+	assert(!leader_sheep);
+
+	if (!nr_joined_sheep)
+		return;
+
+	list_for_each_entry(sheep, &sheep_list_head, sheep_list) {
+		if (sheep->state == SHEEP_STATE_LEAVING)
+			continue;
+
+		msg.type = SK_MSG_LEADER_ELECTION;
+		msg.body_len = 0;
+
+		ret = xwrite(sheep->fd, &msg, sizeof(msg));
+		if (-1 == ret) {
+			vprintf(SDOG_ERR, "xwrite() for SK_MSG_LEADER_ELECTION"\
+				" failed: %m\n");
+			goto election_failed;
+		}
+
+		ret = xread(sheep->fd, &msg, sizeof(msg));
+		if (-1 == ret) {
+			vprintf(SDOG_ERR, "xread() for SK_MSG_LEADER_ELECTION_"\
+				"REPLY failed: %m\n");
+			goto election_failed;
+		}
+
+		leader_sheep = sheep;
+		break;
+
+election_failed:
+		remove_sheep(sheep);
+	}
+
+	if (!leader_sheep)
+		return;		/* all sheeps leaved */
+
+	vprintf(SDOG_INFO, "new leader elected: %s\n",
+		node_to_str(&sheep->node));
+}
+
+static void do_remove_sheep(struct sheep *leaving)
+{
+	int ret;
+	struct sd_node node;
+	struct sheep *s;
+	struct sk_msg snd;
+	struct iovec iov[2];
+
+	node = leaving->node;
+
+	unregister_event(leaving->fd);
+	close(leaving->fd);
+
+	snd.type = SK_MSG_LEAVE_FORWARD;
+	snd.body_len = sizeof(struct sk_msg);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(struct sk_msg);
+
+	iov[1].iov_base = &node;
+	iov[1].iov_len = sizeof(struct sd_node);
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s == leaving)
+			continue;
+
+		if (s->state == SHEEP_STATE_LEAVING)
+			continue;
+
+		ret = writev(s->fd, iov, 2);
+
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			remove_sheep(s);
+		}
+	}
+
+	nr_joined_sheep--;
+}
+
+static void remove_handler(int fd, int events, void *data)
+{
+	struct sheep *sheep;
+	int ret;
+	eventfd_t val;
+	bool election = false;
+
+	vprintf(SDOG_DEBUG, "remove_handler() called\n");
+
+	ret = eventfd_read(remove_efd, &val);
+	if (ret < 0)
+		panic("eventfd_read() failed: %m\n");
+
+remove:
+	list_for_each_entry(sheep, &sheep_list_head, sheep_list) {
+		if (sheep->state != SHEEP_STATE_LEAVING)
+			continue;
+
+		vprintf(SDOG_DEBUG, "removing the node: %s\n",
+			node_to_str(&sheep->node));
+
+		do_remove_sheep(sheep);
+
+		if (sheep == leader_sheep) {
+			leader_sheep = NULL;
+			election = true;
+		}
+
+		goto del;
+	}
+
+	goto end;
+
+del:
+	list_del(&sheep->sheep_list);
+	free(sheep);
+
+	val--;
+	if (val)
+		goto remove;
+
+end:
+	vprintf(SDOG_DEBUG, "removed\n");
+
+
+	if (election) {
+		vprintf(SDOG_DEBUG, "leader election\n");
+		leader_election();
+	}
+}
+
+static void read_msg_from_sheep(struct sheep *sheep)
+{
+	int ret, fd = sheep->fd;
+	struct sk_msg rcv, snd;
+	struct sk_msg_join *join;
+	struct iovec iov[2];
+
+	struct sheep *s, *s2;
+
+	struct sk_msg_notify_forward *notify_forward;
+
+	struct sk_msg join_reply;
+	struct sk_msg_join_reply *reply_body;
+
+	struct sk_msg_join_node_finish *finish;
+
+	memset(&rcv, 0, sizeof(rcv));
+
+	ret = xread(fd, &rcv, sizeof(rcv));
+	if (-1 == ret) {
+		vprintf(SDOG_ERR, "xread() failed: %m\n");
+		goto purge_current_sheep;
+	}
+
+	switch (rcv.type) {
+	case SK_MSG_JOIN:
+		if (state == SK_STATE_JOINING) {
+			memset(&snd, 0, sizeof(snd));
+			snd.type = SK_MSG_JOIN_RETRY;
+
+			ret = xwrite(fd, &snd, sizeof(snd));
+			if (-1 == ret) {
+				vprintf(SDOG_ERR, "xwrite() failed: %m\n");
+				goto purge_current_sheep;
+			}
+
+			break;
+		}
+
+		join = xmalloc(rcv.body_len);
+		ret = xread(fd, join, rcv.body_len);
+		if (-1 == ret) {
+			vprintf(SDOG_ERR, "xread() for reading the body of" \
+				" SK_MSG_JOIN failed: %m\n");
+
+			free(join);
+			goto purge_current_sheep;
+		}
+
+		sheep->node = join->node;
+
+		snd.type = SK_MSG_NEW_NODE;
+		snd.body_len = rcv.body_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = rcv.body_len;
+
+		if (!nr_joined_sheep) {
+			/* this sheep is a new leader */
+			join->leader_elected = true;
+		}
+		ret = writev(!nr_joined_sheep ?
+			fd : leader_sheep->fd, iov, 2);
+		free(join);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() for sending"	\
+				" SK_MSG_NEW_NODE failed: %m\n");
+			goto purge_current_sheep;
+		}
+
+		state = SK_STATE_JOINING;
+		break;
+
+	case SK_MSG_NEW_NODE_REPLY:
+		if (nr_joined_sheep && sheep != leader_sheep) {
+			vprintf(SDOG_ERR, "not leader sheep replied "	\
+				"SK_MSG_NEW_NODE_REPLY\n");
+			goto purge_current_sheep;
+		}
+
+		vprintf(SDOG_INFO, "new node reply from %s\n",
+			node_to_str(&sheep->node));
+
+		join = xmalloc(rcv.body_len);
+		ret = xread(fd, join, rcv.body_len);
+		if (-1 == ret) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			free(join);
+
+			goto purge_current_sheep;
+		}
+
+		s = find_sheep_by_nid(&join->node.nid);
+		if (!s) {
+			/* leader is broken */
+			panic("invalid nid is required, %s\n",
+				node_to_str(&join->node));
+		}
+
+		s->kept_join_msg_len = rcv.body_len -
+			sizeof(struct sk_msg_join);
+		s->kept_join_msg = xmalloc(s->kept_join_msg_len);
+		memcpy(s->kept_join_msg, join->opaque, s->kept_join_msg_len);
+
+		memset(&join_reply, 0, sizeof(struct sk_msg));
+		join_reply.type = SK_MSG_JOIN_REPLY;
+		join_reply.body_len = sizeof(struct sk_msg_join_reply)
+			+ s->kept_join_msg_len;
+		iov[0].iov_base = &join_reply;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		reply_body = xmalloc(join_reply.body_len);
+
+		reply_body->nr_nodes = build_node_array(reply_body->nodes);
+		memcpy(reply_body->opaque, s->kept_join_msg,
+			s->kept_join_msg_len);
+
+		iov[1].iov_base = reply_body;
+		iov[1].iov_len = join_reply.body_len;
+
+		ret = writev(s->fd, iov, 2);
+		free(reply_body);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			goto purge_current_sheep;
+		}
+
+		snd.type = SK_MSG_NEW_NODE_FINISH;
+		snd.body_len = sizeof(*finish) + s->kept_join_msg_len;
+
+		finish = xmalloc(snd.body_len);
+		finish->new_node = s->node;
+		memcpy(finish->opaque, s->kept_join_msg, s->kept_join_msg_len);
+		finish->nr_nodes = build_node_array(finish->nodes);
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = finish;
+		iov[1].iov_len = snd.body_len;
+
+		list_for_each_entry(s2, &sheep_list_head, sheep_list) {
+			if (s2->state == SHEEP_STATE_LEAVING)
+				continue;
+
+			if (s2 == s)
+				continue;
+
+			ret = writev(s2->fd, iov, 2);
+
+			/* FIXME: need leave queue? */
+			if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+				vprintf(SDOG_ERR, "writev() failed: %m\n");
+				remove_sheep(s2);
+			}
+		}
+
+		free(finish);
+
+		nr_joined_sheep++;
+		if (nr_joined_sheep == 1) {
+			assert(!leader_sheep);
+			leader_sheep = sheep;
+
+			vprintf(SDOG_INFO, "new leader elected: %s\n",
+				node_to_str(&sheep->node));
+		}
+
+		sheep->state = SHEEP_STATE_JOINED;
+		state = SK_STATE_ORDINAL;
+		break;
+
+	case SK_MSG_NOTIFY:
+		notify_forward =
+			xmalloc(rcv.body_len + sizeof(*notify_forward));
+		ret = xread(fd, notify_forward->notify_msg, rcv.body_len);
+		if (-1 == ret) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			free(notify_forward);
+			goto purge_current_sheep;
+		}
+
+		snd.type = SK_MSG_NOTIFY_FORWARD;
+		snd.body_len = rcv.body_len + sizeof(*notify_forward);
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		notify_forward->from_node = sheep->node;
+
+		iov[1].iov_base = notify_forward;
+		iov[1].iov_len = snd.body_len;
+
+		list_for_each_entry(s, &sheep_list_head, sheep_list) {
+			if (s->state == SHEEP_STATE_LEAVING)
+				continue;
+
+			ret = writev(s->fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+				vprintf(SDOG_ERR, "writev() failed: %m\n");
+				goto notify_failed;
+			}
+
+			ret = xread(s->fd, &rcv, sizeof(rcv));
+			if (-1 == ret) {
+				vprintf(SDOG_ERR, "xread() failed: %m\n");
+				goto notify_failed;
+			}
+
+			if (rcv.type != SK_MSG_NOTIFY_FORWARD_REPLY) {
+				vprintf(SDOG_ERR, "invalid type received: %d\n",
+					rcv.type);
+				goto notify_failed;
+			}
+
+			continue;
+
+notify_failed:
+			remove_sheep(s);
+		}
+
+		free(notify_forward);
+
+		snd.type = SK_MSG_NOTIFY_FINISH;
+		ret = xwrite(sheep->fd, &snd, sizeof(snd));
+		if (-1 == ret) {
+			vprintf(SDOG_ERR, "xwrite() failed: %m\n");
+			goto purge_current_sheep;
+		}
+
+		break;
+
+	case SK_MSG_BLOCK:
+		list_for_each_entry(s, &sheep_list_head, sheep_list) {
+			struct sk_msg msg;
+
+			if (s == sheep)
+				continue;
+
+			if (s->state == SHEEP_STATE_LEAVING)
+				continue;
+
+			msg.type = SK_MSG_BLOCK_FORWARD;
+			msg.body_len = sizeof(struct sd_node);
+
+			iov[0].iov_base = &msg;
+			iov[0].iov_len = sizeof(struct sk_msg);
+
+			iov[1].iov_base = &sheep->node;
+			iov[1].iov_len = sizeof(struct sd_node);
+
+			ret = writev(s->fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+				vprintf(SDOG_ERR, "writev() failed: %m\n");
+				goto block_failed;
+			}
+
+			ret = xread(s->fd, &msg, sizeof(struct sk_msg));
+			if (-1 == ret) {
+				vprintf(SDOG_ERR, "xread() failed: %m\n");
+				goto block_failed;
+			}
+
+			if (msg.type != SK_MSG_BLOCK_FORWARD_REPLY) {
+				vprintf(SDOG_ERR, "invalid message from sheep,"\
+					" %d\n", msg.type);
+				goto block_failed;
+			}
+
+block_failed:	/* FIXME: is this correct behaviour? */
+			remove_sheep(s);
+		}
+
+		snd.type = SK_MSG_BLOCK_FINISH;
+		snd.body_len = 0;
+		ret = xwrite(sheep->fd, &snd, sizeof(struct sk_msg));
+		if (-1 == ret) {
+			vprintf(SDOG_ERR, "xwrite() failed: %m\n");
+			goto purge_current_sheep;
+		}
+
+		break;
+
+	case SK_MSG_LEAVE:
+		vprintf(SDOG_INFO, "%s is leaving\n",
+			node_to_str(&sheep->node));
+		goto purge_current_sheep;
+
+		break;
+
+	default:
+		vprintf(SDOG_ERR, "invalid message from sheep, message type:" \
+			" %d\n", rcv.type);
+		goto purge_current_sheep;
+
+		break;
+	}
+
+	return;
+
+purge_current_sheep:
+	remove_sheep(sheep);
+}
+
+static void sheep_comm_handler(int fd, int events, void *data)
+{
+	vprintf(SDOG_DEBUG, "sheep_comm_handler() called\n");
+
+	if (events & EPOLLIN)
+		read_msg_from_sheep(data);
+	else if (events & EPOLLHUP || events & EPOLLERR) {
+		vprintf(SDOG_INFO, "epoll() error: %s\n",
+			node_to_str(&((struct sheep *)data)->node));
+		remove_sheep(data);
+	}
+}
+
+static void sheep_accept_handler(int fd, int events, void *data)
+{
+	int ret;
+	struct sheep *new_sheep;
+	socklen_t len;
+
+	new_sheep = xmalloc(sizeof(struct sheep));
+	INIT_LIST_HEAD(&new_sheep->sheep_list);
+
+	len = sizeof(struct sockaddr_in);
+	new_sheep->fd = accept(fd, (struct sockaddr *)&new_sheep->addr, &len);
+	if (new_sheep->fd < 0) {
+		vprintf(SDOG_ERR, "accept() failed: %m\n");
+		goto clean;
+	}
+
+	if (-1 == set_keepalive(new_sheep->fd)) {
+		vprintf(SDOG_ERR, "set_keepalive() failed: %m\n");
+		goto clean;
+	}
+
+	ret = register_event(new_sheep->fd, sheep_comm_handler, new_sheep);
+	if (ret < 0) {
+		vprintf(SDOG_ERR, "register_event() failed: %m\n");
+		goto clean;
+	}
+
+	list_add_tail(&new_sheep->sheep_list, &sheep_list_head);
+	new_sheep->state = SHEEP_STATE_CONNECTED;
+
+	vprintf(SDOG_INFO, "accepted new sheep connection\n");
+
+	return;
+
+clean:
+	free(new_sheep);
+}
+
+static struct option const long_options[] = {
+	{ "port", required_argument, NULL, 'p' },
+	{ "address", required_argument, NULL, 'a' },
+	{ "foreground", no_argument, NULL, 'f' },
+	{ "debug", no_argument, NULL, 'd' },
+	{ "log-file", no_argument, NULL, 'l' },
+
+	{ NULL, 0, NULL, 0 },
+};
+
+static const char *short_options = "p:a:fdl:";
+
+static void exit_handler(void)
+{
+	vprintf(SDOG_INFO, "exiting...\n");
+}
+
+int main(int argc, char **argv)
+{
+	int ch, ret, longindex, opt;
+	char *p;
+	bool daemonize = true;
+	struct sockaddr_in listen_addr;
+	int log_level = SDOG_INFO;
+	const char *log_file = "/var/log/sheepkeeper.log";
+
+	progname = argv[0];
+
+	while ((ch = getopt_long(argc, argv, short_options, long_options,
+				 &longindex)) >= 0) {
+		switch (ch) {
+		case 'p':
+			port = strtol(optarg, &p, 10);
+			if (p == optarg) {
+				fprintf(stderr, "invalid port: %s\n", optarg);
+				exit(1);
+			}
+			break;
+
+		case 'a':
+			if (!str_to_addr(AF_INET, (const char *)optarg, addr)) {
+				fprintf(stderr, "invalid address: %s\n",
+					optarg);
+				exit(1);
+			}
+			break;
+
+		case 'f':
+			daemonize = false;
+			break;
+
+		case 'd':
+			log_level = SDOG_DEBUG;
+			break;
+
+		case 'l':
+			log_file = optarg;
+			break;
+
+		default:
+			fprintf(stderr, "unknown option\n");
+			exit(1);
+			break;
+		}
+	}
+
+	if (daemonize) {
+		ret = daemon(0, 0);
+
+		if (-1 == ret) {
+			fprintf(stderr, "daemon() failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+
+	ret = log_init(progname, LOG_SPACE_SIZE, !daemonize, log_level,
+		(char *)log_file);
+	if (ret)
+		panic("initialize logger failed: %m\n");
+
+	atexit(exit_handler);
+	init_event(EPOLL_SIZE);
+
+	remove_efd = eventfd(0, EFD_NONBLOCK);
+	if (remove_efd < 0)
+		panic("eventfd() failed: %m\n");
+
+	ret = register_event(remove_efd, remove_handler, NULL);
+	if (ret)
+		panic("register_event() failed: %m\n");
+
+	/* setup inet socket for communication with sheeps */
+	sheep_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sheep_listen_fd < 0)
+		panic("socket() failed: %m\n");
+
+	opt = 1;
+	ret = setsockopt(sheep_listen_fd, SOL_SOCKET, SO_REUSEADDR,
+			&opt, sizeof(opt));
+	if (ret == -1)
+		panic("setsockopt() for SO_REUSEADDR failed: %m\n");
+
+	memset(&listen_addr, 0, sizeof(listen_addr));
+	listen_addr.sin_family = AF_INET;
+	listen_addr.sin_port = port;
+	memcpy(&listen_addr.sin_addr, addr + 12, sizeof(listen_addr.sin_addr));
+
+	ret = bind(sheep_listen_fd, &listen_addr, sizeof(struct sockaddr_in));
+	if (ret == -1)
+		panic("bind() failed: %m\n");
+
+	ret = listen(sheep_listen_fd, 1);
+	if (ret == -1)
+		panic("listen() failed: %m\n");
+
+	ret = register_event(sheep_listen_fd, sheep_accept_handler, NULL);
+	if (ret)
+		panic("register_event() failed: %m\n");
+
+	running = true;
+
+	while (running)
+		event_loop(-1);
+
+	return 0;
+}
-- 
1.7.2.5




More information about the sheepdog mailing list