[sheepdog] [PATCH v4 3/3] sheepkeeper: a new cluster manager specialized for sheepdog

Hitoshi Mitake h.mitake at gmail.com
Mon Dec 24 08:44:28 CET 2012


This patch adds sheepkeeper, a new cluster manager for sheepdog.

sheepkeeper cluster manager is designed as master-slave. So it can be
SPOF. But I'm planning to let it be redundant by utilizing corosync.
If it can be improved, this can be an alternative of the group
management part of ZooKeeper, and it doesn't depend on JVM.

This patch adds the new directory sheepkeeper/ for it, and the
executable binary will be produced as sheepkeeper/sheepkeeper.
The cluster driver for sheep is added as sheep/cluster/sheepkeeper.c.

Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
v2
 * lots of cleaning and bug fix
 * sane command line option handling
 * a little bit improvement of leave handling
 * not call sd_join_handler() in sheepkeeper_join()
 * add new option --enable-sheepkeeper to configure script for
   enable/disable building sheepkeeper

v3
 * lots of cleaning and bug fix
 * suppress some compiler warnings
 * handle leave of sheeps in saner way. this also reduces the problem
   of odd handling of return values from xread/xwrite
 * default log of sheepkeeper is /var/log/sheepkeeper.c, and users can
   specify custom location with -l option

v4
 * tons of cleaning and bug fix
 * documentation: https://github.com/mitake/sheepdog/wiki/sheepkeeper-design-note
 * stabilizing. The unpassed tests are: 008 (long), 015, 043, and 044 (long).

8<---
 .gitignore                  |    1 +
 Makefile.am                 |    2 +-
 configure.ac                |    3 +-
 include/Makefile.am         |    3 +-
 include/net.h               |    1 +
 include/sheepkeeper.h       |   78 ++++
 include/sheepkeeper_msg.def |   17 +
 lib/net.c                   |   17 +
 sheep/Makefile.am           |    3 +-
 sheep/cluster/sheepkeeper.c |  713 +++++++++++++++++++++++++++++++++
 sheepkeeper/Makefile.am     |   44 ++
 sheepkeeper/sheepkeeper.c   |  923 +++++++++++++++++++++++++++++++++++++++++++
 12 files changed, 1801 insertions(+), 4 deletions(-)
 create mode 100644 include/sheepkeeper.h
 create mode 100644 include/sheepkeeper_msg.def
 create mode 100644 sheep/cluster/sheepkeeper.c
 create mode 100644 sheepkeeper/Makefile.am
 create mode 100644 sheepkeeper/sheepkeeper.c

diff --git a/.gitignore b/.gitignore
index dbdbd55..ffc9003 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,6 +31,7 @@ GSYMS
 collie/collie
 sheep/sheep
 sheepfs/sheepfs
+sheepkeeper/sheepkeeper
 
 # directories
 .deps
diff --git a/Makefile.am b/Makefile.am
index 53d18b9..f97cab8 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -17,7 +17,7 @@ sheepdogsysconfdir	= ${SHEEPDOGCONFDIR}
 
 sheepdogsysconf_DATA	= 
 
-SUBDIRS			= lib collie sheep include script man
+SUBDIRS			= lib collie sheep include script man sheepkeeper
 
 if BUILD_SHEEPFS
 SUBDIRS			+= sheepfs
diff --git a/configure.ac b/configure.ac
index 9e03e23..2688b6a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -133,7 +133,8 @@ AC_CONFIG_FILES([Makefile
 		include/Makefile
 		script/Makefile
 		lib/Makefile
-		man/Makefile])
+		man/Makefile
+		sheepkeeper/Makefile])
 
 ### Local business
 
diff --git a/include/Makefile.am b/include/Makefile.am
index 6413047..aee2f1b 100644
--- a/include/Makefile.am
+++ b/include/Makefile.am
@@ -2,4 +2,5 @@ MAINTAINERCLEANFILES    = Makefile.in config.h.in
 
 noinst_HEADERS          = bitops.h event.h logger.h sheepdog_proto.h util.h \
 			  list.h net.h sheep.h exits.h strbuf.h rbtree.h \
-			  sha1.h option.h internal_proto.h
+			  sha1.h option.h internal_proto.h sheepkeeper.h sheepkeeper_msg.def
+>>>>>>> sheepkeeper: a new cluster manager specialized for sheepdog
diff --git a/include/net.h b/include/net.h
index 3665f5e..9d025e4 100644
--- a/include/net.h
+++ b/include/net.h
@@ -50,6 +50,7 @@ int create_unix_domain_socket(const char *unix_path,
 
 char *addr_to_str(char *str, int size, const uint8_t *addr, uint16_t port);
 uint8_t *str_to_addr(int af, const char *ipstr, uint8_t *addr);
+char *sockaddr_in_to_str(struct sockaddr_in *sockaddr);
 int set_nonblocking(int fd);
 int set_nodelay(int fd);
 int set_keepalive(int fd);
diff --git a/include/sheepkeeper.h b/include/sheepkeeper.h
new file mode 100644
index 0000000..ec0a466
--- /dev/null
+++ b/include/sheepkeeper.h
@@ -0,0 +1,78 @@
+#ifndef SHEEPKEEPER_H
+#define SHEEPKEEPER_H
+
+enum sk_msg_type {
+	_SK_MSG_PADDING = 0,	/* for making first member 1 */
+#define MSG(x) x,
+#include "sheepkeeper_msg.def"
+#undef MSG
+	_NR_SK_MSG,
+};
+
+#define NR_SK_MSG (_NR_SK_MSG - 1)
+
+struct sk_msg {
+	uint32_t type; /* original type: enum sk_msg_type */
+	uint32_t body_len;
+};
+
+#include "internal_proto.h"
+
+struct sk_msg_join {
+	uint32_t res;		/* original type: enum cluster_join_result */
+	struct sd_node node;
+	uint8_t master_elected;
+	uint8_t opaque[0];
+};
+
+struct sk_msg_join_reply {
+	uint32_t res;		/* original type: enum cluster_join_result */
+	struct sd_node nodes[SD_MAX_NODES];
+	uint32_t nr_nodes;
+	uint8_t opaque[0];
+};
+
+struct sk_msg_join_node_finish {
+	uint32_t res;		/* original type: enum cluster_join_result */
+	struct sd_node new_node;
+
+	struct sd_node nodes[SD_MAX_NODES];
+	uint32_t nr_nodes;
+	uint8_t opaque[0];
+};
+
+struct sk_msg_notify {
+	uint8_t unblock;
+	uint8_t notify_msg[0];
+};
+
+struct sk_msg_notify_forward {
+	struct sd_node from_node;
+	uint8_t unblock;
+	uint8_t notify_msg[0];
+};
+
+#define SHEEPKEEPER_PORT 2501
+
+static inline const char *sk_msg_to_str(enum sk_msg_type type)
+/* CAUTION: non reentrant */
+{
+	static char unknown[64];
+
+	/*
+	 * I know this MSG() seems odd. The do statement is for suppressing
+	 * error of checkpatch.pl.
+	 */
+#define MSG(x) do {				\
+		if (x == type)			\
+			return #x;		\
+	} while (0);
+#include "sheepkeeper_msg.def"
+#undef MSG
+
+	memset(unknown, 0, 64);
+	snprintf(unknown, 64, "<unknown sheepkeeper message: %d>", type);
+	return unknown;
+}
+
+#endif	/* SHEEPKEEPER_H */
diff --git a/include/sheepkeeper_msg.def b/include/sheepkeeper_msg.def
new file mode 100644
index 0000000..b7665c4
--- /dev/null
+++ b/include/sheepkeeper_msg.def
@@ -0,0 +1,17 @@
+MSG(SK_MSG_JOIN)
+MSG(SK_MSG_JOIN_REPLY)
+MSG(SK_MSG_JOIN_RETRY)
+MSG(SK_MSG_NEW_NODE)
+MSG(SK_MSG_NEW_NODE_REPLY)
+MSG(SK_MSG_NEW_NODE_FINISH)
+
+MSG(SK_MSG_NOTIFY)
+MSG(SK_MSG_NOTIFY_FORWARD)
+
+MSG(SK_MSG_BLOCK)
+MSG(SK_MSG_BLOCK_FORWARD)
+
+MSG(SK_MSG_LEAVE)
+MSG(SK_MSG_LEAVE_FORWARD)
+
+MSG(SK_MSG_MASTER_ELECTION)
diff --git a/lib/net.c b/lib/net.c
index 9b7fea8..b7bfe73 100644
--- a/lib/net.c
+++ b/lib/net.c
@@ -412,6 +412,23 @@ char *addr_to_str(char *str, int size, const uint8_t *addr, uint16_t port)
 	return str;
 }
 
+char *sockaddr_in_to_str(struct sockaddr_in *sockaddr)
+{
+	int i, si;
+	static char str[32];
+	uint8_t *addr;
+
+	si = 0;
+	memset(str, 0, 32);
+
+	addr = (uint8_t *)&sockaddr->sin_addr.s_addr;
+	for (i = 0; i < 4; i++)
+		si += sprintf(str + si, i != 3 ? "%d." : "%d", addr[i]);
+	sprintf(str + si, ":%u", sockaddr->sin_port);
+
+	return str;
+}
+
 uint8_t *str_to_addr(int af, const char *ipstr, uint8_t *addr)
 {
 	int addr_start_idx = 0;
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 0ae19de..43faf28 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -27,7 +27,8 @@ sbin_PROGRAMS		= sheep
 sheep_SOURCES		= sheep.c group.c request.c gateway.c store.c vdi.c work.c \
 			  journal.c ops.c recovery.c cluster/local.c \
 			  object_cache.c object_list_cache.c sockfd_cache.c \
-			  plain_store.c config.c migrate.c journal_file.c
+			  plain_store.c config.c migrate.c journal_file.c \
+			  cluster/sheepkeeper.c
 
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
diff --git a/sheep/cluster/sheepkeeper.c b/sheep/cluster/sheepkeeper.c
new file mode 100644
index 0000000..3f6c874
--- /dev/null
+++ b/sheep/cluster/sheepkeeper.c
@@ -0,0 +1,713 @@
+/*
+ * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+
+#include "cluster.h"
+#include "event.h"
+#include "sheepkeeper.h"
+#include "internal_proto.h"
+
+static int sk_comm_fd;
+
+static struct sd_node this_node;
+static bool is_master;
+
+static int nr_nodes;
+static struct sd_node nodes[SD_MAX_NODES];
+
+enum sk_driver_state {
+	STATE_PRE_JOIN,
+	STATE_JOINED,
+};
+
+static enum sk_driver_state state = STATE_PRE_JOIN;
+
+static char *kept_opaque;
+static size_t kept_opaque_len;
+
+static int _sheepkeeper_join(void)
+{
+	int ret, msg_join_len;
+	struct iovec iov[2];
+	struct sk_msg msg;
+	struct sk_msg_join *msg_join;
+
+	msg_join_len = sizeof(struct sk_msg_join) + kept_opaque_len;
+
+	memset(&msg, 0, sizeof(msg));
+	msg.type = SK_MSG_JOIN;
+	msg.body_len = msg_join_len;
+
+	iov[0].iov_base = &msg;
+	iov[0].iov_len = sizeof(msg);
+
+	msg_join = xzalloc(msg_join_len);
+	msg_join->node = this_node;
+	memcpy(msg_join->opaque, kept_opaque, kept_opaque_len);
+
+	iov[1].iov_base = msg_join;
+	iov[1].iov_len = msg_join_len;
+
+	ret = writev(sk_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "sheepkeeper_join() failed, %s\n",
+			strerror(errno));
+		free(msg_join);
+
+		return -1;
+	}
+
+	free(msg_join);
+	return 0;
+}
+
+static void read_msg_pre_join(void)
+{
+	int ret;
+	struct sk_msg snd, rcv;
+	struct sk_msg_join_reply *join_reply;
+	struct iovec iov[2];
+	enum cluster_join_result res;
+
+	static int first = 1;
+
+retry:
+	if (!first)
+		/* FIXME */
+		_sheepkeeper_join();
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_INFO, "reading message from sheepkeeper failed:" \
+			" %m\n");
+		exit(1);
+	}
+
+	if (rcv.type == SK_MSG_JOIN_RETRY) {
+		vprintf(SDOG_INFO, "join request is rejected, retrying\n");
+		first = 0;
+
+		goto retry;
+	} else if (rcv.type == SK_MSG_NEW_NODE) {
+		struct sk_msg_join *join;
+		int join_len;
+
+		join_len = rcv.body_len;
+		join = xzalloc(join_len);
+		ret = xread(sk_comm_fd, join, join_len);
+		if (ret != join_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		res = sd_check_join_cb(&join->node, join->opaque);
+		if (res == CJ_RES_FAIL) {
+			vprintf(SDOG_ERR, "sd_check_join_cb() failed\n");
+			exit(1);
+		}
+		assert(res == CJ_RES_SUCCESS);
+
+		/* FIXME: join->master_elected is needed? */
+		assert(join->master_elected);
+		is_master = true;
+
+		snd.type = SK_MSG_NEW_NODE_REPLY;
+		snd.body_len = join_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = join_len;
+
+		ret = writev(sk_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			exit(1);
+		}
+
+		free(join);
+
+		ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+		if (ret != sizeof(rcv)) {
+			vprintf(SDOG_ERR, "invalid ret: %d\n", ret);
+			exit(1);
+		}
+	}
+
+	if (rcv.type != SK_MSG_JOIN_REPLY) {
+		vprintf(SDOG_ERR, "unexpected message from sheepkeeper, " \
+			"received message: %s\n", sk_msg_to_str(rcv.type));
+
+		/*
+		 * In this case, the state of this sheep in sheepkeeper must be
+		 * SHEEP_STATE_CONNECTED. Messages other than SK_MSG_JOIN_REPLY
+		 * mean bugs of sheepkeeper.
+		 */
+		exit(1);
+	}
+
+	join_reply = xzalloc(rcv.body_len);
+	ret = xread(sk_comm_fd, join_reply, rcv.body_len);
+	if (ret != rcv.body_len) {
+		vprintf(SDOG_ERR, "xread() failed: %m\n");
+		exit(1);
+	}
+
+	vprintf(SDOG_INFO, "join reply arrived, nr_nodes: %d\n",
+		join_reply->nr_nodes);
+
+	if (join_reply->res == CJ_RES_MASTER_TRANSFER) {
+		is_master = true;
+
+		nr_nodes = 1;
+		nodes[0] = this_node;
+	} else {
+		memcpy(nodes, join_reply->nodes,
+			join_reply->nr_nodes * sizeof(struct sd_node));
+
+		nr_nodes = join_reply->nr_nodes;
+	}
+
+	sd_join_handler(&this_node, nodes, nr_nodes,
+			join_reply->res, join_reply->opaque);
+
+	free(join_reply);
+
+	vprintf(SDOG_INFO, "sheepkeeper_join() succeed\n");
+	state = STATE_JOINED;
+}
+
+struct sk_event {
+	struct sd_node sender;
+
+	void *msg;
+	int msg_len;
+
+	bool callbacked, removed;
+
+	struct list_head event_list;
+};
+
+static LIST_HEAD(nonblocked_event_list);
+static LIST_HEAD(blocked_event_list);
+
+static int sk_event_fd;
+
+static bool sk_process_event(void)
+{
+	struct sk_event *ev;
+	bool nonblock;
+
+	if (!list_empty(&nonblocked_event_list)) {
+		ev = list_first_entry(&nonblocked_event_list,
+				struct sk_event, event_list);
+		nonblock = true;
+	} else if (!list_empty(&blocked_event_list)) {
+		ev = list_first_entry(&blocked_event_list,
+				struct sk_event, event_list);
+		nonblock = false;
+	} else
+		return false;
+
+	if (ev->removed)
+		goto remove;
+
+	if (ev->callbacked)
+		return false;
+
+	if (nonblock) {
+		vprintf(SDOG_DEBUG, "processing nonblock event\n");
+
+		sd_notify_handler(&ev->sender, ev->msg, ev->msg_len);
+	} else {
+		vprintf(SDOG_DEBUG, "processing block event\n");
+
+		ev->callbacked = sd_block_handler(&ev->sender);
+		return false;
+	}
+
+remove:
+	list_del(&ev->event_list);
+	free(ev->msg);
+	free(ev);
+
+	return true;
+}
+
+static void push_sk_event(bool nonblock, struct sd_node *sender,
+			void *msg, int msg_len)
+{
+	int ret;
+	struct sk_event *ev;
+
+	vprintf(SDOG_DEBUG, "push_sk_event() called, pushing %sblocking"\
+		" event\n", nonblock ? "non" : "");
+
+	ev = xzalloc(sizeof(*ev));
+
+	ev->sender = *sender;
+	if (msg_len) {
+		ev->msg = xzalloc(msg_len);
+		memcpy(ev->msg, msg, msg_len);
+		ev->msg_len = msg_len;
+	}
+
+	ev->removed = false;
+	ev->callbacked = false;
+
+	INIT_LIST_HEAD(&ev->event_list);
+
+	if (nonblock)
+		list_add(&ev->event_list, &nonblocked_event_list);
+	else
+		list_add(&ev->event_list, &blocked_event_list);
+
+	ret = eventfd_write(sk_event_fd, 1);
+	if (ret) {
+		vprintf(SDOG_ERR, "eventfd_write() failed: %m\n");
+		exit(1);
+	}
+}
+
+static void remove_one_block_event(void)
+{
+	struct sk_event *ev;
+
+	if (list_empty(&blocked_event_list))
+		/* FIXME: should I treat this case as an error? */
+		return;
+
+	ev = list_first_entry(&blocked_event_list,
+			struct sk_event, event_list);
+	ev->removed = true;
+	eventfd_write(sk_event_fd, 1);
+
+	vprintf(SDOG_DEBUG, "unblock a blocking event\n");
+}
+
+static void sk_event_handler(int fd, int events, void *data)
+{
+	int ret;
+	eventfd_t val;
+
+	ret = eventfd_read(fd, &val);
+	if (ret < 0)
+		panic("eventfd_read() failed: %m\n");
+
+	while (sk_process_event())
+		;
+}
+
+static void elected_as_master(void)
+{
+	vprintf(SDOG_DEBUG, "elected_as_master() called\n");
+
+	/* assert(!is_master); */
+	is_master = true;
+	vprintf(SDOG_INFO, "became new master\n");
+}
+
+static void read_msg(void)
+{
+	int i, j, ret;
+	struct sk_msg snd, rcv;
+	struct sd_node sender;
+	struct iovec iov[2];
+	struct sk_msg_join *join;
+	struct sk_msg_notify_forward *notify_forward;
+	struct sk_msg_join_node_finish *join_node_finish;
+	struct join_message *jm;
+
+	enum cluster_join_result res;
+
+	vprintf(SDOG_INFO, "read_msg() called\n");
+
+	ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed: %m\n");
+		exit(1);
+	}
+
+	switch (rcv.type) {
+	case SK_MSG_NEW_NODE:
+		if (!is_master) {
+			vprintf(SDOG_EMERG, "I am not a master but received" \
+				" SK_MSG_NEW_NODE, sheepkeeper is buggy\n");
+			exit(1);
+		}
+
+		join = xzalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, join, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		res = sd_check_join_cb(&join->node, join->opaque);
+
+		join->res = res;
+
+		memset(&snd, 0, sizeof(snd));
+		snd.type = SK_MSG_NEW_NODE_REPLY;
+		snd.body_len = rcv.body_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = rcv.body_len;
+
+		ret = writev(sk_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			exit(1);
+		}
+		free(join);
+
+		if (res == CJ_RES_MASTER_TRANSFER) {
+			vprintf(SDOG_ERR, "failed to join sheepdog cluster: "
+				"please retry when master is up\n");
+			exit(1);
+		}
+
+		break;
+
+	case SK_MSG_NEW_NODE_FINISH:
+		join_node_finish = xzalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, join_node_finish, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		jm = (struct join_message *)join_node_finish->opaque;
+		memcpy(nodes, join_node_finish->nodes,
+			join_node_finish->nr_nodes * sizeof(struct sd_node));
+		nr_nodes = join_node_finish->nr_nodes;
+
+		vprintf(SDOG_INFO, "new node: %s\n",
+			node_to_str(&join_node_finish->new_node));
+		sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
+				join_node_finish->res, jm);
+
+		free(join_node_finish);
+
+		break;
+
+	case SK_MSG_NOTIFY_FORWARD:
+		notify_forward = xzalloc(rcv.body_len);
+		ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		if (notify_forward->unblock)
+			remove_one_block_event();
+
+		push_sk_event(true, &notify_forward->from_node,
+			notify_forward->notify_msg,
+			rcv.body_len - sizeof(*notify_forward));
+
+		free(notify_forward);
+		break;
+
+	case SK_MSG_BLOCK_FORWARD:
+		ret = xread(sk_comm_fd, &sender, sizeof(sender));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		push_sk_event(false, &sender, NULL, 0);
+
+		break;
+
+	case SK_MSG_LEAVE_FORWARD:
+		ret = xread(sk_comm_fd, &sender, sizeof(sender));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		vprintf(SDOG_INFO, "leaving node: %s\n", node_to_str(&sender));
+
+		for (i = 0; i < nr_nodes; i++) {
+			if (node_eq(&sender, &nodes[i])) {
+				for (j = i; j < nr_nodes; j++)
+					nodes[j] = nodes[j + 1];
+
+				nr_nodes--;
+
+				goto removed;
+			}
+		}
+
+		vprintf(SDOG_INFO, "leave message from unknown node: %s\n",
+			node_to_str(&sender));
+		break;
+
+removed:
+		vprintf(SDOG_DEBUG, "calling sd_leave_handler(), sender: %s\n",
+			node_to_str(&sender));
+		sd_leave_handler(&sender, nodes, nr_nodes);
+		break;
+
+	case SK_MSG_MASTER_ELECTION:
+		elected_as_master();
+		break;
+
+	default:
+		vprintf(SDOG_ERR, "invalid message from sheepkeeper: %s, "
+			"length: %d\n", sk_msg_to_str(rcv.type), rcv.body_len);
+		exit(1);
+		break;
+	}
+}
+
+static void read_msg_from_sheepkeeper(void)
+{
+	switch (state) {
+	case STATE_PRE_JOIN:
+		read_msg_pre_join();
+		break;
+
+	case STATE_JOINED:
+		read_msg();
+		break;
+
+	default:
+		panic("invalid state of sheepkeeper cluster driver: %d\n",
+			state);
+		break;
+	};
+}
+
+static void sheepkeeper_comm_handler(int fd, int events, void *data)
+{
+	assert(fd == sk_comm_fd);
+	assert(data == NULL);
+
+	if (events & EPOLLIN)
+		read_msg_from_sheepkeeper();
+	else if (events & EPOLLHUP || events & EPOLLERR) {
+		vprintf(SDOG_ERR, "connection to sheepkeeper"
+			" caused an error: %m");
+		exit(1);
+	}
+}
+
+static void init_addr_port(const char *s, struct sockaddr_in *addr)
+{
+	/* format: <address>[:<port>] */
+	char *sep, *copied, *p;
+	uint8_t tmp_addr[16];
+	bool addr_only = false;
+
+	copied = strdup(s);
+	if (!copied)
+		panic("strdup() failed: %m");
+
+	sep = strchr(copied, ':');
+	if (!sep) {
+		addr_only = true;
+		goto addr_trans;
+	}
+
+	if (sep == copied)
+		goto invalid_format;
+
+	if (*(sep + 1) == '\0')
+		goto invalid_format;
+
+	*sep = '\0';
+
+addr_trans:
+	memset(addr, 0, sizeof(*addr));
+	addr->sin_family = AF_INET;
+
+	if (!str_to_addr(AF_INET, copied, tmp_addr))
+		goto invalid_format;
+
+	memcpy(&addr->sin_addr, &tmp_addr[12], 4);
+	if (addr_only) {
+		addr->sin_port = SHEEPKEEPER_PORT;
+		goto end;
+	}
+
+	addr->sin_port = strtol(sep + 1, &p, 10);
+	if (sep + 1 == p)
+		goto invalid_format;
+	if (*p != '\0')
+		goto invalid_format;
+
+end:
+	free(copied);
+	return;
+
+invalid_format:
+	vprintf(SDOG_ERR, "invalid option for host and port: %s\n", s);
+	exit(1);
+}
+
+static int sheepkeeper_init(const char *option)
+{
+	int ret;
+	struct sockaddr_in addr;
+
+	sk_comm_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sk_comm_fd < 0) {
+		vprintf(SDOG_INFO, "error at socket()\n");
+		return -1;
+	}
+
+	if (option)
+		init_addr_port(option, &addr);
+
+	if (connect(sk_comm_fd, &addr, sizeof(struct sockaddr_in))) {
+		vprintf(SDOG_ERR, "cannot connect to sheepkeeper,"
+			" is sheepkeeper running?\n");
+		vprintf(SDOG_ERR, "errno: %s\n", strerror(errno));
+		return -1;
+	}
+
+	sk_event_fd = eventfd(0, EFD_NONBLOCK);
+	ret = register_event(sk_event_fd, sk_event_handler, NULL);
+	if (ret) {
+		vprintf(SDOG_ERR, "register_event() failed: %m\n");
+		exit(1);
+	}
+
+	return 0;
+}
+
+static int sheepkeeper_join(const struct sd_node *myself,
+		      void *opaque, size_t opaque_len)
+{
+	int ret;
+	/* keep opaque for retrying */
+	kept_opaque = xzalloc(opaque_len);
+	memcpy(kept_opaque, opaque, opaque_len);
+	kept_opaque_len = opaque_len;
+	this_node = *myself;
+
+	vprintf(SDOG_INFO, "sheepkeeper_join() called, myself is %s\n",
+		node_to_str(myself));
+
+	ret = _sheepkeeper_join();
+	register_event(sk_comm_fd, sheepkeeper_comm_handler, NULL);
+
+	return ret;
+}
+
+static int sheepkeeper_leave(void)
+{
+	int ret;
+	struct sk_msg msg;
+
+	vprintf(SDOG_DEBUG, "sheepkeeper_leave() called\n");
+	msg.type = SK_MSG_LEAVE;
+	msg.body_len = 0;
+
+	ret = xwrite(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_INFO, "xwrite() failed: %m\n");
+		exit(1);
+	}
+
+	state = STATE_PRE_JOIN;
+	is_master = false;
+
+	return 0;
+}
+
+static int _sheepkeeper_notify(bool unblock, void *msg, size_t msg_len)
+{
+	int ret;
+	struct sk_msg snd;
+	struct iovec iov[2];
+
+	struct sk_msg_notify *notify;
+
+	vprintf(SDOG_INFO, "_sheepkeeper_notify() called\n");
+
+	snd.type = SK_MSG_NOTIFY;
+	snd.body_len = msg_len + sizeof(*notify);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	notify = xzalloc(snd.body_len);
+	notify->unblock = unblock;
+	memcpy(notify->notify_msg, msg, msg_len);
+
+	iov[1].iov_base = notify;
+	iov[1].iov_len = snd.body_len;
+
+	ret = writev(sk_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "writev() failed: %m\n");
+		exit(1);
+	}
+	free(notify);
+
+	return 0;
+}
+
+static int sheepkeeper_notify(void *msg, size_t msg_len)
+{
+	return _sheepkeeper_notify(false, msg, msg_len);
+}
+
+static void sheepkeeper_block(void)
+{
+	int ret;
+	struct sk_msg msg;
+
+	msg.type = SK_MSG_BLOCK;
+	msg.body_len = 0;
+
+	ret = xwrite(sk_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_ERR, "xwrite() failed: %m\n");
+		exit(1);
+	}
+}
+
+static void sheepkeeper_unblock(void *msg, size_t msg_len)
+{
+	_sheepkeeper_notify(true, msg, msg_len);
+}
+
+static struct cluster_driver cdrv_sheepkeeper = {
+	.name		= "sheepkeeper",
+
+	.init		= sheepkeeper_init,
+	.join		= sheepkeeper_join,
+	.leave		= sheepkeeper_leave,
+	.notify		= sheepkeeper_notify,
+	.block		= sheepkeeper_block,
+	.unblock	= sheepkeeper_unblock,
+};
+
+cdrv_register(cdrv_sheepkeeper);
diff --git a/sheepkeeper/Makefile.am b/sheepkeeper/Makefile.am
new file mode 100644
index 0000000..4e32aa3
--- /dev/null
+++ b/sheepkeeper/Makefile.am
@@ -0,0 +1,44 @@
+#
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; see the file COPYING.  If not, write to
+# the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+
+MAINTAINERCLEANFILES	= Makefile.in
+
+AM_CFLAGS		=
+
+INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include
+
+sbin_PROGRAMS		= sheepkeeper
+
+sheepkeeper_SOURCES		= sheepkeeper.c
+
+sheepkeeper_LDADD	  	= ../lib/libsheepdog.a
+sheepkeeper_DEPENDENCIES	= ../lib/libsheepdog.a
+
+EXTRA_DIST		=
+
+lint:
+	-splint $(INCLUDES) $(LINT_FLAGS) $(CFLAGS) *.c
+
+all-local:
+	@echo Built sheepkeeper
+
+clean-local:
+	rm -f sheepkeeper *.o gmon.out *.da *.bb *.bbg
+
+# support for GNU Flymake
+check-syntax:
+	$(COMPILE) -fsyntax-only $(CHK_SOURCES)
diff --git a/sheepkeeper/sheepkeeper.c b/sheepkeeper/sheepkeeper.c
new file mode 100644
index 0000000..cec9e13
--- /dev/null
+++ b/sheepkeeper/sheepkeeper.c
@@ -0,0 +1,923 @@
+/*
+ * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <errno.h>
+#include <assert.h>
+#include <getopt.h>
+
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+
+#include <sys/un.h>
+#include <netinet/in.h>
+
+#include "net.h"
+#include "event.h"
+#include "list.h"
+#include "internal_proto.h"
+#include "sheep.h"
+#include "util.h"
+#include "sheepkeeper.h"
+
+#define EPOLL_SIZE 4096
+
+enum sheepkeeper_state {
+	SK_STATE_ORDINAL,
+	SK_STATE_JOINING,
+};
+
+static enum sheepkeeper_state state = SK_STATE_ORDINAL;
+
+enum sheep_state {
+	SHEEP_STATE_CONNECTED,	/* accept()ed */
+	SHEEP_STATE_JOINED,
+	SHEEP_STATE_LEAVING,
+};
+
+struct sheep {
+	int fd;
+	struct sd_node node;
+	struct sockaddr_in addr;
+
+	enum sheep_state state;
+
+	struct list_head sheep_list;
+	struct list_head join_wait_list;
+};
+
+static LIST_HEAD(sheep_list_head);
+/*
+ * nr_joined_sheep is a number of sheeps which is in state of
+ * SHEEP_STATE_JOINED, not the length of sheep_list_head
+ */
+static int nr_joined_sheep;
+
+/*
+ * important invariant of sheepkeeper: nr_joined_sheep ? !!master_sheep : true
+ *
+ * if there is at least one sheep which is in state of SHEEP_STATE_JOINED,
+ * master sheep must be elected
+ */
+static struct sheep *master_sheep;
+
+static bool running;
+
+static int port = SHEEPKEEPER_PORT;
+static uint8_t addr[16];
+static int sheep_listen_fd;
+
+static const char *progname;
+
+static bool is_sd_node_zero(struct sd_node *node)
+{
+	static struct sd_node zero_node;
+	return !memcmp(node, &zero_node, sizeof(*node));
+}
+
+static const char *sheep_state_to_str(enum sheep_state s)
+{
+	switch (s) {
+	case SHEEP_STATE_CONNECTED:
+		return "CONNECTED";
+	case SHEEP_STATE_JOINED:
+		return "JOINED";
+	case SHEEP_STATE_LEAVING:
+		return "LEAVING";
+
+	default:
+		return "unknown sheep state, maybe bug";
+		break;
+	}
+}
+
+static void dump_sheep_list(void)
+{
+	struct sheep *s;
+
+	vprintf(SDOG_DEBUG, "dump sheep list...\n");
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		vprintf(SDOG_DEBUG, "node: %s\n", node_to_str(&s->node));
+		vprintf(SDOG_DEBUG, "state: %s\n",
+			sheep_state_to_str(s->state));
+
+		if (s == master_sheep)
+			vprintf(SDOG_DEBUG, "master\n");
+
+		vprintf(SDOG_DEBUG, "\n");
+	}
+
+	vprintf(SDOG_DEBUG, "dump done\n");
+}
+
+static int build_node_array(struct sd_node *nodes)
+{
+	int i;
+	struct sheep *s;
+
+	i = 0;
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		nodes[i++] = s->node;
+	}
+
+	return i;
+}
+
+static struct sheep *find_sheep_by_nid(struct node_id *id)
+{
+	struct sheep *s;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (!node_id_cmp(&s->node.nid, id))
+			return s;
+	}
+
+	return NULL;
+}
+
+static int remove_efd;
+
+static inline void remove_sheep(struct sheep *sheep)
+{
+	int ret;
+
+	vprintf(SDOG_DEBUG, "remove_sheep() called, removing %s\n",
+		node_to_str(&sheep->node));
+
+	if (sheep->state == SHEEP_STATE_JOINED)
+		nr_joined_sheep--;
+
+	sheep->state = SHEEP_STATE_LEAVING;
+	ret = eventfd_write(remove_efd, 1);
+	if (ret < 0)
+		panic("eventfd_write() failed: %m\n");
+
+	unregister_event(sheep->fd);
+	close(sheep->fd);
+}
+
+static int master_election(void)
+{
+	int ret, nr_failed = 0;
+	struct sheep *s;
+	struct sk_msg msg;
+
+	assert(!master_sheep);
+
+	if (!nr_joined_sheep)
+		return 0;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		msg.type = SK_MSG_MASTER_ELECTION;
+		msg.body_len = 0;
+
+		ret = xwrite(s->fd, &msg, sizeof(msg));
+		if (sizeof(msg) != ret) {
+			vprintf(SDOG_ERR, "xwrite() for SK_MSG_MASTER_ELECTION"\
+				" failed: %m\n");
+			goto election_failed;
+		}
+
+		master_sheep = s;
+		break;
+
+election_failed:
+		remove_sheep(s);
+		nr_failed++;
+	}
+
+	if (master_sheep) {
+		vprintf(SDOG_INFO, "new master elected: %s\n",
+			node_to_str(&master_sheep->node));
+	}
+
+	return nr_failed;
+}
+
+static int do_remove_sheep(struct sheep *leaving)
+{
+	int ret, failed = 0;
+	struct sheep *s;
+	struct sk_msg snd;
+	struct iovec iov[2];
+
+	snd.type = SK_MSG_LEAVE_FORWARD;
+	snd.body_len = sizeof(struct sd_node);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	iov[1].iov_base = &leaving->node;
+	iov[1].iov_len = sizeof(struct sd_node);
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		ret = writev(s->fd, iov, 2);
+
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+
+			remove_sheep(s);
+			failed++;
+		}
+	}
+
+	return failed;
+}
+
+static void remove_handler(int fd, int events, void *data)
+{
+	struct sheep *s;
+	int ret, failed = 0;
+	eventfd_t val;
+	bool election = false;
+
+	vprintf(SDOG_DEBUG, "remove_handler() called\n");
+
+	ret = eventfd_read(remove_efd, &val);
+	if (ret < 0)
+		panic("eventfd_read() failed: %m\n");
+
+	vprintf(SDOG_DEBUG, "removed sheeps: %lu\n", val);
+	assert(0 < val);
+
+
+remove:
+	/* FIXME */
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_LEAVING)
+			continue;
+
+		vprintf(SDOG_DEBUG, "removing the node: %s\n",
+			node_to_str(&s->node));
+
+		if (s == master_sheep) {
+			vprintf(SDOG_DEBUG, "removing the master\n");
+
+			master_sheep = NULL;
+			election = true;
+		}
+
+		if (!is_sd_node_zero(&s->node))
+			/*
+			 * This condition can be false when the sheep had
+			 * transited from CONNECTED to LEAVING directly.
+			 * (sd_node of sheep in CONNECTED state doesn't have
+			 * any information, because the member is initialized
+			 * when SK_MSG_NEW_NODE from master sheep is accepted.)
+			 *
+			 * sheep in CONNECTED state doesn't have to be removed
+			 * with do_remove_sheep(), because other sheeps don't
+			 * know its existence.
+			 */
+			do_remove_sheep(s);
+
+		goto del;
+	}
+
+	goto end;
+
+del:
+	vprintf(SDOG_DEBUG, "removed node: %s\n", node_to_str(&s->node));
+	list_del(&s->sheep_list);
+	list_del(&s->join_wait_list);
+	free(s);
+
+	if (--val)
+		goto remove;
+
+end:
+	if (election) {
+		vprintf(SDOG_DEBUG, "master is removed, electing new master\n");
+		failed = master_election();
+
+		vprintf(SDOG_DEBUG, "election was done");
+		assert(nr_joined_sheep ? !!master_sheep : true);
+	}
+
+	vprintf(SDOG_DEBUG, "remove_handler(): failed node: %d\n", failed);
+
+	return;
+}
+
+static void (*msg_handlers[NR_SK_MSG])
+			(struct sk_msg *, struct sheep *);
+
+static LIST_HEAD(join_wait_queue);
+
+static int release_joining_sheep(void)
+{
+	ssize_t wbytes;
+	struct sheep *waiting;
+	struct sk_msg snd;
+	int nr_failed = 0;
+
+	vprintf(SDOG_DEBUG, "release_joining_sheep() called\n");
+
+retry:
+	if (list_empty(&join_wait_queue))
+		return nr_failed;
+
+	waiting = list_first_entry(&join_wait_queue, struct sheep,
+				join_wait_list);
+	list_del(&waiting->join_wait_list);
+	INIT_LIST_HEAD(&waiting->join_wait_list);
+
+	memset(&snd, 0, sizeof(snd));
+	snd.type = SK_MSG_JOIN_RETRY;
+
+	wbytes = xwrite(waiting->fd, &snd, sizeof(snd));
+	if (sizeof(snd) != wbytes) {
+		vprintf(SDOG_ERR, "xwrite() failed: %m\n");
+		remove_sheep(waiting);
+
+		vprintf(SDOG_DEBUG, "node %s is failed to join\n",
+			node_to_str(&waiting->node));
+		nr_failed++;
+
+		goto retry;
+	}
+
+	return nr_failed;
+}
+
+static void msg_join(struct sk_msg *msg, struct sheep *sheep)
+{
+	int fd = sheep->fd;
+	ssize_t rbytes, wbytes;
+	struct iovec iov[2];
+
+	struct sk_msg snd;
+	struct sk_msg_join *join;
+
+	vprintf(SDOG_DEBUG, "msg_join() called\n");
+
+	if (state == SK_STATE_JOINING) {
+		/* we have to trash opaque from the sheep */
+		char *buf;
+		buf = xzalloc(msg->body_len);
+		rbytes = xread(fd, buf, msg->body_len);
+		if (rbytes != msg->body_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			goto purge_current_sheep;
+		}
+		free(buf);
+
+		list_add(&sheep->join_wait_list, &join_wait_queue);
+
+		vprintf(SDOG_DEBUG, "there is already a joining sheep\n");
+		return;
+	}
+
+	join = xzalloc(msg->body_len);
+	rbytes = xread(fd, join, msg->body_len);
+	if (msg->body_len != rbytes) {
+		vprintf(SDOG_ERR, "xread() for reading the body of" \
+			" SK_MSG_JOIN failed: %m\n");
+
+		free(join);
+		goto purge_current_sheep;
+	}
+
+	sheep->node = join->node;
+
+	snd.type = SK_MSG_NEW_NODE;
+	snd.body_len = msg->body_len;
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	iov[1].iov_base = join;
+	iov[1].iov_len = msg->body_len;
+
+	if (!nr_joined_sheep) {
+		/* this sheep is a new master */
+		/* FIXME: is this master_elected need? */
+		join->master_elected = true;
+	}
+
+	assert(nr_joined_sheep ? !!master_sheep : true);
+
+	wbytes = writev(!nr_joined_sheep ? fd : master_sheep->fd, iov, 2);
+	free(join);
+
+	if (wbytes != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "writev() for sending "	\
+			"SK_MSG_NEW_NODE failed: %m\n");
+
+		if (nr_joined_sheep)
+			remove_sheep(master_sheep);
+
+		goto purge_current_sheep;
+	}
+
+	state = SK_STATE_JOINING;
+	return;
+
+purge_current_sheep:
+	remove_sheep(sheep);
+}
+
+static void msg_new_node_reply(struct sk_msg *msg, struct sheep *sheep)
+{
+	int fd = sheep->fd, removed = 0;
+	ssize_t rbytes, wbytes;
+	struct iovec iov[2];
+
+	char *opaque;
+	int opaque_len;
+
+	struct sk_msg_join *join;
+	struct sheep *s, *joining_sheep;
+	struct sk_msg snd;
+	struct sk_msg_join_reply *join_reply_body;
+	struct sk_msg_join_node_finish *join_node_finish;
+
+	enum cluster_join_result join_result;
+
+	if (nr_joined_sheep && sheep != master_sheep) {
+		vprintf(SDOG_ERR, "sheep which is not master replied "	\
+			"SK_MSG_NEW_NODE_REPLY\n");
+		goto purge_current_sheep;
+	}
+
+	vprintf(SDOG_DEBUG, "new node reply from %s\n",
+		node_to_str(&sheep->node));
+
+	join = xzalloc(msg->body_len);
+	rbytes = xread(fd, join, msg->body_len);
+	if (msg->body_len != rbytes) {
+		vprintf(SDOG_ERR, "xread() failed: %m\n");
+		free(join);
+
+		goto purge_current_sheep;
+	}
+
+	join_result = join->res;
+
+	vprintf(SDOG_DEBUG, "joining node is %s\n", node_to_str(&join->node));
+
+	joining_sheep = find_sheep_by_nid(&join->node.nid);
+	if (!joining_sheep) {
+		/* master is broken */
+		vprintf(SDOG_ERR, "invalid nid is required, %s\n",
+			node_to_str(&join->node));
+		vprintf(SDOG_ERR, "purging master sheep: %s and joining one\n",
+			node_to_str(&master_sheep->node));
+
+		remove_sheep(master_sheep);
+		goto purge_current_sheep;
+	}
+
+	opaque_len = msg->body_len - sizeof(struct sk_msg_join);
+	opaque = xzalloc(opaque_len);
+	memcpy(opaque, join->opaque, opaque_len);
+
+	vprintf(SDOG_DEBUG, "length of opaque: %d\n", opaque_len);
+	memset(&snd, 0, sizeof(snd));
+	snd.type = SK_MSG_JOIN_REPLY;
+	snd.body_len = sizeof(struct sk_msg_join_reply) + opaque_len;
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	join_reply_body = xzalloc(snd.body_len);
+
+	join_reply_body->nr_nodes = build_node_array(join_reply_body->nodes);
+	memcpy(join_reply_body->opaque, opaque, opaque_len);
+	join_reply_body->res = join_result;
+
+	iov[1].iov_base = join_reply_body;
+	iov[1].iov_len = snd.body_len;
+
+	wbytes = writev(joining_sheep->fd, iov, 2);
+	free(join_reply_body);
+	free(join);
+
+	if (wbytes != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "writev() to master failed: %m\n");
+
+		remove_sheep(master_sheep);
+		goto purge_current_sheep;
+	}
+
+	snd.type = SK_MSG_NEW_NODE_FINISH;
+	snd.body_len = sizeof(*join_node_finish) + opaque_len;
+
+	join_node_finish = xzalloc(snd.body_len);
+	join_node_finish->new_node = joining_sheep->node;
+	memcpy(join_node_finish->opaque, opaque, opaque_len);
+	join_node_finish->nr_nodes = build_node_array(join_node_finish->nodes);
+	join_node_finish->res = join_result;
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	iov[1].iov_base = join_node_finish;
+	iov[1].iov_len = snd.body_len;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		if (s == joining_sheep)
+			continue;
+
+		wbytes = writev(s->fd, iov, 2);
+
+		if (wbytes != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			remove_sheep(s);
+			removed++;
+		}
+	}
+
+	free(join_node_finish);
+	free(opaque);
+
+	joining_sheep->state = SHEEP_STATE_JOINED;
+	nr_joined_sheep++;
+
+	if (nr_joined_sheep == 1) {
+		assert(!master_sheep);
+		assert(joining_sheep == sheep);
+
+		master_sheep = sheep;
+
+		vprintf(SDOG_INFO, "new master elected: %s\n",
+			node_to_str(&sheep->node));
+	}
+	state = SK_STATE_ORDINAL;
+
+	removed += release_joining_sheep();
+	return;
+
+purge_current_sheep:
+	state = SK_STATE_ORDINAL;
+
+	remove_sheep(sheep);
+}
+
+static void msg_notify(struct sk_msg *msg, struct sheep *sheep)
+{
+	ssize_t rbytes, wbytes;
+	int fd = sheep->fd, removed = 0;
+	struct iovec iov[2];
+
+	struct sk_msg snd;
+	struct sk_msg_notify *notify;
+	int notify_msg_len;
+	struct sk_msg_notify_forward *notify_forward;
+	struct sheep *s;
+
+	notify = xzalloc(msg->body_len);
+	rbytes = xread(fd, notify, msg->body_len);
+	if (rbytes != msg->body_len) {
+		vprintf(SDOG_ERR, "xread() failed: %m\n");
+		goto purge_current_sheep;
+	}
+
+	notify_forward = xzalloc(msg->body_len + sizeof(*notify_forward));
+	notify_msg_len = msg->body_len - sizeof(*notify);
+
+	memcpy(notify_forward->notify_msg, notify->notify_msg, notify_msg_len);
+	notify_forward->unblock = notify->unblock;
+	free(notify);
+
+	memset(&snd, 0, sizeof(snd));
+	snd.type = SK_MSG_NOTIFY_FORWARD;
+	snd.body_len = notify_msg_len + sizeof(*notify_forward);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	notify_forward->from_node = sheep->node;
+
+	iov[1].iov_base = notify_forward;
+	iov[1].iov_len = snd.body_len;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		wbytes = writev(s->fd, iov, 2);
+		if ((iov[0].iov_len + iov[1].iov_len) != wbytes) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			goto notify_failed;
+		}
+
+		continue;
+
+notify_failed:
+		remove_sheep(s);
+		removed++;
+	}
+
+	free(notify_forward);
+	return;
+
+purge_current_sheep:
+	remove_sheep(sheep);
+}
+
+static void msg_block(struct sk_msg *msg, struct sheep *sheep)
+{
+	int removed = 0;
+	ssize_t wbytes;
+	struct iovec iov[2];
+
+	struct sheep *s;
+	struct sk_msg snd;
+
+	memset(&snd, 0, sizeof(snd));
+	snd.type = SK_MSG_BLOCK_FORWARD;
+	snd.body_len = sizeof(struct sd_node);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	iov[1].iov_base = &sheep->node;
+	iov[1].iov_len = sizeof(struct sd_node);
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		wbytes = writev(s->fd, iov, 2);
+		if ((iov[0].iov_len + iov[1].iov_len) != wbytes) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			goto block_failed;
+		}
+
+		continue;
+
+block_failed:	/* FIXME: is this correct behaviour? */
+		remove_sheep(s);
+		removed++;
+	}
+
+	return;
+}
+
+static void msg_leave(struct sk_msg *msg, struct sheep *sheep)
+{
+	vprintf(SDOG_INFO, "%s is leaving\n", node_to_str(&sheep->node));
+	remove_sheep(sheep);
+}
+
+static void msg_invalid(struct sk_msg *msg, struct sheep *sheep)
+{
+	vprintf(SDOG_ERR, "msg_invalid() is called\n");
+	vprintf(SDOG_ERR, "received invalid message with type:"		\
+		" %s from sheep: %s (sockaddr: %s)\n", sk_msg_to_str(msg->type),
+		node_to_str(&sheep->node), sockaddr_in_to_str(&sheep->addr));
+
+	remove_sheep(sheep);
+}
+
+static void init_msg_handlers(void)
+{
+	int i;
+	for (i = 0; i < NR_SK_MSG; i++)
+		msg_handlers[i] = msg_invalid;
+
+	msg_handlers[SK_MSG_JOIN] = msg_join;
+	msg_handlers[SK_MSG_NEW_NODE_REPLY] = msg_new_node_reply;
+	msg_handlers[SK_MSG_NOTIFY] = msg_notify;
+	msg_handlers[SK_MSG_BLOCK] = msg_block;
+	msg_handlers[SK_MSG_LEAVE] = msg_leave;
+}
+
+static void read_msg_from_sheep(struct sheep *sheep)
+{
+	int ret;
+	struct sk_msg rcv;
+
+	memset(&rcv, 0, sizeof(rcv));
+	ret = xread(sheep->fd, &rcv, sizeof(rcv));
+
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed: %m, ");
+		goto remove;
+	}
+
+	if (!(0 < rcv.type && rcv.type < NR_SK_MSG)) {
+		vprintf(SDOG_ERR, "invalid message type: %d, ", rcv.type);
+		vprintf(SDOG_ERR, "from node: %s\n",
+			node_to_str(&sheep->node));
+		vprintf(SDOG_ERR, "from node (sockaddr): %s\n",
+			sockaddr_in_to_str(&sheep->addr));
+		vprintf(SDOG_ERR, "read bytes: %d, body length: %d\n",
+			ret, rcv.body_len);
+		goto remove;
+	}
+
+	vprintf(SDOG_INFO, "received op: %s\n", sk_msg_to_str(rcv.type));
+	return msg_handlers[rcv.type](&rcv, sheep);
+
+remove:
+	vprintf(SDOG_ERR, "removing node: %s\n", node_to_str(&sheep->node));
+	remove_sheep(sheep);
+}
+
+static void sheep_comm_handler(int fd, int events, void *data)
+{
+	vprintf(SDOG_DEBUG, "sheep_comm_handler() called\n");
+
+	if (events & EPOLLIN)
+		read_msg_from_sheep(data);
+	else if (events & EPOLLHUP || events & EPOLLERR) {
+		vprintf(SDOG_INFO, "epoll() error: %s\n",
+			node_to_str(&((struct sheep *)data)->node));
+		remove_sheep(data);
+	}
+}
+
+static void sheep_accept_handler(int fd, int events, void *data)
+{
+	int ret;
+	struct sheep *new_sheep;
+	socklen_t len;
+
+	new_sheep = xzalloc(sizeof(struct sheep));
+	INIT_LIST_HEAD(&new_sheep->sheep_list);
+
+	len = sizeof(struct sockaddr_in);
+	new_sheep->fd = accept(fd, (struct sockaddr *)&new_sheep->addr, &len);
+	if (new_sheep->fd < 0) {
+		vprintf(SDOG_ERR, "accept() failed: %m\n");
+		goto clean;
+	}
+
+	if (-1 == set_keepalive(new_sheep->fd)) {
+		vprintf(SDOG_ERR, "set_keepalive() failed: %m\n");
+		goto clean;
+	}
+
+	ret = register_event(new_sheep->fd, sheep_comm_handler, new_sheep);
+	if (ret < 0) {
+		vprintf(SDOG_ERR, "register_event() failed: %m\n");
+		goto clean;
+	}
+
+	list_add_tail(&new_sheep->sheep_list, &sheep_list_head);
+	new_sheep->state = SHEEP_STATE_CONNECTED;
+
+	INIT_LIST_HEAD(&new_sheep->join_wait_list);
+
+	vprintf(SDOG_INFO, "accepted new sheep connection\n");
+	return;
+
+clean:
+	free(new_sheep);
+}
+
+static struct option const long_options[] = {
+	{ "port", required_argument, NULL, 'p' },
+	{ "address", required_argument, NULL, 'a' },
+	{ "foreground", no_argument, NULL, 'f' },
+	{ "debug", no_argument, NULL, 'd' },
+	{ "log-file", no_argument, NULL, 'l' },
+
+	{ NULL, 0, NULL, 0 },
+};
+
+static const char *short_options = "p:a:fdl:";
+
+static void exit_handler(void)
+{
+	vprintf(SDOG_INFO, "exiting...\n");
+}
+
+int main(int argc, char **argv)
+{
+	int ch, ret, longindex, opt;
+	char *p;
+	bool daemonize = true;
+	struct sockaddr_in listen_addr;
+	int log_level = SDOG_INFO;
+	const char *log_file = "/var/log/sheepkeeper.log";
+
+	progname = argv[0];
+
+	while ((ch = getopt_long(argc, argv, short_options, long_options,
+				 &longindex)) >= 0) {
+		switch (ch) {
+		case 'p':
+			port = strtol(optarg, &p, 10);
+			if (p == optarg) {
+				fprintf(stderr, "invalid port: %s\n", optarg);
+				exit(1);
+			}
+			break;
+
+		case 'a':
+			if (!str_to_addr(AF_INET, (const char *)optarg, addr)) {
+				fprintf(stderr, "invalid address: %s\n",
+					optarg);
+				exit(1);
+			}
+			break;
+
+		case 'f':
+			daemonize = false;
+			break;
+
+		case 'd':
+			log_level = SDOG_DEBUG;
+			break;
+
+		case 'l':
+			log_file = optarg;
+			break;
+
+		default:
+			fprintf(stderr, "unknown option\n");
+			exit(1);
+			break;
+		}
+	}
+
+	if (daemonize) {
+		ret = daemon(0, 0);
+
+		if (-1 == ret) {
+			fprintf(stderr, "daemon() failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+
+	ret = log_init(progname, LOG_SPACE_SIZE, !daemonize, log_level,
+		(char *)log_file);
+	if (ret)
+		panic("initialize logger failed: %m\n");
+
+	atexit(exit_handler);
+	init_event(EPOLL_SIZE);
+
+	remove_efd = eventfd(0, EFD_NONBLOCK);
+	if (remove_efd < 0)
+		panic("eventfd() failed: %m\n");
+
+	ret = register_event(remove_efd, remove_handler, NULL);
+	if (ret)
+		panic("register_event() failed: %m\n");
+
+	/* setup inet socket for communication with sheeps */
+	sheep_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sheep_listen_fd < 0)
+		panic("socket() failed: %m\n");
+
+	init_msg_handlers();
+
+	opt = 1;
+	ret = setsockopt(sheep_listen_fd, SOL_SOCKET, SO_REUSEADDR,
+			&opt, sizeof(opt));
+	if (ret == -1)
+		panic("setsockopt() for SO_REUSEADDR failed: %m\n");
+
+	memset(&listen_addr, 0, sizeof(listen_addr));
+	listen_addr.sin_family = AF_INET;
+	listen_addr.sin_port = port;
+	memcpy(&listen_addr.sin_addr, addr + 12, sizeof(listen_addr.sin_addr));
+
+	ret = bind(sheep_listen_fd, &listen_addr, sizeof(struct sockaddr_in));
+	if (ret == -1)
+		panic("bind() failed: %m\n");
+
+	ret = listen(sheep_listen_fd, 1);
+	if (ret == -1)
+		panic("listen() failed: %m\n");
+
+	ret = register_event(sheep_listen_fd, sheep_accept_handler, NULL);
+	if (ret)
+		panic("register_event() failed: %m\n");
+
+	running = true;
+
+	while (running)
+		event_loop(-1);
+
+	return 0;
+}
-- 
1.7.5.1




More information about the sheepdog mailing list