[sheepdog] [PATCH] sheepkeeper: a new cluster manager specialized for sheepdog

Hitoshi Mitake h.mitake at gmail.com
Sun Oct 21 15:42:36 CEST 2012


This patch adds sheepkeeper, a new cluster manager for sheepdog.

This is still very incomplete, very buggy and very inefficient. The
degree of incompleteness is that it doesn't free() obsolete memory
area and checking return values of system calls which must be done.
In addition, it can't handle many important situation of join and
leave of nodes. And it doesn't handle almost every error. A single
death of node can kill an entire system.

sheepkeeper cluster manager is designed as master-slave. So it can be
SPOF. But I'm planning to let it be redundant by utilizing corosync.

This patch adds the new directory sheepkeeper/ for it, and the
executable binary will be produced as sheepkeeper/sheepkeeper.
The cluster driver for sheep is added as sheep/cluster/sheepkeeper.c.

And I have to note that this patch adds odd change to lib/event.c. A
new member is added to struct event_info. This new member is added for
avoiding memory corruption caused by other part of this patch. I can't
pointed out the bug, I'll try to find it later and remove the dummy
member.

If it can be improved, this can be an alternative of the group
management part of ZooKeeper, and it doesn't depend on JVM.

If you have some opinions or feature requests for a cluster manager
designed and implemented from scratch, I'd like to hear your comments.

Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
 .gitignore                  |    1 +
 Makefile.am                 |    2 +-
 configure.ac                |    3 +-
 include/sheepkeeper.h       |   61 +++++
 lib/event.c                 |    2 +
 sheep/Makefile.am           |    2 +
 sheep/cluster/sheepkeeper.c |  435 +++++++++++++++++++++++++++++++++++
 sheepkeeper/Makefile.am     |   44 ++++
 sheepkeeper/sheepkeeper.c   |  530 +++++++++++++++++++++++++++++++++++++++++++
 9 files changed, 1078 insertions(+), 2 deletions(-)
 create mode 100644 include/sheepkeeper.h
 create mode 100644 sheep/cluster/sheepkeeper.c
 create mode 100644 sheepkeeper/Makefile.am
 create mode 100644 sheepkeeper/sheepkeeper.c

diff --git a/.gitignore b/.gitignore
index dbdbd55..ffc9003 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,6 +31,7 @@ GSYMS
 collie/collie
 sheep/sheep
 sheepfs/sheepfs
+sheepkeeper/sheepkeeper
 
 # directories
 .deps
diff --git a/Makefile.am b/Makefile.am
index f1b97eb..8914408 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -17,7 +17,7 @@ sheepdogsysconfdir	= ${SHEEPDOGCONFDIR}
 
 sheepdogsysconf_DATA	= 
 
-SUBDIRS			= lib collie sheep include script man
+SUBDIRS			= lib collie sheep include script man sheepkeeper
 
 if BUILD_SHEEPFS
 SUBDIRS			+= sheepfs
diff --git a/configure.ac b/configure.ac
index 3cb47c6..4cfb4d0 100644
--- a/configure.ac
+++ b/configure.ac
@@ -133,7 +133,8 @@ AC_CONFIG_FILES([Makefile
 		include/Makefile
 		script/Makefile
 		lib/Makefile
-		man/Makefile])
+		man/Makefile
+		sheepkeeper/Makefile])
 
 ### Local business
 
diff --git a/include/sheepkeeper.h b/include/sheepkeeper.h
new file mode 100644
index 0000000..061f53a
--- /dev/null
+++ b/include/sheepkeeper.h
@@ -0,0 +1,61 @@
+#ifndef SHEEPKEEPER_H
+#define SHEEPKEEPER_H
+
+enum sk_msg_type {
+	SK_MSG_JOIN = 1,
+	SK_MSG_JOIN_REPLY,
+	SK_MSG_JOIN_RETRY,
+
+	SK_MSG_NEW_NODE,
+	SK_MSG_NEW_NODE_REPLY,
+	SK_MSG_NEW_NODE_FINISH,
+
+	SK_MSG_NOTIFY,
+	SK_MSG_NOTIFY_FORWARD,
+	SK_MSG_NOTIFY_FORWARD_REPLY,
+	SK_MSG_NOTIFY_FINISH,
+
+	SK_MSG_BLOCK,
+	SK_MSG_BLOCK_FORWARD,
+	SK_MSG_BLOCK_FORWARD_REPLY,
+	SK_MSG_BLOCK_FINISH,
+
+	SK_MSG_LEAVE,
+	SK_MSG_LEAVE_FORWARD,
+};
+
+struct sk_msg {
+	enum sk_msg_type type;
+	int body_len;
+};
+
+#include "internal_proto.h"
+
+struct sk_msg_join {
+	struct sd_node node;
+	bool master_elected;
+	char opaque[0];
+};
+
+struct sk_msg_join_reply {
+	struct sd_node nodes[SD_MAX_NODES];
+	int nr_nodes;
+	char opaque[0];
+};
+
+struct sk_msg_join_node_finish {
+	struct sd_node new_node;
+
+	struct sd_node nodes[SD_MAX_NODES];
+	int nr_nodes;
+	char opaque[0];
+};
+
+struct sk_msg_notify_forward {
+	struct sd_node from_node;
+	char notify_msg[0];
+};
+
+#define SHEEPKEEPER_PORT 2501
+
+#endif	/* SHEEPKEEPER_H */
diff --git a/lib/event.c b/lib/event.c
index 2154a56..1244399 100644
--- a/lib/event.c
+++ b/lib/event.c
@@ -66,6 +66,8 @@ void add_timer(struct timer *t, unsigned int mseconds)
 }
 
 struct event_info {
+	int dummy;/* FIXME: this must be removed */
+
 	event_handler_t handler;
 	int fd;
 	void *data;
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index ed927d8..4106276 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -39,6 +39,8 @@ if BUILD_ACCORD
 sheep_SOURCES		+= cluster/accord.c
 endif
 
+sheep_SOURCES		+= cluster/sheepkeeper.c
+
 sheep_SOURCES		+= farm/sha1_file.c farm/trunk.c farm/snap.c farm/farm.c
 
 if BUILD_TRACE
diff --git a/sheep/cluster/sheepkeeper.c b/sheep/cluster/sheepkeeper.c
new file mode 100644
index 0000000..2e0d862
--- /dev/null
+++ b/sheep/cluster/sheepkeeper.c
@@ -0,0 +1,435 @@
+/*
+ * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+
+#include "cluster.h"
+#include "event.h"
+#include "sheepkeeper.h"
+#include "internal_proto.h"
+
+static int sk_comm_fd;
+
+static struct sd_node this_node;
+static bool is_master;
+
+static int nr_nodes;
+static struct sd_node nodes[SD_MAX_NODES];
+
+static void read_msg_from_sheepkeeper(void)
+{
+	int i, j, ret;
+	struct sk_msg rcv, snd;
+	struct sd_node sender;
+	struct iovec iov[2];
+	struct sk_msg_join *join;
+	struct sk_msg_notify_forward *notify_forward;
+	struct sk_msg_join_node_finish *join_node_finish;
+	struct join_message *jm;
+
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	switch (rcv.type) {
+	case SK_MSG_NEW_NODE:
+		join = xmalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, join, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		if (is_master)
+			sd_check_join_cb(&join->node, join->opaque);
+
+		bzero(&snd, sizeof(snd));
+		snd.type = SK_MSG_NEW_NODE_REPLY;
+		snd.body_len = rcv.body_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = rcv.body_len;
+
+		ret = writev(sk_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_NEW_NODE_FINISH:
+		join_node_finish = xmalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, join_node_finish, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		jm = (struct join_message *)join_node_finish->opaque;
+		memcpy(nodes, join_node_finish->nodes,
+			join_node_finish->nr_nodes * sizeof(struct sd_node));
+		nr_nodes = join_node_finish->nr_nodes;
+
+		sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
+				CJ_RES_SUCCESS, jm);
+
+		break;
+
+	case SK_MSG_NOTIFY_FORWARD:
+		notify_forward = xmalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		sd_notify_handler(&notify_forward->from_node,
+				notify_forward->notify_msg,
+				rcv.body_len - sizeof(*notify_forward));
+
+		snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;
+		ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
+		if (ret != sizeof(snd)) {
+			vprintf(SDOG_ERR, "xwrite() failed\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_BLOCK_FORWARD:
+		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		snd.type = SK_MSG_BLOCK_FORWARD_REPLY;
+		snd.body_len = 0;
+		ret = xwrite(sk_comm_fd, &snd, sizeof(struct sk_msg));
+		if (ret != sizeof(snd)) {
+			vprintf(SDOG_ERR, "xwrite() failed\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_LEAVE_FORWARD:
+		ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed\n");
+			exit(1);
+		}
+
+		for (i = 0; i < nr_nodes; i++) {
+			if (!memcmp(&sender, &nodes[i], sizeof(sender))) {
+				nr_nodes--;
+
+				for (j = i; j < nr_nodes; j++)
+					nodes[j] = nodes[j + 1];
+
+				goto removed;
+			}
+		}
+
+		vprintf(SDOG_INFO, "leave message from unknown node\n");
+		exit(1);
+
+removed:
+		sd_leave_handler(&sender, nodes, nr_nodes);
+		break;
+
+	default:
+		vprintf(SDOG_ERR, "invalid message from sheepkeeper: %d, "
+			"length: %d\n", rcv.type, rcv.body_len);
+		exit(1);
+		break;
+	}
+}
+
+static void sheepkeeper_comm_handler(int fd, int events, void *data)
+{
+	if (events & EPOLLIN)
+		read_msg_from_sheepkeeper();
+}
+
+static int sheepkeeper_init(const char *option)
+{
+	struct sockaddr_in addr;
+
+	sk_comm_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sk_comm_fd < 0) {
+		vprintf(SDOG_INFO, "error at socket()\n");
+		return -1;
+	}
+
+	bzero(&addr, sizeof(struct sockaddr_in));
+	addr.sin_family = AF_INET;
+	addr.sin_port = SHEEPKEEPER_PORT;
+
+	if (connect(sk_comm_fd, &addr, sizeof(struct sockaddr_in))) {
+		vprintf(SDOG_ERR, "cannot connect to sheepkeeper,"
+			" is sheepkeeper running?\n");
+		vprintf(SDOG_ERR, "errno: %s\n", strerror(errno));
+		return -1;
+	}
+
+	return 0;
+}
+
+static int sheepkeeper_join(struct sd_node *myself,
+		      void *opaque, size_t opaque_len)
+{
+	int ret, msg_join_len;
+	struct iovec iov[2];
+	struct sk_msg msg, rcv;
+	struct sk_msg_join *msg_join;
+	struct sk_msg_join_reply *join_reply;
+
+	msg_join_len = sizeof(struct sk_msg_join) + opaque_len;
+
+	bzero(&msg, sizeof(struct sk_msg));
+	msg.type = SK_MSG_JOIN;
+	msg.body_len = msg_join_len;
+
+	iov[0].iov_base = &msg;
+	iov[0].iov_len = sizeof(struct sk_msg);
+
+	msg_join = calloc(opaque_len, sizeof(char));
+	this_node = *myself;
+	msg_join->node = this_node;
+	memcpy(msg_join->opaque, opaque, opaque_len);
+
+	iov[1].iov_base = msg_join;
+	iov[1].iov_len = msg_join_len;
+
+retry:
+	ret = writev(sk_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "sheepkeeper_join() failed, %s\n",
+			strerror(errno));
+		return -1;
+	}
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(struct sk_msg));
+	if (rcv.type == SK_MSG_JOIN_RETRY) {
+		vprintf(SDOG_INFO, "join request is rejected, retrying\n");
+		goto retry;
+	} else if (rcv.type == SK_MSG_NEW_NODE) {
+		struct sk_msg_join *join;
+		int join_len;
+
+		join_len = msg.body_len;
+		join = xcalloc(join_len, sizeof(char));
+		ret = xread(sk_comm_fd, join, join_len);
+
+		ret = sd_check_join_cb(&join->node, join->opaque);
+
+		if (join->master_elected) {
+			vprintf(SDOG_INFO, "elected as master\n");
+			is_master = true;
+		}
+
+		msg.type = SK_MSG_NEW_NODE_REPLY;
+		msg.body_len = join_len;
+
+		iov[0].iov_base = &msg;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = join_len;
+
+		ret = writev(sk_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed\n");
+			exit(1);
+		}
+
+		ret = xread(sk_comm_fd, &rcv, sizeof(struct sk_msg));
+		if (ret != sizeof(rcv)) {
+			vprintf(SDOG_ERR, "invalid ret: %d\n", ret);
+			exit(1);
+		}
+	}
+
+	if (rcv.type != SK_MSG_JOIN_REPLY) {
+		vprintf(SDOG_INFO, "invalid message from sheepkeeper, "
+			"received message: %d\n", rcv.type);
+		exit(1);
+	}
+
+	join_reply = xmalloc(rcv.body_len);
+	ret = xread(sk_comm_fd, join_reply, rcv.body_len);
+	if (ret != rcv.body_len) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	vprintf(SDOG_INFO, "join reply arrived, nr_nodes: %d\n",
+		join_reply->nr_nodes);
+	sd_join_handler(&this_node, join_reply->nodes, join_reply->nr_nodes,
+			CJ_RES_SUCCESS, join_reply->opaque);
+
+	memcpy(nodes, join_reply->nodes,
+		join_reply->nr_nodes * sizeof(struct sd_node));
+	nr_nodes = join_reply->nr_nodes;
+
+	register_event(sk_comm_fd, sheepkeeper_comm_handler, NULL);
+
+	vprintf(SDOG_INFO, "sheepkeeper_join() succeed\n");
+	return 0;
+}
+
+static int sheepkeeper_leave(void)
+{
+	int ret;
+	struct sk_msg msg;
+
+	msg.type = SK_MSG_LEAVE;
+	msg.body_len = 0;
+
+	ret = xwrite(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_INFO, "xwrite() failed\n");
+		exit(1);
+	}
+
+	return 0;
+}
+
+static int sheepkeeper_notify(void *msg, size_t msg_len)
+{
+	int ret;
+	struct sk_msg snd, rcv;
+	struct iovec iov[2];
+
+	struct sk_msg_notify_forward *notify_forward;
+
+	vprintf(SDOG_INFO, "sheepkeeper_notify() called\n");
+
+	snd.type = SK_MSG_NOTIFY;
+	snd.body_len = msg_len;
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(struct sk_msg);
+
+	iov[1].iov_base = msg;
+	iov[1].iov_len = msg_len;
+
+	ret = writev(sk_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "writev() failed\n");
+		exit(1);
+	}
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	if (rcv.type != SK_MSG_NOTIFY_FORWARD) {
+		vprintf(SDOG_ERR, "invalid type: %d\n", rcv.type);
+		exit(1);
+	}
+
+	notify_forward = xmalloc(rcv.body_len);
+	ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
+	if (ret != rcv.body_len) {
+		vprintf(SDOG_INFO, "xread() failed, ret: %d\n", ret);
+		exit(1);
+	}
+
+	sd_notify_handler(&notify_forward->from_node,
+			notify_forward->notify_msg,
+			rcv.body_len - sizeof(struct sd_node));
+
+	snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;
+	ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
+	if (ret != sizeof(snd)) {
+		vprintf(SDOG_ERR, "xwrite() failed\n");
+		exit(1);
+	}
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	if (rcv.type != SK_MSG_NOTIFY_FINISH) {
+		vprintf(SDOG_ERR, "invalid type: %d\n", rcv.type);
+		exit(1);
+	}
+
+	return 0;
+}
+
+static void sheepkeeper_block(void)
+{
+	int ret;
+	struct sk_msg msg;
+
+	msg.type = SK_MSG_BLOCK;
+	msg.body_len = 0;
+
+	ret = xwrite(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_ERR, "xwrite() failed\n");
+		exit(1);
+	}
+
+	ret = xread(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_ERR, "xread() failed\n");
+		exit(1);
+	}
+
+	if (msg.type != SK_MSG_BLOCK_FINISH) {
+		vprintf(SDOG_ERR, "invalid message from sheepkeeper\n");
+		exit(1);
+	}
+
+	sd_block_handler(&this_node);
+}
+
+static void sheepkeeper_unblock(void *msg, size_t msg_len)
+{
+	sheepkeeper_notify(msg, msg_len);
+}
+
+static struct cluster_driver cdrv_sheepkeeper = {
+	.name		= "sheepkeeper",
+
+	.init		= sheepkeeper_init,
+	.join		= sheepkeeper_join,
+	.leave		= sheepkeeper_leave,
+	.notify		= sheepkeeper_notify,
+	.block		= sheepkeeper_block,
+	.unblock	= sheepkeeper_unblock,
+};
+
+cdrv_register(cdrv_sheepkeeper);
diff --git a/sheepkeeper/Makefile.am b/sheepkeeper/Makefile.am
new file mode 100644
index 0000000..4e32aa3
--- /dev/null
+++ b/sheepkeeper/Makefile.am
@@ -0,0 +1,44 @@
+#
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; see the file COPYING.  If not, write to
+# the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+
+MAINTAINERCLEANFILES	= Makefile.in
+
+AM_CFLAGS		=
+
+INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include
+
+sbin_PROGRAMS		= sheepkeeper
+
+sheepkeeper_SOURCES		= sheepkeeper.c
+
+sheepkeeper_LDADD	  	= ../lib/libsheepdog.a
+sheepkeeper_DEPENDENCIES	= ../lib/libsheepdog.a
+
+EXTRA_DIST		=
+
+lint:
+	-splint $(INCLUDES) $(LINT_FLAGS) $(CFLAGS) *.c
+
+all-local:
+	@echo Built sheepkeeper
+
+clean-local:
+	rm -f sheepkeeper *.o gmon.out *.da *.bb *.bbg
+
+# support for GNU Flymake
+check-syntax:
+	$(COMPILE) -fsyntax-only $(CHK_SOURCES)
diff --git a/sheepkeeper/sheepkeeper.c b/sheepkeeper/sheepkeeper.c
new file mode 100644
index 0000000..3249958
--- /dev/null
+++ b/sheepkeeper/sheepkeeper.c
@@ -0,0 +1,530 @@
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <errno.h>
+#include <assert.h>
+#include <getopt.h>
+
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/epoll.h>
+
+#include <sys/un.h>
+#include <netinet/in.h>
+
+#include "net.h"
+#include "event.h"
+#include "list.h"
+#include "internal_proto.h"
+#include "sheepkeeper.h"
+
+static bool daemonized;
+
+static inline char *node_to_str(struct sd_node *id)
+{
+	static char str[256];
+	char name[256];
+	int af = AF_INET6;
+	uint8_t *addr = id->nid.addr;
+
+	/* Find address family type */
+	if (addr[12]) {
+		int  oct_no = 0;
+		while (!addr[oct_no] && oct_no++ < 12)
+			;
+		if (oct_no == 12)
+			af = AF_INET;
+	}
+
+	snprintf(str, sizeof(str), "%s ip:%s port:%d",
+		(af == AF_INET) ? "IPv4" : "IPv6",
+		addr_to_str(name, sizeof(name), id->nid.addr, 0), id->nid.port);
+
+	return str;
+}
+
+#define die(fmt, ...) do {						\
+		if (!daemonized) {					\
+			fprintf(stderr, "%s, %d (errno: %s): " fmt,	\
+				__FILE__, __LINE__, strerror(errno),	\
+				## __VA_ARGS__);			\
+		} else {						\
+			syslog(LOG_ERR, "%s, %d (errno: %s): " fmt,	\
+				__FILE__, __LINE__, strerror(errno),	\
+				## __VA_ARGS__);			\
+		}							\
+									\
+		exit(1);						\
+	} while (0)
+
+#define log(fmt, ...) do {						\
+		if (!daemonized) {					\
+			fprintf(stdout, "%s, %d: " fmt,			\
+				__FILE__, __LINE__,			\
+				## __VA_ARGS__);			\
+		} else {						\
+			syslog(LOG_INFO, "%s, %d: " fmt,		\
+				__FILE__, __LINE__,			\
+				## __VA_ARGS__);			\
+		}							\
+	} while (0)
+
+#define debug_printf(fmt, ...) do {					\
+		if (!daemonized) {					\
+			fprintf(stdout, "%s, %d (errno: %s): " fmt,	\
+				__FILE__, __LINE__, strerror(errno),	\
+				## __VA_ARGS__);			\
+		} else {						\
+			syslog(LOG_DEBUG, "%s, %d (errno: %s): " fmt,	\
+				__FILE__, __LINE__, strerror(errno),	\
+				## __VA_ARGS__);			\
+		}							\
+	} while (0)
+
+static void *_xcalloc(size_t nmemb, size_t size)
+{
+	void *ret;
+
+	ret = calloc(nmemb, size);
+	if (!ret)
+		die("error at calloc()\n");
+
+	return ret;
+}
+
+#define EPOLL_SIZE 4096
+
+enum sheep_state {
+	joining,
+	joined,
+};
+
+struct sheep {
+	int fd;
+	struct sd_node node;
+	struct sockaddr_in addr;
+
+	enum sheep_state state;
+
+	bool master;
+
+	void *kept_join_msg;	/* opaque */
+	int kept_join_msg_len;
+
+	struct list_head sheep_list;
+};
+
+static LIST_HEAD(sheep_list_head);
+static int nr_joined_sheep;
+
+static bool running;
+
+static int port;
+static uint8_t addr[16];
+static int sheep_listen_fd;
+
+static const char *progname;
+
+static int build_node_array(struct sd_node *nodes)
+{
+	int i;
+	struct sheep *s;
+
+	i = 0;
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		nodes[i++] = s->node;
+	}
+
+	return i;
+}
+
+static struct sheep *find_sheep_by_nid(struct node_id *id)
+{
+	struct sheep *s;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (!memcmp(&s->node.nid, id, sizeof(struct node_id)))
+			return s;
+	}
+
+	return NULL;
+}
+
+static void read_msg_from_sheep(struct sheep *sheep)
+{
+	int ret, fd = sheep->fd;
+	struct sk_msg rcv, snd;
+	struct sk_msg_join *join;
+	struct iovec iov[2];
+
+	struct sheep *s, *s2;
+
+	struct sk_msg_notify_forward *notify_forward;
+	struct sd_node node;
+
+	struct sk_msg join_reply;
+	struct sk_msg_join_reply *reply_body;
+
+	struct sk_msg_join_node_finish *finish;
+
+	ret = xread(fd, &rcv, sizeof(struct sk_msg));
+	if (ret != sizeof(rcv))
+		die("xread() failed\n");
+
+	switch (rcv.type) {
+	case SK_MSG_JOIN:
+		join = _xcalloc(rcv.body_len, sizeof(char));
+		ret = xread(fd, join, rcv.body_len);
+
+		sheep->node = join->node;
+
+		snd.type = SK_MSG_NEW_NODE;
+		snd.body_len = rcv.body_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = rcv.body_len;
+
+		if (!nr_joined_sheep) {
+			/* this sheep is a new leader */
+
+			join->master_elected = true;
+			sheep->master = true;
+
+			ret = writev(fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len))
+				die("writev() failed\n");
+
+		} else {
+			list_for_each_entry(s, &sheep_list_head, sheep_list) {
+				if (sheep == s)
+					continue;
+
+				if (s->state == joining) {
+					log("there is already joining node\n");
+					snd.type = SK_MSG_JOIN_RETRY;
+
+					ret = xwrite(fd, &snd, sizeof(snd));
+					if (ret != sizeof(snd))
+						die("xwrite() failed\n");
+
+					return;
+				}
+			}
+
+			list_for_each_entry(s, &sheep_list_head, sheep_list) {
+				if (!s->master)
+					continue;
+
+				ret = writev(s->fd, iov, 2);
+				if (ret != (iov[0].iov_len + iov[1].iov_len))
+					die("writev() failed\n");
+				break;
+			}
+
+		}
+
+		sheep->state = joining;
+
+		break;
+
+	case SK_MSG_NEW_NODE_REPLY:
+		log("new node reply from %s\n", node_to_str(&sheep->node));
+		join = _xcalloc(rcv.body_len, sizeof(char));
+		ret = xread(fd, join, rcv.body_len);
+		if (ret != rcv.body_len)
+			die("xread() failed, ret: %d\n", ret);
+
+		s = find_sheep_by_nid(&join->node.nid);
+		if (!s) {
+			die("invalid nid is required, %s\n",
+				node_to_str(&join->node));
+		}
+
+		if (!sheep->master)
+			die("no master replied new node reply\n");
+
+		s->kept_join_msg_len = rcv.body_len -
+			sizeof(struct sk_msg_join);
+		s->kept_join_msg = _xcalloc(s->kept_join_msg_len, sizeof(char));
+		memcpy(s->kept_join_msg, join->opaque, s->kept_join_msg_len);
+
+		log("reserved opaque, length: %d\n", s->kept_join_msg_len);
+
+		bzero(&join_reply, sizeof(struct sk_msg));
+		join_reply.type = SK_MSG_JOIN_REPLY;
+
+		join_reply.body_len = sizeof(struct sk_msg_join_reply)
+			+ s->kept_join_msg_len;
+		iov[0].iov_base = &join_reply;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		reply_body = _xcalloc(join_reply.body_len, sizeof(char));
+
+		reply_body->nr_nodes = build_node_array(reply_body->nodes);
+		memcpy(reply_body->opaque, s->kept_join_msg,
+			s->kept_join_msg_len);
+
+		iov[1].iov_base = reply_body;
+		iov[1].iov_len = join_reply.body_len;
+
+		ret = writev(s->fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len))
+			die("writev() failed\n");
+
+		free(reply_body);
+
+		nr_joined_sheep++;
+		s->state = joined;
+
+		snd.type = SK_MSG_NEW_NODE_FINISH;
+		snd.body_len = sizeof(*finish) + s->kept_join_msg_len;
+
+		finish = _xcalloc(snd.body_len, sizeof(char));
+		finish->new_node = s->node;
+		memcpy(finish->opaque, s->kept_join_msg, s->kept_join_msg_len);
+		finish->nr_nodes = build_node_array(finish->nodes);
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = finish;
+		iov[1].iov_len = snd.body_len;
+
+		list_for_each_entry(s2, &sheep_list_head, sheep_list) {
+			if (s2 == s)
+				continue;
+
+			ret = writev(s2->fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len))
+				die("writev() failed\n");
+		}
+
+		free(finish);
+
+		break;
+
+	case SK_MSG_NOTIFY:
+		notify_forward = malloc(rcv.body_len + sizeof(*notify_forward));
+		ret = xread(fd, notify_forward->notify_msg, rcv.body_len);
+		if (ret != rcv.body_len)
+			die("xread() failed\n");
+
+		snd.type = SK_MSG_NOTIFY_FORWARD;
+		snd.body_len = rcv.body_len + sizeof(*notify_forward);
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		notify_forward->from_node = sheep->node;
+
+		iov[1].iov_base = notify_forward;
+		iov[1].iov_len = snd.body_len;
+
+		list_for_each_entry(s, &sheep_list_head, sheep_list) {
+			ret = writev(s->fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len))
+				die("writev() failed\n");
+
+			ret = xread(s->fd, &rcv, sizeof(rcv));
+			if (ret != sizeof(rcv))
+				die("xread() failed\n");
+
+			if (rcv.type != SK_MSG_NOTIFY_FORWARD_REPLY)
+				die("invalid type received: %d\n", rcv.type);
+
+			log("forwarding...\n");
+		}
+
+		snd.type = SK_MSG_NOTIFY_FINISH;
+		ret = xwrite(sheep->fd, &snd, sizeof(snd));
+		if (ret != sizeof(snd))
+			die("xwrite() failed\n");
+
+		break;
+
+	case SK_MSG_BLOCK:
+		list_for_each_entry(s, &sheep_list_head, sheep_list) {
+			struct sk_msg msg;
+
+			if (s == sheep)
+				continue;
+
+			msg.type = SK_MSG_BLOCK_FORWARD;
+			msg.body_len = sizeof(struct sd_node);
+
+			iov[0].iov_base = &msg;
+			iov[0].iov_len = sizeof(struct sk_msg);
+
+			iov[1].iov_base = &sheep->node;
+			iov[1].iov_len = sizeof(struct sd_node);
+
+			ret = writev(s->fd, iov, 2);
+			if (ret != (iov[0].iov_len + iov[1].iov_len))
+				die("writev() failed\n");
+
+			ret = xread(s->fd, &msg, sizeof(struct sk_msg));
+			if (ret != sizeof(struct sk_msg))
+				die("xread() failed\n");
+
+			if (msg.type != SK_MSG_BLOCK_FORWARD_REPLY) {
+				die("invalid message from sheep, %d\n",
+					msg.type);
+			}
+		}
+
+		snd.type = SK_MSG_BLOCK_FINISH;
+		snd.body_len = 0;
+		ret = xwrite(sheep->fd, &snd, sizeof(struct sk_msg));
+		if (ret != sizeof(struct sk_msg))
+			die("xwrite() failed\n");
+
+		break;
+
+	case SK_MSG_LEAVE:
+		node = sheep->node;
+
+		close(sheep->fd);
+		list_del(&sheep->sheep_list);
+
+		snd.type = SK_MSG_LEAVE_FORWARD;
+		snd.body_len = sizeof(struct sk_msg);
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(struct sk_msg);
+
+		iov[1].iov_base = &node;
+		iov[1].iov_len = sizeof(struct sd_node);
+
+		list_for_each_entry(s, &sheep_list_head, sheep_list) {
+			ret = writev(s->fd, iov, 2);
+
+			if (ret != (iov[0].iov_len + iov[1].iov_len))
+				die("writev() failed\n");
+		}
+
+		free(sheep);
+
+		break;
+
+	default:
+		die("invalid message from sheep: %d\n", rcv.type);
+		break;
+	}
+}
+
+static void sheep_comm_handler(int fd, int events, void *data)
+{
+	if (events & EPOLLIN)
+		read_msg_from_sheep(data);
+}
+
+static void sheep_accept_handler(int fd, int events, void *data)
+{
+	struct sheep *new_sheep;
+	socklen_t len;
+
+	new_sheep = _xcalloc(1, sizeof(struct sheep));
+	INIT_LIST_HEAD(&new_sheep->sheep_list);
+
+	len = sizeof(struct sockaddr_in);
+	new_sheep->fd = accept(fd, &new_sheep->addr, &len);
+	if (new_sheep->fd < 0)
+		die("accept() failed\n");
+
+	if (-1 == set_keepalive(new_sheep->fd))
+		die("set_keepalive() failed\n");
+
+	list_add_tail(&new_sheep->sheep_list, &sheep_list_head);
+	register_event(new_sheep->fd, sheep_comm_handler, new_sheep);
+
+	log("accepted new sheep connection\n");
+}
+
+static struct option const long_options[] = {
+	{ "port", required_argument, NULL, 'p' },
+	{ "address", required_argument, NULL, 'a' },
+	{ "not-daemonize", no_argument, NULL, 'n' },
+};
+
+static const char *short_options = "p:a:n";
+
+static void exit_handler(void)
+{
+	log("exit_handler() called, exiting...\n");
+}
+
+int main(int argc, char **argv)
+{
+	int ch, ret, longindex;
+	char *p;
+	bool daemonize = true;
+
+	struct sockaddr_in listen_addr;
+
+	progname = argv[0];
+
+	atexit(exit_handler);
+	init_event(EPOLL_SIZE);
+
+	while ((ch = getopt_long(argc, argv, short_options, long_options,
+				 &longindex)) >= 0) {
+		switch (ch) {
+		case 'p':
+			port = strtol(optarg, &p, 10);
+			if (p == optarg)
+				die("invalid port: %s\n", optarg);
+			break;
+		case 'a':
+			if (!str_to_addr(AF_INET, optarg, addr))
+				die("invalid address: %s\n", optarg);
+			break;
+		case 'n':
+			daemonize = false;
+			break;
+		}
+	}
+
+	/* setup inet socket for communication with sheeps */
+	sheep_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sheep_listen_fd < 0)
+		die("socket() failed\n");
+
+	bzero(&listen_addr, sizeof(struct sockaddr_in));
+	listen_addr.sin_family = AF_INET;
+	listen_addr.sin_port = port;
+	memcpy(&listen_addr.sin_addr, addr, sizeof(addr));
+
+	ret = bind(sheep_listen_fd, &listen_addr, sizeof(struct sockaddr_in));
+	if (ret == -1)
+		die("bind() failed\n");
+
+	ret = listen(sheep_listen_fd, 1);
+	if (ret == -1)
+		die("listen() failed\n");
+
+
+	register_event(sheep_listen_fd, sheep_accept_handler, NULL);
+
+	if (daemonize) {
+		openlog(progname, LOG_PID, LOG_USER);
+
+		if (daemon(0, 0))
+			die("error at daemon()");
+
+		daemonized = true;
+	}
+
+	running = true;
+
+	while (running)
+		event_loop(-1);
+
+	return 0;
+}
-- 
1.7.5.1




More information about the sheepdog mailing list