[sheepdog] [PATCH v2 3/3] sheepkeeper: a new cluster manager specialized for sheepdog

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Mon Nov 12 09:58:19 CET 2012


This patch adds sheepkeeper, a new cluster manager for sheepdog.

sheepkeeper cluster manager is designed as master-slave. So it can be
SPOF. But I'm planning to let it be redundant by utilizing corosync.
If it can be improved, this can be an alternative of the group
management part of ZooKeeper, and it doesn't depend on JVM.

This patch adds the new directory sheepkeeper/ for it, and the
executable binary will be produced as sheepkeeper/sheepkeeper.
The cluster driver for sheep is added as sheep/cluster/sheepkeeper.c.

If you have some opinions or feature requests for a cluster manager
designed and implemented from scratch, I'd like to hear your comments.

Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
v2
 * lots of cleaning and bug fix
 * sane command line option handling
 * a little bit improvement of leave handling
 * not call sd_join_handler() in sheepkeeper_join()
 * add new option --enable-sheepkeeper to configure script for
   enable/disable building sheepkeeper
8<---
 .gitignore                  |    1 +
 Makefile.am                 |    3 +
 configure.ac                |    8 +-
 include/sheepkeeper.h       |   61 +++++
 sheep/Makefile.am           |    3 +
 sheep/cluster/sheepkeeper.c |  527 +++++++++++++++++++++++++++++++++++++++++++
 sheepkeeper/Makefile.am     |   44 ++++
 sheepkeeper/sheepkeeper.c   |  501 ++++++++++++++++++++++++++++++++++++++++
 8 files changed, 1147 insertions(+), 1 deletions(-)
 create mode 100644 include/sheepkeeper.h
 create mode 100644 sheep/cluster/sheepkeeper.c
 create mode 100644 sheepkeeper/Makefile.am
 create mode 100644 sheepkeeper/sheepkeeper.c

diff --git a/.gitignore b/.gitignore
index dbdbd55..ffc9003 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,6 +31,7 @@ GSYMS
 collie/collie
 sheep/sheep
 sheepfs/sheepfs
+sheepkeeper/sheepkeeper
 
 # directories
 .deps
diff --git a/Makefile.am b/Makefile.am
index 53d18b9..1ed8072 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -19,6 +19,9 @@ sheepdogsysconf_DATA	=
 
 SUBDIRS			= lib collie sheep include script man
 
+if BUILD_SHEEPKEEPER
+SUBDIRS			+= sheepkeeper
+endif
 if BUILD_SHEEPFS
 SUBDIRS			+= sheepfs
 endif
diff --git a/configure.ac b/configure.ac
index 373c03f..fac12c3 100644
--- a/configure.ac
+++ b/configure.ac
@@ -133,7 +133,8 @@ AC_CONFIG_FILES([Makefile
 		include/Makefile
 		script/Makefile
 		lib/Makefile
-		man/Makefile])
+		man/Makefile
+		sheepkeeper/Makefile])
 
 ### Local business
 
@@ -197,6 +198,11 @@ AC_ARG_ENABLE([accord],
 	[ enable_accord="no" ],)
 AM_CONDITIONAL(BUILD_ACCORD, test x$enable_accord = xyes)
 
+AC_ARG_ENABLE([sheepkeeper],
+	[  --enable-sheepkeeper    : build sheepkeeper and its cluster driver ],,
+	[ enable_sheepkeeper="no" ],)
+AM_CONDITIONAL(BUILD_SHEEPKEEPER, test x$enable_sheepkeeper = xyes)
+
 AC_ARG_WITH([initddir],
 	[  --with-initddir=DIR     : path to init script directory. ],
 	[ INITDDIR="$withval" ],
diff --git a/include/sheepkeeper.h b/include/sheepkeeper.h
new file mode 100644
index 0000000..061f53a
--- /dev/null
+++ b/include/sheepkeeper.h
@@ -0,0 +1,61 @@
+#ifndef SHEEPKEEPER_H
+#define SHEEPKEEPER_H
+
+enum sk_msg_type {
+	SK_MSG_JOIN = 1,
+	SK_MSG_JOIN_REPLY,
+	SK_MSG_JOIN_RETRY,
+
+	SK_MSG_NEW_NODE,
+	SK_MSG_NEW_NODE_REPLY,
+	SK_MSG_NEW_NODE_FINISH,
+
+	SK_MSG_NOTIFY,
+	SK_MSG_NOTIFY_FORWARD,
+	SK_MSG_NOTIFY_FORWARD_REPLY,
+	SK_MSG_NOTIFY_FINISH,
+
+	SK_MSG_BLOCK,
+	SK_MSG_BLOCK_FORWARD,
+	SK_MSG_BLOCK_FORWARD_REPLY,
+	SK_MSG_BLOCK_FINISH,
+
+	SK_MSG_LEAVE,
+	SK_MSG_LEAVE_FORWARD,
+};
+
+struct sk_msg {
+	enum sk_msg_type type;
+	int body_len;
+};
+
+#include "internal_proto.h"
+
+struct sk_msg_join {
+	struct sd_node node;
+	bool master_elected;
+	char opaque[0];
+};
+
+struct sk_msg_join_reply {
+	struct sd_node nodes[SD_MAX_NODES];
+	int nr_nodes;
+	char opaque[0];
+};
+
+struct sk_msg_join_node_finish {
+	struct sd_node new_node;
+
+	struct sd_node nodes[SD_MAX_NODES];
+	int nr_nodes;
+	char opaque[0];
+};
+
+struct sk_msg_notify_forward {
+	struct sd_node from_node;
+	char notify_msg[0];
+};
+
+#define SHEEPKEEPER_PORT 2501
+
+#endif	/* SHEEPKEEPER_H */
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index e7b4f53..3e22e13 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -38,6 +38,9 @@ endif
 if BUILD_ACCORD
 sheep_SOURCES		+= cluster/accord.c
 endif
+if BUILD_SHEEPKEEPER
+sheep_SOURCES		+= cluster/sheepkeeper.c
+endif
 
 sheep_SOURCES		+= farm/sha1_file.c farm/trunk.c farm/snap.c farm/farm.c
 
diff --git a/sheep/cluster/sheepkeeper.c b/sheep/cluster/sheepkeeper.c
new file mode 100644
index 0000000..1ec1ac3
--- /dev/null
+++ b/sheep/cluster/sheepkeeper.c
@@ -0,0 +1,527 @@
+/*
+ * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+
+#include "cluster.h"
+#include "event.h"
+#include "sheepkeeper.h"
+#include "internal_proto.h"
+
+static int sk_comm_fd;
+
+static struct sd_node this_node;
+static bool is_master;
+
+static int nr_nodes;
+static struct sd_node nodes[SD_MAX_NODES];
+
+enum sk_driver_state {
+	state_pre_join,
+	state_joined,
+};
+
+static enum sk_driver_state state = state_pre_join;
+
+static void read_msg_pre_join(void)
+{
+	int ret;
+	struct sk_msg msg, rcv;
+	struct sk_msg_join_reply *join_reply;
+	struct iovec iov[2];
+
+retry:
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv))
+		panic("invalid message from sheepkeeper: %m\n");
+
+	if (rcv.type == SK_MSG_JOIN_RETRY) {
+		vprintf(SDOG_INFO, "join request is rejected, retrying\n");
+		goto retry;
+	} else if (rcv.type == SK_MSG_NEW_NODE) {
+		struct sk_msg_join *join;
+		int join_len;
+
+		join_len = rcv.body_len;
+		join = xcalloc(join_len, sizeof(char));
+		ret = xread(sk_comm_fd, join, join_len);
+
+		ret = sd_check_join_cb(&join->node, join->opaque);
+
+		if (join->master_elected) {
+			vprintf(SDOG_INFO, "elected as master\n");
+			is_master = true;
+		}
+
+		msg.type = SK_MSG_NEW_NODE_REPLY;
+		msg.body_len = join_len;
+
+		iov[0].iov_base = &msg;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = join_len;
+
+		ret = writev(sk_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed\n");
+			exit(1);
+		}
+
+		ret = xread(sk_comm_fd, &rcv, sizeof(struct sk_msg));
+		if (ret != sizeof(rcv)) {
+			vprintf(SDOG_ERR, "invalid ret: %d\n", ret);
+			exit(1);
+		}
+	}
+
+	if (rcv.type != SK_MSG_JOIN_REPLY) {
+		vprintf(SDOG_INFO, "invalid message from sheepkeeper, "
+			"received message: %d\n", rcv.type);
+		exit(1);
+	}
+
+	join_reply = xmalloc(rcv.body_len);
+	ret = xread(sk_comm_fd, join_reply, rcv.body_len);
+	if (ret != rcv.body_len) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	vprintf(SDOG_INFO, "join reply arrived, nr_nodes: %d\n",
+		join_reply->nr_nodes);
+	sd_join_handler(&this_node, join_reply->nodes, join_reply->nr_nodes,
+			CJ_RES_SUCCESS, join_reply->opaque);
+
+	memcpy(nodes, join_reply->nodes,
+		join_reply->nr_nodes * sizeof(struct sd_node));
+	nr_nodes = join_reply->nr_nodes;
+
+	vprintf(SDOG_INFO, "sheepkeeper_join() succeed\n");
+	state = state_joined;
+}
+
+static void read_msg(void)
+{
+	int i, j, ret;
+	struct sk_msg rcv, snd;
+	struct sd_node sender;
+	struct iovec iov[2];
+	struct sk_msg_join *join;
+	struct sk_msg_notify_forward *notify_forward;
+	struct sk_msg_join_node_finish *join_node_finish;
+	struct join_message *jm;
+
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	switch (rcv.type) {
+	case SK_MSG_NEW_NODE:
+		join = xmalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, join, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		if (is_master)
+			sd_check_join_cb(&join->node, join->opaque);
+
+		memset(&snd, 0, sizeof(snd));
+		snd.type = SK_MSG_NEW_NODE_REPLY;
+		snd.body_len = rcv.body_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = rcv.body_len;
+
+		ret = writev(sk_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_NEW_NODE_FINISH:
+		join_node_finish = xmalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, join_node_finish, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		jm = (struct join_message *)join_node_finish->opaque;
+		memcpy(nodes, join_node_finish->nodes,
+			join_node_finish->nr_nodes * sizeof(struct sd_node));
+		nr_nodes = join_node_finish->nr_nodes;
+
+		sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
+				CJ_RES_SUCCESS, jm);
+
+		break;
+
+	case SK_MSG_NOTIFY_FORWARD:
+		notify_forward = xmalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		sd_notify_handler(&notify_forward->from_node,
+				notify_forward->notify_msg,
+				rcv.body_len - sizeof(*notify_forward));
+
+		snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;
+		ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
+		if (ret != sizeof(snd)) {
+			vprintf(SDOG_ERR, "xwrite() failed\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_BLOCK_FORWARD:
+		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		snd.type = SK_MSG_BLOCK_FORWARD_REPLY;
+		snd.body_len = 0;
+		ret = xwrite(sk_comm_fd, &snd, sizeof(struct sk_msg));
+		if (ret != sizeof(snd)) {
+			vprintf(SDOG_ERR, "xwrite() failed\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_LEAVE_FORWARD:
+		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		for (i = 0; i < nr_nodes; i++) {
+			if (!memcmp(&sender, &nodes[i], sizeof(sender))) {
+				nr_nodes--;
+
+				for (j = i; j < nr_nodes; j++)
+					nodes[j] = nodes[j + 1];
+
+				goto removed;
+			}
+		}
+
+		vprintf(SDOG_INFO, "leave message from unknown node\n");
+		exit(1);
+
+removed:
+		sd_leave_handler(&sender, nodes, nr_nodes);
+		break;
+
+	default:
+		vprintf(SDOG_ERR, "invalid message from sheepkeeper: %d, "
+			"length: %d\n", rcv.type, rcv.body_len);
+		exit(1);
+		break;
+	}
+}
+
+static void read_msg_from_sheepkeeper(void)
+{
+	switch (state) {
+	case state_pre_join:
+		read_msg_pre_join();
+		break;
+
+	case state_joined:
+		read_msg();
+		break;
+
+	default:
+		panic("invalid state of sheepkeeper cluster driver: %d\n",
+			state);
+		break;
+	};
+}
+
+static void sheepkeeper_comm_handler(int fd, int events, void *data)
+{
+	assert(fd == sk_comm_fd);
+	assert(data == NULL);
+
+	if (events & EPOLLIN)
+		read_msg_from_sheepkeeper();
+}
+
+static void init_addr_port(const char *s, struct sockaddr_in *addr)
+{
+	/* format: <address>[:<port>] */
+	char *sep, *copied, *p;
+	uint8_t tmp_addr[16];
+	bool addr_only = false;
+
+	copied = xcalloc(strlen(s) + 1, sizeof(char));
+	memcpy(copied, s, strlen(s));
+	sep = strchr(copied, ':');
+	if (!sep) {
+		addr_only = true;
+		goto addr_trans;
+	}
+
+	if (sep == copied)
+		goto invalid_format;
+
+	if (*(sep + 1) == '\0')
+		goto invalid_format;
+
+	*sep = '\0';
+
+addr_trans:
+	memset(addr, 0, sizeof(*addr));
+	addr->sin_family = AF_INET;
+
+	if (!str_to_addr(AF_INET, copied, tmp_addr))
+		goto invalid_format;
+
+	memcpy(&addr->sin_addr, &tmp_addr[12], 4);
+	if (addr_only) {
+		addr->sin_port = SHEEPKEEPER_PORT;
+		goto end;
+	}
+
+	addr->sin_port = strtol(sep + 1, &p, 10);
+	if (sep + 1 == p)
+		goto invalid_format;
+	if (*p != '\0')
+		goto invalid_format;
+
+end:
+	free(copied);
+	return;
+
+invalid_format:
+	vprintf(SDOG_ERR, "invalid option for host and port: %s\n", s);
+	exit(1);
+}
+
+static int sheepkeeper_init(const char *option)
+{
+	struct sockaddr_in addr;
+
+	sk_comm_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sk_comm_fd < 0) {
+		vprintf(SDOG_INFO, "error at socket()\n");
+		return -1;
+	}
+
+	init_addr_port(option, &addr);
+
+	if (connect(sk_comm_fd, &addr, sizeof(struct sockaddr_in))) {
+		vprintf(SDOG_ERR, "cannot connect to sheepkeeper,"
+			" is sheepkeeper running?\n");
+		vprintf(SDOG_ERR, "errno: %s\n", strerror(errno));
+		return -1;
+	}
+
+	return 0;
+}
+
+static int sheepkeeper_join(const struct sd_node *myself,
+		      void *opaque, size_t opaque_len)
+{
+	int ret, msg_join_len;
+	struct iovec iov[2];
+	struct sk_msg msg;
+	struct sk_msg_join *msg_join;
+
+	msg_join_len = sizeof(struct sk_msg_join) + opaque_len;
+
+	memset(&msg, 0, sizeof(struct sk_msg));
+	msg.type = SK_MSG_JOIN;
+	msg.body_len = msg_join_len;
+
+	iov[0].iov_base = &msg;
+	iov[0].iov_len = sizeof(struct sk_msg);
+
+	msg_join = calloc(msg_join_len, sizeof(char));
+	this_node = *myself;
+	msg_join->node = this_node;
+	memcpy(msg_join->opaque, opaque, opaque_len);
+
+	iov[1].iov_base = msg_join;
+	iov[1].iov_len = msg_join_len;
+
+	ret = writev(sk_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "sheepkeeper_join() failed, %s\n",
+			strerror(errno));
+		return -1;
+	}
+
+	register_event(sk_comm_fd, sheepkeeper_comm_handler, NULL);
+
+	return 0;
+}
+
+static int sheepkeeper_leave(void)
+{
+	int ret;
+	struct sk_msg msg;
+
+	msg.type = SK_MSG_LEAVE;
+	msg.body_len = 0;
+
+	ret = xwrite(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_INFO, "xwrite() failed\n");
+		exit(1);
+	}
+
+	unregister_event(sk_comm_fd);
+	close(sk_comm_fd);
+
+	return 0;
+}
+
+static int sheepkeeper_notify(void *msg, size_t msg_len)
+{
+	int ret;
+	struct sk_msg snd, rcv;
+	struct iovec iov[2];
+
+	struct sk_msg_notify_forward *notify_forward;
+
+	vprintf(SDOG_INFO, "sheepkeeper_notify() called\n");
+
+	snd.type = SK_MSG_NOTIFY;
+	snd.body_len = msg_len;
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(struct sk_msg);
+
+	iov[1].iov_base = msg;
+	iov[1].iov_len = msg_len;
+
+	ret = writev(sk_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "writev() failed\n");
+		exit(1);
+	}
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	if (rcv.type != SK_MSG_NOTIFY_FORWARD) {
+		vprintf(SDOG_ERR, "invalid type: %d\n", rcv.type);
+		exit(1);
+	}
+
+	notify_forward = xmalloc(rcv.body_len);
+	ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
+	if (ret != rcv.body_len) {
+		vprintf(SDOG_INFO, "xread() failed, ret: %d\n", ret);
+		exit(1);
+	}
+
+	sd_notify_handler(&notify_forward->from_node,
+			notify_forward->notify_msg,
+			rcv.body_len - sizeof(struct sd_node));
+
+	snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;
+	ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
+	if (ret != sizeof(snd)) {
+		vprintf(SDOG_ERR, "xwrite() failed\n");
+		exit(1);
+	}
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	if (rcv.type != SK_MSG_NOTIFY_FINISH) {
+		vprintf(SDOG_ERR, "invalid type: %d\n", rcv.type);
+		exit(1);
+	}
+
+	return 0;
+}
+
+static void sheepkeeper_block(void)
+{
+	int ret;
+	struct sk_msg msg;
+
+	msg.type = SK_MSG_BLOCK;
+	msg.body_len = 0;
+
+	ret = xwrite(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_ERR, "xwrite() failed\n");
+		exit(1);
+	}
+
+	ret = xread(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	if (msg.type != SK_MSG_BLOCK_FINISH) {
+		vprintf(SDOG_ERR, "invalid message from sheepkeeper\n");
+		exit(1);
+	}
+
+	sd_block_handler(&this_node);
+}
+
+static void sheepkeeper_unblock(void *msg, size_t msg_len)
+{
+	sheepkeeper_notify(msg, msg_len);
+}
+
+static struct cluster_driver cdrv_sheepkeeper = {
+	.name		= "sheepkeeper",
+
+	.init		= sheepkeeper_init,
+	.join		= sheepkeeper_join,
+	.leave		= sheepkeeper_leave,
+	.notify		= sheepkeeper_notify,
+	.block		= sheepkeeper_block,
+	.unblock	= sheepkeeper_unblock,
+};
+
+cdrv_register(cdrv_sheepkeeper);
diff --git a/sheepkeeper/Makefile.am b/sheepkeeper/Makefile.am
new file mode 100644
index 0000000..4e32aa3
--- /dev/null
+++ b/sheepkeeper/Makefile.am
@@ -0,0 +1,44 @@
+#
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; see the file COPYING.  If not, write to
+# the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+
+MAINTAINERCLEANFILES	= Makefile.in
+
+AM_CFLAGS		=
+
+INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include
+
+sbin_PROGRAMS		= sheepkeeper
+
+sheepkeeper_SOURCES		= sheepkeeper.c
+
+sheepkeeper_LDADD	  	= ../lib/libsheepdog.a
+sheepkeeper_DEPENDENCIES	= ../lib/libsheepdog.a
+
+EXTRA_DIST		=
+
+lint:
+	-splint $(INCLUDES) $(LINT_FLAGS) $(CFLAGS) *.c
+
+all-local:
+	@echo Built sheepkeeper
+
+clean-local:
+	rm -f sheepkeeper *.o gmon.out *.da *.bb *.bbg
+
+# support for GNU Flymake
+check-syntax:
+	$(COMPILE) -fsyntax-only $(CHK_SOURCES)
diff --git a/sheepkeeper/sheepkeeper.c b/sheepkeeper/sheepkeeper.c
new file mode 100644
index 0000000..ebc9c99
--- /dev/null
+++ b/sheepkeeper/sheepkeeper.c
@@ -0,0 +1,501 @@
+/*
+ * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <errno.h>
+#include <assert.h>
+#include <getopt.h>
+
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/epoll.h>
+
+#include <sys/un.h>
+#include <netinet/in.h>
+
+#include "net.h"
+#include "event.h"
+#include "list.h"
+#include "internal_proto.h"
+#include "util.h"
+#include "sheepkeeper.h"
+
+#define EPOLL_SIZE 4096
+
+enum sheep_state {
+	joining,
+	joined,
+};
+
+struct sheep {
+	int fd;
+	struct sd_node node;
+	struct sockaddr_in addr;
+
+	enum sheep_state state;
+
+	bool master;
+
+	void *kept_join_msg;	/* opaque */
+	int kept_join_msg_len;
+
+	struct list_head sheep_list;
+};
+
+static LIST_HEAD(sheep_list_head);
+static int nr_joined_sheep;
+
+static bool running;
+
+static int port = SHEEPKEEPER_PORT;
+static uint8_t addr[16];
+static int sheep_listen_fd;
+
+static const char *progname;
+
+static int build_node_array(struct sd_node *nodes)
+{
+	int i;
+	struct sheep *s;
+
+	i = 0;
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		nodes[i++] = s->node;
+	}
+
+	return i;
+}
+
+static struct sheep *find_sheep_by_nid(struct node_id *id)
+{
+	struct sheep *s;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (!memcmp(&s->node.nid, id, sizeof(struct node_id)))
+			return s;
+	}
+
+	return NULL;
+}
+
+static void leave_sheep(struct sheep *leaving)
+{
+	int ret;
+	struct sd_node node;
+	struct sheep *s;
+	struct sk_msg snd;
+	struct iovec iov[2];
+
+	node = leaving->node;
+
+	close(leaving->fd);
+	list_del(&leaving->sheep_list);
+
+	snd.type = SK_MSG_LEAVE_FORWARD;
+	snd.body_len = sizeof(struct sk_msg);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(struct sk_msg);
+
+	iov[1].iov_base = &node;
+	iov[1].iov_len = sizeof(struct sd_node);
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		ret = writev(s->fd, iov, 2);
+
+		if (ret != (iov[0].iov_len + iov[1].iov_len))
+			panic("writev() failed: %m\n");
+	}
+
+	free(leaving);
+}
+
+static void read_msg_from_sheep(struct sheep *sheep)
+{
+	int ret, fd = sheep->fd;
+	struct sk_msg rcv, snd;
+	struct sk_msg_join *join;
+	struct iovec iov[2];
+
+	struct sheep *s, *s2;
+
+	struct sk_msg_notify_forward *notify_forward;
+
+	struct sk_msg join_reply;
+	struct sk_msg_join_reply *reply_body;
+
+	struct sk_msg_join_node_finish *finish;
+
+	ret = xread(fd, &rcv, sizeof(struct sk_msg));
+	if (ret != sizeof(rcv))
+		panic("xread() failed: %m\n");
+
+	switch (rcv.type) {
+	case SK_MSG_JOIN:
+		join = xcalloc(rcv.body_len, sizeof(char));
+		ret = xread(fd, join, rcv.body_len);
+
+		sheep->node = join->node;
+
+		snd.type = SK_MSG_NEW_NODE;
+		snd.body_len = rcv.body_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = rcv.body_len;
+
+		if (!nr_joined_sheep) {
+			/* this sheep is a new leader */
+
+			join->master_elected = true;
+			sheep->master = true;
+
+			ret = writev(fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len))
+				panic("writev() failed: %m\n");
+
+		} else {
+			list_for_each_entry(s, &sheep_list_head, sheep_list) {
+				if (sheep == s)
+					continue;
+
+				if (s->state == joining) {
+					vprintf(SDOG_INFO, "there is already"\
+						" joining node\n");
+					snd.type = SK_MSG_JOIN_RETRY;
+
+					ret = xwrite(fd, &snd, sizeof(snd));
+					if (ret != sizeof(snd))
+						panic("xwrite() failed: %m\n");
+
+					return;
+				}
+			}
+
+			list_for_each_entry(s, &sheep_list_head, sheep_list) {
+				if (!s->master)
+					continue;
+
+				ret = writev(s->fd, iov, 2);
+				if (ret != (iov[0].iov_len + iov[1].iov_len))
+					panic("writev() failed: %m\n");
+				break;
+			}
+
+		}
+
+		sheep->state = joining;
+
+		break;
+
+	case SK_MSG_NEW_NODE_REPLY:
+		vprintf(SDOG_INFO, "new node reply from %s\n",
+			node_to_str(&sheep->node));
+		join = xcalloc(rcv.body_len, sizeof(char));
+		ret = xread(fd, join, rcv.body_len);
+		if (ret != rcv.body_len)
+			panic("xread() failed: %m, ret: %d\n", ret);
+
+		s = find_sheep_by_nid(&join->node.nid);
+		if (!s) {
+			panic("invalid nid is required, %s\n",
+				node_to_str(&join->node));
+		}
+
+		if (!sheep->master)
+			panic("no master replied new node reply\n");
+
+		s->kept_join_msg_len = rcv.body_len -
+			sizeof(struct sk_msg_join);
+		s->kept_join_msg = xcalloc(s->kept_join_msg_len, sizeof(char));
+		memcpy(s->kept_join_msg, join->opaque, s->kept_join_msg_len);
+
+		memset(&join_reply, 0, sizeof(struct sk_msg));
+		join_reply.type = SK_MSG_JOIN_REPLY;
+
+		join_reply.body_len = sizeof(struct sk_msg_join_reply)
+			+ s->kept_join_msg_len;
+		iov[0].iov_base = &join_reply;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		reply_body = xcalloc(join_reply.body_len, sizeof(char));
+
+		reply_body->nr_nodes = build_node_array(reply_body->nodes);
+		memcpy(reply_body->opaque, s->kept_join_msg,
+			s->kept_join_msg_len);
+
+		iov[1].iov_base = reply_body;
+		iov[1].iov_len = join_reply.body_len;
+
+		ret = writev(s->fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len))
+			panic("writev() failed: %m\n");
+
+		free(reply_body);
+
+		nr_joined_sheep++;
+		s->state = joined;
+
+		snd.type = SK_MSG_NEW_NODE_FINISH;
+		snd.body_len = sizeof(*finish) + s->kept_join_msg_len;
+
+		finish = xcalloc(snd.body_len, sizeof(char));
+		finish->new_node = s->node;
+		memcpy(finish->opaque, s->kept_join_msg, s->kept_join_msg_len);
+		finish->nr_nodes = build_node_array(finish->nodes);
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = finish;
+		iov[1].iov_len = snd.body_len;
+
+		list_for_each_entry(s2, &sheep_list_head, sheep_list) {
+			if (s2 == s)
+				continue;
+
+			ret = writev(s2->fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len))
+				panic("writev() failed: %m\n");
+		}
+
+		free(finish);
+
+		break;
+
+	case SK_MSG_NOTIFY:
+		notify_forward = malloc(rcv.body_len + sizeof(*notify_forward));
+		ret = xread(fd, notify_forward->notify_msg, rcv.body_len);
+		if (ret != rcv.body_len)
+			panic("xread() failed: %m\n");
+
+		snd.type = SK_MSG_NOTIFY_FORWARD;
+		snd.body_len = rcv.body_len + sizeof(*notify_forward);
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		notify_forward->from_node = sheep->node;
+
+		iov[1].iov_base = notify_forward;
+		iov[1].iov_len = snd.body_len;
+
+		list_for_each_entry(s, &sheep_list_head, sheep_list) {
+			ret = writev(s->fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len))
+				panic("writev() failed: %m\n");
+
+			ret = xread(s->fd, &rcv, sizeof(rcv));
+			if (ret != sizeof(rcv))
+				panic("xread() failed: %m\n");
+
+			if (rcv.type != SK_MSG_NOTIFY_FORWARD_REPLY)
+				panic("invalid type received: %d\n", rcv.type);
+		}
+
+		snd.type = SK_MSG_NOTIFY_FINISH;
+		ret = xwrite(sheep->fd, &snd, sizeof(snd));
+		if (ret != sizeof(snd))
+			panic("xwrite() failed: %m\n");
+
+		break;
+
+	case SK_MSG_BLOCK:
+		list_for_each_entry(s, &sheep_list_head, sheep_list) {
+			struct sk_msg msg;
+
+			if (s == sheep)
+				continue;
+
+			msg.type = SK_MSG_BLOCK_FORWARD;
+			msg.body_len = sizeof(struct sd_node);
+
+			iov[0].iov_base = &msg;
+			iov[0].iov_len = sizeof(struct sk_msg);
+
+			iov[1].iov_base = &sheep->node;
+			iov[1].iov_len = sizeof(struct sd_node);
+
+			ret = writev(s->fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len))
+				panic("writev() failed: %m\n");
+
+			ret = xread(s->fd, &msg, sizeof(struct sk_msg));
+			if (ret != sizeof(struct sk_msg))
+				panic("xread() failed: %m\n");
+
+			if (msg.type != SK_MSG_BLOCK_FORWARD_REPLY) {
+				panic("invalid message from sheep, %d\n",
+					msg.type);
+			}
+		}
+
+		snd.type = SK_MSG_BLOCK_FINISH;
+		snd.body_len = 0;
+		ret = xwrite(sheep->fd, &snd, sizeof(struct sk_msg));
+		if (ret != sizeof(struct sk_msg))
+			panic("xwrite() failed: %m\n");
+
+		break;
+
+	case SK_MSG_LEAVE:
+		leave_sheep(sheep);
+		break;
+
+	default:
+		panic("invalid message from sheep: %d\n", rcv.type);
+		break;
+	}
+}
+
+static void sheep_comm_handler(int fd, int events, void *data)
+{
+	if (events & EPOLLIN)
+		read_msg_from_sheep(data);
+	else if (events & EPOLLHUP || events & EPOLLERR)
+		leave_sheep(data);
+}
+
+static void sheep_accept_handler(int fd, int events, void *data)
+{
+	struct sheep *new_sheep;
+	socklen_t len;
+
+	new_sheep = xcalloc(1, sizeof(struct sheep));
+	INIT_LIST_HEAD(&new_sheep->sheep_list);
+
+	len = sizeof(struct sockaddr_in);
+	new_sheep->fd = accept(fd, (struct sockaddr *)&new_sheep->addr, &len);
+	if (new_sheep->fd < 0)
+		panic("accept() failed: %m\n");
+
+	if (-1 == set_keepalive(new_sheep->fd))
+		panic("set_keepalive() failed: %m\n");
+
+	list_add_tail(&new_sheep->sheep_list, &sheep_list_head);
+	register_event(new_sheep->fd, sheep_comm_handler, new_sheep);
+
+	vprintf(SDOG_INFO, "accepted new sheep connection\n");
+}
+
+static struct option const long_options[] = {
+	{ "port", required_argument, NULL, 'p' },
+	{ "address", required_argument, NULL, 'a' },
+	{ "foreground", no_argument, NULL, 'f' },
+	{ "debug", no_argument, NULL, 'd' },
+
+	{ NULL, 0, NULL, 0 },
+};
+
+static const char *short_options = "p:a:fd";
+
+static void exit_handler(void)
+{
+	vprintf(SDOG_INFO, "exiting...\n");
+}
+
+int main(int argc, char **argv)
+{
+	int ch, ret, longindex, opt;
+	char *p;
+	bool daemonize = true;
+	struct sockaddr_in listen_addr;
+	int log_level = SDOG_INFO;
+
+	progname = argv[0];
+
+	while ((ch = getopt_long(argc, argv, short_options, long_options,
+				 &longindex)) >= 0) {
+		switch (ch) {
+		case 'p':
+			port = strtol(optarg, &p, 10);
+			if (p == optarg) {
+				fprintf(stderr, "invalid port: %s\n", optarg);
+				exit(1);
+			}
+			break;
+
+		case 'a':
+			if (!str_to_addr(AF_INET, (const char *)optarg, addr)) {
+				fprintf(stderr, "invalid address: %s\n",
+					optarg);
+				exit(1);
+			}
+			break;
+
+		case 'f':
+			daemonize = false;
+			break;
+
+		case 'd':
+			log_level = SDOG_DEBUG;
+			break;
+
+		default:
+			fprintf(stderr, "unknown option\n");
+			exit(1);
+			break;
+		}
+	}
+
+	if (daemonize)
+		daemon(0, 0);
+
+	/* our log will be appear in a default output of syslog */
+	ret = log_init(progname, LOG_SPACE_SIZE, !daemonize, log_level, NULL);
+	if (ret)
+		panic("initialize logger failed: %m\n");
+
+	atexit(exit_handler);
+	init_event(EPOLL_SIZE);
+
+	/* setup inet socket for communication with sheeps */
+	sheep_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sheep_listen_fd < 0)
+		panic("socket() failed: %m\n");
+
+	opt = 1;
+	ret = setsockopt(sheep_listen_fd, SOL_SOCKET, SO_REUSEADDR,
+			&opt, sizeof(opt));
+	if (ret == -1)
+		panic("setsockopt() for SO_REUSEADDR failed: %m\n");
+
+	memset(&listen_addr, 0, sizeof(listen_addr));
+	listen_addr.sin_family = AF_INET;
+	listen_addr.sin_port = port;
+	memcpy(&listen_addr.sin_addr, addr, sizeof(addr));
+
+	ret = bind(sheep_listen_fd, &listen_addr, sizeof(struct sockaddr_in));
+	if (ret == -1)
+		panic("bind() failed: %m\n");
+
+	ret = listen(sheep_listen_fd, 1);
+	if (ret == -1)
+		panic("listen() failed: %m\n");
+
+	ret = register_event(sheep_listen_fd, sheep_accept_handler, NULL);
+	if (ret)
+		panic("register_event() failed: %m\n");
+
+	running = true;
+
+	while (running)
+		event_loop(-1);
+
+	return 0;
+}
-- 
1.7.2.5




More information about the sheepdog mailing list